feat(orchestrator): Implement Delegate node for task routing

This commit is contained in:
Yunxiao Xu
2026-02-23 05:28:02 -08:00
parent 575e1a2e53
commit ff9b443bfe
2 changed files with 74 additions and 0 deletions

View File

@@ -0,0 +1,18 @@
from ea_chatbot.graph.state import AgentState
from ea_chatbot.utils.logging import get_logger
def delegate_node(state: AgentState) -> dict:
"""Determine which worker subgraph to call next based on the checklist."""
checklist = state.get("checklist", [])
current_step = state.get("current_step", 0)
logger = get_logger("orchestrator:delegate")
if not checklist or current_step >= len(checklist):
logger.info("Checklist complete or empty. Routing to summarizer.")
return {"next_action": "summarize"}
task_info = checklist[current_step]
worker = task_info.get("worker", "data_analyst")
logger.info(f"Delegating next task to worker: {worker}")
return {"next_action": worker}

View File

@@ -0,0 +1,56 @@
from ea_chatbot.graph.nodes.delegate import delegate_node
from ea_chatbot.graph.state import AgentState
def test_delegate_node_data_analyst():
"""Verify that the delegate node routes to data_analyst."""
state = AgentState(
checklist=[{"task": "Analyze data", "worker": "data_analyst"}],
current_step=0,
messages=[],
question="test",
analysis={},
next_action="",
iterations=0,
vfs={},
plots=[],
dfs={}
)
result = delegate_node(state)
assert result["next_action"] == "data_analyst"
def test_delegate_node_researcher():
"""Verify that the delegate node routes to researcher."""
state = AgentState(
checklist=[{"task": "Search web", "worker": "researcher"}],
current_step=0,
messages=[],
question="test",
analysis={},
next_action="",
iterations=0,
vfs={},
plots=[],
dfs={}
)
result = delegate_node(state)
assert result["next_action"] == "researcher"
def test_delegate_node_finished():
"""Verify that the delegate node routes to summarize if checklist is complete."""
state = AgentState(
checklist=[{"task": "Task 1", "worker": "data_analyst"}],
current_step=1, # Already finished the only task
messages=[],
question="test",
analysis={},
next_action="",
iterations=0,
vfs={},
plots=[],
dfs={}
)
result = delegate_node(state)
assert result["next_action"] == "summarize"