From 18e4e8db7d0637268b33d230cf326737cca11f47 Mon Sep 17 00:00:00 2001 From: Yunxiao Xu Date: Mon, 23 Feb 2026 05:44:35 -0800 Subject: [PATCH] feat(orchestrator): Integrate Orchestrator-Workers loop and human-in-the-loop interrupts --- backend/src/ea_chatbot/graph/workflow.py | 97 ++++++++++++------------ 1 file changed, 50 insertions(+), 47 deletions(-) diff --git a/backend/src/ea_chatbot/graph/workflow.py b/backend/src/ea_chatbot/graph/workflow.py index a312638..8b982d7 100644 --- a/backend/src/ea_chatbot/graph/workflow.py +++ b/backend/src/ea_chatbot/graph/workflow.py @@ -2,89 +2,92 @@ from langgraph.graph import StateGraph, END from ea_chatbot.graph.state import AgentState from ea_chatbot.graph.nodes.query_analyzer import query_analyzer_node from ea_chatbot.graph.nodes.planner import planner_node -from ea_chatbot.graph.nodes.coder import coder_node -from ea_chatbot.graph.nodes.error_corrector import error_corrector_node -from ea_chatbot.graph.nodes.executor import executor_node -from ea_chatbot.graph.nodes.summarizer import summarizer_node +from ea_chatbot.graph.nodes.delegate import delegate_node +from ea_chatbot.graph.nodes.reflector import reflector_node +from ea_chatbot.graph.nodes.synthesizer import synthesizer_node +from ea_chatbot.graph.workers.data_analyst.workflow import create_data_analyst_worker +from ea_chatbot.graph.workers.data_analyst.mapping import prepare_worker_input, merge_worker_output from ea_chatbot.graph.nodes.researcher import researcher_node from ea_chatbot.graph.nodes.clarification import clarification_node from ea_chatbot.graph.nodes.summarize_conversation import summarize_conversation_node -MAX_ITERATIONS = 3 +def data_analyst_worker_node(state: AgentState) -> dict: + """Wrapper node for the Data Analyst subgraph with state mapping.""" + worker_graph = create_data_analyst_worker() + worker_input = prepare_worker_input(state) + worker_result = worker_graph.invoke(worker_input) + return merge_worker_output(worker_result) -def router(state: AgentState) -> str: - """Route to the next node based on the analysis.""" +def main_router(state: AgentState) -> str: + """Route from query analyzer based on initial assessment.""" next_action = state.get("next_action") - if next_action == "plan": - return "planner" - elif next_action == "research": - return "researcher" - elif next_action == "clarify": + if next_action == "clarify": return "clarification" - else: - return END + return "planner" + +def delegation_router(state: AgentState) -> str: + """Route from delegate node to specific workers or synthesis.""" + next_action = state.get("next_action") + if next_action == "data_analyst": + return "data_analyst_worker" + elif next_action == "researcher": + return "researcher" + elif next_action == "summarize": + return "synthesizer" + return "synthesizer" def create_workflow(): - """Create the LangGraph workflow.""" + """Create the high-level Orchestrator workflow.""" workflow = StateGraph(AgentState) - # Add nodes + # Add Nodes workflow.add_node("query_analyzer", query_analyzer_node) workflow.add_node("planner", planner_node) - workflow.add_node("coder", coder_node) - workflow.add_node("error_corrector", error_corrector_node) + workflow.add_node("delegate", delegate_node) + workflow.add_node("data_analyst_worker", data_analyst_worker_node) workflow.add_node("researcher", researcher_node) + workflow.add_node("reflector", reflector_node) + workflow.add_node("synthesizer", synthesizer_node) workflow.add_node("clarification", clarification_node) - workflow.add_node("executor", executor_node) - workflow.add_node("summarizer", summarizer_node) workflow.add_node("summarize_conversation", summarize_conversation_node) # Set entry point workflow.set_entry_point("query_analyzer") - # Add conditional edges from query_analyzer + # Edges workflow.add_conditional_edges( "query_analyzer", - router, + main_router, { - "planner": "planner", - "researcher": "researcher", "clarification": "clarification", - END: END + "planner": "planner" } ) - # Linear flow for planning and coding - workflow.add_edge("planner", "coder") - workflow.add_edge("coder", "executor") + workflow.add_edge("planner", "delegate") - # Executor routing - def executor_router(state: AgentState) -> str: - if state.get("error"): - # Check for iteration limit to prevent infinite loops - if state.get("iterations", 0) >= MAX_ITERATIONS: - return "summarizer" - return "error_corrector" - return "summarizer" - workflow.add_conditional_edges( - "executor", - executor_router, + "delegate", + delegation_router, { - "error_corrector": "error_corrector", - "summarizer": "summarizer" + "data_analyst_worker": "data_analyst_worker", + "researcher": "researcher", + "synthesizer": "synthesizer" } ) - workflow.add_edge("error_corrector", "executor") + workflow.add_edge("data_analyst_worker", "reflector") + workflow.add_edge("researcher", "reflector") + workflow.add_edge("reflector", "delegate") - workflow.add_edge("researcher", "summarize_conversation") - workflow.add_edge("clarification", END) - workflow.add_edge("summarizer", "summarize_conversation") + workflow.add_edge("synthesizer", "summarize_conversation") workflow.add_edge("summarize_conversation", END) + workflow.add_edge("clarification", END) - # Compile the graph - app = workflow.compile() + # Compile the graph with human-in-the-loop interrupt + app = workflow.compile( + interrupt_before=["clarification"] + ) return app