All Posts

Streaming Agent Responses in LangGraph: Tokens, Events, and Real-Time UI Integration

Stream LangGraph agents in real time: astream_events, token-level output, StreamWriter, and FastAPI SSE frontend integration.

Abstract AlgorithmsAbstract Algorithms
Β·Β·20 min read
Share
Share on X / Twitter
Share on LinkedIn
Copy link

TLDR: Stream agents token by token with astream_events; wire to FastAPI SSE for zero-spinner UX.


πŸ“– The 25-Second Spinner: Why Streaming Is a UX Requirement, Not a Nice-to-Have

Your agent takes 25 seconds to respond. Users abandon after 8 seconds. You add streaming β€” same 25 seconds of total compute, but the first token appears in 0.4 seconds. Abandonment drops to near zero.

This is not a performance optimisation. It is a perception shift. The user's subjective experience of waiting for the whole answer versus watching it appear word by word are cognitively different events. Streaming makes computation feel interactive even when it isn't faster.

In LangGraph's world, "streaming" covers three distinct things:

What streamsGranularityPrimary audience
Graph stateFull snapshot after each nodeDebugging, backend orchestration
Node deltasChanged keys only, per nodeLightweight status updates
LLM tokens + eventsIndividual tokens + tool lifecycle eventsFrontend UIs, real-time dashboards

Understanding which layer you need β€” and which API exposes it β€” is the first decision. Getting it wrong means either drowning your frontend in over-large payloads or leaving users staring at a blank screen while your agent executes seven tool calls.

This post walks through all three streaming modes, explains how LangGraph generates the event stream internally, and ends with a working FastAPI SSE endpoint that renders a streaming research assistant word by word in the browser.


πŸ” Three Streaming Modes: values, updates, and astream_events Compared

LangGraph exposes streaming through two sync and one async family of methods. The mode you pass determines what you receive on each iteration.

# Sync β€” full state snapshot after each node
for state in graph.stream(input, stream_mode="values"):
    print(state)

# Sync β€” only changed keys after each node
for delta in graph.stream(input, stream_mode="updates"):
    print(delta)

# Async β€” fine-grained event stream (tokens, tool calls, custom events)
async for event in graph.astream_events(input, version="v2"):
    print(event)
ModePayload per iterationIncludes LLM tokens?OverheadWhen to use
stream("values")Full StateSnapshot❌LowAdmin consoles, debug logging
stream("updates"){"node_name": {changed_keys}}❌Very lowProgress indicators, node-level UI
astream_events()Event dict with type, data, metadataβœ…MediumToken-by-token frontend rendering

stream(mode="values") yields the entire graph state after every node completes. If your state carries large lists (e.g., accumulated tool results), each yield is a large payload. Useful for checkpointing or feeding downstream orchestrators.

stream(mode="updates") yields only what changed. If the search node adds a search_results key, you receive {"search": {"search_results": [...]}}. This is the natural choice for a "Node X completed" progress bar.

astream_events() is the only method that exposes LLM tokens. It wraps every node's execution in LangChain's callback infrastructure and emits a stream of typed event dicts. The version="v2" parameter is required for LangGraph graphs and enables richer metadata including langgraph_node and langgraph_step.


βš™οΈ Token Streaming with astream_events: Filtering, Extracting, and Handling Events

The raw event stream from astream_events() is noisy β€” it includes start/end events for chains, tools, and the LLM itself. In practice you filter to the events that matter for your use case.

Step 1 β€” Iterate the event stream

async for event in graph.astream_events(
    {"messages": [HumanMessage(content=user_query)]},
    version="v2",
    config={"configurable": {"thread_id": session_id}},
):
    event_type = event["event"]
    node_name  = event.get("metadata", {}).get("langgraph_node", "")

Step 2 β€” Filter by event type

LangGraph surfaces the following core event types:

