feat(orchestrator): Implement Synthesizer node for final worker results integration

This commit is contained in:
Yunxiao Xu
2026-02-23 05:42:55 -08:00
parent 37c353a249
commit 9fef4888b5
3 changed files with 102 additions and 0 deletions

View File

@@ -0,0 +1,44 @@
from ea_chatbot.graph.state import AgentState
from ea_chatbot.config import Settings
from ea_chatbot.utils.llm_factory import get_llm_model
from ea_chatbot.utils.logging import get_logger, LangChainLoggingHandler
from ea_chatbot.graph.prompts.synthesizer import SYNTHESIZER_PROMPT
def synthesizer_node(state: AgentState) -> dict:
"""Synthesize the results from multiple workers into a final answer."""
question = state["question"]
history = state.get("messages", [])
# We look for the 'summary' from the last worker which might have cumulative info
# Or we can look at all messages in history bubbled up from workers.
# For now, let's assume the history contains all the worker summaries.
settings = Settings()
logger = get_logger("orchestrator:synthesizer")
logger.info("Synthesizing final answer from worker results...")
llm = get_llm_model(
settings.summarizer_llm,
callbacks=[LangChainLoggingHandler(logger=logger)]
)
# We provide the full history and the original question
messages = SYNTHESIZER_PROMPT.format_messages(
question=question,
history=history,
worker_results="Review the worker summaries provided in the message history."
)
try:
response = llm.invoke(messages)
logger.info("[bold green]Final synthesis complete.[/bold green]")
# Return the final message to be added to the state
return {
"messages": [response],
"next_action": "end"
}
except Exception as e:
logger.error(f"Failed to synthesize final answer: {str(e)}")
raise e

View File

@@ -0,0 +1,27 @@
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
SYNTHESIZER_SYSTEM = """You are a Lead Orchestrator for an Election Analytics Chatbot.
You have coordinated several specialized workers (Data Analysts, Researchers) to answer a user's complex query.
Your goal is to synthesize their individual findings into a single, cohesive, and comprehensive final response for the user.
**Guidelines:**
- Do NOT mention the internal 'workers' or 'checklist' names.
- Combine the data insights (from Data Analysts) and factual research (from Researchers) into a natural narrative.
- Ensure all numbers, dates, and names from the worker reports are included accurately.
- If any part of the plan failed, explain the status honestly but professionally.
- Present data in clear formats (tables, bullet points) where appropriate."""
SYNTHESIZER_USER = """USER QUESTION:
{question}
EXECUTION SUMMARY (Results from specialized workers):
{worker_results}
Provide the final integrated response:"""
SYNTHESIZER_PROMPT = ChatPromptTemplate.from_messages([
("system", SYNTHESIZER_SYSTEM),
MessagesPlaceholder(variable_name="history"),
("human", SYNTHESIZER_USER),
])

View File

@@ -0,0 +1,31 @@
from unittest.mock import MagicMock, patch
from ea_chatbot.graph.nodes.synthesizer import synthesizer_node
from ea_chatbot.graph.state import AgentState
from langchain_core.messages import AIMessage
def test_synthesizer_node_success():
"""Verify that the synthesizer node produces a final response."""
state = AgentState(
messages=[AIMessage(content="Worker 1 found data."), AIMessage(content="Worker 2 searched web.")],
question="What are the results?",
checklist=[],
current_step=0,
iterations=0,
vfs={},
plots=[],
dfs={},
next_action="",
analysis={}
)
# Mocking the LLM
with patch("ea_chatbot.graph.nodes.synthesizer.get_llm_model") as mock_get_llm:
mock_llm = MagicMock()
mock_llm.invoke.return_value = AIMessage(content="Final synthesized answer.")
mock_get_llm.return_value = mock_llm
result = synthesizer_node(state)
assert len(result["messages"]) == 1
assert result["messages"][0].content == "Final synthesized answer."
assert result["next_action"] == "end"