From 02d93120e0ff4d6f425669f6cde0327ff46df611 Mon Sep 17 00:00:00 2001 From: Yunxiao Xu Date: Mon, 23 Feb 2026 16:44:06 -0800 Subject: [PATCH] feat(api): Update Chat Stream Protocol for Orchestrator Architecture --- backend/src/ea_chatbot/api/routers/agent.py | 11 ++++++----- backend/tests/api/test_agent_stream.py | 13 +++++++------ 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/backend/src/ea_chatbot/api/routers/agent.py b/backend/src/ea_chatbot/api/routers/agent.py index 788a87d..df03c19 100644 --- a/backend/src/ea_chatbot/api/routers/agent.py +++ b/backend/src/ea_chatbot/api/routers/agent.py @@ -56,7 +56,8 @@ async def stream_agent_events( initial_state, config, version="v2", - checkpointer=checkpointer + checkpointer=checkpointer, + subgraphs=True ): kind = event.get("event") name = event.get("name") @@ -71,8 +72,8 @@ async def stream_agent_events( "data": data } - # Buffer assistant chunks (summarizer and researcher might stream) - if kind == "on_chat_model_stream" and node_name in ["summarizer", "researcher", "clarification"]: + # Buffer assistant chunks (synthesizer and clarification might stream) + if kind == "on_chat_model_stream" and node_name in ["synthesizer", "clarification"]: chunk = data.get("chunk", "") # Use utility to safely extract text content from the chunk chunk_data = convert_to_json_compatible(chunk) @@ -83,7 +84,7 @@ async def stream_agent_events( assistant_chunks.append(str(chunk_data)) # Buffer and encode plots - if kind == "on_chain_end" and name == "executor": + if kind == "on_chain_end" and name == "data_analyst_worker": output = data.get("output", {}) if isinstance(output, dict) and "plots" in output: plots = output["plots"] @@ -95,7 +96,7 @@ async def stream_agent_events( output_event["data"]["encoded_plots"] = encoded_plots # Collect final response from terminal nodes - if kind == "on_chain_end" and name in ["summarizer", "researcher", "clarification"]: + if kind == "on_chain_end" and name in ["synthesizer", "clarification"]: output = data.get("output", {}) if isinstance(output, dict) and "messages" in output: last_msg = output["messages"][-1] diff --git a/backend/tests/api/test_agent_stream.py b/backend/tests/api/test_agent_stream.py index f81c5d9..f839799 100644 --- a/backend/tests/api/test_agent_stream.py +++ b/backend/tests/api/test_agent_stream.py @@ -36,24 +36,25 @@ async def test_stream_agent_events_all_features(): # Stream chunk { "event": "on_chat_model_stream", - "metadata": {"langgraph_node": "summarizer"}, + "metadata": {"langgraph_node": "synthesizer"}, "data": {"chunk": AIMessage(content="Hello ")} }, { "event": "on_chat_model_stream", - "metadata": {"langgraph_node": "summarizer"}, + "metadata": {"langgraph_node": "synthesizer"}, "data": {"chunk": AIMessage(content="world")} }, - # Plot event + # Plot event - with nested subgraph it might bubble up or come directly from data_analyst_worker + # Let's mock it coming from the data_analyst_worker on_chain_end { "event": "on_chain_end", - "name": "executor", + "name": "data_analyst_worker", "data": {"output": {"plots": [fig]}} }, # Final response { "event": "on_chain_end", - "name": "summarizer", + "name": "synthesizer", "data": {"output": {"messages": [AIMessage(content="Hello world final")]}} }, # Summary update @@ -91,7 +92,7 @@ async def test_stream_agent_events_all_features(): assert any(r.get("type") == "on_chat_model_stream" for r in results) # Verify plot was encoded - plot_event = next(r for r in results if r.get("name") == "executor") + plot_event = next(r for r in results if r.get("name") == "data_analyst_worker") assert "encoded_plots" in plot_event["data"] assert len(plot_event["data"]["encoded_plots"]) == 1