Skip to main content

Documentation Index

Fetch the complete documentation index at: https://langchain-5e9cc07a-preview-nhlang-1779401817-6c3eb92.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

Event streaming is the recommended streaming model for new applications talking to a LangSmith deployment. The LangGraph SDK (Python, JavaScript) opens a single subscription against the LangSmith Deployment API and exposes typed projections—messages, state, tool calls, subgraphs, output, and custom transformer extensions—that can be consumed concurrently from one run. Event streaming sits one level above the legacy streaming API, which exposes raw stream modes.
Event streaming requires langgraph-api>=0.9.0rc1 on the LangGraph Agent Server, with langgraph>=1.2.0 and langgraph-sdk>=0.3.14 (Python) or @langchain/langgraph-sdk>=1.9.2 (JavaScript). Servers on earlier versions continue to serve the legacy streaming API.

Quickstart

from langgraph_sdk import get_client

client = get_client(url=DEPLOYMENT_URL, api_key=API_KEY)

async with client.threads.stream(assistant_id="agent") as thread:
    await thread.run.start(
        input={"messages": [{"role": "user", "content": "What is 42 * 17?"}]},
    )

    async for message in thread.messages:
        async for token in message.text:
            print(token, end="", flush=True)

    final_state = await thread.output
For the in-process equivalent in LangGraph application code, see LangGraph event streaming.

What event streaming provides

The stream returned by client.threads.stream(...) exposes typed projections over one underlying event flow:
ProjectionUse
thread (iterable)Iterate every protocol event.
thread.messagesStream chat model messages, token deltas, reasoning, and tool-call argument chunks.
thread.valuesIterate state snapshots and await the final value.
thread.outputAwait the final output.
thread.toolCallsObserve tool invocations with assembled input, status, output, and errors.
thread.subgraphsDiscover and observe nested graph executions.
thread.subagentsAlias for thread.subgraphs. Use this name when working with Deep Agents subagent invocations.
thread.interruptsInspect human-in-the-loop interrupt payloads.
thread.interruptedCheck whether the run paused for human input.
thread.extensionsConsume custom stream transformer projections published on custom:<name> channels.
Multiple consumers can read these projections concurrently. Reading thread.messages does not consume events needed by thread.values, thread.toolCalls, thread.subgraphs, or thread.output.

Stream messages

Use thread.messages for chat model output:
async with client.threads.stream(assistant_id="agent") as thread:
    await thread.run.start(input=input)

    async for message in thread.messages:
        text = await message.text
        usage = await message.usage

        print(text)
        print(usage)
message.text is both an async iterable and an awaitable. Iterate it for token-by-token output, or await it for the complete text. message.reasoning exposes reasoning deltas and message.tool_calls (message.toolCalls in JavaScript) exposes tool-call argument chunks. To consume text, reasoning, and tool-call chunks in exact arrival order, iterate the raw event stream instead of each projection separately.

Stream state

Use thread.values to stream full state snapshots after each step:
async with client.threads.stream(assistant_id="agent") as thread:
    await thread.run.start(input=input)

    async for snapshot in thread.values:
        print(snapshot)

    final_state = await thread.output
thread.values is also awaitable. Awaiting thread.values resolves to the final state, equivalent to await thread.output.

Stream tool calls

thread.tool_calls (thread.toolCalls in JavaScript) exposes assembled tool invocations. Each handle carries the tool name, the assembled input, a status (started, finished, errored), and—when the tool ran—its output or error:
async for call in thread.tool_calls:
    print(call.name, await call.input)

    if call.status == "errored":
        print(await call.error)
    else:
        print(await call.output)
Tool events are correlated by tool call ID with the corresponding tool-call content blocks on thread.messages.

Stream subgraphs

Use thread.subgraphs to observe nested graph work without parsing namespace strings:
async for subgraph in thread.subgraphs:
    print(subgraph.name, subgraph.path)

    async for message in subgraph.messages:
        print(await message.text)
For Deep Agents deployments, prefer thread.subagents for subagent invocations—it exposes the subagent name, prompt, and per-subagent message and tool-call projections.

Stream output

Await thread.output for the final state once the run completes:
await thread.run.start(input=input)

final_state = await thread.output
thread.output shares its subscription with thread.values, so awaiting one does not require an extra round trip when the other is also being read.

Stream multiple projections

Run concurrent consumers when application code needs more than one projection at a time:
import asyncio


async def consume_messages():
    async for message in thread.messages:
        print(await message.text)


async def consume_tool_calls():
    async for call in thread.tool_calls:
        print(call.name, call.status)


async def consume_subgraphs():
    async for subgraph in thread.subgraphs:
        print(subgraph.path)


await asyncio.gather(consume_messages(), consume_tool_calls(), consume_subgraphs())
Each projection opens a filtered subscription against the same thread, so concurrent reads do not increase server load beyond the channels actually consumed.

