Streaming Agent Responses in LangGraph: Tokens, Events, and Real-Time UI Integration
Stream LangGraph agents in real time: astream_events, token-level output, StreamWriter, and FastAPI SSE frontend integration.
Abstract AlgorithmsTLDR: Stream agents token by token with astream_events; wire to FastAPI SSE for zero-spinner UX.
π The 25-Second Spinner: Why Streaming Is a UX Requirement, Not a Nice-to-Have
Your agent takes 25 seconds to respond. Users abandon after 8 seconds. You add streaming β same 25 seconds of total compute, but the first token appears in 0.4 seconds. Abandonment drops to near zero.
This is not a performance optimisation. It is a perception shift. The user's subjective experience of waiting for the whole answer versus watching it appear word by word are cognitively different events. Streaming makes computation feel interactive even when it isn't faster.
In LangGraph's world, "streaming" covers three distinct things:
| What streams | Granularity | Primary audience |
| Graph state | Full snapshot after each node | Debugging, backend orchestration |
| Node deltas | Changed keys only, per node | Lightweight status updates |
| LLM tokens + events | Individual tokens + tool lifecycle events | Frontend UIs, real-time dashboards |
Understanding which layer you need β and which API exposes it β is the first decision. Getting it wrong means either drowning your frontend in over-large payloads or leaving users staring at a blank screen while your agent executes seven tool calls.
This post walks through all three streaming modes, explains how LangGraph generates the event stream internally, and ends with a working FastAPI SSE endpoint that renders a streaming research assistant word by word in the browser.
π Three Streaming Modes: values, updates, and astream_events Compared
LangGraph exposes streaming through two sync and one async family of methods. The mode you pass determines what you receive on each iteration.
# Sync β full state snapshot after each node
for state in graph.stream(input, stream_mode="values"):
print(state)
# Sync β only changed keys after each node
for delta in graph.stream(input, stream_mode="updates"):
print(delta)
# Async β fine-grained event stream (tokens, tool calls, custom events)
async for event in graph.astream_events(input, version="v2"):
print(event)
| Mode | Payload per iteration | Includes LLM tokens? | Overhead | When to use |
stream("values") | Full StateSnapshot | β | Low | Admin consoles, debug logging |
stream("updates") | {"node_name": {changed_keys}} | β | Very low | Progress indicators, node-level UI |
astream_events() | Event dict with type, data, metadata | β | Medium | Token-by-token frontend rendering |
stream(mode="values") yields the entire graph state after every node completes. If your state carries large lists (e.g., accumulated tool results), each yield is a large payload. Useful for checkpointing or feeding downstream orchestrators.
stream(mode="updates") yields only what changed. If the search node adds a search_results key, you receive {"search": {"search_results": [...]}}. This is the natural choice for a "Node X completed" progress bar.
astream_events() is the only method that exposes LLM tokens. It wraps every node's execution in LangChain's callback infrastructure and emits a stream of typed event dicts. The version="v2" parameter is required for LangGraph graphs and enables richer metadata including langgraph_node and langgraph_step.
βοΈ Token Streaming with astream_events: Filtering, Extracting, and Handling Events
The raw event stream from astream_events() is noisy β it includes start/end events for chains, tools, and the LLM itself. In practice you filter to the events that matter for your use case.
Step 1 β Iterate the event stream
async for event in graph.astream_events(
{"messages": [HumanMessage(content=user_query)]},
version="v2",
config={"configurable": {"thread_id": session_id}},
):
event_type = event["event"]
node_name = event.get("metadata", {}).get("langgraph_node", "")
Step 2 β Filter by event type
LangGraph surfaces the following core event types:
| Event type | Fires when |
on_chain_start / on_chain_end | A LangGraph node begins or completes |
on_chat_model_start | The LLM receives its prompt |
on_chat_model_stream | Each token chunk from the LLM |
on_chat_model_end | The LLM finishes its response |
on_tool_start / on_tool_end | A tool is invoked and returns |
on_custom_event | A node emitted a manual event via StreamWriter |
Step 3 β Extract token content
if event_type == "on_chat_model_stream":
chunk = event["data"]["chunk"]
token = chunk.content # str β may be empty for tool-call deltas
if token:
yield token # push to SSE, websocket, etc.
Step 4 β Scope to a specific node
In a multi-node graph you usually only want tokens from the final answer node, not the reasoning node:
if event_type == "on_chat_model_stream" and node_name == "synthesize":
token = event["data"]["chunk"].content
if token:
yield f"data: {token}\n\n"
Step 5 β Handle tool-call lifecycle events
if event_type == "on_tool_start":
tool_name = event["name"]
yield f"data: [TOOL:{tool_name}]\n\n" # surface "Searchingβ¦" to the UI
if event_type == "on_tool_end":
yield f"data: [TOOL_DONE:{event['name']}]\n\n"
This gives your frontend enough signal to render spinners around tool calls and swap them for results once the tool completes.
π§ Deep Dive: How LangGraph's Streaming Pipeline Works
The Internals
LangGraph builds on LangChain's Runnable abstraction. Every node in a LangGraph graph is a RunnableLambda or RunnableSequence under the hood. When you call astream_events(), LangGraph:
- Creates an
AsyncCallbackManagerand attaches it to theRunnableConfigpassed to each node. - The callback manager listens for
on_llm_new_token,on_tool_start,on_tool_end, and other LCEL lifecycle hooks. - Hooks publish typed event dicts into a
asyncio.Queue. - A generator coroutine drains that queue and yields events to your
async forloop.
The event dict schema looks like:
{
"event": "on_chat_model_stream",
"name": "ChatOpenAI",
"run_id": "3f8a...",
"tags": ["seq:step:2"],
"metadata": {
"langgraph_node": "synthesize",
"langgraph_step": 3,
"thread_id": "session-abc",
},
"data": {
"chunk": AIMessageChunk(content="The"),
},
}
The run_id ties a start event to its corresponding end event, which is useful for measuring per-node latency in production tracing.
Backpressure occurs when your consumer (the SSE loop, the browser write) is slower than the LLM produces tokens. The asyncio.Queue buffers unconsumed events in memory. Under sustained load with a slow client, this queue can grow unboundedly. The mitigation is a bounded queue with a timeout or a backpressure-aware transport (HTTP/2 flow control, WebSocket congestion signals).
Performance Analysis
| Metric | stream("values") | stream("updates") | astream_events() |
| Time to first yield | After first node completes | After first node completes | At first LLM token (~400ms typical) |
| Payload size/yield | Full state (can be large) | Changed keys only (small) | Single token dict (~300β600 bytes) |
| Event overhead | Negligible | Negligible | ~0.5β2 ms per event (callback dispatch) |
| Async required? | No (sync stream()) | No (sync stream()) | Yes (astream_events() is async-only) |
| Typical TTFT | 2β15 s (node boundary) | 2β15 s (node boundary) | 0.3β0.8 s (first token) |
The critical observation: TTFT with astream_events() is almost entirely determined by the LLM's time-to-first-token, not by graph traversal. For a two-node graph (search β synthesize), the user sees text appear ~400ms after the synthesize node starts the LLM call, even if search took 18 seconds.
The callback dispatch overhead is measurable at very high throughput (>100 events/sec), but for typical agent workloads (10β40 tokens/sec) it is negligible compared to network RTT and LLM latency.
Mathematical Model
Let T be total agent latency (fixed), T_TTFT be time to first token, and T_total be time to last token. Define perceived latency P as:
$$P = \alpha \cdot T_{\text{TTFT}} + (1 - \alpha) \cdot T_{\text{total}}$$
where Ξ± β [0.7, 0.9] is an empirically measured perceptual weighting factor β humans weight the wait-before-anything-happens far more heavily than the time to read the rest of the answer.
With batch delivery (no streaming): T_TTFT = T_total = 25 s β P β 25 s.
With token streaming at 40 tokens/sec starting at T_TTFT = 0.4 s and T_total = 25 s:
$$P = 0.8 \times 0.4 + 0.2 \times 25 = 0.32 + 5.0 = 5.32 \text{ s}$$
The user's perceived wait drops from 25 seconds to roughly 5 seconds β an 80% reduction β with identical total compute. The lever is entirely the user's attention shifting from "waiting for something to start" to "reading while it continues".
This model also explains why a fast-streaming but verbose agent (high T_total) still feels better than a slow-starting but concise one. The first token is the psychological unlock.
π The Streaming Event Flow: From Node Execution to Browser Pixel
graph TD
A["π Browser\nEventSource /stream?q=..."] -->|"HTTP GET β SSE request"| B["β‘ FastAPI\nStreamingResponse"]
B -->|"graph.astream_events(input, version='v2')"| C["π LangGraph Graph\nagent β tools β agent"]
C -->|"node: agent\nChatOpenAI streaming=True"| D["π€ LLM\nToken generation"]
C -->|"node: tools\nTavilySearch / custom tools"| E["π§ Tool Execution\non_tool_start / on_tool_end"]
D -->|"on_chat_model_stream\n{chunk: AIMessageChunk}"| C
E -->|"tool result dict"| C
C -->|"event dict\n{event, metadata, data}"| B
B -->|"data: token\\n\\n"| A
C -->|"on_chain_end"| B
B -->|"data: [DONE]\\n\\n"| A
Each SSE frame is a UTF-8 string prefixed with data: and terminated by \n\n. The browser's EventSource API fires an onmessage callback for each frame, which appends the token to the DOM. No WebSocket handshake, no polling β a plain HTTP response that never closes until the agent finishes.
ποΈ Multi-Agent Streaming: Subgraphs, Parallel Fan-Out, and Per-Agent Filtering
Single-agent streaming is straightforward. Multi-agent graphs introduce three complications: event multiplexing (tokens from multiple agents interleave), subgraph event bubbling (subgraph events propagate to the parent stream), and parallel branch synchronisation (fan-out nodes produce concurrent event streams).
Subgraph event bubbling
When you compile a subgraph and embed it as a node in a parent graph, events from inside the subgraph automatically appear in the parent's astream_events() stream. The langgraph_node metadata reflects the subgraph node name, not the parent node name. Use the full node path to disambiguate:
# Subgraph "researcher" contains a node "search"
# In the parent stream, events appear as:
event["metadata"]["langgraph_node"] # β "search" (subgraph node)
event["metadata"]["checkpoint_ns"] # β "researcher:" (subgraph namespace)
# Correct filter for the subgraph's LLM tokens:
if (kind == "on_chat_model_stream"
and event["metadata"].get("checkpoint_ns", "").startswith("researcher:")):
yield token
Parallel fan-out with Send
LangGraph's Send API launches multiple parallel branches. Each branch runs concurrently, so their on_chat_model_stream events interleave in arrival order β not graph order. Tag each token with its source branch:
elif kind == "on_chat_model_stream":
node = event["metadata"].get("langgraph_node", "unknown")
token = event["data"]["chunk"].content
if token:
import json
yield f"data: {json.dumps({'node': node, 'token': token})}\n\n"
The frontend routes each node to its own output lane, so the user sees three agents typing simultaneously β the same pattern used in multi-agent research tools like GPT Researcher.
Scaling consideration: each parallel branch maintains its own callback queue. Under high concurrency (>10 parallel agents), the total number of in-flight events grows linearly with branch count. Use an explicit asyncio.Semaphore to cap concurrent branches if your SSE server is memory-constrained.
Common misconceptions
| Misconception | Reality |
| "Streaming is only useful for the final LLM response" | Tool-call start/end events are equally valuable β they give users a real-time map of the agent's reasoning steps |
| "I can add streaming after the fact without changing the graph" | True for event-stream mode, but only if streaming=True was set on every LLM node from the start |
| "WebSockets are always better than SSE for streaming" | SSE is unidirectional, simpler, and works through every HTTP proxy. Use WebSockets only when you need the client to send mid-stream messages (e.g., "stop generating") |
| "astream_events() guarantees event order across parallel nodes" | No β events across concurrent nodes are interleaved by asyncio scheduling; only within-node ordering is guaranteed |
π Real-World Applications: How ChatGPT, Claude, and Copilot Stream Responses
ChatGPT uses OpenAI's /v1/chat/completions endpoint with stream: true. The response is an SSE stream of JSON delta objects ({"choices":[{"delta":{"content":"The"}}]}). The frontend concatenates deltas into the visible message. Error recovery: if the stream drops mid-response, the UI shows a "regenerate" button β it does not attempt to resume from the partial output.
Claude (Anthropic) streams content_block_delta events over text/event-stream. It additionally emits message_start, content_block_start, and message_delta events, giving the client richer structure to render thinking blocks separately from final answers. Anthropic's extended thinking feature streams the <thinking> tokens under a thinking content block before the final text block begins.
GitHub Copilot Chat runs inside VS Code's extension host. It uses LangChain's streaming callback system internally and pipes tokens to VS Code's ChatResponseStream API. The key difference from browser SSE: the token consumer is a language-server protocol client running in the same process, which means backpressure is handled by Node.js stream backpressure rather than HTTP flow control.
Operational pattern shared by all three: they emit a structured terminal event ([DONE], message_stop, copilot_done) so the client knows the stream is complete without waiting for the HTTP connection to close. This matters when using HTTP/1.1 keep-alive connections where the connection might be reused.
βοΈ Trade-offs and Failure Modes: Partial Output, Error Mid-Stream, and Backpressure
Partial output on error. If your graph raises an exception inside a node after 200 tokens have already been sent, the user has already seen partial text. You cannot unsend it. The mitigation is to emit a structured error event (data: {"type":"error","message":"..."}\n\n) and have the frontend replace the partial message with an error state rather than leaving the half-answer visible.
Slow consumer / backpressure. A user's browser tab backgrounded or a mobile device on a flaky connection will stop consuming the SSE stream. The server-side asyncio.Queue buffers events in memory. Under high concurrency this is a memory leak vector. Use a bounded asyncio.Queue(maxsize=512) with a put_nowait + asyncio.TimeoutError guard, and close the generator if the consumer falls too far behind.
Error mid-stream in a subgraph. Multi-agent graphs where a subgraph raises a ToolException will surface on_chain_error events. Your event loop must handle these explicitly:
if event["event"] == "on_chain_error":
error_msg = str(event["data"].get("error", "Unknown error"))
yield f"data: {{\"type\":\"error\",\"message\":\"{error_msg}\"}}\n\n"
return
Without this handler, the generator silently terminates and the browser SSE connection closes with no error signal to the user.
Event ordering across concurrent nodes. LangGraph supports parallel node execution (Send API, fanout). Events from concurrent nodes are interleaved in the order their callbacks fire β there is no global ordering guarantee across concurrent branches. If your frontend renders multiple agent streams side by side, tag each event with langgraph_node and route to the correct UI lane.
Debugging difficulty. Async event streams are harder to trace than synchronous call stacks. Use run_id correlation (present on every event) to group a single LLM call's start/stream/end events together in your logging pipeline (Langfuse, LangSmith, OpenTelemetry).
π§ Decision Guide: stream() vs astream() vs astream_events() β When to Use Each
| Situation | Recommendation |
Use stream("values") | You need the full state after each node for checkpointing, admin inspection, or feeding results to a downstream pipeline. Token latency is not a concern. |
Use stream("updates") | You want a lightweight "Node X finished" progress indicator and don't need token-level updates. Sync context (CLI tool, batch pipeline). |
Use astream_events() | You are building a real-time UI. You need individual tokens, tool-call lifecycle signals, or custom progress events. Async context required. |
Avoid astream_events() when | Your graph runs entirely non-LLM logic (pure Python computation). The event overhead adds latency with no user-visible benefit. Use stream("updates") instead. |
| Edge case β multi-agent subgraphs | Call astream_events() at the top-level orchestrator. Events from subgraphs bubble up automatically. Use event["metadata"]["langgraph_node"] to filter which subgraph's tokens to surface. If you need to stream a subgraph in isolation, call astream_events() directly on the compiled subgraph with the same config. |
π§ͺ Practical Example: FastAPI SSE Endpoint With Streaming Research Assistant
This example builds a two-node LangGraph agent (search β synthesize) and exposes it as a streaming SSE endpoint, with a JavaScript EventSource client that renders tokens word by word as they arrive. The research-assistant scenario was chosen because it has two nodes with meaningfully different event profiles β the search_node emits tool call events, and the synthesize_node emits token-by-token text chunks β which lets you see both event types in one runnable example. As you read through the generate_stream async generator, watch the event["event"] filter: every on_chat_model_stream event that passes the check is one token yielded to the browser, and that is the complete streaming pipeline in its minimal form.
Graph definition
# graph.py
from langgraph.graph import StateGraph, MessagesState, END
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults
from langgraph.prebuilt import ToolNode
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
tools = [TavilySearchResults(max_results=3)]
llm_with_tools = llm.bind_tools(tools)
def call_model(state: MessagesState):
response = llm_with_tools.invoke(state["messages"])
return {"messages": [response]}
def should_continue(state: MessagesState):
last = state["messages"][-1]
return "tools" if last.tool_calls else END
builder = StateGraph(MessagesState)
builder.add_node("agent", call_model)
builder.add_node("tools", ToolNode(tools))
builder.set_entry_point("agent")
builder.add_conditional_edges("agent", should_continue)
builder.add_edge("tools", "agent")
graph = builder.compile()
Important: pass
streaming=TruetoChatOpenAI. Without it, the LLM buffers its full response internally before releasing any tokens, makingastream_events()no more responsive thanainvoke().
FastAPI SSE endpoint
# api.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_core.messages import HumanMessage
from graph import graph
app = FastAPI()
async def token_stream(query: str):
"""Async generator: yields SSE-formatted tokens."""
async for event in graph.astream_events(
{"messages": [HumanMessage(content=query)]},
version="v2",
):
kind = event["event"]
node = event.get("metadata", {}).get("langgraph_node", "")
# Surface tool-call progress
if kind == "on_tool_start":
yield f'data: {{"type":"tool_start","name":"{event["name"]}"}}\n\n'
# Stream tokens from the agent node only (not intermediate reasoning)
elif kind == "on_chat_model_stream" and node == "agent":
token = event["data"]["chunk"].content
if token:
# Escape newlines so each SSE frame carries one logical token
safe = token.replace("\n", "\\n")
yield f"data: {safe}\n\n"
# Signal completion
elif kind == "on_chain_end" and node == "agent":
yield "data: [DONE]\n\n"
return
# Propagate errors
elif kind == "on_chain_error":
msg = str(event["data"].get("error", "Agent error"))
yield f'data: {{"type":"error","message":"{msg}"}}\n\n'
return
@app.get("/stream")
async def stream_response(q: str):
return StreamingResponse(
token_stream(q),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # disable nginx buffering
},
)
Frontend β consuming the SSE stream
// frontend.js
const output = document.getElementById("output");
const source = new EventSource(`/stream?q=${encodeURIComponent(userQuery)}`);
let buffer = "";
source.onmessage = (e) => {
if (e.data === "[DONE]") { source.close(); return; }
const payload = e.data;
try {
const parsed = JSON.parse(payload);
if (parsed.type === "tool_start") {
output.insertAdjacentHTML("beforeend",
`<span class="tool-badge">π ${parsed.name}β¦</span>`);
} else if (parsed.type === "error") {
output.insertAdjacentHTML("beforeend",
`<span class="error">${parsed.message}</span>`);
source.close();
}
} catch {
// Plain token β append to text
buffer += payload.replace(/\\n/g, "\n");
output.textContent = buffer;
}
};
source.onerror = () => {
output.insertAdjacentHTML("beforeend", "<em>Connection lost.</em>");
source.close();
};
The result: tool-call badges appear as the agent searches, then tokens flow into the text area word by word. The user sees activity within 400 milliseconds of hitting Enter.
π οΈ StreamWriter: Custom Event Injection From Inside Your Nodes
astream_events() exposes LLM and tool lifecycle events automatically. But what if you want to emit custom progress messages from inside your own node logic β for example, "Loaded 47 documents" between two tool calls?
LangGraph provides get_stream_writer() for exactly this:
# nodes.py
from langgraph.types import StreamWriter, get_stream_writer
def synthesize_with_progress(state: MessagesState) -> dict:
writer: StreamWriter = get_stream_writer()
writer({"type": "progress", "message": "Ranking search resultsβ¦"})
ranked = rank_results(state["search_results"])
writer({"type": "progress", "message": f"Synthesising from {len(ranked)} sourcesβ¦"})
response = llm.invoke(build_prompt(ranked, state["messages"]))
return {"messages": [response]}
These calls emit on_custom_event events into the astream_events() stream:
# In your SSE generator:
elif kind == "on_custom_event" and node == "synthesize_with_progress":
payload = event["data"] # the dict you passed to writer()
if payload.get("type") == "progress":
msg = payload["message"]
yield f'data: {{"type":"progress","message":"{msg}"}}\n\n'
Key rules for StreamWriter:
get_stream_writer()must be called inside a node function that is already being invoked within aastream_events()context. Calling it outside an active streaming context raises aRuntimeError.- The writer is synchronous β you pass a dict directly, no
awaitneeded, even inside an async node. - Custom events are only visible in
astream_events(), not instream("values")orstream("updates"). - In multi-agent setups, custom events from a subgraph bubble up to the parent graph's
astream_events()stream with the correctlanggraph_nodemetadata.
StreamWriter is the right tool for progress reporting between tool calls β the gap where the LLM is silent but the agent is working. Without it, users see a blank stream for potentially 10β20 seconds while tool calls execute.
π Lessons Learned
1. streaming=True on the LLM is not optional. The single most common reason developers find astream_events() doesn't improve TTFT is forgetting to set streaming=True on the ChatOpenAI (or equivalent) constructor. Without it, the LLM callback only fires on_chat_model_end β no on_chat_model_stream events, no tokens.
2. Filter by node name, not just event type. In a multi-node graph, on_chat_model_stream fires for every LLM call across every node. If your planning node calls the LLM to decide what to do next, its tokens will appear in your frontend before the real answer tokens. Always scope to the node that produces user-facing output.
3. Always send a terminal event. SSE connections stay open until the server closes them. If your generator returns without sending [DONE], the browser's EventSource will re-connect every 3 seconds (the default retry interval). This creates ghost requests that re-run the agent. Emit [DONE] explicitly and call source.close() on the frontend.
4. Disable proxy and reverse-proxy buffering. Nginx, Caddy, and AWS ALB all buffer upstream responses by default. Streaming tokens accumulate in the proxy buffer and release in chunks, destroying the UX effect. Add X-Accel-Buffering: no (Nginx) and set appropriate proxy timeout headers.
5. astream_events() version matters. version="v1" (the default before LangGraph 0.2) does not populate langgraph_node in metadata. Always pass version="v2" for LangGraph graphs. Some LangChain LCEL chains only work with version="v1" β check the LangChain release notes when upgrading.
6. Run IDs are your production observability hook. Every event carries a run_id. Log it and pass it to your tracing backend (LangSmith, Langfuse). You can reconstruct the full token sequence, tool call latencies, and error events for any session from the run_id alone.
π TLDR: Summary and Key Takeaways
- Three streaming APIs, three granularities:
stream("values")for full state,stream("updates")for node deltas,astream_events()for tokens and lifecycle events. Pick based on what the consumer needs, not what is easiest to implement. - TTFT is the UX metric that matters: streaming at 40 tokens/sec with a 400ms first-token delay reduces perceived latency by ~80% versus batch delivery, even with identical total compute time.
on_chat_model_streamis the token event: filter byevent["event"] == "on_chat_model_stream"and scope to the right node withevent["metadata"]["langgraph_node"].streaming=Trueon the LLM constructor is mandatory: without it, noon_chat_model_streamevents fire regardless of which streaming API you call.- StreamWriter bridges the silent gap: between tool calls, the LLM is quiet but the agent is working. Use
get_stream_writer()to emit custom progress events so the user sees activity. - FastAPI SSE is the simplest production-ready integration:
StreamingResponsewithmedia_type="text/event-stream"andX-Accel-Buffering: nocovers 95% of web deployment scenarios. Graduate to WebSockets only if you need bidirectional communication. - Always handle errors and send terminal events: unhandled
on_chain_errorevents silently close the stream; missing[DONE]events triggerEventSourcereconnects that re-run the agent.
Streaming is not a feature you add at the end β it is an architecture decision that shapes how you structure nodes, handle errors, and design your frontend contract.
π Practice Quiz
You call
graph.astream_events(input, version="v2")but never see anyon_chat_model_streamevents, even though the agent is clearly calling the LLM. What is the most likely cause?- A) You forgot to pass
version="v2" - B) You need to use
graph.stream()instead ofastream_events() - C) The
ChatOpenAIinstance was not initialised withstreaming=True - D)
astream_events()only works with tool-call events, not LLM tokens Correct Answer: C
- A) You forgot to pass
Your LangGraph graph has three nodes:
plan,search, andanswer. You want to stream only the tokens that theanswernode produces. Which filter is correct?- A)
if event["event"] == "on_chat_model_stream" - B)
if event["event"] == "on_chat_model_stream" and event["name"] == "answer" - C)
if event["event"] == "on_chat_model_stream" and event["metadata"]["langgraph_node"] == "answer" - D)
if event["event"] == "on_chain_end" and event["metadata"]["langgraph_node"] == "answer"Correct Answer: C
- A)
A user reports that your streaming agent re-runs automatically every few seconds after it finishes. No code change was made. What is the most likely cause and fix?
- A) The
asyncio.Queueoverflowed β increasemaxsize - B) The
EventSourceis reconnecting because no[DONE]terminal event was sent β addyield "data: [DONE]\n\n"and callsource.close()on the frontend - C)
astream_events()has a default retry loop β disable it withretry=False - D) FastAPI is not compatible with SSE β switch to WebSockets Correct Answer: B
- A) The
Open-ended challenge: A product manager asks you to add "streaming" to a multi-agent LangGraph system where three specialist subgraphs run in parallel via
Send. Describe how you would structure the SSE endpoint to simultaneously stream tokens from all three subgraphs, keep them visually distinct in the frontend, and handle the case where one subgraph errors while the other two succeed.
π Related Posts

Written by
Abstract Algorithms
@abstractalgorithms
More Posts
Software Engineering Principles: Your Complete Learning Roadmap
TLDR: This roadmap organizes the Software Engineering Principles series into a problem-first learning path β starting with the code smell before the principle. New to SOLID? Start with Single Responsibility. Facing messy legacy code? Jump to the smel...
Machine Learning Fundamentals: Your Complete Learning Roadmap
TLDR: πΊοΈ Most ML courses dive into math formulas before explaining what problems they solve. This roadmap guides you through 9 essential posts across 3 phases: understanding ML fundamentals β mastering core algorithms β deploying production models. ...
Low-Level Design Guide: Your Complete Learning Roadmap
TLDR TLDR: LLD interviews ask you to design classes and interfaces β not databases and caches.This roadmap sequences 8 problems across two phases: Phase 1 (6 beginner posts) builds your core OOP vocabulary through increasingly complex domains; Phase...

LLM Engineering: Your Complete Learning Roadmap
TLDR: The LLM space moves so fast that engineers end up reading random blog posts and never build a mental model of how everything connects. This roadmap organizes 35+ LLM Engineering posts into 7 tra
