fix(orchestrator): Enforce retry budget to prevent unbounded loops
This commit is contained in:
@@ -5,14 +5,24 @@ def delegate_node(state: AgentState) -> dict:
|
|||||||
"""Determine which worker subgraph to call next based on the checklist."""
|
"""Determine which worker subgraph to call next based on the checklist."""
|
||||||
checklist = state.get("checklist", [])
|
checklist = state.get("checklist", [])
|
||||||
current_step = state.get("current_step", 0)
|
current_step = state.get("current_step", 0)
|
||||||
|
iterations = state.get("iterations", 0)
|
||||||
logger = get_logger("orchestrator:delegate")
|
logger = get_logger("orchestrator:delegate")
|
||||||
|
|
||||||
if not checklist or current_step >= len(checklist):
|
if not checklist or current_step >= len(checklist):
|
||||||
logger.info("Checklist complete or empty. Routing to summarizer.")
|
logger.info("Checklist complete or empty. Routing to summarizer.")
|
||||||
return {"next_action": "summarize"}
|
return {"next_action": "summarize"}
|
||||||
|
|
||||||
|
# Enforce retry budget
|
||||||
|
if iterations >= 3:
|
||||||
|
logger.error(f"Max retries reached for task {current_step}. Routing to summary with failure.")
|
||||||
|
return {
|
||||||
|
"next_action": "summarize",
|
||||||
|
"iterations": 0, # Reset for next turn
|
||||||
|
"summary": f"Failed to complete task {current_step} after {iterations} attempts."
|
||||||
|
}
|
||||||
|
|
||||||
task_info = checklist[current_step]
|
task_info = checklist[current_step]
|
||||||
worker = task_info.get("worker", "data_analyst")
|
worker = task_info.get("worker", "data_analyst")
|
||||||
|
|
||||||
logger.info(f"Delegating next task to worker: {worker}")
|
logger.info(f"Delegating next task to worker: {worker} (Attempt {iterations + 1})")
|
||||||
return {"next_action": worker}
|
return {"next_action": worker}
|
||||||
|
|||||||
@@ -45,6 +45,7 @@ def planner_node(state: AgentState) -> dict:
|
|||||||
return {
|
return {
|
||||||
"checklist": checklist,
|
"checklist": checklist,
|
||||||
"current_step": 0,
|
"current_step": 0,
|
||||||
|
"iterations": 0, # Reset iteration counter for the new plan
|
||||||
"summary": response.reflection # Use reflection as initial summary
|
"summary": response.reflection # Use reflection as initial summary
|
||||||
}
|
}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
@@ -44,20 +44,20 @@ If there were major errors or the output is missing critical data requested in t
|
|||||||
logger.info("[bold green]Sub-task satisfied.[/bold green] Advancing plan.")
|
logger.info("[bold green]Sub-task satisfied.[/bold green] Advancing plan.")
|
||||||
return {
|
return {
|
||||||
"current_step": current_step + 1,
|
"current_step": current_step + 1,
|
||||||
|
"iterations": 0, # Reset for next task
|
||||||
"next_action": "delegate"
|
"next_action": "delegate"
|
||||||
}
|
}
|
||||||
else:
|
else:
|
||||||
logger.warning(f"[bold yellow]Sub-task NOT satisfied.[/bold yellow] Reason: {response.reasoning}")
|
logger.warning(f"[bold yellow]Sub-task NOT satisfied.[/bold yellow] Reason: {response.reasoning}")
|
||||||
# Do NOT advance the step. This triggers a retry of the same task.
|
# Do NOT advance the step. Increment iterations to track retries.
|
||||||
# In a more advanced version, we might route to a 'planner' for revision.
|
|
||||||
return {
|
return {
|
||||||
"current_step": current_step,
|
"iterations": state.get("iterations", 0) + 1,
|
||||||
"next_action": "delegate"
|
"next_action": "delegate"
|
||||||
}
|
}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to reflect: {str(e)}")
|
logger.error(f"Failed to reflect: {str(e)}")
|
||||||
# On error, do not advance to be safe
|
# On error, increment iterations to avoid infinite loop if LLM is stuck
|
||||||
return {
|
return {
|
||||||
"current_step": current_step,
|
"iterations": state.get("iterations", 0) + 1,
|
||||||
"next_action": "delegate"
|
"next_action": "delegate"
|
||||||
}
|
}
|
||||||
|
|||||||
85
backend/tests/test_review_fix_unbounded_loop.py
Normal file
85
backend/tests/test_review_fix_unbounded_loop.py
Normal file
@@ -0,0 +1,85 @@
|
|||||||
|
import pytest
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
from ea_chatbot.graph.workflow import create_workflow
|
||||||
|
from ea_chatbot.graph.state import AgentState
|
||||||
|
from langchain_core.messages import AIMessage, HumanMessage
|
||||||
|
|
||||||
|
def test_orchestrator_loop_retry_budget():
|
||||||
|
"""Verify that the orchestrator loop is bounded and terminates after max retries."""
|
||||||
|
|
||||||
|
mock_analyzer = MagicMock()
|
||||||
|
mock_planner = MagicMock()
|
||||||
|
mock_delegate = MagicMock()
|
||||||
|
mock_worker = MagicMock()
|
||||||
|
mock_reflector = MagicMock()
|
||||||
|
mock_synthesizer = MagicMock()
|
||||||
|
|
||||||
|
# 1. Analyzer: Proceed to planning
|
||||||
|
mock_analyzer.return_value = {"next_action": "plan"}
|
||||||
|
|
||||||
|
# 2. Planner: One task
|
||||||
|
mock_planner.return_value = {
|
||||||
|
"checklist": [{"task": "Unsolvable Task", "worker": "data_analyst"}],
|
||||||
|
"current_step": 0,
|
||||||
|
"iterations": 0
|
||||||
|
}
|
||||||
|
|
||||||
|
# We'll use the REAL delegate and reflector logic to verify the fix
|
||||||
|
# But we mock the LLM calls inside them if necessary.
|
||||||
|
# Actually, it's easier to just mock the node return values but follow the logic.
|
||||||
|
|
||||||
|
from ea_chatbot.graph.nodes.delegate import delegate_node
|
||||||
|
from ea_chatbot.graph.nodes.reflector import reflector_node
|
||||||
|
|
||||||
|
# Mocking the LLM inside reflector to always be unsatisfied
|
||||||
|
with patch("ea_chatbot.graph.nodes.reflector.get_llm_model") as mock_get_llm:
|
||||||
|
mock_llm = MagicMock()
|
||||||
|
# Mark as NOT satisfied
|
||||||
|
mock_llm.with_structured_output.return_value.invoke.return_value = MagicMock(satisfied=False, reasoning="Still bad.")
|
||||||
|
mock_get_llm.return_value = mock_llm
|
||||||
|
|
||||||
|
app = create_workflow(
|
||||||
|
query_analyzer=mock_analyzer,
|
||||||
|
planner=mock_planner,
|
||||||
|
# delegate=delegate_node, # Use real
|
||||||
|
data_analyst_worker=mock_worker,
|
||||||
|
# reflector=reflector_node, # Use real
|
||||||
|
synthesizer=mock_synthesizer
|
||||||
|
)
|
||||||
|
|
||||||
|
# Mock worker to return something
|
||||||
|
mock_worker.return_value = {"result": "Bad Output", "messages": [AIMessage(content="Bad")]}
|
||||||
|
mock_synthesizer.return_value = {"messages": [AIMessage(content="Failure Summary")], "next_action": "end"}
|
||||||
|
|
||||||
|
initial_state = AgentState(
|
||||||
|
messages=[HumanMessage(content="test")],
|
||||||
|
question="test",
|
||||||
|
analysis={},
|
||||||
|
next_action="",
|
||||||
|
iterations=0,
|
||||||
|
checklist=[],
|
||||||
|
current_step=0,
|
||||||
|
vfs={},
|
||||||
|
plots=[],
|
||||||
|
dfs={}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Run the graph. If fix works, it should hit iterations=3 and route to synthesizer.
|
||||||
|
# We use a recursion_limit higher than our retry budget but low enough to fail fast if unbounded.
|
||||||
|
final_state = app.invoke(initial_state, config={"recursion_limit": 20})
|
||||||
|
|
||||||
|
# Assertions
|
||||||
|
# 1. We tried 3 times (iterations 0, 1, 2) and failed on 3rd.
|
||||||
|
# Wait, delegate routes to summarize when iterations >= 3.
|
||||||
|
# Reflector increments iterations.
|
||||||
|
# Loop:
|
||||||
|
# Start: it=0
|
||||||
|
# Delegate (it=0) -> Worker -> Reflector (fail, it=1) -> Delegate (it=1)
|
||||||
|
# Delegate (it=1) -> Worker -> Reflector (fail, it=2) -> Delegate (it=2)
|
||||||
|
# Delegate (it=2) -> Worker -> Reflector (fail, it=3) -> Delegate (it=3)
|
||||||
|
# Delegate (it=3) -> Summarize (it=0)
|
||||||
|
|
||||||
|
assert final_state["iterations"] == 0 # Reset in delegate or handled in synthesizer
|
||||||
|
# Check if we hit the failure summary
|
||||||
|
assert "Failed to complete task" in final_state["summary"]
|
||||||
|
assert mock_worker.call_count == 3
|
||||||
Reference in New Issue
Block a user