Event typeFires when
on_chain_start / on_chain_endA LangGraph node begins or completes
on_chat_model_startThe LLM receives its prompt
on_chat_model_streamEach token chunk from the LLM
on_chat_model_endThe LLM finishes its response
on_tool_start / on_tool_endA tool is invoked and returns
on_custom_eventA node emitted a manual event via StreamWriter

Step 3 β€” Extract token content

if event_type == "on_chat_model_stream":
    chunk = event["data"]["chunk"]
    token = chunk.content          # str β€” may be empty for tool-call deltas
    if token:
        yield token                # push to SSE, websocket, etc.

Step 4 β€” Scope to a specific node

In a multi-node graph you usually only want tokens from the final answer node, not the reasoning node:

if event_type == "on_chat_model_stream" and node_name == "synthesize":
    token = event["data"]["chunk"].content
    if token:
        yield f"data: {token}\n\n"

Step 5 β€” Handle tool-call lifecycle events

if event_type == "on_tool_start":
    tool_name = event["name"]
    yield f"data: [TOOL:{tool_name}]\n\n"   # surface "Searching…" to the UI

if event_type == "on_tool_end":
    yield f"data: [TOOL_DONE:{event['name']}]\n\n"

This gives your frontend enough signal to render spinners around tool calls and swap them for results once the tool completes.


🧠 Deep Dive: How LangGraph's Streaming Pipeline Works

The Internals

LangGraph builds on LangChain's Runnable abstraction. Every node in a LangGraph graph is a RunnableLambda or RunnableSequence under the hood. When you call astream_events(), LangGraph:

  1. Creates an AsyncCallbackManager and attaches it to the RunnableConfig passed to each node.
  2. The callback manager listens for on_llm_new_token, on_tool_start, on_tool_end, and other LCEL lifecycle hooks.
  3. Hooks publish typed event dicts into a asyncio.Queue.
  4. A generator coroutine drains that queue and yields events to your async for loop.

The event dict schema looks like:

{
    "event":    "on_chat_model_stream",
    "name":     "ChatOpenAI",
    "run_id":   "3f8a...",
    "tags":     ["seq:step:2"],
    "metadata": {
        "langgraph_node": "synthesize",
        "langgraph_step": 3,
        "thread_id":      "session-abc",
    },
    "data": {
        "chunk": AIMessageChunk(content="The"),
    },
}

The run_id ties a start event to its corresponding end event, which is useful for measuring per-node latency in production tracing.

Backpressure occurs when your consumer (the SSE loop, the browser write) is slower than the LLM produces tokens. The asyncio.Queue buffers unconsumed events in memory. Under sustained load with a slow client, this queue can grow unboundedly. The mitigation is a bounded queue with a timeout or a backpressure-aware transport (HTTP/2 flow control, WebSocket congestion signals).

Performance Analysis

Metricstream("values")stream("updates")astream_events()
Time to first yieldAfter first node completesAfter first node completesAt first LLM token (~400ms typical)
Payload size/yieldFull state (can be large)Changed keys only (small)Single token dict (~300–600 bytes)
Event overheadNegligibleNegligible~0.5–2 ms per event (callback dispatch)
Async required?No (sync stream())No (sync stream())Yes (astream_events() is async-only)
Typical TTFT2–15 s (node boundary)2–15 s (node boundary)0.3–0.8 s (first token)

The critical observation: TTFT with astream_events() is almost entirely determined by the LLM's time-to-first-token, not by graph traversal. For a two-node graph (search β†’ synthesize), the user sees text appear ~400ms after the synthesize node starts the LLM call, even if search took 18 seconds.

The callback dispatch overhead is measurable at very high throughput (>100 events/sec), but for typical agent workloads (10–40 tokens/sec) it is negligible compared to network RTT and LLM latency.

Mathematical Model

Let T be total agent latency (fixed), T_TTFT be time to first token, and T_total be time to last token. Define perceived latency P as:

$$P = \alpha \cdot T_{\text{TTFT}} + (1 - \alpha) \cdot T_{\text{total}}$$

