diff --git a/backend/src/ea_chatbot/graph/nodes/delegate.py b/backend/src/ea_chatbot/graph/nodes/delegate.py new file mode 100644 index 0000000..787d436 --- /dev/null +++ b/backend/src/ea_chatbot/graph/nodes/delegate.py @@ -0,0 +1,18 @@ +from ea_chatbot.graph.state import AgentState +from ea_chatbot.utils.logging import get_logger + +def delegate_node(state: AgentState) -> dict: + """Determine which worker subgraph to call next based on the checklist.""" + checklist = state.get("checklist", []) + current_step = state.get("current_step", 0) + logger = get_logger("orchestrator:delegate") + + if not checklist or current_step >= len(checklist): + logger.info("Checklist complete or empty. Routing to summarizer.") + return {"next_action": "summarize"} + + task_info = checklist[current_step] + worker = task_info.get("worker", "data_analyst") + + logger.info(f"Delegating next task to worker: {worker}") + return {"next_action": worker} diff --git a/backend/tests/test_orchestrator_delegate.py b/backend/tests/test_orchestrator_delegate.py new file mode 100644 index 0000000..418dac0 --- /dev/null +++ b/backend/tests/test_orchestrator_delegate.py @@ -0,0 +1,56 @@ +from ea_chatbot.graph.nodes.delegate import delegate_node +from ea_chatbot.graph.state import AgentState + +def test_delegate_node_data_analyst(): + """Verify that the delegate node routes to data_analyst.""" + state = AgentState( + checklist=[{"task": "Analyze data", "worker": "data_analyst"}], + current_step=0, + messages=[], + question="test", + analysis={}, + next_action="", + iterations=0, + vfs={}, + plots=[], + dfs={} + ) + + result = delegate_node(state) + assert result["next_action"] == "data_analyst" + +def test_delegate_node_researcher(): + """Verify that the delegate node routes to researcher.""" + state = AgentState( + checklist=[{"task": "Search web", "worker": "researcher"}], + current_step=0, + messages=[], + question="test", + analysis={}, + next_action="", + iterations=0, + vfs={}, + plots=[], + dfs={} + ) + + result = delegate_node(state) + assert result["next_action"] == "researcher" + +def test_delegate_node_finished(): + """Verify that the delegate node routes to summarize if checklist is complete.""" + state = AgentState( + checklist=[{"task": "Task 1", "worker": "data_analyst"}], + current_step=1, # Already finished the only task + messages=[], + question="test", + analysis={}, + next_action="", + iterations=0, + vfs={}, + plots=[], + dfs={} + ) + + result = delegate_node(state) + assert result["next_action"] == "summarize"