diff --git a/application/agents/workflows/cel_evaluator.py b/application/agents/workflows/cel_evaluator.py new file mode 100644 index 00000000..5f8bab37 --- /dev/null +++ b/application/agents/workflows/cel_evaluator.py @@ -0,0 +1,64 @@ +from typing import Any, Dict + +import celpy +import celpy.celtypes + + +class CelEvaluationError(Exception): + pass + + +def _convert_value(value: Any) -> Any: + if isinstance(value, bool): + return celpy.celtypes.BoolType(value) + if isinstance(value, int): + return celpy.celtypes.IntType(value) + if isinstance(value, float): + return celpy.celtypes.DoubleType(value) + if isinstance(value, str): + return celpy.celtypes.StringType(value) + if isinstance(value, list): + return celpy.celtypes.ListType([_convert_value(item) for item in value]) + if isinstance(value, dict): + return celpy.celtypes.MapType( + {celpy.celtypes.StringType(k): _convert_value(v) for k, v in value.items()} + ) + if value is None: + return celpy.celtypes.BoolType(False) + return celpy.celtypes.StringType(str(value)) + + +def build_activation(state: Dict[str, Any]) -> Dict[str, Any]: + return {k: _convert_value(v) for k, v in state.items()} + + +def evaluate_cel(expression: str, state: Dict[str, Any]) -> Any: + if not expression or not expression.strip(): + raise CelEvaluationError("Empty expression") + try: + env = celpy.Environment() + ast = env.compile(expression) + program = env.program(ast) + activation = build_activation(state) + result = program.evaluate(activation) + except celpy.CELEvalError as exc: + raise CelEvaluationError(f"CEL evaluation error: {exc}") from exc + except Exception as exc: + raise CelEvaluationError(f"CEL error: {exc}") from exc + return cel_to_python(result) + + +def cel_to_python(value: Any) -> Any: + if isinstance(value, celpy.celtypes.BoolType): + return bool(value) + if isinstance(value, celpy.celtypes.IntType): + return int(value) + if isinstance(value, celpy.celtypes.DoubleType): + return float(value) + if isinstance(value, celpy.celtypes.StringType): + return str(value) + if isinstance(value, celpy.celtypes.ListType): + return [cel_to_python(item) for item in value] + if isinstance(value, celpy.celtypes.MapType): + return {str(k): cel_to_python(v) for k, v in value.items()} + return value diff --git a/application/agents/workflows/schemas.py b/application/agents/workflows/schemas.py index a14d8cc6..5355b88e 100644 --- a/application/agents/workflows/schemas.py +++ b/application/agents/workflows/schemas.py @@ -1,6 +1,6 @@ from datetime import datetime, timezone from enum import Enum -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Literal, Optional, Union from bson import ObjectId from pydantic import BaseModel, ConfigDict, Field, field_validator @@ -12,6 +12,7 @@ class NodeType(str, Enum): AGENT = "agent" NOTE = "note" STATE = "state" + CONDITION = "condition" class AgentType(str, Enum): @@ -48,6 +49,25 @@ class AgentNodeConfig(BaseModel): json_schema: Optional[Dict[str, Any]] = None +class ConditionCase(BaseModel): + model_config = ConfigDict(extra="forbid", populate_by_name=True) + name: Optional[str] = None + expression: str = "" + source_handle: str = Field(..., alias="sourceHandle") + + +class ConditionNodeConfig(BaseModel): + model_config = ConfigDict(extra="allow") + mode: Literal["simple", "advanced"] = "simple" + cases: List[ConditionCase] = Field(default_factory=list) + + +class StateOperation(BaseModel): + model_config = ConfigDict(extra="forbid") + expression: str = "" + target_variable: str = "" + + class WorkflowEdgeCreate(BaseModel): model_config = ConfigDict(populate_by_name=True) id: str diff --git a/application/agents/workflows/workflow_engine.py b/application/agents/workflows/workflow_engine.py index 5c6a005c..e417e621 100644 --- a/application/agents/workflows/workflow_engine.py +++ b/application/agents/workflows/workflow_engine.py @@ -2,9 +2,11 @@ import logging from datetime import datetime, timezone from typing import Any, Dict, Generator, List, Optional, TYPE_CHECKING +from application.agents.workflows.cel_evaluator import CelEvaluationError, evaluate_cel from application.agents.workflows.node_agent import WorkflowNodeAgentFactory from application.agents.workflows.schemas import ( AgentNodeConfig, + ConditionNodeConfig, ExecutionStatus, NodeExecutionLog, NodeType, @@ -28,6 +30,7 @@ class WorkflowEngine: self.agent = agent self.state: WorkflowState = {} self.execution_log: List[Dict[str, Any]] = [] + self._condition_result: Optional[str] = None def execute( self, initial_inputs: WorkflowState, query: str @@ -98,6 +101,10 @@ class WorkflowEngine: if node.type == NodeType.END: break current_node_id = self._get_next_node_id(current_node_id) + if current_node_id is None and node.type != NodeType.END: + logger.warning( + f"Branch ended at node '{node.title}' ({node.id}) without reaching an end node" + ) steps += 1 if steps >= self.MAX_EXECUTION_STEPS: logger.warning( @@ -121,10 +128,20 @@ class WorkflowEngine: } def _get_next_node_id(self, current_node_id: str) -> Optional[str]: + node = self.graph.get_node_by_id(current_node_id) edges = self.graph.get_outgoing_edges(current_node_id) - if edges: - return edges[0].target_id - return None + if not edges: + return None + + if node and node.type == NodeType.CONDITION and self._condition_result: + target_handle = self._condition_result + self._condition_result = None + for edge in edges: + if edge.source_handle == target_handle: + return edge.target_id + return None + + return edges[0].target_id def _execute_node( self, node: WorkflowNode @@ -136,6 +153,7 @@ class WorkflowEngine: NodeType.NOTE: self._execute_note_node, NodeType.AGENT: self._execute_agent_node, NodeType.STATE: self._execute_state_node, + NodeType.CONDITION: self._execute_condition_node, NodeType.END: self._execute_end_node, } @@ -158,7 +176,7 @@ class WorkflowEngine: ) -> Generator[Dict[str, str], None, None]: from application.core.model_utils import get_api_key_for_provider - node_config = AgentNodeConfig(**node.config) + node_config = AgentNodeConfig(**node.config.get("config", node.config)) if node_config.prompt_template: formatted_prompt = self._format_template(node_config.prompt_template) @@ -195,59 +213,42 @@ class WorkflowEngine: self._has_streamed = True output_key = node_config.output_variable or f"node_{node.id}_output" - self.state[output_key] = full_response + self.state[output_key] = full_response.strip() def _execute_state_node( self, node: WorkflowNode ) -> Generator[Dict[str, str], None, None]: - config = node.config - operations = config.get("operations", []) + config = node.config.get("config", node.config) + for op in config.get("operations", []): + expression = op.get("expression", "") + target_variable = op.get("target_variable", "") + if expression and target_variable: + self.state[target_variable] = evaluate_cel(expression, self.state) + yield from () - if operations: - for op in operations: - key = op.get("key") - operation = op.get("operation", "set") - value = op.get("value") + def _execute_condition_node( + self, node: WorkflowNode + ) -> Generator[Dict[str, str], None, None]: + config = ConditionNodeConfig(**node.config.get("config", node.config)) + matched_handle = None - if not key: - continue - if operation == "set": - formatted_value = ( - self._format_template(str(value)) - if isinstance(value, str) - else value - ) - self.state[key] = formatted_value - elif operation == "increment": - current = self.state.get(key, 0) - try: - self.state[key] = int(current) + int(value or 1) - except (ValueError, TypeError): - self.state[key] = 1 - elif operation == "append": - if key not in self.state: - self.state[key] = [] - if isinstance(self.state[key], list): - self.state[key].append(value) - else: - updates = config.get("updates", {}) - if not updates: - var_name = config.get("variable") - var_value = config.get("value") - if var_name and isinstance(var_name, str): - updates = {var_name: var_value or ""} - if isinstance(updates, dict): - for key, value in updates.items(): - if isinstance(value, str): - self.state[key] = self._format_template(value) - else: - self.state[key] = value + for case in config.cases: + if not case.expression.strip(): + continue + try: + if evaluate_cel(case.expression, self.state): + matched_handle = case.source_handle + break + except CelEvaluationError: + continue + + self._condition_result = matched_handle or "else" yield from () def _execute_end_node( self, node: WorkflowNode ) -> Generator[Dict[str, str], None, None]: - config = node.config + config = node.config.get("config", node.config) output_template = str(config.get("output_template", "")) if output_template: formatted_output = self._format_template(output_template) diff --git a/application/api/user/workflows/routes.py b/application/api/user/workflows/routes.py index 1521b457..1b1a83ed 100644 --- a/application/api/user/workflows/routes.py +++ b/application/api/user/workflows/routes.py @@ -1,7 +1,7 @@ """Workflow management routes.""" from datetime import datetime, timezone -from typing import Dict, List +from typing import Dict, List, Set from flask import current_app, request from flask_restx import Namespace, Resource @@ -102,6 +102,9 @@ def validate_workflow_structure(nodes: List[Dict], edges: List[Dict]) -> List[st errors.append("Workflow must have at least one end node") node_ids = {n.get("id") for n in nodes} + node_map = {n.get("id"): n for n in nodes} + end_ids = {n.get("id") for n in end_nodes} + for edge in edges: source_id = edge.get("source") target_id = edge.get("target") @@ -115,6 +118,104 @@ def validate_workflow_structure(nodes: List[Dict], edges: List[Dict]) -> List[st if not any(e.get("source") == start_id for e in edges): errors.append("Start node must have at least one outgoing edge") + condition_nodes = [n for n in nodes if n.get("type") == "condition"] + for cnode in condition_nodes: + cnode_id = cnode.get("id") + cnode_title = cnode.get("title", cnode_id) + outgoing = [e for e in edges if e.get("source") == cnode_id] + if len(outgoing) < 2: + errors.append( + f"Condition node '{cnode_title}' must have at least 2 outgoing edges" + ) + node_data = cnode.get("data", {}) or {} + cases = node_data.get("cases", []) + if not isinstance(cases, list): + cases = [] + if not cases or not any( + isinstance(c, dict) and str(c.get("expression", "")).strip() for c in cases + ): + errors.append( + f"Condition node '{cnode_title}' must have at least one case with an expression" + ) + + case_handles: Set[str] = set() + duplicate_case_handles: Set[str] = set() + for case in cases: + if not isinstance(case, dict): + continue + raw_handle = case.get("sourceHandle", "") + handle = raw_handle.strip() if isinstance(raw_handle, str) else "" + if not handle: + errors.append( + f"Condition node '{cnode_title}' has a case without a branch handle" + ) + continue + if handle in case_handles: + duplicate_case_handles.add(handle) + case_handles.add(handle) + + for handle in duplicate_case_handles: + errors.append( + f"Condition node '{cnode_title}' has duplicate case handle '{handle}'" + ) + + outgoing_by_handle: Dict[str, List[Dict]] = {} + for out_edge in outgoing: + raw_handle = out_edge.get("sourceHandle", "") + handle = raw_handle.strip() if isinstance(raw_handle, str) else "" + outgoing_by_handle.setdefault(handle, []).append(out_edge) + + for handle, handle_edges in outgoing_by_handle.items(): + if not handle: + errors.append( + f"Condition node '{cnode_title}' has an outgoing edge without sourceHandle" + ) + continue + if handle != "else" and handle not in case_handles: + errors.append( + f"Condition node '{cnode_title}' has a connection from unknown branch '{handle}'" + ) + if len(handle_edges) > 1: + errors.append( + f"Condition node '{cnode_title}' has multiple outgoing edges from branch '{handle}'" + ) + + if "else" not in outgoing_by_handle: + errors.append(f"Condition node '{cnode_title}' must have an 'else' branch") + + for case in cases: + if not isinstance(case, dict): + continue + raw_handle = case.get("sourceHandle", "") + handle = raw_handle.strip() if isinstance(raw_handle, str) else "" + if not handle: + continue + + raw_expression = case.get("expression", "") + has_expression = isinstance(raw_expression, str) and bool( + raw_expression.strip() + ) + has_outgoing = bool(outgoing_by_handle.get(handle)) + if has_expression and not has_outgoing: + errors.append( + f"Condition node '{cnode_title}' case '{handle}' has an expression but no outgoing edge" + ) + if not has_expression and has_outgoing: + errors.append( + f"Condition node '{cnode_title}' case '{handle}' has an outgoing edge but no expression" + ) + + for handle, handle_edges in outgoing_by_handle.items(): + if not handle: + continue + for out_edge in handle_edges: + target = out_edge.get("target") + if target and not _can_reach_end(target, edges, node_map, end_ids): + errors.append( + f"Branch '{handle}' of condition '{cnode_title}' " + f"must eventually reach an end node" + ) + for node in nodes: if not node.get("id"): errors.append("All nodes must have an id") @@ -124,6 +225,20 @@ def validate_workflow_structure(nodes: List[Dict], edges: List[Dict]) -> List[st return errors +def _can_reach_end( + node_id: str, edges: List[Dict], node_map: Dict, end_ids: set, visited: set = None +) -> bool: + if visited is None: + visited = set() + if node_id in end_ids: + return True + if node_id in visited or node_id not in node_map: + return False + visited.add(node_id) + outgoing = [e.get("target") for e in edges if e.get("source") == node_id] + return any(_can_reach_end(t, edges, node_map, end_ids, visited) for t in outgoing if t) + + def create_workflow_nodes( workflow_id: str, nodes_data: List[Dict], graph_version: int ) -> None: diff --git a/application/requirements.txt b/application/requirements.txt index 85d1e391..e99b8614 100644 --- a/application/requirements.txt +++ b/application/requirements.txt @@ -1,6 +1,7 @@ anthropic==0.75.0 boto3==1.42.17 beautifulsoup4==4.14.3 +cel-python==0.5.0 celery==5.6.0 cryptography==46.0.3 dataclasses-json==0.6.7 diff --git a/frontend/src/agents/types/workflow.ts b/frontend/src/agents/types/workflow.ts index a18cb430..8615714a 100644 --- a/frontend/src/agents/types/workflow.ts +++ b/frontend/src/agents/types/workflow.ts @@ -1,4 +1,20 @@ -export type NodeType = 'start' | 'end' | 'agent' | 'note' | 'state'; +export type NodeType = 'start' | 'end' | 'agent' | 'note' | 'state' | 'condition'; + +export interface ConditionCase { + name?: string; + expression: string; + sourceHandle: string; +} + +export interface ConditionNodeConfig { + mode: 'simple' | 'advanced'; + cases: ConditionCase[]; +} + +export interface StateOperationConfig { + expression: string; + target_variable: string; +} export interface WorkflowEdge { id: string; diff --git a/frontend/src/agents/workflow/WorkflowBuilder.tsx b/frontend/src/agents/workflow/WorkflowBuilder.tsx index 1593fd53..1e451be7 100644 --- a/frontend/src/agents/workflow/WorkflowBuilder.tsx +++ b/frontend/src/agents/workflow/WorkflowBuilder.tsx @@ -2,16 +2,23 @@ import 'reactflow/dist/style.css'; import { AlertCircle, + ChartColumn, Bot, Database, Flag, + GitBranch, + Loader2, + Link, Pencil, Play, + Plus, + Settings2, StickyNote, Trash2, X, } from 'lucide-react'; import { useCallback, useEffect, useMemo, useRef, useState } from 'react'; +import { useSelector } from 'react-redux'; import { useNavigate, useParams, useSearchParams } from 'react-router-dom'; import ReactFlow, { addEdge, @@ -43,13 +50,29 @@ import { Sheet, SheetContent } from '@/components/ui/sheet'; import modelService from '../../api/services/modelService'; import userService from '../../api/services/userService'; import ArrowLeft from '../../assets/arrow-left.svg'; -import { WorkflowNode } from '../types/workflow'; +import { FileUpload } from '../../components/FileUpload'; +import AgentDetailsModal from '../../modals/AgentDetailsModal'; +import ConfirmationModal from '../../modals/ConfirmationModal'; +import { ActiveState } from '../../models/misc'; +import { selectToken } from '../../preferences/preferenceSlice'; +import { Agent } from '../types'; +import { ConditionCase, WorkflowNode } from '../types/workflow'; import MobileBlocker from './components/MobileBlocker'; import PromptTextArea from './components/PromptTextArea'; -import { AgentNode, EndNode, NoteNode, SetStateNode, StartNode } from './nodes'; +import { + AgentNode, + ConditionNode, + EndNode, + NoteNode, + SetStateNode, + StartNode, +} from './nodes'; import WorkflowPreview from './WorkflowPreview'; import type { Model } from '../../models/types'; + +const PRIMARY_ACTION_SPINNER_DELAY_MS = 180; + interface AgentNodeConfig { agent_type: 'classic' | 'react'; llm_name?: string; @@ -71,8 +94,156 @@ interface UserTool { displayName: string; } +function createEmptyWorkflowAgent(): Agent { + return { + id: '', + name: '', + description: '', + image: '', + source: '', + chunks: '2', + retriever: '', + prompt_id: '', + tools: [], + agent_type: 'workflow', + status: 'published', + }; +} + +function canReachEnd( + nodeId: string, + edges: Edge[], + nodeIds: Set, + endIds: Set, + visited: Set = new Set(), +): boolean { + if (endIds.has(nodeId)) return true; + if (visited.has(nodeId) || !nodeIds.has(nodeId)) return false; + visited.add(nodeId); + return edges + .filter((e) => e.source === nodeId) + .some((e) => canReachEnd(e.target, edges, nodeIds, endIds, visited)); +} + +function parseSimpleCel(expression: string): { + variable: string; + operator: string; + value: string; +} { + let match = expression.match( + /^(\w+)\.(contains|startsWith)\(["'](.*)["']\)$/, + ); + if (match) return { variable: match[1], operator: match[2], value: match[3] }; + + match = expression.match(/^(\w+)\s*(==|!=|>=|<=|>|<)\s*["'](.*)["']$/); + if (match) return { variable: match[1], operator: match[2], value: match[3] }; + + match = expression.match(/^(\w+)\s*(==|!=|>=|<=|>|<)\s*(.*)$/); + if (match) return { variable: match[1], operator: match[2], value: match[3] }; + + return { variable: '', operator: '==', value: '' }; +} + +function buildSimpleCel( + variable: string, + operator: string, + value: string, +): string { + if (!variable) return ''; + const isNumeric = value !== '' && !isNaN(Number(value)); + const isBool = value === 'true' || value === 'false'; + const quoted = isNumeric || isBool ? value : `"${value}"`; + if (operator === 'contains') return `${variable}.contains(${quoted})`; + if (operator === 'startsWith') return `${variable}.startsWith(${quoted})`; + return `${variable} ${operator} ${quoted}`; +} + +function normalizeConditionCases(cases: ConditionCase[]): ConditionCase[] { + const usedHandles = new Set(); + let nextIndex = 0; + + return cases.map((conditionCase) => { + const candidate = (conditionCase.sourceHandle || '').trim(); + if (candidate && !usedHandles.has(candidate)) { + usedHandles.add(candidate); + const match = candidate.match(/^case_(\d+)$/); + if (match) { + nextIndex = Math.max(nextIndex, Number(match[1]) + 1); + } + return conditionCase; + } + + while (usedHandles.has(`case_${nextIndex}`)) { + nextIndex += 1; + } + const generatedHandle = `case_${nextIndex}`; + usedHandles.add(generatedHandle); + nextIndex += 1; + + return { + ...conditionCase, + sourceHandle: generatedHandle, + }; + }); +} + +function getNextConditionHandle(cases: ConditionCase[]): string { + const usedHandles = new Set( + cases.map((conditionCase) => conditionCase.sourceHandle).filter(Boolean), + ); + const usedIndices = Array.from(usedHandles) + .map((handle) => handle.match(/^case_(\d+)$/)) + .filter((match): match is RegExpMatchArray => Boolean(match)) + .map((match) => Number(match[1])); + + let nextIndex = usedIndices.length > 0 ? Math.max(...usedIndices) + 1 : 0; + while (usedHandles.has(`case_${nextIndex}`)) { + nextIndex += 1; + } + + return `case_${nextIndex}`; +} + +function createWorkflowPayload( + name: string, + description: string, + workflowNodes: Node[], + workflowEdges: Edge[], +) { + return { + name, + description, + nodes: workflowNodes.map((node) => ({ + id: node.id, + type: node.type as + | 'start' + | 'end' + | 'agent' + | 'note' + | 'state' + | 'condition', + title: node.data.title || node.data.label || node.type, + position: node.position, + data: + node.type === 'agent' || + node.type === 'condition' || + node.type === 'state' + ? node.data.config + : node.data, + })), + edges: workflowEdges.map((edge) => ({ + id: edge.id, + source: edge.source, + target: edge.target, + sourceHandle: edge.sourceHandle || undefined, + targetHandle: edge.targetHandle || undefined, + })), + }; +} + function WorkflowBuilderInner() { const navigate = useNavigate(); + const token = useSelector(selectToken); const { agentId } = useParams<{ agentId?: string }>(); const [searchParams] = useSearchParams(); const folderId = searchParams.get('folder_id'); @@ -91,12 +262,25 @@ function WorkflowBuilderInner() { const [workflowDescription, setWorkflowDescription] = useState(''); const [showWorkflowSettings, setShowWorkflowSettings] = useState(false); const [isPublishing, setIsPublishing] = useState(false); + const [showPrimaryActionSpinner, setShowPrimaryActionSpinner] = + useState(false); const [publishErrors, setPublishErrors] = useState([]); const [errorContext, setErrorContext] = useState<'preview' | 'publish'>( 'publish', ); const [showNodeConfig, setShowNodeConfig] = useState(false); const [showPreview, setShowPreview] = useState(false); + const [deleteConfirmation, setDeleteConfirmation] = + useState('INACTIVE'); + const [agentDetails, setAgentDetails] = useState('INACTIVE'); + const [isDeletingAgent, setIsDeletingAgent] = useState(false); + const [currentAgent, setCurrentAgent] = useState( + createEmptyWorkflowAgent(), + ); + const [imageFile, setImageFile] = useState(null); + const [savedWorkflowSignature, setSavedWorkflowSignature] = useState< + string | null + >(null); const workflowSettingsRef = useRef(null); const [availableModels, setAvailableModels] = useState([]); const [availableTools, setAvailableTools] = useState([]); @@ -108,6 +292,7 @@ function WorkflowBuilderInner() { end: EndNode, note: NoteNode, state: SetStateNode, + condition: ConditionNode, }), [], ); @@ -229,8 +414,15 @@ function WorkflowBuilderInner() { } as AgentNodeConfig; } else if (type === 'state') { baseNode.data.title = 'Set State'; - baseNode.data.variable = ''; - baseNode.data.value = ''; + baseNode.data.config = { + operations: [{ expression: '', target_variable: '' }], + }; + } else if (type === 'condition') { + baseNode.data.title = 'If / Else'; + baseNode.data.config = { + mode: 'simple', + cases: [{ name: '', expression: '', sourceHandle: 'case_0' }], + }; } else if (type === 'note') { baseNode.data.title = 'Note'; baseNode.data.label = 'Note'; @@ -276,6 +468,39 @@ function WorkflowBuilderInner() { [selectedNode], ); + const handleUpload = useCallback((files: File[]) => { + if (files && files.length > 0) { + setImageFile(files[0]); + } + }, []); + + const navigateBackToAgents = useCallback(() => { + navigate(folderId ? `/agents?folder=${folderId}` : '/agents'); + }, [navigate, folderId]); + + const handleDeleteAgent = useCallback(async () => { + const agentToDelete = currentAgentId || currentAgent.id; + if (!agentToDelete) return; + setIsDeletingAgent(true); + try { + const response = await userService.deleteAgent(agentToDelete, token); + if (!response.ok) { + const errorData = await response.json().catch(() => ({})); + throw new Error(errorData.message || 'Failed to delete workflow agent'); + } + navigateBackToAgents(); + } catch (error) { + setPublishErrors([ + error instanceof Error + ? error.message + : 'Failed to delete workflow agent', + ]); + setErrorContext('publish'); + } finally { + setIsDeletingAgent(false); + } + }, [currentAgentId, currentAgent.id, token, navigateBackToAgents]); + useEffect(() => { if (publishErrors.length > 0) { const timer = setTimeout(() => { @@ -285,6 +510,19 @@ function WorkflowBuilderInner() { } }, [publishErrors.length]); + useEffect(() => { + if (!isPublishing) { + setShowPrimaryActionSpinner(false); + return; + } + + const spinnerTimer = window.setTimeout(() => { + setShowPrimaryActionSpinner(true); + }, PRIMARY_ACTION_SPINNER_DELAY_MS); + + return () => window.clearTimeout(spinnerTimer); + }, [isPublishing]); + useEffect(() => { const handleKeyDown = (e: KeyboardEvent) => { if (e.key === 'Delete' && selectedNode) { @@ -345,9 +583,14 @@ function WorkflowBuilderInner() { const loadAgentDetails = async () => { if (!agentId) return; try { - const response = await userService.getAgent(agentId, null); + const response = await userService.getAgent(agentId, token); if (!response.ok) throw new Error('Failed to fetch agent'); const agent = await response.json(); + setCurrentAgent({ + ...createEmptyWorkflowAgent(), + ...agent, + agent_type: 'workflow', + }); if (agent.agent_type === 'workflow' && agent.workflow) { setWorkflowId(agent.workflow); setCurrentAgentId(agent.id); @@ -359,52 +602,69 @@ function WorkflowBuilderInner() { } }; loadAgentDetails(); - }, [agentId]); + }, [agentId, token]); useEffect(() => { const loadWorkflow = async () => { if (!workflowId) return; try { - const response = await userService.getWorkflow(workflowId, null); + const response = await userService.getWorkflow(workflowId, token); if (!response.ok) throw new Error('Failed to fetch workflow'); const responseData = await response.json(); const { workflow, nodes: apiNodes, edges: apiEdges } = responseData; - setWorkflowName(workflow.name); - setWorkflowDescription(workflow.description || ''); - setNodes( - apiNodes.map((n: WorkflowNode) => { - const nodeData: Record = { - title: n.title, - label: n.title, - }; - if (n.type === 'agent' && n.data) { - nodeData.config = n.data; - } else if (n.data) { - Object.assign(nodeData, n.data); - } - return { - id: n.id, - type: n.type, - position: n.position, - data: nodeData, + const nextWorkflowName = workflow.name; + const nextWorkflowDescription = workflow.description || ''; + const mappedNodes = apiNodes.map((n: WorkflowNode) => { + const nodeData: Record = { + title: n.title, + label: n.title, + }; + if (n.type === 'agent' && n.data) { + nodeData.config = n.data; + } else if (n.type === 'condition' && n.data) { + nodeData.config = { + ...n.data, + cases: normalizeConditionCases(n.data.cases || []), }; + } else if (n.type === 'state' && n.data) { + nodeData.config = n.data; + } else if (n.data) { + Object.assign(nodeData, n.data); + } + return { + id: n.id, + type: n.type, + position: n.position, + data: nodeData, + }; + }); + const mappedEdges = apiEdges.map( + (e: { + id: string; + source: string; + target: string; + sourceHandle?: string; + targetHandle?: string; + }) => ({ + id: e.id, + source: e.source, + target: e.target, + sourceHandle: e.sourceHandle, + targetHandle: e.targetHandle, }), ); - setEdges( - apiEdges.map( - (e: { - id: string; - source: string; - target: string; - sourceHandle?: string; - targetHandle?: string; - }) => ({ - id: e.id, - source: e.source, - target: e.target, - sourceHandle: e.sourceHandle, - targetHandle: e.targetHandle, - }), + setWorkflowName(nextWorkflowName); + setWorkflowDescription(nextWorkflowDescription); + setNodes(mappedNodes); + setEdges(mappedEdges); + setSavedWorkflowSignature( + JSON.stringify( + createWorkflowPayload( + nextWorkflowName, + nextWorkflowDescription, + mappedNodes, + mappedEdges, + ), ), ); setTimeout(() => { @@ -419,7 +679,7 @@ function WorkflowBuilderInner() { } }; loadWorkflow(); - }, [workflowId, reactFlowInstance]); + }, [workflowId, reactFlowInstance, token]); const validateWorkflow = useCallback((): string[] => { const errors: string[] = []; @@ -434,6 +694,7 @@ function WorkflowBuilderInner() { } const endNodes = nodes.filter((n) => n.type === 'end'); + const endNodeIds = new Set(endNodes.map((n) => n.id)); if (endNodes.length === 0) { errors.push('Workflow must have at least one end node'); } @@ -479,81 +740,211 @@ function WorkflowBuilderInner() { } }); + const conditionNodes = nodes.filter((n) => n.type === 'condition'); + conditionNodes.forEach((node) => { + const conditionTitle = node.data?.title || node.id; + const cases = (node.data?.config?.cases || []) as ConditionCase[]; + if ( + !cases.length || + !cases.some((c: ConditionCase) => Boolean((c.expression || '').trim())) + ) { + errors.push( + `Condition "${conditionTitle}" must have at least one case with an expression`, + ); + } + + const caseHandles = new Set(); + const duplicateCaseHandles = new Set(); + cases.forEach((conditionCase: ConditionCase) => { + const handle = (conditionCase.sourceHandle || '').trim(); + if (!handle) { + errors.push( + `Condition "${conditionTitle}" has a case without a branch handle`, + ); + return; + } + if (caseHandles.has(handle)) { + duplicateCaseHandles.add(handle); + } + caseHandles.add(handle); + }); + duplicateCaseHandles.forEach((handle) => { + errors.push( + `Condition "${conditionTitle}" has duplicate case handle "${handle}"`, + ); + }); + + const outgoing = edges.filter((e) => e.source === node.id); + if (outgoing.length < 2) { + errors.push( + `Condition "${conditionTitle}" must have at least 2 outgoing connections`, + ); + } + + const outgoingByHandle = new Map(); + outgoing.forEach((edge) => { + const handle = (edge.sourceHandle || '').trim(); + const handleEdges = outgoingByHandle.get(handle); + if (handleEdges) { + handleEdges.push(edge); + return; + } + outgoingByHandle.set(handle, [edge]); + }); + + for (const [handle, handleEdges] of outgoingByHandle.entries()) { + if (!handle) { + errors.push( + `Condition "${conditionTitle}" has a connection without a branch handle`, + ); + continue; + } + if (handle !== 'else' && !caseHandles.has(handle)) { + errors.push( + `Condition "${conditionTitle}" has a connection from unknown branch "${handle}"`, + ); + } + if (handleEdges.length > 1) { + errors.push( + `Condition "${conditionTitle}" has multiple connections from branch "${handle}"`, + ); + } + } + + if (!outgoingByHandle.has('else')) { + errors.push(`Condition "${conditionTitle}" must have an Else branch`); + } + + cases.forEach((conditionCase: ConditionCase) => { + const handle = (conditionCase.sourceHandle || '').trim(); + if (!handle) return; + + const hasExpression = Boolean((conditionCase.expression || '').trim()); + const hasOutgoing = Boolean(outgoingByHandle.get(handle)?.length); + if (hasExpression && !hasOutgoing) { + errors.push( + `Condition "${conditionTitle}" case "${handle}" has an expression but no branch connection`, + ); + } + if (!hasExpression && hasOutgoing) { + errors.push( + `Condition "${conditionTitle}" case "${handle}" has a branch connection but no expression`, + ); + } + }); + + outgoing.forEach((edge) => { + if (!canReachEnd(edge.target, edges, nodeIds, endNodeIds)) { + const handle = edge.sourceHandle || 'branch'; + errors.push( + `Branch "${handle}" of condition "${conditionTitle}" must eventually reach an end node`, + ); + } + }); + }); + return errors; }, [workflowName, nodes, edges]); - const handlePublish = useCallback(async () => { - setPublishErrors([]); - setErrorContext('publish'); + const canManageAgent = Boolean(currentAgentId || currentAgent.id); + const effectiveAgentId = currentAgentId || currentAgent.id || ''; + const currentAgentImage = currentAgent.image || ''; - const validationErrors = validateWorkflow(); - if (validationErrors.length > 0) { - setPublishErrors(validationErrors); - return; - } + const buildWorkflowPayload = useCallback( + () => + createWorkflowPayload(workflowName, workflowDescription, nodes, edges), + [workflowName, workflowDescription, nodes, edges], + ); - setIsPublishing(true); - let createdWorkflowId: string | null = null; - try { - const workflowPayload = { - name: workflowName, - description: workflowDescription, - nodes: nodes.map((n) => ({ - id: n.id, - type: n.type as 'start' | 'end' | 'agent' | 'note' | 'state', - title: n.data.title || n.data.label || n.type, - position: n.position, - data: n.type === 'agent' ? n.data.config : n.data, - })), - edges: edges.map((e) => ({ - id: e.id, - source: e.source, - target: e.target, - sourceHandle: e.sourceHandle || undefined, - targetHandle: e.targetHandle || undefined, - })), - }; + const workflowPayloadSignature = useMemo( + () => JSON.stringify(buildWorkflowPayload()), + [buildWorkflowPayload], + ); - let savedWorkflowId = workflowId; - if (workflowId) { - const updateResponse = await userService.updateWorkflow( - workflowId, - workflowPayload, - null, - ); - if (!updateResponse.ok) { - const errorData = await updateResponse.json().catch(() => ({})); - throw new Error(errorData.message || 'Failed to update workflow'); - } + const hasSavableChanges = + canManageAgent && savedWorkflowSignature !== null + ? workflowPayloadSignature !== savedWorkflowSignature || + imageFile !== null + : false; - if (currentAgentId) { - const agentFormData = new FormData(); - agentFormData.append('name', workflowName); - agentFormData.append( - 'description', - workflowDescription || `Workflow agent: ${workflowName}`, + const persistWorkflow = useCallback( + async (navigateAfterSuccess: boolean): Promise => { + setPublishErrors([]); + setErrorContext('publish'); + + const validationErrors = validateWorkflow(); + if (validationErrors.length > 0) { + setPublishErrors(validationErrors); + return false; + } + + setIsPublishing(true); + let createdWorkflowId: string | null = null; + try { + const workflowPayload = buildWorkflowPayload(); + + let savedWorkflowId = workflowId; + if (workflowId) { + const updateResponse = await userService.updateWorkflow( + workflowId, + workflowPayload, + token, ); - agentFormData.append('status', 'published'); - const agentUpdateResponse = await userService.updateAgent( - currentAgentId, - agentFormData, - null, - ); - if (!agentUpdateResponse.ok) { - throw new Error('Failed to update agent'); + if (!updateResponse.ok) { + const errorData = await updateResponse.json().catch(() => ({})); + throw new Error(errorData.message || 'Failed to update workflow'); } + + if (effectiveAgentId) { + const agentFormData = new FormData(); + agentFormData.append('name', workflowName); + agentFormData.append( + 'description', + workflowDescription || `Workflow agent: ${workflowName}`, + ); + agentFormData.append('status', 'published'); + if (imageFile) { + agentFormData.append('image', imageFile); + } + const agentUpdateResponse = await userService.updateAgent( + effectiveAgentId, + agentFormData, + token, + ); + if (!agentUpdateResponse.ok) { + throw new Error('Failed to update agent'); + } + const updatedAgent = await agentUpdateResponse + .json() + .catch(() => null); + setCurrentAgent((prev) => ({ + ...prev, + ...(updatedAgent || {}), + id: effectiveAgentId, + name: workflowName, + description: + workflowDescription || `Workflow agent: ${workflowName}`, + image: updatedAgent?.image || prev.image || '', + })); + } + setImageFile(null); + setSavedWorkflowSignature(JSON.stringify(workflowPayload)); + if (navigateAfterSuccess) { + navigateBackToAgents(); + } + return true; } - } else { + const createResponse = await userService.createWorkflow( workflowPayload, - null, + token, ); if (!createResponse.ok) { const errorData = await createResponse.json().catch(() => ({})); const backendErrors = errorData.errors || []; if (backendErrors.length > 0) { setPublishErrors(backendErrors); - return; + return false; } throw new Error(errorData.message || 'Failed to create workflow'); } @@ -573,11 +964,14 @@ function WorkflowBuilderInner() { agentFormData.append('agent_type', 'workflow'); agentFormData.append('status', 'published'); agentFormData.append('workflow', savedWorkflowId || ''); + if (imageFile) { + agentFormData.append('image', imageFile); + } if (folderId) agentFormData.append('folder_id', folderId); const agentResponse = await userService.createAgent( agentFormData, - null, + token, ); if (!agentResponse.ok) { const errorData = await agentResponse.json().catch(() => ({})); @@ -586,45 +980,102 @@ function WorkflowBuilderInner() { const agentData = await agentResponse.json().catch(() => ({})); if (agentData?.id) { setCurrentAgentId(agentData.id); + setCurrentAgent({ + ...createEmptyWorkflowAgent(), + ...agentData, + id: agentData.id, + name: workflowName, + description: + workflowDescription || `Workflow agent: ${workflowName}`, + image: agentData.image || '', + workflow: savedWorkflowId || undefined, + agent_type: 'workflow', + status: 'published', + }); } - } - navigate(folderId ? `/agents?folder=${folderId}` : '/agents'); - } catch (error) { - if (createdWorkflowId) { - try { - const cleanupResponse = await userService.deleteWorkflow( - createdWorkflowId, - null, - ); - if (cleanupResponse.ok) { - setWorkflowId(null); - } - } catch (cleanupError) { - console.error( - 'Failed to clean up workflow after publish error:', - cleanupError, - ); + setImageFile(null); + setSavedWorkflowSignature(JSON.stringify(workflowPayload)); + if (navigateAfterSuccess) { + navigateBackToAgents(); } + return true; + } catch (error) { + if (createdWorkflowId) { + try { + const cleanupResponse = await userService.deleteWorkflow( + createdWorkflowId, + token, + ); + if (cleanupResponse.ok) { + setWorkflowId(null); + } + } catch (cleanupError) { + console.error( + 'Failed to clean up workflow after publish error:', + cleanupError, + ); + } + } + console.error('Failed to save workflow:', error); + setPublishErrors([ + error instanceof Error ? error.message : 'Failed to save workflow', + ]); + return false; + } finally { + setIsPublishing(false); } - console.error('Failed to publish workflow:', error); - setPublishErrors([ - error instanceof Error ? error.message : 'Failed to publish workflow', - ]); - } finally { - setIsPublishing(false); - } - }, [ - workflowName, - workflowDescription, - nodes, - edges, - navigate, - folderId, - workflowId, - currentAgentId, - validateWorkflow, - ]); + }, + [ + validateWorkflow, + buildWorkflowPayload, + workflowId, + token, + effectiveAgentId, + workflowName, + workflowDescription, + imageFile, + folderId, + navigateBackToAgents, + ], + ); + + const handleWorkflowSettingsDone = useCallback(() => { + setShowWorkflowSettings(false); + if (!canManageAgent || !hasSavableChanges || isPublishing) return; + void persistWorkflow(false); + }, [canManageAgent, hasSavableChanges, isPublishing, persistWorkflow]); + + const isPrimaryActionDisabled = + isPublishing || (canManageAgent && !hasSavableChanges); + const primaryActionLabel = canManageAgent ? 'Save' : 'Publish'; + + const handlePrimaryAction = useCallback(() => { + if (isPrimaryActionDisabled) return; + void persistWorkflow(!canManageAgent); + }, [isPrimaryActionDisabled, persistWorkflow, canManageAgent]); + + const agentForDetails = useMemo( + () => ({ + ...createEmptyWorkflowAgent(), + ...currentAgent, + id: effectiveAgentId, + name: workflowName, + description: workflowDescription || `Workflow agent: ${workflowName}`, + image: currentAgentImage, + agent_type: 'workflow', + status: currentAgent.status || 'published', + workflow: workflowId || currentAgent.workflow, + }), + [ + currentAgent, + effectiveAgentId, + workflowName, + workflowDescription, + currentAgentImage, + workflowId, + ], + ); return ( <> @@ -633,7 +1084,7 @@ function WorkflowBuilderInner() {
+
+ + {currentAgentImage && !imageFile && ( +
+ Agent image + + Current image + +
+ )} + setImageFile(null)} + uploadText={[ + { + text: 'Click to upload', + colorClass: 'text-violets-are-blue', + }, + { + text: ' or drag and drop', + colorClass: 'text-gray-500', + }, + ]} + className="rounded-lg border-2 border-dashed border-[#E5E5E5] p-3 text-center transition-colors dark:border-[#3A3A3A] dark:bg-[#2C2C2C]" + /> +

+ Image updates are included the next time you save. +

+
@@ -700,7 +1190,42 @@ function WorkflowBuilderInner() { )}
-
+
+ + {canManageAgent && ( + + )} + {canManageAgent && ( + + )} + {canManageAgent && ( + + )}
@@ -736,7 +1274,9 @@ function WorkflowBuilderInner() { {errorContext === 'preview' ? 'Unable to preview workflow' - : 'Unable to publish workflow'} + : canManageAgent + ? 'Unable to save workflow' + : 'Unable to publish workflow'}
    @@ -823,6 +1363,23 @@ function WorkflowBuilderInner() { +
    handleNodeDragStart(e, 'condition')} + > +
    + +
    +
    + + If / Else + + + Conditional branching + +
    +
    @@ -862,7 +1419,8 @@ function WorkflowBuilderInner() { {selectedNode.type === 'end' && 'End Node'} {selectedNode.type === 'agent' && 'AI Agent'} {selectedNode.type === 'note' && 'Note'} - {selectedNode.type === 'state' && 'Set State'} + {selectedNode.type === 'state' && 'Set global variables'} + {selectedNode.type === 'condition' && 'If / Else'} + )} + +