where Ξ± ∈ [0.7, 0.9] is an empirically measured perceptual weighting factor β€” humans weight the wait-before-anything-happens far more heavily than the time to read the rest of the answer.

With batch delivery (no streaming): T_TTFT = T_total = 25 s β†’ P β‰ˆ 25 s.

With token streaming at 40 tokens/sec starting at T_TTFT = 0.4 s and T_total = 25 s:

$$P = 0.8 \times 0.4 + 0.2 \times 25 = 0.32 + 5.0 = 5.32 \text{ s}$$

The user's perceived wait drops from 25 seconds to roughly 5 seconds β€” an 80% reduction β€” with identical total compute. The lever is entirely the user's attention shifting from "waiting for something to start" to "reading while it continues".

This model also explains why a fast-streaming but verbose agent (high T_total) still feels better than a slow-starting but concise one. The first token is the psychological unlock.


πŸ“Š The Streaming Event Flow: From Node Execution to Browser Pixel

graph TD
    A["🌐 Browser\nEventSource /stream?q=..."] -->|"HTTP GET β€” SSE request"| B["⚑ FastAPI\nStreamingResponse"]
    B -->|"graph.astream_events(input, version='v2')"| C["πŸ”€ LangGraph Graph\nagent β†’ tools β†’ agent"]
    C -->|"node: agent\nChatOpenAI streaming=True"| D["πŸ€– LLM\nToken generation"]
    C -->|"node: tools\nTavilySearch / custom tools"| E["πŸ”§ Tool Execution\non_tool_start / on_tool_end"]
    D -->|"on_chat_model_stream\n{chunk: AIMessageChunk}"| C
    E -->|"tool result dict"| C
    C -->|"event dict\n{event, metadata, data}"| B
    B -->|"data: token\\n\\n"| A
    C -->|"on_chain_end"| B
    B -->|"data: [DONE]\\n\\n"| A

Each SSE frame is a UTF-8 string prefixed with data: and terminated by \n\n. The browser's EventSource API fires an onmessage callback for each frame, which appends the token to the DOM. No WebSocket handshake, no polling β€” a plain HTTP response that never closes until the agent finishes.


πŸ—οΈ Multi-Agent Streaming: Subgraphs, Parallel Fan-Out, and Per-Agent Filtering

Single-agent streaming is straightforward. Multi-agent graphs introduce three complications: event multiplexing (tokens from multiple agents interleave), subgraph event bubbling (subgraph events propagate to the parent stream), and parallel branch synchronisation (fan-out nodes produce concurrent event streams).

Subgraph event bubbling

When you compile a subgraph and embed it as a node in a parent graph, events from inside the subgraph automatically appear in the parent's astream_events() stream. The langgraph_node metadata reflects the subgraph node name, not the parent node name. Use the full node path to disambiguate:

# Subgraph "researcher" contains a node "search"
# In the parent stream, events appear as:
event["metadata"]["langgraph_node"]   # β†’ "search"  (subgraph node)
event["metadata"]["checkpoint_ns"]    # β†’ "researcher:"  (subgraph namespace)

# Correct filter for the subgraph's LLM tokens:
if (kind == "on_chat_model_stream"
        and event["metadata"].get("checkpoint_ns", "").startswith("researcher:")):
    yield token

Parallel fan-out with Send

LangGraph's Send API launches multiple parallel branches. Each branch runs concurrently, so their on_chat_model_stream events interleave in arrival order β€” not graph order. Tag each token with its source branch:

elif kind == "on_chat_model_stream":
    node  = event["metadata"].get("langgraph_node", "unknown")
    token = event["data"]["chunk"].content
    if token:
        import json
        yield f"data: {json.dumps({'node': node, 'token': token})}\n\n"

The frontend routes each node to its own output lane, so the user sees three agents typing simultaneously β€” the same pattern used in multi-agent research tools like GPT Researcher.

Scaling consideration: each parallel branch maintains its own callback queue. Under high concurrency (>10 parallel agents), the total number of in-flight events grows linearly with branch count. Use an explicit asyncio.Semaphore to cap concurrent branches if your SSE server is memory-constrained.

