Originally published at kalyna.pro
Streaming sends Claude's response token by token as it's generated, instead of waiting for the full completion before showing anything. For a chat UI this is the difference between a user staring at a spinner for several seconds and seeing the first words appear within a few hundred milliseconds. The Claude API Tutorial introduces the basic stream.text_stream helper — this guide covers the full picture: the raw event stream, async streaming, error handling, and a complete FastAPI endpoint that streams Claude's output to a browser.
Prerequisites
pip install anthropic
# for the API endpoint example later:
pip install fastapi uvicorn
The Simple Way: text_stream
from anthropic import Anthropic
client = Anthropic()
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a haiku about debugging."}],
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
final_message = stream.get_final_message()
print(f"\n\nstop_reason: {final_message.stop_reason}")
print(f"output tokens: {final_message.usage.output_tokens}")
stream.get_final_message() returns the same Message object you'd get from a non-streaming call — complete content, stop_reason, and usage — without manually reassembling it from chunks.
The Raw Event Stream
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a haiku about debugging."}],
) as stream:
for event in stream:
print(event.type)
Event types, in order:
-
message_start— initialMessageshell withusage.input_tokens -
content_block_start— a new content block begins (text,tool_use, etc.) -
content_block_delta— incremental content:text_delta(.text),input_json_delta(.partial_json, for tool inputs), orthinking_delta -
content_block_stop— the block is complete -
message_delta—stop_reasonand updatedusage.output_tokens -
message_stop— stream finished
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a haiku about debugging."}],
) as stream:
for event in stream:
if event.type == "content_block_delta" and event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
elif event.type == "message_delta":
print(f"\n[tokens so far: {event.usage.output_tokens}]", end="")
Async Streaming
import asyncio
from anthropic import AsyncAnthropic
client = AsyncAnthropic()
async def main():
async with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a haiku about debugging."}],
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
asyncio.run(main())
Building a Streaming API Endpoint (FastAPI + SSE)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from anthropic import AsyncAnthropic
app = FastAPI()
client = AsyncAnthropic()
@app.get("/chat")
async def chat(message: str):
async def event_stream():
async with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": message}],
) as stream:
async for text in stream.text_stream:
yield f"data: {text}\n\n"
yield "event: done\ndata: {}\n\n"
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
X-Accel-Buffering: no stops nginx from buffering the whole response — without it, "streaming" arrives in one burst at the end. On the frontend, read with fetch + a ReadableStream reader, or EventSource for GET endpoints.
Handling Errors and Interruptions
import anthropic
try:
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a haiku about debugging."}],
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
except anthropic.APIConnectionError:
print("\n[connection lost — showing partial response]")
except anthropic.RateLimitError:
print("\n[rate limited — retry shortly]")
except anthropic.APIStatusError as e:
print(f"\n[API error {e.status_code}]")
If the client disconnects mid-response, exit the generator early so the SDK closes the stream — this stops billing for output tokens generated into the void. For long generations, check await request.is_disconnected() periodically and break if true.
Streaming with Tool Use
Text still arrives via text_delta, tool arguments arrive incrementally via input_json_delta, and stream.get_final_message() gives fully-parsed tool_use blocks once the stream ends. See Claude API Function Calling for the complete tool-use loop — it works unchanged whether calls are streamed or not.
Best Practices
- Use
get_final_message()forstop_reason/usageinstead of accumulatingmessage_deltamanually - Use
AsyncAnthropicin web backends — a sync stream blocks the event loop - Set
Cache-Control: no-cacheandX-Accel-Buffering: nofor SSE behind a proxy - Detect client disconnects and stop generation early
- Streaming doesn't change pricing — tokens are billed the same either way
- Handle
APIConnectionError,RateLimitError, andAPIStatusErrorexplicitly
Summary
-
stream.text_streamyields plain text chunks for display - Raw events:
message_start,content_block_start,content_block_delta,content_block_stop,message_delta,message_stop -
get_final_message()returns the completeMessageafter streaming -
AsyncAnthropic+async with/async forfor non-blocking backends - FastAPI
StreamingResponse+ async generator → SSE to the browser - Tool use streams the same way;
input_json_deltacarries tool arguments
Further reading: