From a6625ec5de9284fd674836dc92591a5b37f04cb2 Mon Sep 17 00:00:00 2001 From: Alex Date: Sun, 22 Feb 2026 11:10:42 +0000 Subject: [PATCH] fix: mini workflow fixes --- application/agents/base.py | 11 +- application/agents/workflow_agent.py | 21 +- .../agents/workflows/workflow_engine.py | 202 ++++++++++- application/api/user/agents/routes.py | 60 +--- application/api/user/workflows/routes.py | 75 +++- application/core/json_schema_utils.py | 34 ++ frontend/src/agents/NewAgent.tsx | 16 +- .../src/agents/workflow/WorkflowBuilder.tsx | 332 +++++++++++++++++- .../src/agents/workflow/WorkflowPreview.tsx | 65 ++-- .../workflow/components/PromptTextArea.tsx | 162 +++++++-- frontend/src/agents/workflow/nodes/index.tsx | 6 +- .../agents/workflow/workflowPreviewSlice.ts | 18 +- tests/agents/test_workflow_engine.py | 332 ++++++++++++++++++ tests/agents/test_workflow_template.py | 63 ++++ 14 files changed, 1261 insertions(+), 136 deletions(-) create mode 100644 application/core/json_schema_utils.py create mode 100644 tests/agents/test_workflow_engine.py create mode 100644 tests/agents/test_workflow_template.py diff --git a/application/agents/base.py b/application/agents/base.py index 49c84d33..2795847b 100644 --- a/application/agents/base.py +++ b/application/agents/base.py @@ -7,6 +7,10 @@ from bson.objectid import ObjectId from application.agents.tools.tool_action_parser import ToolActionParser from application.agents.tools.tool_manager import ToolManager +from application.core.json_schema_utils import ( + JsonSchemaValidationError, + normalize_json_schema_payload, +) from application.core.mongo_db import MongoDB from application.core.settings import settings from application.llm.handlers.handler_creator import LLMHandlerCreator @@ -63,7 +67,12 @@ class BaseAgent(ABC): llm_name if llm_name else "default" ) self.attachments = attachments or [] - self.json_schema = json_schema + self.json_schema = None + if json_schema is not None: + try: + self.json_schema = normalize_json_schema_payload(json_schema) + except JsonSchemaValidationError as exc: + logger.warning("Ignoring invalid JSON schema payload: %s", exc) self.limited_token_mode = limited_token_mode self.token_limit = token_limit self.limited_request_mode = limited_request_mode diff --git a/application/agents/workflow_agent.py b/application/agents/workflow_agent.py index 73e98be7..5c005df5 100644 --- a/application/agents/workflow_agent.py +++ b/application/agents/workflow_agent.py @@ -211,8 +211,21 @@ class WorkflowAgent(BaseAgent): def _serialize_state(self, state: Dict[str, Any]) -> Dict[str, Any]: serialized: Dict[str, Any] = {} for key, value in state.items(): - if isinstance(value, (str, int, float, bool, type(None))): - serialized[key] = value - else: - serialized[key] = str(value) + serialized[key] = self._serialize_state_value(value) return serialized + + def _serialize_state_value(self, value: Any) -> Any: + if isinstance(value, dict): + return { + str(dict_key): self._serialize_state_value(dict_value) + for dict_key, dict_value in value.items() + } + if isinstance(value, list): + return [self._serialize_state_value(item) for item in value] + if isinstance(value, tuple): + return [self._serialize_state_value(item) for item in value] + if isinstance(value, datetime): + return value.isoformat() + if isinstance(value, (str, int, float, bool, type(None))): + return value + return str(value) diff --git a/application/agents/workflows/workflow_engine.py b/application/agents/workflows/workflow_engine.py index e417e621..ef372f98 100644 --- a/application/agents/workflows/workflow_engine.py +++ b/application/agents/workflows/workflow_engine.py @@ -1,3 +1,4 @@ +import json import logging from datetime import datetime, timezone from typing import Any, Dict, Generator, List, Optional, TYPE_CHECKING @@ -13,6 +14,17 @@ from application.agents.workflows.schemas import ( WorkflowGraph, WorkflowNode, ) +from application.core.json_schema_utils import ( + JsonSchemaValidationError, + normalize_json_schema_payload, +) +from application.templates.namespaces import NamespaceManager +from application.templates.template_engine import TemplateEngine, TemplateRenderError + +try: + import jsonschema +except ImportError: # pragma: no cover - optional dependency in some deployments. + jsonschema = None if TYPE_CHECKING: from application.agents.base import BaseAgent @@ -20,6 +32,7 @@ logger = logging.getLogger(__name__) StateValue = Any WorkflowState = Dict[str, StateValue] +TEMPLATE_RESERVED_NAMESPACES = {"agent", "system", "source", "tools", "passthrough"} class WorkflowEngine: @@ -31,6 +44,8 @@ class WorkflowEngine: self.state: WorkflowState = {} self.execution_log: List[Dict[str, Any]] = [] self._condition_result: Optional[str] = None + self._template_engine = TemplateEngine() + self._namespace_manager = NamespaceManager() def execute( self, initial_inputs: WorkflowState, query: str @@ -174,7 +189,11 @@ class WorkflowEngine: def _execute_agent_node( self, node: WorkflowNode ) -> Generator[Dict[str, str], None, None]: - from application.core.model_utils import get_api_key_for_provider + from application.core.model_utils import ( + get_api_key_for_provider, + get_model_capabilities, + get_provider_from_model_id, + ) node_config = AgentNodeConfig(**node.config.get("config", node.config)) @@ -182,27 +201,50 @@ class WorkflowEngine: formatted_prompt = self._format_template(node_config.prompt_template) else: formatted_prompt = self.state.get("query", "") - node_llm_name = node_config.llm_name or self.agent.llm_name + node_json_schema = self._normalize_node_json_schema( + node_config.json_schema, node.title + ) + node_model_id = node_config.model_id or self.agent.model_id + node_llm_name = ( + node_config.llm_name + or get_provider_from_model_id(node_model_id or "") + or self.agent.llm_name + ) node_api_key = get_api_key_for_provider(node_llm_name) or self.agent.api_key + if node_json_schema and node_model_id: + model_capabilities = get_model_capabilities(node_model_id) + if model_capabilities and not model_capabilities.get( + "supports_structured_output", False + ): + raise ValueError( + f'Model "{node_model_id}" does not support structured output for node "{node.title}"' + ) + node_agent = WorkflowNodeAgentFactory.create( agent_type=node_config.agent_type, endpoint=self.agent.endpoint, llm_name=node_llm_name, - model_id=node_config.model_id or self.agent.model_id, + model_id=node_model_id, api_key=node_api_key, tool_ids=node_config.tools, prompt=node_config.system_prompt, chat_history=self.agent.chat_history, decoded_token=self.agent.decoded_token, - json_schema=node_config.json_schema, + json_schema=node_json_schema, ) - full_response = "" + full_response_parts: List[str] = [] + structured_response_parts: List[str] = [] + has_structured_response = False first_chunk = True for event in node_agent.gen(formatted_prompt): if "answer" in event: - full_response += event["answer"] + chunk = str(event["answer"]) + full_response_parts.append(chunk) + if event.get("structured"): + has_structured_response = True + structured_response_parts.append(chunk) if node_config.stream_to_user: if first_chunk and hasattr(self, "_has_streamed"): yield {"answer": "\n\n"} @@ -212,8 +254,33 @@ class WorkflowEngine: if node_config.stream_to_user: self._has_streamed = True - output_key = node_config.output_variable or f"node_{node.id}_output" - self.state[output_key] = full_response.strip() + full_response = "".join(full_response_parts).strip() + output_value: Any = full_response + if has_structured_response: + structured_response = "".join(structured_response_parts).strip() + response_to_parse = structured_response or full_response + parsed_success, parsed_structured = self._parse_structured_output( + response_to_parse + ) + output_value = parsed_structured if parsed_success else response_to_parse + if node_json_schema: + self._validate_structured_output(node_json_schema, output_value) + elif node_json_schema: + parsed_success, parsed_structured = self._parse_structured_output( + full_response + ) + if not parsed_success: + raise ValueError( + "Structured output was expected but response was not valid JSON" + ) + output_value = parsed_structured + self._validate_structured_output(node_json_schema, output_value) + + default_output_key = f"node_{node.id}_output" + self.state[default_output_key] = output_value + + if node_config.output_variable: + self.state[node_config.output_variable] = output_value def _execute_state_node( self, node: WorkflowNode @@ -254,13 +321,122 @@ class WorkflowEngine: formatted_output = self._format_template(output_template) yield {"answer": formatted_output} + def _parse_structured_output(self, raw_response: str) -> tuple[bool, Optional[Any]]: + normalized_response = raw_response.strip() + if not normalized_response: + return False, None + + try: + return True, json.loads(normalized_response) + except json.JSONDecodeError: + logger.warning( + "Workflow agent returned structured output that was not valid JSON" + ) + return False, None + + def _normalize_node_json_schema( + self, schema: Optional[Dict[str, Any]], node_title: str + ) -> Optional[Dict[str, Any]]: + if schema is None: + return None + try: + return normalize_json_schema_payload(schema) + except JsonSchemaValidationError as exc: + raise ValueError( + f'Invalid JSON schema for node "{node_title}": {exc}' + ) from exc + + def _validate_structured_output(self, schema: Dict[str, Any], output_value: Any) -> None: + if jsonschema is None: + logger.warning( + "jsonschema package is not available, skipping structured output validation" + ) + return + + try: + normalized_schema = normalize_json_schema_payload(schema) + except JsonSchemaValidationError as exc: + raise ValueError(f"Invalid JSON schema: {exc}") from exc + + try: + jsonschema.validate(instance=output_value, schema=normalized_schema) + except jsonschema.exceptions.ValidationError as exc: + raise ValueError(f"Structured output did not match schema: {exc.message}") from exc + except jsonschema.exceptions.SchemaError as exc: + raise ValueError(f"Invalid JSON schema: {exc.message}") from exc + def _format_template(self, template: str) -> str: - formatted = template + context = self._build_template_context() + try: + return self._template_engine.render(template, context) + except TemplateRenderError as e: + logger.warning( + "Workflow template rendering failed, using raw template: %s", str(e) + ) + return template + + def _build_template_context(self) -> Dict[str, Any]: + docs, docs_together = self._get_source_template_data() + passthrough_data = ( + self.state.get("passthrough") + if isinstance(self.state.get("passthrough"), dict) + else None + ) + tools_data = ( + self.state.get("tools") if isinstance(self.state.get("tools"), dict) else None + ) + + context = self._namespace_manager.build_context( + user_id=getattr(self.agent, "user", None), + request_id=getattr(self.agent, "request_id", None), + passthrough_data=passthrough_data, + docs=docs, + docs_together=docs_together, + tools_data=tools_data, + ) + + agent_context: Dict[str, Any] = {} for key, value in self.state.items(): - placeholder = f"{{{{{key}}}}}" - if placeholder in formatted and value is not None: - formatted = formatted.replace(placeholder, str(value)) - return formatted + if not isinstance(key, str): + continue + normalized_key = key.strip() + if not normalized_key: + continue + agent_context[normalized_key] = value + + context["agent"] = agent_context + + # Keep legacy top-level variables working while namespaced variables are adopted. + for key, value in agent_context.items(): + if key in TEMPLATE_RESERVED_NAMESPACES: + context[f"agent_{key}"] = value + continue + if key not in context: + context[key] = value + + return context + + def _get_source_template_data(self) -> tuple[Optional[List[Dict[str, Any]]], Optional[str]]: + docs = getattr(self.agent, "retrieved_docs", None) + if not isinstance(docs, list) or len(docs) == 0: + return None, None + + docs_together_parts: List[str] = [] + for doc in docs: + if not isinstance(doc, dict): + continue + text = doc.get("text") + if not isinstance(text, str): + continue + + filename = doc.get("filename") or doc.get("title") or doc.get("source") + if isinstance(filename, str) and filename.strip(): + docs_together_parts.append(f"{filename}\n{text}") + else: + docs_together_parts.append(text) + + docs_together = "\n\n".join(docs_together_parts) if docs_together_parts else None + return docs, docs_together def get_execution_summary(self) -> List[NodeExecutionLog]: return [ diff --git a/application/api/user/agents/routes.py b/application/api/user/agents/routes.py index d0c74923..64fa7bed 100644 --- a/application/api/user/agents/routes.py +++ b/application/api/user/agents/routes.py @@ -23,6 +23,10 @@ from application.api.user.base import ( workflow_nodes_collection, workflows_collection, ) +from application.core.json_schema_utils import ( + JsonSchemaValidationError, + normalize_json_schema_payload, +) from application.core.settings import settings from application.utils import ( check_required_fields, @@ -479,41 +483,15 @@ class CreateAgent(Resource): data["models"] = [] print(f"Received data: {data}") - # Validate JSON schema if provided - - if data.get("json_schema"): + # Validate and normalize JSON schema if provided + if "json_schema" in data: try: - # Basic validation - ensure it's a valid JSON structure - - json_schema = data.get("json_schema") - if not isinstance(json_schema, dict): - return make_response( - jsonify( - { - "success": False, - "message": "JSON schema must be a valid JSON object", - } - ), - 400, - ) - # Validate that it has either a 'schema' property or is itself a schema - - if "schema" not in json_schema and "type" not in json_schema: - return make_response( - jsonify( - { - "success": False, - "message": "JSON schema must contain either a 'schema' property or be a valid JSON schema with 'type' property", - } - ), - 400, - ) - except Exception as e: - current_app.logger.error(f"Invalid JSON schema: {e}") + data["json_schema"] = normalize_json_schema_payload( + data.get("json_schema") + ) + except JsonSchemaValidationError as exc: return make_response( - jsonify( - {"success": False, "message": "Invalid JSON schema format"} - ), + jsonify({"success": False, "message": f"JSON schema {exc}"}), 400, ) if data.get("status") not in ["draft", "published"]: @@ -732,6 +710,8 @@ class UpdateAgent(Resource): ), 400, ) + if data.get("json_schema") == "": + data["json_schema"] = None except Exception as err: current_app.logger.error( f"Error parsing request data: {err}", exc_info=True @@ -892,17 +872,15 @@ class UpdateAgent(Resource): elif field == "json_schema": json_schema = data.get("json_schema") if json_schema is not None: - if not isinstance(json_schema, dict): + try: + update_fields[field] = normalize_json_schema_payload( + json_schema + ) + except JsonSchemaValidationError as exc: return make_response( - jsonify( - { - "success": False, - "message": "JSON schema must be a valid object", - } - ), + jsonify({"success": False, "message": f"JSON schema {exc}"}), 400, ) - update_fields[field] = json_schema else: update_fields[field] = None elif field == "limited_token_mode": diff --git a/application/api/user/workflows/routes.py b/application/api/user/workflows/routes.py index 1b1a83ed..fc6006bd 100644 --- a/application/api/user/workflows/routes.py +++ b/application/api/user/workflows/routes.py @@ -1,7 +1,7 @@ """Workflow management routes.""" from datetime import datetime, timezone -from typing import Dict, List, Set +from typing import Any, Dict, List, Optional, Set from flask import current_app, request from flask_restx import Namespace, Resource @@ -11,6 +11,11 @@ from application.api.user.base import ( workflow_nodes_collection, workflows_collection, ) +from application.core.json_schema_utils import ( + JsonSchemaValidationError, + normalize_json_schema_payload, +) +from application.core.model_utils import get_model_capabilities from application.api.user.utils import ( check_resource_ownership, error_response, @@ -85,6 +90,50 @@ def fetch_graph_documents(collection, workflow_id: str, graph_version: int) -> L return docs +def validate_json_schema_payload( + json_schema: Any, +) -> tuple[Optional[Dict[str, Any]], Optional[str]]: + """Validate and normalize optional JSON schema payload for structured output.""" + if json_schema is None: + return None, None + try: + return normalize_json_schema_payload(json_schema), None + except JsonSchemaValidationError as exc: + return None, str(exc) + + +def normalize_agent_node_json_schemas(nodes: List[Dict]) -> List[Dict]: + """Normalize agent-node JSON schema payloads before persistence.""" + normalized_nodes: List[Dict] = [] + for node in nodes: + if not isinstance(node, dict): + normalized_nodes.append(node) + continue + + normalized_node = dict(node) + if normalized_node.get("type") != "agent": + normalized_nodes.append(normalized_node) + continue + + raw_config = normalized_node.get("data") + if not isinstance(raw_config, dict) or "json_schema" not in raw_config: + normalized_nodes.append(normalized_node) + continue + + normalized_config = dict(raw_config) + try: + normalized_config["json_schema"] = normalize_json_schema_payload( + raw_config.get("json_schema") + ) + except JsonSchemaValidationError: + # Validation runs before normalization; keep original on unexpected shape. + normalized_config["json_schema"] = raw_config.get("json_schema") + normalized_node["data"] = normalized_config + normalized_nodes.append(normalized_node) + + return normalized_nodes + + def validate_workflow_structure(nodes: List[Dict], edges: List[Dict]) -> List[str]: """Validate workflow graph structure.""" errors = [] @@ -216,6 +265,28 @@ def validate_workflow_structure(nodes: List[Dict], edges: List[Dict]) -> List[st f"must eventually reach an end node" ) + agent_nodes = [n for n in nodes if n.get("type") == "agent"] + for agent_node in agent_nodes: + agent_title = agent_node.get("title", agent_node.get("id", "unknown")) + raw_config = agent_node.get("data", {}) or {} + if not isinstance(raw_config, dict): + errors.append(f"Agent node '{agent_title}' has invalid configuration") + continue + normalized_schema, schema_error = validate_json_schema_payload( + raw_config.get("json_schema") + ) + has_json_schema = normalized_schema is not None + + model_id = raw_config.get("model_id") + if has_json_schema and isinstance(model_id, str) and model_id.strip(): + capabilities = get_model_capabilities(model_id.strip()) + if capabilities and not capabilities.get("supports_structured_output", False): + errors.append( + f"Agent node '{agent_title}' selected model does not support structured output" + ) + if schema_error: + errors.append(f"Agent node '{agent_title}' JSON schema {schema_error}") + for node in nodes: if not node.get("id"): errors.append("All nodes must have an id") @@ -301,6 +372,7 @@ class WorkflowList(Resource): return error_response( "Workflow validation failed", errors=validation_errors ) + nodes_data = normalize_agent_node_json_schemas(nodes_data) now = datetime.now(timezone.utc) workflow_doc = { @@ -391,6 +463,7 @@ class WorkflowDetail(Resource): return error_response( "Workflow validation failed", errors=validation_errors ) + nodes_data = normalize_agent_node_json_schemas(nodes_data) current_graph_version = get_workflow_graph_version(workflow) next_graph_version = current_graph_version + 1 diff --git a/application/core/json_schema_utils.py b/application/core/json_schema_utils.py new file mode 100644 index 00000000..1d9d8f04 --- /dev/null +++ b/application/core/json_schema_utils.py @@ -0,0 +1,34 @@ +from typing import Any, Dict, Optional + + +class JsonSchemaValidationError(ValueError): + """Raised when a JSON schema payload is invalid.""" + + +def normalize_json_schema_payload(json_schema: Any) -> Optional[Dict[str, Any]]: + """ + Normalize accepted JSON schema payload shapes to a plain schema object. + + Accepted inputs: + - None + - A raw schema object with a top-level "type" + - A wrapped payload with a top-level "schema" object + """ + if json_schema is None: + return None + + if not isinstance(json_schema, dict): + raise JsonSchemaValidationError("must be a valid JSON object") + + wrapped_schema = json_schema.get("schema") + if wrapped_schema is not None: + if not isinstance(wrapped_schema, dict): + raise JsonSchemaValidationError('field "schema" must be a valid JSON object') + return wrapped_schema + + if "type" not in json_schema: + raise JsonSchemaValidationError( + 'must include either a "type" or "schema" field' + ) + + return json_schema diff --git a/frontend/src/agents/NewAgent.tsx b/frontend/src/agents/NewAgent.tsx index 9b7d951e..7ff752bd 100644 --- a/frontend/src/agents/NewAgent.tsx +++ b/frontend/src/agents/NewAgent.tsx @@ -439,10 +439,24 @@ export default function NewAgent({ mode }: { mode: 'new' | 'edit' | 'draft' }) { const data = await response.json(); const transformed = modelService.transformModels(data.models || []); setAvailableModels(transformed); + + if (mode === 'new' && transformed.length > 0) { + const preferredDefaultModelId = + transformed.find((model) => model.id === data.default_model_id)?.id || + transformed[0].id; + + if (preferredDefaultModelId) { + setSelectedModelIds((prevSelectedModelIds) => + prevSelectedModelIds.size > 0 + ? prevSelectedModelIds + : new Set([preferredDefaultModelId]), + ); + } + } }; getTools(); getModels(); - }, [token]); + }, [token, mode]); // Validate folder_id from URL against user's folders useEffect(() => { diff --git a/frontend/src/agents/workflow/WorkflowBuilder.tsx b/frontend/src/agents/workflow/WorkflowBuilder.tsx index 1e451be7..b1b5faf6 100644 --- a/frontend/src/agents/workflow/WorkflowBuilder.tsx +++ b/frontend/src/agents/workflow/WorkflowBuilder.tsx @@ -94,6 +94,20 @@ interface UserTool { displayName: string; } +function validateJsonSchemaConfig(schema: unknown): string | null { + if (schema === undefined || schema === null) return null; + if (typeof schema !== 'object' || Array.isArray(schema)) { + return 'must be a valid JSON object'; + } + + const schemaObject = schema as Record; + if (!('schema' in schemaObject) && !('type' in schemaObject)) { + return 'must include either a "type" or "schema" field'; + } + + return null; +} + function createEmptyWorkflowAgent(): Agent { return { id: '', @@ -130,17 +144,46 @@ function parseSimpleCel(expression: string): { operator: string; value: string; } { - let match = expression.match( + const trimmedExpression = expression.trim(); + + let match = trimmedExpression.match( /^(\w+)\.(contains|startsWith)\(["'](.*)["']\)$/, ); if (match) return { variable: match[1], operator: match[2], value: match[3] }; - match = expression.match(/^(\w+)\s*(==|!=|>=|<=|>|<)\s*["'](.*)["']$/); + match = trimmedExpression.match(/^(\w+)\.(contains|startsWith)\((.*)\)$/); + if (match) { + const rawValue = match[3].trim(); + const unquotedValue = rawValue.replace(/^["'](.*)["']$/, '$1'); + return { + variable: match[1], + operator: match[2], + value: unquotedValue, + }; + } + + match = trimmedExpression.match(/^(contains|startsWith)\(["'](.*)["']\)$/); + if (match) return { variable: '', operator: match[1], value: match[2] }; + + match = trimmedExpression.match(/^(contains|startsWith)\((.*)\)$/); + if (match) { + const rawValue = match[2].trim(); + const unquotedValue = rawValue.replace(/^["'](.*)["']$/, '$1'); + return { variable: '', operator: match[1], value: unquotedValue }; + } + + match = trimmedExpression.match(/^(\w+)\s*(==|!=|>=|<=|>|<)\s*["'](.*)["']$/); if (match) return { variable: match[1], operator: match[2], value: match[3] }; - match = expression.match(/^(\w+)\s*(==|!=|>=|<=|>|<)\s*(.*)$/); + match = trimmedExpression.match(/^(==|!=|>=|<=|>|<)\s*["'](.*)["']$/); + if (match) return { variable: '', operator: match[1], value: match[2] }; + + match = trimmedExpression.match(/^(\w+)\s*(==|!=|>=|<=|>|<)\s*(.*)$/); if (match) return { variable: match[1], operator: match[2], value: match[3] }; + match = trimmedExpression.match(/^(==|!=|>=|<=|>|<)\s*(.*)$/); + if (match) return { variable: '', operator: match[1], value: match[2] }; + return { variable: '', operator: '==', value: '' }; } @@ -149,13 +192,24 @@ function buildSimpleCel( operator: string, value: string, ): string { - if (!variable) return ''; - const isNumeric = value !== '' && !isNaN(Number(value)); - const isBool = value === 'true' || value === 'false'; - const quoted = isNumeric || isBool ? value : `"${value}"`; - if (operator === 'contains') return `${variable}.contains(${quoted})`; - if (operator === 'startsWith') return `${variable}.startsWith(${quoted})`; - return `${variable} ${operator} ${quoted}`; + const trimmedValue = value.trim(); + const isNumeric = trimmedValue !== '' && !isNaN(Number(trimmedValue)); + const isBool = trimmedValue === 'true' || trimmedValue === 'false'; + const literalValue = + isNumeric || isBool ? trimmedValue : JSON.stringify(value); + const stringValue = JSON.stringify(value); + if (operator === 'contains') { + return variable + ? `${variable}.contains(${stringValue})` + : `contains(${stringValue})`; + } + if (operator === 'startsWith') { + return variable + ? `${variable}.startsWith(${stringValue})` + : `startsWith(${stringValue})`; + } + if (!variable) return `${operator} ${literalValue}`; + return `${variable} ${operator} ${literalValue}`; } function normalizeConditionCases(cases: ConditionCase[]): ConditionCase[] { @@ -283,7 +337,14 @@ function WorkflowBuilderInner() { >(null); const workflowSettingsRef = useRef(null); const [availableModels, setAvailableModels] = useState([]); + const [defaultAgentModelId, setDefaultAgentModelId] = useState(''); const [availableTools, setAvailableTools] = useState([]); + const [agentJsonSchemaDrafts, setAgentJsonSchemaDrafts] = useState< + Record + >({}); + const [agentJsonSchemaErrors, setAgentJsonSchemaErrors] = useState< + Record + >({}); const nodeTypes = useMemo( () => ({ @@ -404,8 +465,14 @@ function WorkflowBuilderInner() { }; if (type === 'agent') { + const defaultModelId = defaultAgentModelId || availableModels[0]?.id; + const defaultModelProvider = availableModels.find( + (model) => model.id === defaultModelId, + )?.provider; baseNode.data.config = { agent_type: 'classic', + model_id: defaultModelId, + llm_name: defaultModelProvider || '', system_prompt: 'You are a helpful assistant.', prompt_template: '', stream_to_user: true, @@ -430,7 +497,7 @@ function WorkflowBuilderInner() { setNodes((nds) => nds.concat(baseNode)); }, - [reactFlowInstance], + [reactFlowInstance, availableModels, defaultAgentModelId], ); const handleNodeClick = useCallback( @@ -449,6 +516,18 @@ function WorkflowBuilderInner() { (e) => e.source !== selectedNode.id && e.target !== selectedNode.id, ), ); + setAgentJsonSchemaDrafts((prev) => { + if (!(selectedNode.id in prev)) return prev; + const next = { ...prev }; + delete next[selectedNode.id]; + return next; + }); + setAgentJsonSchemaErrors((prev) => { + if (!(selectedNode.id in prev)) return prev; + const next = { ...prev }; + delete next[selectedNode.id]; + return next; + }); setSelectedNode(null); setShowNodeConfig(false); }, [selectedNode]); @@ -468,6 +547,49 @@ function WorkflowBuilderInner() { [selectedNode], ); + const handleAgentJsonSchemaChange = useCallback( + (text: string) => { + if (!selectedNode || selectedNode.type !== 'agent') return; + + const nodeId = selectedNode.id; + setAgentJsonSchemaDrafts((prev) => ({ ...prev, [nodeId]: text })); + + if (text.trim() === '') { + setAgentJsonSchemaErrors((prev) => ({ ...prev, [nodeId]: null })); + handleUpdateNodeData({ + config: { + ...(selectedNode.data.config || {}), + json_schema: undefined, + }, + }); + return; + } + + try { + const parsed = JSON.parse(text); + const validationError = validateJsonSchemaConfig(parsed); + setAgentJsonSchemaErrors((prev) => ({ + ...prev, + [nodeId]: validationError, + })); + if (!validationError) { + handleUpdateNodeData({ + config: { + ...(selectedNode.data.config || {}), + json_schema: parsed, + }, + }); + } + } catch { + setAgentJsonSchemaErrors((prev) => ({ + ...prev, + [nodeId]: 'must be valid JSON', + })); + } + }, + [handleUpdateNodeData, selectedNode], + ); + const handleUpload = useCallback((files: File[]) => { if (files && files.length > 0) { setImageFile(files[0]); @@ -564,7 +686,17 @@ function WorkflowBuilderInner() { const modelsResponse = await modelService.getModels(null); if (modelsResponse.ok) { const modelsData = await modelsResponse.json(); - setAvailableModels(modelService.transformModels(modelsData.models)); + const transformedModels = modelService.transformModels( + modelsData.models || [], + ); + setAvailableModels(transformedModels); + const preferredDefaultModel = + transformedModels.find( + (model) => model.id === modelsData.default_model_id, + )?.id || + transformedModels[0]?.id || + ''; + setDefaultAgentModelId(preferredDefaultModel); } const toolsResponse = await userService.getUserTools(null); @@ -579,6 +711,51 @@ function WorkflowBuilderInner() { loadModelsAndTools(); }, []); + useEffect(() => { + if (!selectedNode || selectedNode.type !== 'agent') return; + if (!defaultAgentModelId) return; + if (selectedNode.data.config?.model_id) return; + + handleUpdateNodeData({ + config: { + ...(selectedNode.data.config || {}), + model_id: defaultAgentModelId, + llm_name: + availableModels.find((model) => model.id === defaultAgentModelId) + ?.provider || '', + }, + }); + }, [ + selectedNode, + defaultAgentModelId, + availableModels, + handleUpdateNodeData, + ]); + + useEffect(() => { + if (!selectedNode || selectedNode.type !== 'agent') return; + const nodeId = selectedNode.id; + const rawSchema = selectedNode.data.config?.json_schema; + + setAgentJsonSchemaDrafts((prev) => { + if (prev[nodeId] !== undefined) return prev; + if (rawSchema === undefined || rawSchema === null) { + return { ...prev, [nodeId]: '' }; + } + + try { + return { ...prev, [nodeId]: JSON.stringify(rawSchema, null, 2) }; + } catch { + return { ...prev, [nodeId]: String(rawSchema) }; + } + }); + + setAgentJsonSchemaErrors((prev) => { + if (prev[nodeId] !== undefined) return prev; + return { ...prev, [nodeId]: validateJsonSchemaConfig(rawSchema) }; + }); + }, [selectedNode]); + useEffect(() => { const loadAgentDetails = async () => { if (!agentId) return; @@ -655,6 +832,8 @@ function WorkflowBuilderInner() { ); setWorkflowName(nextWorkflowName); setWorkflowDescription(nextWorkflowDescription); + setAgentJsonSchemaDrafts({}); + setAgentJsonSchemaErrors({}); setNodes(mappedNodes); setEdges(mappedEdges); setSavedWorkflowSignature( @@ -711,6 +890,33 @@ function WorkflowBuilderInner() { `Agent "${node.data?.title || node.id}" must have a model selected`, ); } + + const hasSchema = + config?.json_schema !== undefined && config?.json_schema !== null; + if (hasSchema && config?.model_id) { + const selectedModel = availableModels.find( + (model) => model.id === config.model_id, + ); + if (selectedModel && !selectedModel.supports_structured_output) { + errors.push( + `Agent "${node.data?.title || node.id}" selected model does not support structured output`, + ); + } + } + + const schemaValidationError = validateJsonSchemaConfig( + config?.json_schema, + ); + const draftSchemaError = agentJsonSchemaErrors[node.id]; + const effectiveSchemaError = + draftSchemaError !== undefined + ? draftSchemaError + : schemaValidationError; + if (effectiveSchemaError) { + errors.push( + `Agent "${node.data?.title || node.id}" JSON schema ${effectiveSchemaError}`, + ); + } }); if (startNodes.length === 1) { @@ -743,6 +949,7 @@ function WorkflowBuilderInner() { const conditionNodes = nodes.filter((n) => n.type === 'condition'); conditionNodes.forEach((node) => { const conditionTitle = node.data?.title || node.id; + const conditionMode = node.data?.config?.mode || 'simple'; const cases = (node.data?.config?.cases || []) as ConditionCase[]; if ( !cases.length || @@ -831,6 +1038,16 @@ function WorkflowBuilderInner() { `Condition "${conditionTitle}" case "${handle}" has a branch connection but no expression`, ); } + if (conditionMode === 'simple' && hasExpression) { + const parsedCondition = parseSimpleCel( + conditionCase.expression || '', + ); + if (!parsedCondition.variable.trim()) { + errors.push( + `Condition "${conditionTitle}" case "${handle}" must specify a variable in Simple mode`, + ); + } + } }); outgoing.forEach((edge) => { @@ -844,7 +1061,7 @@ function WorkflowBuilderInner() { }); return errors; - }, [workflowName, nodes, edges]); + }, [workflowName, nodes, edges, agentJsonSchemaErrors, availableModels]); const canManageAgent = Boolean(currentAgentId || currentAgent.id); const effectiveAgentId = currentAgentId || currentAgent.id || ''; @@ -1077,6 +1294,42 @@ function WorkflowBuilderInner() { ], ); + const selectedAgentJsonSchemaText = useMemo(() => { + if (!selectedNode || selectedNode.type !== 'agent') return ''; + + const draft = agentJsonSchemaDrafts[selectedNode.id]; + if (draft !== undefined) return draft; + + const schema = selectedNode.data.config?.json_schema; + if (schema === undefined || schema === null) return ''; + + try { + return JSON.stringify(schema, null, 2); + } catch { + return String(schema); + } + }, [selectedNode, agentJsonSchemaDrafts]); + + const selectedAgentJsonSchemaError = useMemo(() => { + if (!selectedNode || selectedNode.type !== 'agent') return null; + + const cachedError = agentJsonSchemaErrors[selectedNode.id]; + if (cachedError !== undefined) return cachedError; + + return validateJsonSchemaConfig(selectedNode.data.config?.json_schema); + }, [selectedNode, agentJsonSchemaErrors]); + + const selectedAgentModelSupportsStructuredOutput = useMemo(() => { + if (!selectedNode || selectedNode.type !== 'agent') return true; + const modelId = selectedNode.data.config?.model_id; + if (!modelId) return true; + + const selectedModel = availableModels.find((model) => model.id === modelId); + if (!selectedModel) return true; + + return selectedModel.supports_structured_output; + }, [selectedNode, availableModels]); + return ( <> @@ -1577,7 +1830,7 @@ function WorkflowBuilderInner() { nodes={nodes} edges={edges} selectedNodeId={selectedNode.id} - placeholder="Use {{variable}} for dynamic content" + placeholder="Use {{ agent.variable }} for dynamic content" />
+
+ + {!selectedAgentModelSupportsStructuredOutput && ( +

+ Selected model does not support structured + output. +

+ )} +