Common misconceptions

MisconceptionReality
"Streaming is only useful for the final LLM response"Tool-call start/end events are equally valuable β€” they give users a real-time map of the agent's reasoning steps
"I can add streaming after the fact without changing the graph"True for event-stream mode, but only if streaming=True was set on every LLM node from the start
"WebSockets are always better than SSE for streaming"SSE is unidirectional, simpler, and works through every HTTP proxy. Use WebSockets only when you need the client to send mid-stream messages (e.g., "stop generating")
"astream_events() guarantees event order across parallel nodes"No β€” events across concurrent nodes are interleaved by asyncio scheduling; only within-node ordering is guaranteed

🌍 Real-World Applications: How ChatGPT, Claude, and Copilot Stream Responses

ChatGPT uses OpenAI's /v1/chat/completions endpoint with stream: true. The response is an SSE stream of JSON delta objects ({"choices":[{"delta":{"content":"The"}}]}). The frontend concatenates deltas into the visible message. Error recovery: if the stream drops mid-response, the UI shows a "regenerate" button β€” it does not attempt to resume from the partial output.

Claude (Anthropic) streams content_block_delta events over text/event-stream. It additionally emits message_start, content_block_start, and message_delta events, giving the client richer structure to render thinking blocks separately from final answers. Anthropic's extended thinking feature streams the <thinking> tokens under a thinking content block before the final text block begins.

GitHub Copilot Chat runs inside VS Code's extension host. It uses LangChain's streaming callback system internally and pipes tokens to VS Code's ChatResponseStream API. The key difference from browser SSE: the token consumer is a language-server protocol client running in the same process, which means backpressure is handled by Node.js stream backpressure rather than HTTP flow control.

Operational pattern shared by all three: they emit a structured terminal event ([DONE], message_stop, copilot_done) so the client knows the stream is complete without waiting for the HTTP connection to close. This matters when using HTTP/1.1 keep-alive connections where the connection might be reused.


βš–οΈ Trade-offs and Failure Modes: Partial Output, Error Mid-Stream, and Backpressure

Partial output on error. If your graph raises an exception inside a node after 200 tokens have already been sent, the user has already seen partial text. You cannot unsend it. The mitigation is to emit a structured error event (data: {"type":"error","message":"..."}\n\n) and have the frontend replace the partial message with an error state rather than leaving the half-answer visible.

Slow consumer / backpressure. A user's browser tab backgrounded or a mobile device on a flaky connection will stop consuming the SSE stream. The server-side asyncio.Queue buffers events in memory. Under high concurrency this is a memory leak vector. Use a bounded asyncio.Queue(maxsize=512) with a put_nowait + asyncio.TimeoutError guard, and close the generator if the consumer falls too far behind.

Error mid-stream in a subgraph. Multi-agent graphs where a subgraph raises a ToolException will surface on_chain_error events. Your event loop must handle these explicitly:

if event["event"] == "on_chain_error":
    error_msg = str(event["data"].get("error", "Unknown error"))
    yield f"data: {{\"type\":\"error\",\"message\":\"{error_msg}\"}}\n\n"
    return

Without this handler, the generator silently terminates and the browser SSE connection closes with no error signal to the user.

Event ordering across concurrent nodes. LangGraph supports parallel node execution (Send API, fanout). Events from concurrent nodes are interleaved in the order their callbacks fire β€” there is no global ordering guarantee across concurrent branches. If your frontend renders multiple agent streams side by side, tag each event with langgraph_node and route to the correct UI lane.

Debugging difficulty. Async event streams are harder to trace than synchronous call stacks. Use run_id correlation (present on every event) to group a single LLM call's start/stream/end events together in your logging pipeline (Langfuse, LangSmith, OpenTelemetry).


🧭 Decision Guide: stream() vs astream() vs astream_events() β€” When to Use Each

