Files
ea-chatbot-lg/backend/src/ea_chatbot/graph/workflow.py

93 lines
3.1 KiB
Python

from langgraph.graph import StateGraph, END
from ea_chatbot.graph.state import AgentState
from ea_chatbot.graph.nodes.query_analyzer import query_analyzer_node
from ea_chatbot.graph.nodes.planner import planner_node
from ea_chatbot.graph.nodes.coder import coder_node
from ea_chatbot.graph.nodes.error_corrector import error_corrector_node
from ea_chatbot.graph.nodes.executor import executor_node
from ea_chatbot.graph.nodes.summarizer import summarizer_node
from ea_chatbot.graph.nodes.researcher import researcher_node
from ea_chatbot.graph.nodes.clarification import clarification_node
from ea_chatbot.graph.nodes.summarize_conversation import summarize_conversation_node
MAX_ITERATIONS = 3
def router(state: AgentState) -> str:
"""Route to the next node based on the analysis."""
next_action = state.get("next_action")
if next_action == "plan":
return "planner"
elif next_action == "research":
return "researcher"
elif next_action == "clarify":
return "clarification"
else:
return END
def create_workflow():
"""Create the LangGraph workflow."""
workflow = StateGraph(AgentState)
# Add nodes
workflow.add_node("query_analyzer", query_analyzer_node)
workflow.add_node("planner", planner_node)
workflow.add_node("coder", coder_node)
workflow.add_node("error_corrector", error_corrector_node)
workflow.add_node("researcher", researcher_node)
workflow.add_node("clarification", clarification_node)
workflow.add_node("executor", executor_node)
workflow.add_node("summarizer", summarizer_node)
workflow.add_node("summarize_conversation", summarize_conversation_node)
# Set entry point
workflow.set_entry_point("query_analyzer")
# Add conditional edges from query_analyzer
workflow.add_conditional_edges(
"query_analyzer",
router,
{
"planner": "planner",
"researcher": "researcher",
"clarification": "clarification",
END: END
}
)
# Linear flow for planning and coding
workflow.add_edge("planner", "coder")
workflow.add_edge("coder", "executor")
# Executor routing
def executor_router(state: AgentState) -> str:
if state.get("error"):
# Check for iteration limit to prevent infinite loops
if state.get("iterations", 0) >= MAX_ITERATIONS:
return "summarizer"
return "error_corrector"
return "summarizer"
workflow.add_conditional_edges(
"executor",
executor_router,
{
"error_corrector": "error_corrector",
"summarizer": "summarizer"
}
)
workflow.add_edge("error_corrector", "executor")
workflow.add_edge("researcher", "summarize_conversation")
workflow.add_edge("clarification", END)
workflow.add_edge("summarizer", "summarize_conversation")
workflow.add_edge("summarize_conversation", END)
# Compile the graph
app = workflow.compile()
return app
# Initialize the app
app = create_workflow()