Add support for streaming partial messages via include_partial_messages option (#168)

## Summary
- Adds support for streaming partial messages through the
`include_partial_messages` option
- Introduces `StreamEvent` message type to handle Anthropic API stream
events
- Enables real-time streaming of Claude's responses for building
interactive UIs

## Changes
- Added `StreamEvent` dataclass with proper structure matching
TypeScript SDK (uuid, session_id, event, parent_tool_use_id)
- Added `include_partial_messages` boolean option to `ClaudeCodeOptions`
- Updated message parser to handle `stream_event` message type
- Updated subprocess CLI transport to pass `--include-partial-messages`
flag when enabled
- Added example demonstrating partial message streaming usage

## Test plan
- [x] Verified CLI flag is passed correctly when
`include_partial_messages=True`
- [x] Confirmed `StreamEvent` structure matches TypeScript SDK
implementation
- [x] Added test for user parameter in transport
- [x] Example runs successfully with streaming events

🤖 Generated with [Claude Code](https://claude.ai/code)

---------

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Dickson Tsai
2025-09-17 16:32:21 -07:00
committed by GitHub
parent 607921dcb0
commit f550c21a7e
5 changed files with 247 additions and 1 deletions

View File

@@ -0,0 +1,154 @@
"""End-to-end tests for include_partial_messages option with real Claude API calls.
These tests verify that the SDK properly handles partial message streaming,
including StreamEvent parsing and message interleaving.
"""
import asyncio
from typing import List, Any
import pytest
from claude_code_sdk import ClaudeSDKClient
from claude_code_sdk.types import (
ClaudeCodeOptions,
StreamEvent,
AssistantMessage,
SystemMessage,
ResultMessage,
ThinkingBlock,
TextBlock,
)
@pytest.mark.e2e
@pytest.mark.asyncio
async def test_include_partial_messages_stream_events():
"""Test that include_partial_messages produces StreamEvent messages."""
options = ClaudeCodeOptions(
include_partial_messages=True,
model="claude-sonnet-4-20250514",
max_turns=2,
env={
"MAX_THINKING_TOKENS": "8000",
},
)
collected_messages: List[Any] = []
async with ClaudeSDKClient(options) as client:
# Send a simple prompt that will generate streaming response with thinking
await client.query("Think of three jokes, then tell one")
async for message in client.receive_response():
collected_messages.append(message)
# Verify we got the expected message types
message_types = [type(msg).__name__ for msg in collected_messages]
# Should have SystemMessage(init) at the start
assert message_types[0] == "SystemMessage"
assert isinstance(collected_messages[0], SystemMessage)
assert collected_messages[0].subtype == "init"
# Should have multiple StreamEvent messages
stream_events = [msg for msg in collected_messages if isinstance(msg, StreamEvent)]
assert len(stream_events) > 0, "No StreamEvent messages received"
# Check for expected StreamEvent types
event_types = [event.event.get("type") for event in stream_events]
assert "message_start" in event_types, "No message_start StreamEvent"
assert "content_block_start" in event_types, "No content_block_start StreamEvent"
assert "content_block_delta" in event_types, "No content_block_delta StreamEvent"
assert "content_block_stop" in event_types, "No content_block_stop StreamEvent"
assert "message_stop" in event_types, "No message_stop StreamEvent"
# Should have AssistantMessage messages with thinking and text
assistant_messages = [msg for msg in collected_messages if isinstance(msg, AssistantMessage)]
assert len(assistant_messages) >= 1, "No AssistantMessage received"
# Check for thinking block in at least one AssistantMessage
has_thinking = any(
any(isinstance(block, ThinkingBlock) for block in msg.content)
for msg in assistant_messages
)
assert has_thinking, "No ThinkingBlock found in AssistantMessages"
# Check for text block (the joke) in at least one AssistantMessage
has_text = any(
any(isinstance(block, TextBlock) for block in msg.content)
for msg in assistant_messages
)
assert has_text, "No TextBlock found in AssistantMessages"
# Should end with ResultMessage
assert isinstance(collected_messages[-1], ResultMessage)
assert collected_messages[-1].subtype == "success"
@pytest.mark.e2e
@pytest.mark.asyncio
async def test_include_partial_messages_thinking_deltas():
"""Test that thinking content is streamed incrementally via deltas."""
options = ClaudeCodeOptions(
include_partial_messages=True,
model="claude-sonnet-4-20250514",
max_turns=2,
env={
"MAX_THINKING_TOKENS": "8000",
},
)
thinking_deltas = []
async with ClaudeSDKClient(options) as client:
await client.query("Think step by step about what 2 + 2 equals")
async for message in client.receive_response():
if isinstance(message, StreamEvent):
event = message.event
if event.get("type") == "content_block_delta":
delta = event.get("delta", {})
if delta.get("type") == "thinking_delta":
thinking_deltas.append(delta.get("thinking", ""))
# Should have received multiple thinking deltas
assert len(thinking_deltas) > 0, "No thinking deltas received"
# Combined thinking should form coherent text
combined_thinking = "".join(thinking_deltas)
assert len(combined_thinking) > 10, "Thinking content too short"
# Should contain some reasoning about the calculation
assert "2" in combined_thinking.lower(), "Thinking doesn't mention the numbers"
@pytest.mark.e2e
@pytest.mark.asyncio
async def test_partial_messages_disabled_by_default():
"""Test that partial messages are not included when option is not set."""
options = ClaudeCodeOptions(
# include_partial_messages not set (defaults to False)
model="claude-sonnet-4-20250514",
max_turns=2,
)
collected_messages: List[Any] = []
async with ClaudeSDKClient(options) as client:
await client.query("Say hello")
async for message in client.receive_response():
collected_messages.append(message)
# Should NOT have any StreamEvent messages
stream_events = [msg for msg in collected_messages if isinstance(msg, StreamEvent)]
assert len(stream_events) == 0, "StreamEvent messages present when partial messages disabled"
# Should still have the regular messages
assert any(isinstance(msg, SystemMessage) for msg in collected_messages)
assert any(isinstance(msg, AssistantMessage) for msg in collected_messages)
assert any(isinstance(msg, ResultMessage) for msg in collected_messages)

View File

@@ -0,0 +1,62 @@
#!/usr/bin/env python3
"""
Example of using the "include_partial_messages" option to stream partial messages
from Claude Code SDK.
This feature allows you to receive stream events that contain incremental
updates as Claude generates responses. This is useful for:
- Building real-time UIs that show text as it's being generated
- Monitoring tool use progress
- Getting early results before the full response is complete
Note: Partial message streaming requires the CLI to support it, and the
messages will include StreamEvent messages interspersed with regular messages.
"""
import asyncio
from claude_code_sdk import ClaudeSDKClient
from claude_code_sdk.types import (
ClaudeCodeOptions,
StreamEvent,
AssistantMessage,
UserMessage,
SystemMessage,
ResultMessage,
)
async def main():
# Enable partial message streaming
options = ClaudeCodeOptions(
include_partial_messages=True,
model="claude-sonnet-4-20250514",
max_turns=2,
env={
"MAX_THINKING_TOKENS": "8000",
},
)
client = ClaudeSDKClient(options)
try:
await client.connect()
# Send a prompt that will generate a streaming response
# prompt = "Run a bash command to sleep for 5 seconds"
prompt = "Think of three jokes, then tell one"
print(f"Prompt: {prompt}\n")
print("=" * 50)
await client.query(prompt)
async for message in client.receive_response():
print(message)
finally:
await client.disconnect()
if __name__ == "__main__":
print("Partial Message Streaming Example")
print("=" * 50)
asyncio.run(main())

View File

@@ -9,6 +9,7 @@ from ..types import (
ContentBlock,
Message,
ResultMessage,
StreamEvent,
SystemMessage,
TextBlock,
ThinkingBlock,
@@ -154,5 +155,18 @@ def parse_message(data: dict[str, Any]) -> Message:
f"Missing required field in result message: {e}", data
) from e
case "stream_event":
try:
return StreamEvent(
uuid=data["uuid"],
session_id=data["session_id"],
event=data["event"],
parent_tool_use_id=data.get("parent_tool_use_id"),
)
except KeyError as e:
raise MessageParseError(
f"Missing required field in stream_event message: {e}", data
) from e
case _:
raise MessageParseError(f"Unknown message type: {message_type}", data)

View File

@@ -150,6 +150,9 @@ class SubprocessCLITransport(Transport):
# String or Path format: pass directly as file path or JSON string
cmd.extend(["--mcp-config", str(self._options.mcp_servers)])
if self._options.include_partial_messages:
cmd.append("--include-partial-messages")
# Add extra args for future CLI flags
for flag, value in self._options.extra_args.items():
if value is None:

View File

@@ -265,7 +265,17 @@ class ResultMessage:
result: str | None = None
Message = UserMessage | AssistantMessage | SystemMessage | ResultMessage
@dataclass
class StreamEvent:
"""Stream event for partial message updates during streaming."""
uuid: str
session_id: str
event: dict[str, Any] # The raw Anthropic API stream event
parent_tool_use_id: str | None = None
Message = UserMessage | AssistantMessage | SystemMessage | ResultMessage | StreamEvent
@dataclass
@@ -302,6 +312,9 @@ class ClaudeCodeOptions:
user: str | None = None
# Partial message streaming support
include_partial_messages: bool = False
# SDK Control Protocol
class SDKControlInterruptRequest(TypedDict):