SituationRecommendation
Use stream("values")You need the full state after each node for checkpointing, admin inspection, or feeding results to a downstream pipeline. Token latency is not a concern.
Use stream("updates")You want a lightweight "Node X finished" progress indicator and don't need token-level updates. Sync context (CLI tool, batch pipeline).
Use astream_events()You are building a real-time UI. You need individual tokens, tool-call lifecycle signals, or custom progress events. Async context required.
Avoid astream_events() whenYour graph runs entirely non-LLM logic (pure Python computation). The event overhead adds latency with no user-visible benefit. Use stream("updates") instead.
Edge case β€” multi-agent subgraphsCall astream_events() at the top-level orchestrator. Events from subgraphs bubble up automatically. Use event["metadata"]["langgraph_node"] to filter which subgraph's tokens to surface. If you need to stream a subgraph in isolation, call astream_events() directly on the compiled subgraph with the same config.

πŸ§ͺ Practical Example: FastAPI SSE Endpoint With Streaming Research Assistant

This example builds a two-node LangGraph agent (search β†’ synthesize) and exposes it as a streaming SSE endpoint, with a JavaScript EventSource client that renders tokens word by word as they arrive. The research-assistant scenario was chosen because it has two nodes with meaningfully different event profiles β€” the search_node emits tool call events, and the synthesize_node emits token-by-token text chunks β€” which lets you see both event types in one runnable example. As you read through the generate_stream async generator, watch the event["event"] filter: every on_chat_model_stream event that passes the check is one token yielded to the browser, and that is the complete streaming pipeline in its minimal form.

Graph definition

# graph.py
from langgraph.graph import StateGraph, MessagesState, END
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults
from langgraph.prebuilt import ToolNode

llm   = ChatOpenAI(model="gpt-4o-mini", streaming=True)
tools = [TavilySearchResults(max_results=3)]
llm_with_tools = llm.bind_tools(tools)

def call_model(state: MessagesState):
    response = llm_with_tools.invoke(state["messages"])
    return {"messages": [response]}

def should_continue(state: MessagesState):
    last = state["messages"][-1]
    return "tools" if last.tool_calls else END

builder = StateGraph(MessagesState)
builder.add_node("agent",  call_model)
builder.add_node("tools",  ToolNode(tools))
builder.set_entry_point("agent")
builder.add_conditional_edges("agent", should_continue)
builder.add_edge("tools", "agent")
graph = builder.compile()

Important: pass streaming=True to ChatOpenAI. Without it, the LLM buffers its full response internally before releasing any tokens, making astream_events() no more responsive than ainvoke().

FastAPI SSE endpoint

# api.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_core.messages import HumanMessage
from graph import graph

app = FastAPI()

async def token_stream(query: str):
    """Async generator: yields SSE-formatted tokens."""
    async for event in graph.astream_events(
        {"messages": [HumanMessage(content=query)]},
        version="v2",
    ):
        kind = event["event"]
        node = event.get("metadata", {}).get("langgraph_node", "")

        # Surface tool-call progress
        if kind == "on_tool_start":
            yield f'data: {{"type":"tool_start","name":"{event["name"]}"}}\n\n'

        # Stream tokens from the agent node only (not intermediate reasoning)
        elif kind == "on_chat_model_stream" and node == "agent":
            token = event["data"]["chunk"].content
            if token:
                # Escape newlines so each SSE frame carries one logical token
                safe = token.replace("\n", "\\n")
                yield f"data: {safe}\n\n"

        # Signal completion
        elif kind == "on_chain_end" and node == "agent":
            yield "data: [DONE]\n\n"
            return

        # Propagate errors
        elif kind == "on_chain_error":
            msg = str(event["data"].get("error", "Agent error"))
            yield f'data: {{"type":"error","message":"{msg}"}}\n\n'
            return

@app.get("/stream")
async def stream_response(q: str):
    return StreamingResponse(
        token_stream(q),
        media_type="text/event-stream",
        headers={
            "Cache-Control":    "no-cache",
            "X-Accel-Buffering": "no",   # disable nginx buffering
        },
    )

