diff --git a/backend/src/ea_chatbot/graph/nodes/delegate.py b/backend/src/ea_chatbot/graph/nodes/delegate.py index 787d436..35c0ba5 100644 --- a/backend/src/ea_chatbot/graph/nodes/delegate.py +++ b/backend/src/ea_chatbot/graph/nodes/delegate.py @@ -5,14 +5,24 @@ def delegate_node(state: AgentState) -> dict: """Determine which worker subgraph to call next based on the checklist.""" checklist = state.get("checklist", []) current_step = state.get("current_step", 0) + iterations = state.get("iterations", 0) logger = get_logger("orchestrator:delegate") if not checklist or current_step >= len(checklist): logger.info("Checklist complete or empty. Routing to summarizer.") return {"next_action": "summarize"} + # Enforce retry budget + if iterations >= 3: + logger.error(f"Max retries reached for task {current_step}. Routing to summary with failure.") + return { + "next_action": "summarize", + "iterations": 0, # Reset for next turn + "summary": f"Failed to complete task {current_step} after {iterations} attempts." + } + task_info = checklist[current_step] worker = task_info.get("worker", "data_analyst") - logger.info(f"Delegating next task to worker: {worker}") + logger.info(f"Delegating next task to worker: {worker} (Attempt {iterations + 1})") return {"next_action": worker} diff --git a/backend/src/ea_chatbot/graph/nodes/planner.py b/backend/src/ea_chatbot/graph/nodes/planner.py index fc14492..de7e858 100644 --- a/backend/src/ea_chatbot/graph/nodes/planner.py +++ b/backend/src/ea_chatbot/graph/nodes/planner.py @@ -45,6 +45,7 @@ def planner_node(state: AgentState) -> dict: return { "checklist": checklist, "current_step": 0, + "iterations": 0, # Reset iteration counter for the new plan "summary": response.reflection # Use reflection as initial summary } except Exception as e: diff --git a/backend/src/ea_chatbot/graph/nodes/reflector.py b/backend/src/ea_chatbot/graph/nodes/reflector.py index 6299f32..6b75ab7 100644 --- a/backend/src/ea_chatbot/graph/nodes/reflector.py +++ b/backend/src/ea_chatbot/graph/nodes/reflector.py @@ -44,20 +44,20 @@ If there were major errors or the output is missing critical data requested in t logger.info("[bold green]Sub-task satisfied.[/bold green] Advancing plan.") return { "current_step": current_step + 1, + "iterations": 0, # Reset for next task "next_action": "delegate" } else: logger.warning(f"[bold yellow]Sub-task NOT satisfied.[/bold yellow] Reason: {response.reasoning}") - # Do NOT advance the step. This triggers a retry of the same task. - # In a more advanced version, we might route to a 'planner' for revision. + # Do NOT advance the step. Increment iterations to track retries. return { - "current_step": current_step, + "iterations": state.get("iterations", 0) + 1, "next_action": "delegate" } except Exception as e: logger.error(f"Failed to reflect: {str(e)}") - # On error, do not advance to be safe + # On error, increment iterations to avoid infinite loop if LLM is stuck return { - "current_step": current_step, + "iterations": state.get("iterations", 0) + 1, "next_action": "delegate" } diff --git a/backend/tests/test_review_fix_unbounded_loop.py b/backend/tests/test_review_fix_unbounded_loop.py new file mode 100644 index 0000000..311157d --- /dev/null +++ b/backend/tests/test_review_fix_unbounded_loop.py @@ -0,0 +1,85 @@ +import pytest +from unittest.mock import MagicMock, patch +from ea_chatbot.graph.workflow import create_workflow +from ea_chatbot.graph.state import AgentState +from langchain_core.messages import AIMessage, HumanMessage + +def test_orchestrator_loop_retry_budget(): + """Verify that the orchestrator loop is bounded and terminates after max retries.""" + + mock_analyzer = MagicMock() + mock_planner = MagicMock() + mock_delegate = MagicMock() + mock_worker = MagicMock() + mock_reflector = MagicMock() + mock_synthesizer = MagicMock() + + # 1. Analyzer: Proceed to planning + mock_analyzer.return_value = {"next_action": "plan"} + + # 2. Planner: One task + mock_planner.return_value = { + "checklist": [{"task": "Unsolvable Task", "worker": "data_analyst"}], + "current_step": 0, + "iterations": 0 + } + + # We'll use the REAL delegate and reflector logic to verify the fix + # But we mock the LLM calls inside them if necessary. + # Actually, it's easier to just mock the node return values but follow the logic. + + from ea_chatbot.graph.nodes.delegate import delegate_node + from ea_chatbot.graph.nodes.reflector import reflector_node + + # Mocking the LLM inside reflector to always be unsatisfied + with patch("ea_chatbot.graph.nodes.reflector.get_llm_model") as mock_get_llm: + mock_llm = MagicMock() + # Mark as NOT satisfied + mock_llm.with_structured_output.return_value.invoke.return_value = MagicMock(satisfied=False, reasoning="Still bad.") + mock_get_llm.return_value = mock_llm + + app = create_workflow( + query_analyzer=mock_analyzer, + planner=mock_planner, + # delegate=delegate_node, # Use real + data_analyst_worker=mock_worker, + # reflector=reflector_node, # Use real + synthesizer=mock_synthesizer + ) + + # Mock worker to return something + mock_worker.return_value = {"result": "Bad Output", "messages": [AIMessage(content="Bad")]} + mock_synthesizer.return_value = {"messages": [AIMessage(content="Failure Summary")], "next_action": "end"} + + initial_state = AgentState( + messages=[HumanMessage(content="test")], + question="test", + analysis={}, + next_action="", + iterations=0, + checklist=[], + current_step=0, + vfs={}, + plots=[], + dfs={} + ) + + # Run the graph. If fix works, it should hit iterations=3 and route to synthesizer. + # We use a recursion_limit higher than our retry budget but low enough to fail fast if unbounded. + final_state = app.invoke(initial_state, config={"recursion_limit": 20}) + + # Assertions + # 1. We tried 3 times (iterations 0, 1, 2) and failed on 3rd. + # Wait, delegate routes to summarize when iterations >= 3. + # Reflector increments iterations. + # Loop: + # Start: it=0 + # Delegate (it=0) -> Worker -> Reflector (fail, it=1) -> Delegate (it=1) + # Delegate (it=1) -> Worker -> Reflector (fail, it=2) -> Delegate (it=2) + # Delegate (it=2) -> Worker -> Reflector (fail, it=3) -> Delegate (it=3) + # Delegate (it=3) -> Summarize (it=0) + + assert final_state["iterations"] == 0 # Reset in delegate or handled in synthesizer + # Check if we hit the failure summary + assert "Failed to complete task" in final_state["summary"] + assert mock_worker.call_count == 3