Make streaming implementation trio-compatible (#84)

## Summary
- Replace asyncio.create_task() with anyio task group for trio
compatibility
- Update client.py docstring example to use anyio.sleep
- Add trio example demonstrating multi-turn conversation

## Details
The SDK already uses anyio for most async operations, but one line was
using asyncio.create_task() which broke trio compatibility. This PR
fixes that by using anyio's task group API with proper lifecycle
management.

### Changes:
1. **subprocess_cli.py**: Replace asyncio.create_task() with anyio task
group, ensuring proper cleanup on disconnect
2. **client.py**: Update docstring example to use anyio.sleep instead of
asyncio.sleep
3. **streaming_mode_trio.py**: Add new example showing how to use the
SDK with trio

## Test plan
- [x] All existing tests pass
- [x] Manually tested with trio runtime (created test script that
successfully runs multi-turn conversation)
- [x] Linting and type checking pass

🤖 Generated with [Claude Code](https://claude.ai/code)
This commit is contained in:
Dickson Tsai
2025-07-22 23:31:42 -07:00
committed by GitHub
parent b25cdf81c9
commit 5f8351fce9
3 changed files with 91 additions and 4 deletions

View File

@@ -0,0 +1,80 @@
#!/usr/bin/env python3
"""
Example of multi-turn conversation using trio with the Claude SDK.
This demonstrates how to use the ClaudeSDKClient with trio for interactive,
stateful conversations where you can send follow-up messages based on
Claude's responses.
"""
import trio
from claude_code_sdk import (
AssistantMessage,
ClaudeCodeOptions,
ClaudeSDKClient,
ResultMessage,
SystemMessage,
TextBlock,
UserMessage,
)
def display_message(msg):
"""Standardized message display function.
- UserMessage: "User: <content>"
- AssistantMessage: "Claude: <content>"
- SystemMessage: ignored
- ResultMessage: "Result ended" + cost if available
"""
if isinstance(msg, UserMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"User: {block.text}")
elif isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
elif isinstance(msg, SystemMessage):
# Ignore system messages
pass
elif isinstance(msg, ResultMessage):
print("Result ended")
async def multi_turn_conversation():
"""Example of a multi-turn conversation using trio."""
async with ClaudeSDKClient(
options=ClaudeCodeOptions(model="claude-3-5-sonnet-20241022")
) as client:
print("=== Multi-turn Conversation with Trio ===\n")
# First turn: Simple math question
print("User: What's 15 + 27?")
await client.query("What's 15 + 27?")
async for message in client.receive_response():
display_message(message)
print()
# Second turn: Follow-up calculation
print("User: Now multiply that result by 2")
await client.query("Now multiply that result by 2")
async for message in client.receive_response():
display_message(message)
print()
# Third turn: One more operation
print("User: Divide that by 7 and round to 2 decimal places")
await client.query("Divide that by 7 and round to 2 decimal places")
async for message in client.receive_response():
display_message(message)
print("\nConversation complete!")
if __name__ == "__main__":
trio.run(multi_turn_conversation)

View File

@@ -45,6 +45,7 @@ class SubprocessCLITransport(Transport):
self._pending_control_responses: dict[str, dict[str, Any]] = {}
self._request_counter = 0
self._close_stdin_after_prompt = close_stdin_after_prompt
self._task_group: anyio.abc.TaskGroup | None = None
def _find_cli(self) -> str:
"""Find Claude Code CLI binary."""
@@ -160,9 +161,9 @@ class SubprocessCLITransport(Transport):
if self._process.stdin:
self._stdin_stream = TextSendStream(self._process.stdin)
# Start streaming messages to stdin in background
import asyncio
asyncio.create_task(self._stream_to_stdin())
self._task_group = anyio.create_task_group()
await self._task_group.__aenter__()
self._task_group.start_soon(self._stream_to_stdin)
else:
# String mode: close stdin immediately (backward compatible)
if self._process.stdin:
@@ -183,6 +184,12 @@ class SubprocessCLITransport(Transport):
if not self._process:
return
# Cancel task group if it exists
if self._task_group:
self._task_group.cancel_scope.cancel()
await self._task_group.__aexit__(None, None, None)
self._task_group = None
if self._process.returncode is None:
try:
self._process.terminate()

View File

@@ -63,7 +63,7 @@ class ClaudeSDKClient:
await client.query("Count to 1000")
# Interrupt after 2 seconds
await asyncio.sleep(2)
await anyio.sleep(2)
await client.interrupt()
# Send new instruction