Frontend β€” consuming the SSE stream

// frontend.js
const output  = document.getElementById("output");
const source  = new EventSource(`/stream?q=${encodeURIComponent(userQuery)}`);
let   buffer  = "";

source.onmessage = (e) => {
  if (e.data === "[DONE]") { source.close(); return; }

  const payload = e.data;
  try {
    const parsed = JSON.parse(payload);
    if (parsed.type === "tool_start") {
      output.insertAdjacentHTML("beforeend",
        `<span class="tool-badge">πŸ” ${parsed.name}…</span>`);
    } else if (parsed.type === "error") {
      output.insertAdjacentHTML("beforeend",
        `<span class="error">${parsed.message}</span>`);
      source.close();
    }
  } catch {
    // Plain token β€” append to text
    buffer += payload.replace(/\\n/g, "\n");
    output.textContent = buffer;
  }
};

source.onerror = () => {
  output.insertAdjacentHTML("beforeend", "<em>Connection lost.</em>");
  source.close();
};

The result: tool-call badges appear as the agent searches, then tokens flow into the text area word by word. The user sees activity within 400 milliseconds of hitting Enter.


πŸ› οΈ StreamWriter: Custom Event Injection From Inside Your Nodes

astream_events() exposes LLM and tool lifecycle events automatically. But what if you want to emit custom progress messages from inside your own node logic β€” for example, "Loaded 47 documents" between two tool calls?

LangGraph provides get_stream_writer() for exactly this:

# nodes.py
from langgraph.types import StreamWriter, get_stream_writer

def synthesize_with_progress(state: MessagesState) -> dict:
    writer: StreamWriter = get_stream_writer()

    writer({"type": "progress", "message": "Ranking search results…"})

    ranked = rank_results(state["search_results"])

    writer({"type": "progress", "message": f"Synthesising from {len(ranked)} sources…"})

    response = llm.invoke(build_prompt(ranked, state["messages"]))
    return {"messages": [response]}

These calls emit on_custom_event events into the astream_events() stream:

# In your SSE generator:
elif kind == "on_custom_event" and node == "synthesize_with_progress":
    payload = event["data"]          # the dict you passed to writer()
    if payload.get("type") == "progress":
        msg = payload["message"]
        yield f'data: {{"type":"progress","message":"{msg}"}}\n\n'

Key rules for StreamWriter:

  • get_stream_writer() must be called inside a node function that is already being invoked within a astream_events() context. Calling it outside an active streaming context raises a RuntimeError.
  • The writer is synchronous β€” you pass a dict directly, no await needed, even inside an async node.
  • Custom events are only visible in astream_events(), not in stream("values") or stream("updates").
  • In multi-agent setups, custom events from a subgraph bubble up to the parent graph's astream_events() stream with the correct langgraph_node metadata.

StreamWriter is the right tool for progress reporting between tool calls β€” the gap where the LLM is silent but the agent is working. Without it, users see a blank stream for potentially 10–20 seconds while tool calls execute.


πŸ“š Lessons Learned

1. streaming=True on the LLM is not optional. The single most common reason developers find astream_events() doesn't improve TTFT is forgetting to set streaming=True on the ChatOpenAI (or equivalent) constructor. Without it, the LLM callback only fires on_chat_model_end β€” no on_chat_model_stream events, no tokens.

2. Filter by node name, not just event type. In a multi-node graph, on_chat_model_stream fires for every LLM call across every node. If your planning node calls the LLM to decide what to do next, its tokens will appear in your frontend before the real answer tokens. Always scope to the node that produces user-facing output.

3. Always send a terminal event. SSE connections stay open until the server closes them. If your generator returns without sending [DONE], the browser's EventSource will re-connect every 3 seconds (the default retry interval). This creates ghost requests that re-run the agent. Emit [DONE] explicitly and call source.close() on the frontend.

