From 9fef4888b51d22e9ed2994c8b512a4aa8700d846 Mon Sep 17 00:00:00 2001 From: Yunxiao Xu Date: Mon, 23 Feb 2026 05:42:55 -0800 Subject: [PATCH] feat(orchestrator): Implement Synthesizer node for final worker results integration --- .../src/ea_chatbot/graph/nodes/synthesizer.py | 44 +++++++++++++++++++ .../ea_chatbot/graph/prompts/synthesizer.py | 27 ++++++++++++ .../tests/test_orchestrator_synthesizer.py | 31 +++++++++++++ 3 files changed, 102 insertions(+) create mode 100644 backend/src/ea_chatbot/graph/nodes/synthesizer.py create mode 100644 backend/src/ea_chatbot/graph/prompts/synthesizer.py create mode 100644 backend/tests/test_orchestrator_synthesizer.py diff --git a/backend/src/ea_chatbot/graph/nodes/synthesizer.py b/backend/src/ea_chatbot/graph/nodes/synthesizer.py new file mode 100644 index 0000000..e1b6f6b --- /dev/null +++ b/backend/src/ea_chatbot/graph/nodes/synthesizer.py @@ -0,0 +1,44 @@ +from ea_chatbot.graph.state import AgentState +from ea_chatbot.config import Settings +from ea_chatbot.utils.llm_factory import get_llm_model +from ea_chatbot.utils.logging import get_logger, LangChainLoggingHandler +from ea_chatbot.graph.prompts.synthesizer import SYNTHESIZER_PROMPT + +def synthesizer_node(state: AgentState) -> dict: + """Synthesize the results from multiple workers into a final answer.""" + question = state["question"] + history = state.get("messages", []) + + # We look for the 'summary' from the last worker which might have cumulative info + # Or we can look at all messages in history bubbled up from workers. + # For now, let's assume the history contains all the worker summaries. + + settings = Settings() + logger = get_logger("orchestrator:synthesizer") + + logger.info("Synthesizing final answer from worker results...") + + llm = get_llm_model( + settings.summarizer_llm, + callbacks=[LangChainLoggingHandler(logger=logger)] + ) + + # We provide the full history and the original question + messages = SYNTHESIZER_PROMPT.format_messages( + question=question, + history=history, + worker_results="Review the worker summaries provided in the message history." + ) + + try: + response = llm.invoke(messages) + logger.info("[bold green]Final synthesis complete.[/bold green]") + + # Return the final message to be added to the state + return { + "messages": [response], + "next_action": "end" + } + except Exception as e: + logger.error(f"Failed to synthesize final answer: {str(e)}") + raise e diff --git a/backend/src/ea_chatbot/graph/prompts/synthesizer.py b/backend/src/ea_chatbot/graph/prompts/synthesizer.py new file mode 100644 index 0000000..05e3e32 --- /dev/null +++ b/backend/src/ea_chatbot/graph/prompts/synthesizer.py @@ -0,0 +1,27 @@ +from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder + +SYNTHESIZER_SYSTEM = """You are a Lead Orchestrator for an Election Analytics Chatbot. +You have coordinated several specialized workers (Data Analysts, Researchers) to answer a user's complex query. + +Your goal is to synthesize their individual findings into a single, cohesive, and comprehensive final response for the user. + +**Guidelines:** +- Do NOT mention the internal 'workers' or 'checklist' names. +- Combine the data insights (from Data Analysts) and factual research (from Researchers) into a natural narrative. +- Ensure all numbers, dates, and names from the worker reports are included accurately. +- If any part of the plan failed, explain the status honestly but professionally. +- Present data in clear formats (tables, bullet points) where appropriate.""" + +SYNTHESIZER_USER = """USER QUESTION: +{question} + +EXECUTION SUMMARY (Results from specialized workers): +{worker_results} + +Provide the final integrated response:""" + +SYNTHESIZER_PROMPT = ChatPromptTemplate.from_messages([ + ("system", SYNTHESIZER_SYSTEM), + MessagesPlaceholder(variable_name="history"), + ("human", SYNTHESIZER_USER), +]) diff --git a/backend/tests/test_orchestrator_synthesizer.py b/backend/tests/test_orchestrator_synthesizer.py new file mode 100644 index 0000000..f5b046d --- /dev/null +++ b/backend/tests/test_orchestrator_synthesizer.py @@ -0,0 +1,31 @@ +from unittest.mock import MagicMock, patch +from ea_chatbot.graph.nodes.synthesizer import synthesizer_node +from ea_chatbot.graph.state import AgentState +from langchain_core.messages import AIMessage + +def test_synthesizer_node_success(): + """Verify that the synthesizer node produces a final response.""" + state = AgentState( + messages=[AIMessage(content="Worker 1 found data."), AIMessage(content="Worker 2 searched web.")], + question="What are the results?", + checklist=[], + current_step=0, + iterations=0, + vfs={}, + plots=[], + dfs={}, + next_action="", + analysis={} + ) + + # Mocking the LLM + with patch("ea_chatbot.graph.nodes.synthesizer.get_llm_model") as mock_get_llm: + mock_llm = MagicMock() + mock_llm.invoke.return_value = AIMessage(content="Final synthesized answer.") + mock_get_llm.return_value = mock_llm + + result = synthesizer_node(state) + + assert len(result["messages"]) == 1 + assert result["messages"][0].content == "Final synthesized answer." + assert result["next_action"] == "end"