Documentation Index
Fetch the complete documentation index at: https://langchain-5e9cc07a-preview-nhlang-1779401817-6c3eb92.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
Event streaming is the recommended streaming model for new applications talking to a LangSmith deployment. The LangGraph SDK (Python, JavaScript) opens a single subscription against the LangSmith Deployment API and exposes typed projections—messages, state, tool calls, subgraphs, output, and custom transformer extensions—that can be consumed concurrently from one run.
Event streaming sits one level above the legacy streaming API, which exposes raw stream modes.
Event streaming requires langgraph-api>=0.9.0rc1 on the LangGraph Agent Server, with langgraph>=1.2.0 and langgraph-sdk>=0.3.14 (Python) or @langchain/langgraph-sdk>=1.9.2 (JavaScript). Servers on earlier versions continue to serve the legacy streaming API.
Quickstart
from langgraph_sdk import get_client
client = get_client(url=DEPLOYMENT_URL, api_key=API_KEY)
async with client.threads.stream(assistant_id="agent") as thread:
await thread.run.start(
input={"messages": [{"role": "user", "content": "What is 42 * 17?"}]},
)
async for message in thread.messages:
async for token in message.text:
print(token, end="", flush=True)
final_state = await thread.output
import { Client } from "@langchain/langgraph-sdk";
const client = new Client({
apiUrl: process.env.DEPLOYMENT_URL,
apiKey: process.env.LANGSMITH_API_KEY,
});
const thread = client.threads.stream({ assistantId: "agent" });
await thread.run.start({
input: { messages: [{ role: "user", content: "What is 42 * 17?" }] },
});
for await (const message of thread.messages) {
for await (const token of message.text) {
process.stdout.write(token);
}
}
const finalState = await thread.output;
Event streaming uses two endpoints. Open the SSE subscription first, then send a run.start command on the same thread — the SDK does both for you, but at the wire level they are separate requests.Create a thread:curl --request POST \
--url <DEPLOYMENT_URL>/threads \
--header 'Content-Type: application/json' \
--header 'x-api-key: <API_KEY>' \
--data '{}'
Open the event subscription. The body is a SubscribeParams envelope listing channels to filter (and optional namespaces / depth):curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/stream/events \
--header 'Content-Type: application/json' \
--header 'x-api-key: <API_KEY>' \
--data '{"channels": ["values", "updates", "messages", "tools", "lifecycle", "input", "checkpoints", "tasks", "custom"]}'
Send the run.start command on a second request to start the run. The command body is a JSON-RPC-style envelope with id, method, and params:curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/commands \
--header 'Content-Type: application/json' \
--header 'x-api-key: <API_KEY>' \
--data '{
"id": 1,
"method": "run.start",
"params": {
"assistant_id": "agent",
"input": {"messages": [{"role": "user", "content": "What is 42 * 17?"}]}
}
}'
Each line of the SSE response is a ProtocolEvent envelope; parse the events and dispatch by method to reconstruct the typed projections the SDK exposes.
For the in-process equivalent in LangGraph application code, see LangGraph event streaming.
What event streaming provides
The stream returned by client.threads.stream(...) exposes typed projections over one underlying event flow:
| Projection | Use |
|---|
thread (iterable) | Iterate every protocol event. |
thread.messages | Stream chat model messages, token deltas, reasoning, and tool-call argument chunks. |
thread.values | Iterate state snapshots and await the final value. |
thread.output | Await the final output. |
thread.toolCalls | Observe tool invocations with assembled input, status, output, and errors. |
thread.subgraphs | Discover and observe nested graph executions. |
thread.subagents | Alias for thread.subgraphs. Use this name when working with Deep Agents subagent invocations. |
thread.interrupts | Inspect human-in-the-loop interrupt payloads. |
thread.interrupted | Check whether the run paused for human input. |
thread.extensions | Consume custom stream transformer projections published on custom:<name> channels. |
Multiple consumers can read these projections concurrently. Reading thread.messages does not consume events needed by thread.values, thread.toolCalls, thread.subgraphs, or thread.output.
Stream messages
Use thread.messages for chat model output:
async with client.threads.stream(assistant_id="agent") as thread:
await thread.run.start(input=input)
async for message in thread.messages:
text = await message.text
usage = await message.usage
print(text)
print(usage)
const thread = client.threads.stream({ assistantId: "agent" });
await thread.run.start({ input });
for await (const message of thread.messages) {
const text = await message.text;
const usage = await message.usage;
console.log(text);
console.log(usage);
}
Open the subscription scoped to the messages channel:curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/stream/events \
--header 'Content-Type: application/json' \
--header 'x-api-key: <API_KEY>' \
--data '{"channels": ["messages"]}'
Send run.start on /commands as shown in the Quickstart. Dispatch each event on params.data.event (message-start, content-block-start, content-block-delta, content-block-finish, message-finish) to reassemble each message and its content blocks.
message.text is both an async iterable and an awaitable. Iterate it for token-by-token output, or await it for the complete text. message.reasoning exposes reasoning deltas and message.tool_calls (message.toolCalls in JavaScript) exposes tool-call argument chunks. To consume text, reasoning, and tool-call chunks in exact arrival order, iterate the raw event stream instead of each projection separately.
Stream state
Use thread.values to stream full state snapshots after each step:
async with client.threads.stream(assistant_id="agent") as thread:
await thread.run.start(input=input)
async for snapshot in thread.values:
print(snapshot)
final_state = await thread.output
const thread = client.threads.stream({ assistantId: "agent" });
await thread.run.start({ input });
for await (const snapshot of thread.values) {
console.log(snapshot);
}
const finalState = await thread.output;
Open the subscription scoped to the values channel:curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/stream/events \
--header 'Content-Type: application/json' \
--header 'x-api-key: <API_KEY>' \
--data '{"channels": ["values"]}'
Send run.start on /commands as shown in the Quickstart. Each event’s params.data is a full state snapshot.
thread.values is also awaitable. Awaiting thread.values resolves to the final state, equivalent to await thread.output.
thread.tool_calls (thread.toolCalls in JavaScript) exposes assembled tool invocations. Each handle carries the tool name, the assembled input, a status (started, finished, errored), and—when the tool ran—its output or error:
async for call in thread.tool_calls:
print(call.name, await call.input)
if call.status == "errored":
print(await call.error)
else:
print(await call.output)
for await (const call of thread.toolCalls) {
console.log(call.name, await call.input);
if (call.status === "errored") {
console.error(await call.error);
} else {
console.log(await call.output);
}
}
Open the subscription scoped to the tools channel:curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/stream/events \
--header 'Content-Type: application/json' \
--header 'x-api-key: <API_KEY>' \
--data '{"channels": ["tools"]}'
Send run.start on /commands as shown in the Quickstart. Dispatch on params.data.event (tool-started, tool-output-delta, tool-finished, tool-error); correlate by tool call ID with the corresponding tool-call content blocks on the messages channel.
Tool events are correlated by tool call ID with the corresponding tool-call content blocks on thread.messages.
Stream subgraphs
Use thread.subgraphs to observe nested graph work without parsing namespace strings:
async for subgraph in thread.subgraphs:
print(subgraph.name, subgraph.path)
async for message in subgraph.messages:
print(await message.text)
for await (const subgraph of thread.subgraphs) {
console.log(subgraph.name, subgraph.path);
for await (const message of subgraph.messages) {
console.log(await message.text);
}
}
Subgraph activity is conveyed by the params.namespace path on every event. Open the subscription scoped to the lifecycle channel (and any channels you want to observe inside subgraphs):curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/stream/events \
--header 'Content-Type: application/json' \
--header 'x-api-key: <API_KEY>' \
--data '{"channels": ["lifecycle", "messages", "tools"]}'
Send run.start on /commands as shown in the Quickstart. Watch the lifecycle channel for started events with graph_name to discover new subgraphs, then filter subsequent events to that namespace prefix to observe per-subgraph work.
For Deep Agents deployments, prefer thread.subagents for subagent invocations—it exposes the subagent name, prompt, and per-subagent message and tool-call projections.
Stream output
Await thread.output for the final state once the run completes:
await thread.run.start(input=input)
final_state = await thread.output
await thread.run.start({ input });
const finalState = await thread.output;
Open the subscription scoped to values and lifecycle:curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/stream/events \
--header 'Content-Type: application/json' \
--header 'x-api-key: <API_KEY>' \
--data '{"channels": ["values", "lifecycle"]}'
Send run.start on /commands as shown in the Quickstart. Read until you observe a root-namespace lifecycle event with params.data.event == "completed"; the last preceding values event carries the final state.
thread.output shares its subscription with thread.values, so awaiting one does not require an extra round trip when the other is also being read.
Stream multiple projections
Run concurrent consumers when application code needs more than one projection at a time:
import asyncio
async def consume_messages():
async for message in thread.messages:
print(await message.text)
async def consume_tool_calls():
async for call in thread.tool_calls:
print(call.name, call.status)
async def consume_subgraphs():
async for subgraph in thread.subgraphs:
print(subgraph.path)
await asyncio.gather(consume_messages(), consume_tool_calls(), consume_subgraphs())
await Promise.all([
(async () => {
for await (const message of thread.messages) {
console.log(await message.text);
}
})(),
(async () => {
for await (const call of thread.toolCalls) {
console.log(call.name, call.status);
}
})(),
(async () => {
for await (const subgraph of thread.subgraphs) {
console.log(subgraph.path);
}
})(),
]);
Open one subscription that covers every channel you want to consume:curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/stream/events \
--header 'Content-Type: application/json' \
--header 'x-api-key: <API_KEY>' \
--data '{"channels": ["messages", "tools", "lifecycle"]}'
Send run.start on /commands as shown in the Quickstart. A single SSE subscription delivers every channel listed in the body. Dispatch on method to feed independent consumers; the SDK’s concurrent projections do the same demultiplexing client-side.
Each projection opens a filtered subscription against the same thread, so concurrent reads do not increase server load beyond the channels actually consumed.
Resume after an interrupt
When a graph pauses for human input, inspect thread.interrupted and thread.interrupts, then resume by responding to the interrupt:
async with client.threads.stream(assistant_id="agent") as thread:
await thread.run.start(input=input)
async for message in thread.messages:
print(await message.text)
if thread.interrupted:
for interrupt in thread.interrupts:
await thread.run.respond(
{"decisions": [{"type": "approve"}]},
interrupt_id=interrupt["interrupt_id"],
)
final_state = await thread.output
const thread = client.threads.stream({ assistantId: "agent" });
await thread.run.start({ input });
for await (const message of thread.messages) {
console.log(await message.text);
}
if (thread.interrupted) {
for (const interrupt of thread.interrupts) {
await thread.input.respond({
namespace: interrupt.namespace,
interrupt_id: interrupt.interruptId,
response: { decisions: [{ type: "approve" }] },
});
}
}
const finalState = await thread.output;
Send the interrupt response as an input.respond command:curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/commands \
--header 'Content-Type: application/json' \
--header 'x-api-key: <API_KEY>' \
--data '{
"id": 2,
"method": "input.respond",
"params": {
"namespace": <INTERRUPT_NAMESPACE>,
"interrupt_id": "<INTERRUPT_ID>",
"response": {"decisions": [{"type": "approve"}]}
}
}'
Keep the original SSE connection open—the deployment continues emitting events on the same thread after the command lands.
Join an active run
To attach to a run already in flight on a thread—after a page reload, in a separate worker, or from another client—open the thread stream with the existing thread_id and skip thread.run.start(). The deployment replays buffered events when the connection opens, so consumers reconstruct the run state from the beginning without missing any output.
from langgraph_sdk import get_client
client = get_client(url=DEPLOYMENT_URL, api_key=API_KEY)
async with client.threads.stream(
thread_id=thread_id,
assistant_id="agent",
) as thread:
async for message in thread.messages:
print(await message.text)
final_state = await thread.output
import { Client } from "@langchain/langgraph-sdk";
const client = new Client({ apiUrl: DEPLOYMENT_URL, apiKey: API_KEY });
const thread = client.threads.stream(threadId, { assistantId: "agent" });
for await (const message of thread.messages) {
console.log(await message.text);
}
const finalState = await thread.output;
Open the event stream without sending a run.start command to attach as a passive observer:curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/stream/events \
--header 'Content-Type: application/json' \
--header 'x-api-key: <API_KEY>' \
--data '{"channels": ["values", "updates", "messages", "tools", "lifecycle", "input", "checkpoints", "tasks", "custom"]}'
The server replays buffered events from the start of the run when the subscription opens.
Stream all protocol events
Iterate the stream object itself when application code needs the raw protocol event flow:
async with client.threads.stream(assistant_id="agent") as thread:
await thread.run.start(input=input)
async for event in thread:
print(event["method"], event["params"]["namespace"], event["params"]["data"])
To narrow to specific channels, open a subscribe on the thread:async for event in thread.subscribe(["messages", "tools"]):
...
const thread = client.threads.stream({ assistantId: "agent" });
await thread.run.start({ input });
for await (const event of thread) {
console.log(event.method, event.params.namespace, event.params.data);
}
To narrow to specific channels, open a subscribe on the thread:const sub = await thread.subscribe(["messages", "tools"]);
for await (const event of sub) {
// ...
}
Open a subscription that covers every channel, then send run.start on /commands as shown in the Quickstart:curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/stream/events \
--header 'Content-Type: application/json' \
--header 'x-api-key: <API_KEY>' \
--data '{"channels": ["values", "updates", "messages", "tools", "lifecycle", "input", "checkpoints", "tasks", "custom"]}'
The response is a stream of SSE frames. Each frame’s data: payload is a JSON ProtocolEvent; the SSE id: line carries the stable event_id.id: 01HZ...
data: {"seq":1,"method":"lifecycle","params":{"namespace":[],"timestamp":1736...,"data":{"event":"started"}}}
id: 01HZ...
data: {"seq":2,"method":"messages","params":{"namespace":[],"timestamp":1736...,"data":{"event":"message-start","message":{...}}}}
Each event is a ProtocolEvent envelope wrapping a channel-specific payload:
from typing import Any, NotRequired, TypedDict
class ProtocolEventParams(TypedDict):
namespace: list[str] # path of "<name>:<runtime_id>" segments; [] is the root
timestamp: int # wall-clock milliseconds; can drift, don't rely on for ordering
data: Any # channel-specific payload
class ProtocolEvent(TypedDict):
seq: int # strictly increasing within a run; use for ordering
method: str # channel name: "messages", "values", "tools", "lifecycle", "custom", ...
params: ProtocolEventParams
event_id: NotRequired[str] # stable dedup ID; mirrored to the SSE `id:` field
interface ProtocolEvent {
readonly seq: number; // strictly increasing within a run; use for ordering
readonly method: string; // channel name: "messages", "values", "tools", "lifecycle", "custom", ...
readonly params: {
readonly namespace: string[]; // path of "<name>:<runtime_id>" segments; [] is the root
readonly timestamp: number; // wall-clock milliseconds; can drift, don't rely on for ordering
readonly data: unknown; // channel-specific payload
};
readonly event_id?: string; // stable dedup ID; mirrored to the SSE `id:` field
}
Raw SSE frame:id: 01HZQ8XK5N6F9M2A3B4C5D6E7F
data: {"seq":42,"method":"messages","params":{"namespace":["researcher:6f4d"],"timestamp":1736283600123,"data":{"event":"content-block-delta","index":0,"delta":{"type":"text","text":"Hello"}}},"event_id":"01HZQ8XK5N6F9M2A3B4C5D6E7F"}
The SSE id: line mirrors event_id for client-side deduplication. The wire protocol does not use the Last-Event-ID header for resumption — the Agent Server buffers events per run and replays them from the start of the run on every new subscription, and the SDK dedupes by seq and event_id client-side.
The namespace is a path from the root graph to the scope that emitted the event. The root is the empty array. Each child execution adds one "name:runtime_id" segment, so a nested tool call inside a subgraph looks like ["researcher:6f4d", "tools:91ac"]. Filter raw events by namespace directly when only a specific subtree matters; thread.subgraphs already does this for nested graph executions.
Channels and event lifecycle
Raw events flow on channels. The channel name appears as the event’s method; each channel emits a specific event shape.
| Channel | Purpose |
|---|
values | Full graph state snapshots. |
updates | Per-node state deltas. |
messages | Content-block-centric chat model output. |
tools | Tool call start, streamed output, finish, and error events. |
lifecycle | Run, subgraph, and subagent status changes. |
checkpoints | Lightweight checkpoint envelopes for branching and time travel. |
input | Human-in-the-loop input requests and responses. |
tasks | Pregel task creation and result events. |
custom | User-defined payloads from graph code. |
custom:<name> | Application-defined stream transformer output. |
The typed projections (thread.messages, thread.values, thread.toolCalls, etc.) are built from these channels. The channel name appears as the method field on raw events when iterating the stream object directly.
Messages
The messages channel models output as content blocks. The data.event field is one of message-start, content-block-start, content-block-delta, content-block-finish, message-finish. Content blocks have explicit boundaries: a block starts, emits zero or more deltas, and finishes before the next block in the same message starts. message-finish may include token usage; unrecoverable model-call failures arrive as message error events.
The tools channel exposes tool execution. The data.event field is one of tool-started, tool-output-delta, tool-finished, tool-error. Tool events are correlated by tool call ID, so a tool execution can be joined back to its originating tool-call content block on the messages channel.
Lifecycle
The lifecycle channel tracks root run, subgraph, and subagent status. The data.event field is one of started, completed, failed, interrupted. Lifecycle data may include an optional graph_name, error, and cause describing why a child scope started (parent tool call, fan-out send, edge transition).
Resume from last event
Event streams are resumable. The Agent Server buffers events per run, assigns each a strictly increasing seq and stable event_id, and replays from a cursor on reconnect. The SDK handles transient drops automatically: each open subscription tracks its highest observed seq, and on reconnect the SDK replays from that cursor and dedupes any events the server resends.
To resume across a process boundary—a page reload, a worker handoff, or a separate client—reopen the thread with the same thread_id. The server replays buffered events from the start of the run when a new subscription opens, and the SDK demultiplexes them into the same typed projections.
async with client.threads.stream(
thread_id=thread_id,
assistant_id="agent",
) as thread:
async for event in thread:
print(event["method"], event.get("event_id"))
const thread = client.threads.stream(threadId, { assistantId: "agent" });
for await (const event of thread) {
console.log(event.method, event.event_id);
}
Reopen the subscription to receive the replayed events:curl --request POST \
--url <DEPLOYMENT_URL>/threads/<THREAD_ID>/stream/events \
--header 'Content-Type: application/json' \
--header 'x-api-key: <API_KEY>' \
--data '{"channels": ["values", "updates", "messages", "tools", "lifecycle", "input", "checkpoints", "tasks", "custom"]}'
Cursor handling (since) is an internal protocol detail used by the SDK on reconnect; it is not exposed as a parameter on the public stream-open call.
The wire-level event and command formats are defined in the Agent Protocol repository.