4. Disable proxy and reverse-proxy buffering. Nginx, Caddy, and AWS ALB all buffer upstream responses by default. Streaming tokens accumulate in the proxy buffer and release in chunks, destroying the UX effect. Add X-Accel-Buffering: no (Nginx) and set appropriate proxy timeout headers.

5. astream_events() version matters. version="v1" (the default before LangGraph 0.2) does not populate langgraph_node in metadata. Always pass version="v2" for LangGraph graphs. Some LangChain LCEL chains only work with version="v1" β€” check the LangChain release notes when upgrading.

6. Run IDs are your production observability hook. Every event carries a run_id. Log it and pass it to your tracing backend (LangSmith, Langfuse). You can reconstruct the full token sequence, tool call latencies, and error events for any session from the run_id alone.


πŸ“Œ TLDR: Summary and Key Takeaways

  • Three streaming APIs, three granularities: stream("values") for full state, stream("updates") for node deltas, astream_events() for tokens and lifecycle events. Pick based on what the consumer needs, not what is easiest to implement.
  • TTFT is the UX metric that matters: streaming at 40 tokens/sec with a 400ms first-token delay reduces perceived latency by ~80% versus batch delivery, even with identical total compute time.
  • on_chat_model_stream is the token event: filter by event["event"] == "on_chat_model_stream" and scope to the right node with event["metadata"]["langgraph_node"].
  • streaming=True on the LLM constructor is mandatory: without it, no on_chat_model_stream events fire regardless of which streaming API you call.
  • StreamWriter bridges the silent gap: between tool calls, the LLM is quiet but the agent is working. Use get_stream_writer() to emit custom progress events so the user sees activity.
  • FastAPI SSE is the simplest production-ready integration: StreamingResponse with media_type="text/event-stream" and X-Accel-Buffering: no covers 95% of web deployment scenarios. Graduate to WebSockets only if you need bidirectional communication.
  • Always handle errors and send terminal events: unhandled on_chain_error events silently close the stream; missing [DONE] events trigger EventSource reconnects that re-run the agent.

Streaming is not a feature you add at the end β€” it is an architecture decision that shapes how you structure nodes, handle errors, and design your frontend contract.


πŸ“ Practice Quiz

  1. You call graph.astream_events(input, version="v2") but never see any on_chat_model_stream events, even though the agent is clearly calling the LLM. What is the most likely cause?

    • A) You forgot to pass version="v2"
    • B) You need to use graph.stream() instead of astream_events()
    • C) The ChatOpenAI instance was not initialised with streaming=True
    • D) astream_events() only works with tool-call events, not LLM tokens Correct Answer: C
  2. Your LangGraph graph has three nodes: plan, search, and answer. You want to stream only the tokens that the answer node produces. Which filter is correct?

    • A) if event["event"] == "on_chat_model_stream"
    • B) if event["event"] == "on_chat_model_stream" and event["name"] == "answer"
    • C) if event["event"] == "on_chat_model_stream" and event["metadata"]["langgraph_node"] == "answer"
    • D) if event["event"] == "on_chain_end" and event["metadata"]["langgraph_node"] == "answer" Correct Answer: C
  3. A user reports that your streaming agent re-runs automatically every few seconds after it finishes. No code change was made. What is the most likely cause and fix?

    • A) The asyncio.Queue overflowed β€” increase maxsize
    • B) The EventSource is reconnecting because no [DONE] terminal event was sent β€” add yield "data: [DONE]\n\n" and call source.close() on the frontend
    • C) astream_events() has a default retry loop β€” disable it with retry=False
    • D) FastAPI is not compatible with SSE β€” switch to WebSockets Correct Answer: B
  4. Open-ended challenge: A product manager asks you to add "streaming" to a multi-agent LangGraph system where three specialist subgraphs run in parallel via Send. Describe how you would structure the SSE endpoint to simultaneously stream tokens from all three subgraphs, keep them visually distinct in the frontend, and handle the case where one subgraph errors while the other two succeed.


Abstract Algorithms

Written by

Abstract Algorithms

@abstractalgorithms