Resume after an interrupt

When a graph pauses for human input, inspect thread.interrupted and thread.interrupts, then resume by responding to the interrupt:
async with client.threads.stream(assistant_id="agent") as thread:
    await thread.run.start(input=input)

    async for message in thread.messages:
        print(await message.text)

    if thread.interrupted:
        for interrupt in thread.interrupts:
            await thread.run.respond(
                {"decisions": [{"type": "approve"}]},
                interrupt_id=interrupt["interrupt_id"],
            )

    final_state = await thread.output

Join an active run

To attach to a run already in flight on a thread—after a page reload, in a separate worker, or from another client—open the thread stream with the existing thread_id and skip thread.run.start(). The deployment replays buffered events when the connection opens, so consumers reconstruct the run state from the beginning without missing any output.
from langgraph_sdk import get_client

client = get_client(url=DEPLOYMENT_URL, api_key=API_KEY)

async with client.threads.stream(
    thread_id=thread_id,
    assistant_id="agent",
) as thread:
    async for message in thread.messages:
        print(await message.text)

    final_state = await thread.output

Stream all protocol events

Iterate the stream object itself when application code needs the raw protocol event flow:
async with client.threads.stream(assistant_id="agent") as thread:
    await thread.run.start(input=input)

    async for event in thread:
        print(event["method"], event["params"]["namespace"], event["params"]["data"])
To narrow to specific channels, open a subscribe on the thread:
async for event in thread.subscribe(["messages", "tools"]):
    ...
Each event is a ProtocolEvent envelope wrapping a channel-specific payload:
from typing import Any, NotRequired, TypedDict


class ProtocolEventParams(TypedDict):
    namespace: list[str]   # path of "<name>:<runtime_id>" segments; [] is the root
    timestamp: int         # wall-clock milliseconds; can drift, don't rely on for ordering
    data: Any              # channel-specific payload


class ProtocolEvent(TypedDict):
    seq: int               # strictly increasing within a run; use for ordering
    method: str            # channel name: "messages", "values", "tools", "lifecycle", "custom", ...
    params: ProtocolEventParams
    event_id: NotRequired[str]   # stable dedup ID; mirrored to the SSE `id:` field
The namespace is a path from the root graph to the scope that emitted the event. The root is the empty array. Each child execution adds one "name:runtime_id" segment, so a nested tool call inside a subgraph looks like ["researcher:6f4d", "tools:91ac"]. Filter raw events by namespace directly when only a specific subtree matters; thread.subgraphs already does this for nested graph executions.

Channels and event lifecycle

Raw events flow on channels. The channel name appears as the event’s method; each channel emits a specific event shape.
ChannelPurpose
valuesFull graph state snapshots.
updatesPer-node state deltas.
messagesContent-block-centric chat model output.
toolsTool call start, streamed output, finish, and error events.
lifecycleRun, subgraph, and subagent status changes.
checkpointsLightweight checkpoint envelopes for branching and time travel.
inputHuman-in-the-loop input requests and responses.
tasksPregel task creation and result events.
customUser-defined payloads from graph code.
custom:<name>Application-defined stream transformer output.
The typed projections (thread.messages, thread.values, thread.toolCalls, etc.) are built from these channels. The channel name appears as the method field on raw events when iterating the stream object directly.

Messages

The messages channel models output as content blocks. The data.event field is one of message-start, content-block-start, content-block-delta, content-block-finish, message-finish. Content blocks have explicit boundaries: a block starts, emits zero or more deltas, and finishes before the next block in the same message starts. message-finish may include token usage; unrecoverable model-call failures arrive as message error events.

Tools

The tools channel exposes tool execution. The data.event field is one of tool-started, tool-output-delta, tool-finished, tool-error. Tool events are correlated by tool call ID, so a tool execution can be joined back to its originating tool-call content block on the messages channel.

Lifecycle

The lifecycle channel tracks root run, subgraph, and subagent status. The data.event field is one of started, completed, failed, interrupted. Lifecycle data may include an optional graph_name, error, and cause describing why a child scope started (parent tool call, fan-out send, edge transition).

Resume from last event

Event streams are resumable. The Agent Server buffers events per run, assigns each a strictly increasing seq and stable event_id, and replays from a cursor on reconnect. The SDK handles transient drops automatically: each open subscription tracks its highest observed seq, and on reconnect the SDK replays from that cursor and dedupes any events the server resends. To resume across a process boundary—a page reload, a worker handoff, or a separate client—reopen the thread with the same thread_id. The server replays buffered events from the start of the run when a new subscription opens, and the SDK demultiplexes them into the same typed projections.
async with client.threads.stream(
    thread_id=thread_id,
    assistant_id="agent",
) as thread:
    async for event in thread:
        print(event["method"], event.get("event_id"))
The wire-level event and command formats are defined in the Agent Protocol repository.