feat(orchestrator): Integrate Orchestrator-Workers loop and human-in-the-loop interrupts

This commit is contained in:
Yunxiao Xu
2026-02-23 05:44:35 -08:00
parent 9fef4888b5
commit 18e4e8db7d

View File

@@ -2,89 +2,92 @@ from langgraph.graph import StateGraph, END
from ea_chatbot.graph.state import AgentState from ea_chatbot.graph.state import AgentState
from ea_chatbot.graph.nodes.query_analyzer import query_analyzer_node from ea_chatbot.graph.nodes.query_analyzer import query_analyzer_node
from ea_chatbot.graph.nodes.planner import planner_node from ea_chatbot.graph.nodes.planner import planner_node
from ea_chatbot.graph.nodes.coder import coder_node from ea_chatbot.graph.nodes.delegate import delegate_node
from ea_chatbot.graph.nodes.error_corrector import error_corrector_node from ea_chatbot.graph.nodes.reflector import reflector_node
from ea_chatbot.graph.nodes.executor import executor_node from ea_chatbot.graph.nodes.synthesizer import synthesizer_node
from ea_chatbot.graph.nodes.summarizer import summarizer_node from ea_chatbot.graph.workers.data_analyst.workflow import create_data_analyst_worker
from ea_chatbot.graph.workers.data_analyst.mapping import prepare_worker_input, merge_worker_output
from ea_chatbot.graph.nodes.researcher import researcher_node from ea_chatbot.graph.nodes.researcher import researcher_node
from ea_chatbot.graph.nodes.clarification import clarification_node from ea_chatbot.graph.nodes.clarification import clarification_node
from ea_chatbot.graph.nodes.summarize_conversation import summarize_conversation_node from ea_chatbot.graph.nodes.summarize_conversation import summarize_conversation_node
MAX_ITERATIONS = 3 def data_analyst_worker_node(state: AgentState) -> dict:
"""Wrapper node for the Data Analyst subgraph with state mapping."""
worker_graph = create_data_analyst_worker()
worker_input = prepare_worker_input(state)
worker_result = worker_graph.invoke(worker_input)
return merge_worker_output(worker_result)
def router(state: AgentState) -> str: def main_router(state: AgentState) -> str:
"""Route to the next node based on the analysis.""" """Route from query analyzer based on initial assessment."""
next_action = state.get("next_action") next_action = state.get("next_action")
if next_action == "plan": if next_action == "clarify":
return "planner"
elif next_action == "research":
return "researcher"
elif next_action == "clarify":
return "clarification" return "clarification"
else: return "planner"
return END
def delegation_router(state: AgentState) -> str:
"""Route from delegate node to specific workers or synthesis."""
next_action = state.get("next_action")
if next_action == "data_analyst":
return "data_analyst_worker"
elif next_action == "researcher":
return "researcher"
elif next_action == "summarize":
return "synthesizer"
return "synthesizer"
def create_workflow(): def create_workflow():
"""Create the LangGraph workflow.""" """Create the high-level Orchestrator workflow."""
workflow = StateGraph(AgentState) workflow = StateGraph(AgentState)
# Add nodes # Add Nodes
workflow.add_node("query_analyzer", query_analyzer_node) workflow.add_node("query_analyzer", query_analyzer_node)
workflow.add_node("planner", planner_node) workflow.add_node("planner", planner_node)
workflow.add_node("coder", coder_node) workflow.add_node("delegate", delegate_node)
workflow.add_node("error_corrector", error_corrector_node) workflow.add_node("data_analyst_worker", data_analyst_worker_node)
workflow.add_node("researcher", researcher_node) workflow.add_node("researcher", researcher_node)
workflow.add_node("reflector", reflector_node)
workflow.add_node("synthesizer", synthesizer_node)
workflow.add_node("clarification", clarification_node) workflow.add_node("clarification", clarification_node)
workflow.add_node("executor", executor_node)
workflow.add_node("summarizer", summarizer_node)
workflow.add_node("summarize_conversation", summarize_conversation_node) workflow.add_node("summarize_conversation", summarize_conversation_node)
# Set entry point # Set entry point
workflow.set_entry_point("query_analyzer") workflow.set_entry_point("query_analyzer")
# Add conditional edges from query_analyzer # Edges
workflow.add_conditional_edges( workflow.add_conditional_edges(
"query_analyzer", "query_analyzer",
router, main_router,
{ {
"planner": "planner",
"researcher": "researcher",
"clarification": "clarification", "clarification": "clarification",
END: END "planner": "planner"
} }
) )
# Linear flow for planning and coding workflow.add_edge("planner", "delegate")
workflow.add_edge("planner", "coder")
workflow.add_edge("coder", "executor")
# Executor routing
def executor_router(state: AgentState) -> str:
if state.get("error"):
# Check for iteration limit to prevent infinite loops
if state.get("iterations", 0) >= MAX_ITERATIONS:
return "summarizer"
return "error_corrector"
return "summarizer"
workflow.add_conditional_edges( workflow.add_conditional_edges(
"executor", "delegate",
executor_router, delegation_router,
{ {
"error_corrector": "error_corrector", "data_analyst_worker": "data_analyst_worker",
"summarizer": "summarizer" "researcher": "researcher",
"synthesizer": "synthesizer"
} }
) )
workflow.add_edge("error_corrector", "executor") workflow.add_edge("data_analyst_worker", "reflector")
workflow.add_edge("researcher", "reflector")
workflow.add_edge("reflector", "delegate")
workflow.add_edge("researcher", "summarize_conversation") workflow.add_edge("synthesizer", "summarize_conversation")
workflow.add_edge("clarification", END)
workflow.add_edge("summarizer", "summarize_conversation")
workflow.add_edge("summarize_conversation", END) workflow.add_edge("summarize_conversation", END)
workflow.add_edge("clarification", END)
# Compile the graph # Compile the graph with human-in-the-loop interrupt
app = workflow.compile() app = workflow.compile(
interrupt_before=["clarification"]
)
return app return app