diff --git a/examples/streaming_mode.py b/examples/streaming_mode.py old mode 100644 new mode 100755 index 239024d..73eb410 --- a/examples/streaming_mode.py +++ b/examples/streaming_mode.py @@ -4,10 +4,20 @@ Comprehensive examples of using ClaudeSDKClient for streaming mode. This file demonstrates various patterns for building applications with the ClaudeSDKClient streaming interface. + +The queries are intentionally simplistic. In reality, a query can be a more +complex task that Claude SDK uses its agentic capabilities and tools (e.g. run +bash commands, edit files, search the web, fetch web content) to accomplish. + +Usage: +./examples/streaming_mode.py - List the examples +./examples/streaming_mode.py all - Run all examples +./examples/streaming_mode.py basic_streaming - Run a specific example """ import asyncio import contextlib +import sys from claude_code_sdk import ( AssistantMessage, @@ -15,28 +25,48 @@ from claude_code_sdk import ( ClaudeSDKClient, CLIConnectionError, ResultMessage, + SystemMessage, TextBlock, + UserMessage, ) +def display_message(msg): + """Standardized message display function. + + - UserMessage: "User: " + - AssistantMessage: "Claude: " + - SystemMessage: ignored + - ResultMessage: "Result ended" + cost if available + """ + if isinstance(msg, UserMessage): + for block in msg.content: + if isinstance(block, TextBlock): + print(f"User: {block.text}") + elif isinstance(msg, AssistantMessage): + for block in msg.content: + if isinstance(block, TextBlock): + print(f"Claude: {block.text}") + elif isinstance(msg, SystemMessage): + # Ignore system messages + pass + elif isinstance(msg, ResultMessage): + print("Result ended") + + async def example_basic_streaming(): """Basic streaming with context manager.""" print("=== Basic Streaming Example ===") async with ClaudeSDKClient() as client: - # Send a message - await client.send_message("What is 2+2?") + print("User: What is 2+2?") + await client.query("What is 2+2?") # Receive complete response using the helper method async for msg in client.receive_response(): - if isinstance(msg, AssistantMessage): - for block in msg.content: - if isinstance(block, TextBlock): - print(f"Claude: {block.text}") - elif isinstance(msg, ResultMessage) and msg.total_cost_usd: - print(f"Cost: ${msg.total_cost_usd:.4f}") + display_message(msg) - print("Session ended\n") + print("\n") async def example_multi_turn_conversation(): @@ -46,26 +76,20 @@ async def example_multi_turn_conversation(): async with ClaudeSDKClient() as client: # First turn print("User: What's the capital of France?") - await client.send_message("What's the capital of France?") + await client.query("What's the capital of France?") # Extract and print response async for msg in client.receive_response(): - content_blocks = getattr(msg, 'content', []) - for block in content_blocks: - if isinstance(block, TextBlock): - print(f"{block.text}") + display_message(msg) # Second turn - follow-up print("\nUser: What's the population of that city?") - await client.send_message("What's the population of that city?") + await client.query("What's the population of that city?") async for msg in client.receive_response(): - content_blocks = getattr(msg, 'content', []) - for block in content_blocks: - if isinstance(block, TextBlock): - print(f"{block.text}") + display_message(msg) - print("\nConversation ended\n") + print("\n") async def example_concurrent_responses(): @@ -76,10 +100,7 @@ async def example_concurrent_responses(): # Background task to continuously receive messages async def receive_messages(): async for message in client.receive_messages(): - if isinstance(message, AssistantMessage): - for block in message.content: - if isinstance(block, TextBlock): - print(f"Claude: {block.text}") + display_message(message) # Start receiving in background receive_task = asyncio.create_task(receive_messages()) @@ -93,7 +114,7 @@ async def example_concurrent_responses(): for question in questions: print(f"\nUser: {question}") - await client.send_message(question) + await client.query(question) await asyncio.sleep(3) # Wait between messages # Give time for final responses @@ -104,7 +125,7 @@ async def example_concurrent_responses(): with contextlib.suppress(asyncio.CancelledError): await receive_task - print("\nSession ended\n") + print("\n") async def example_with_interrupt(): @@ -115,7 +136,7 @@ async def example_with_interrupt(): async with ClaudeSDKClient() as client: # Start a long-running task print("\nUser: Count from 1 to 100 slowly") - await client.send_message( + await client.query( "Count from 1 to 100 slowly, with a brief pause between each number" ) @@ -132,10 +153,10 @@ async def example_with_interrupt(): if isinstance(block, TextBlock): # Print first few numbers print(f"Claude: {block.text[:50]}...") - - # Stop when we get a result after interrupt - if isinstance(message, ResultMessage) and interrupt_sent: - break + elif isinstance(message, ResultMessage): + display_message(message) + if interrupt_sent: + break # Start consuming messages in the background consume_task = asyncio.create_task(consume_messages()) @@ -151,16 +172,13 @@ async def example_with_interrupt(): # Send new instruction after interrupt print("\nUser: Never mind, just tell me a quick joke") - await client.send_message("Never mind, just tell me a quick joke") + await client.query("Never mind, just tell me a quick joke") # Get the joke async for msg in client.receive_response(): - if isinstance(msg, AssistantMessage): - for block in msg.content: - if isinstance(block, TextBlock): - print(f"Claude: {block.text}") + display_message(msg) - print("\nSession ended\n") + print("\n") async def example_manual_message_handling(): @@ -168,7 +186,7 @@ async def example_manual_message_handling(): print("=== Manual Message Handling Example ===") async with ClaudeSDKClient() as client: - await client.send_message( + await client.query( "List 5 programming languages and their main use cases" ) @@ -180,6 +198,7 @@ async def example_manual_message_handling(): for block in message.content: if isinstance(block, TextBlock): text = block.text + print(f"Claude: {text}") # Custom logic: extract language names for lang in [ "Python", @@ -193,12 +212,12 @@ async def example_manual_message_handling(): if lang in text and lang not in languages_found: languages_found.append(lang) print(f"Found language: {lang}") - elif isinstance(message, ResultMessage): - print(f"\nTotal languages mentioned: {len(languages_found)}") + display_message(message) + print(f"Total languages mentioned: {len(languages_found)}") break - print("\nSession ended\n") + print("\n") async def example_with_options(): @@ -213,23 +232,75 @@ async def example_with_options(): ) async with ClaudeSDKClient(options=options) as client: - await client.send_message( + print("User: Create a simple hello.txt file with a greeting message") + await client.query( "Create a simple hello.txt file with a greeting message" ) tool_uses = [] async for msg in client.receive_response(): if isinstance(msg, AssistantMessage): + display_message(msg) for block in msg.content: - if isinstance(block, TextBlock): - print(f"Claude: {block.text}") - elif hasattr(block, "name"): # ToolUseBlock + if hasattr(block, "name") and not isinstance( + block, TextBlock + ): # ToolUseBlock tool_uses.append(getattr(block, "name", "")) + else: + display_message(msg) if tool_uses: - print(f"\nTools used: {', '.join(tool_uses)}") + print(f"Tools used: {', '.join(tool_uses)}") - print("\nSession ended\n") + print("\n") + + +async def example_async_iterable_prompt(): + """Demonstrate send_message with async iterable.""" + print("=== Async Iterable Prompt Example ===") + + async def create_message_stream(): + """Generate a stream of messages.""" + print("User: Hello! I have multiple questions.") + yield { + "type": "user", + "message": {"role": "user", "content": "Hello! I have multiple questions."}, + "parent_tool_use_id": None, + "session_id": "qa-session", + } + + print("User: First, what's the capital of Japan?") + yield { + "type": "user", + "message": { + "role": "user", + "content": "First, what's the capital of Japan?", + }, + "parent_tool_use_id": None, + "session_id": "qa-session", + } + + print("User: Second, what's 15% of 200?") + yield { + "type": "user", + "message": {"role": "user", "content": "Second, what's 15% of 200?"}, + "parent_tool_use_id": None, + "session_id": "qa-session", + } + + async with ClaudeSDKClient() as client: + # Send async iterable of messages + await client.query(create_message_stream()) + + # Receive the three responses + async for msg in client.receive_response(): + display_message(msg) + async for msg in client.receive_response(): + display_message(msg) + async for msg in client.receive_response(): + display_message(msg) + + print("\n") async def example_error_handling(): @@ -242,7 +313,8 @@ async def example_error_handling(): await client.connect() # Send a message that will take time to process - await client.send_message("Run a bash sleep command for 60 seconds") + print("User: Run a bash sleep command for 60 seconds") + await client.query("Run a bash sleep command for 60 seconds") # Try to receive response with a short timeout try: @@ -255,11 +327,13 @@ async def example_error_handling(): if isinstance(block, TextBlock): print(f"Claude: {block.text[:50]}...") elif isinstance(msg, ResultMessage): - print("Received complete response") + display_message(msg) break except asyncio.TimeoutError: - print("\nResponse timeout after 10 seconds - demonstrating graceful handling") + print( + "\nResponse timeout after 10 seconds - demonstrating graceful handling" + ) print(f"Received {len(messages)} messages before timeout") except CLIConnectionError as e: @@ -272,24 +346,48 @@ async def example_error_handling(): # Always disconnect await client.disconnect() - print("\nSession ended\n") + print("\n") async def main(): - """Run all examples.""" - examples = [ - example_basic_streaming, - example_multi_turn_conversation, - example_concurrent_responses, - example_with_interrupt, - example_manual_message_handling, - example_with_options, - example_error_handling, - ] + """Run all examples or a specific example based on command line argument.""" + examples = { + "basic_streaming": example_basic_streaming, + "multi_turn_conversation": example_multi_turn_conversation, + "concurrent_responses": example_concurrent_responses, + "with_interrupt": example_with_interrupt, + "manual_message_handling": example_manual_message_handling, + "with_options": example_with_options, + "async_iterable_prompt": example_async_iterable_prompt, + "error_handling": example_error_handling, + } - for example in examples: - await example() - print("-" * 50 + "\n") + if len(sys.argv) < 2: + # List available examples + print("Usage: python streaming_mode.py ") + print("\nAvailable examples:") + print(" all - Run all examples") + for name in examples: + print(f" {name}") + sys.exit(0) + + example_name = sys.argv[1] + + if example_name == "all": + # Run all examples + for example in examples.values(): + await example() + print("-" * 50 + "\n") + elif example_name in examples: + # Run specific example + await examples[example_name]() + else: + print(f"Error: Unknown example '{example_name}'") + print("\nAvailable examples:") + print(" all - Run all examples") + for name in examples: + print(f" {name}") + sys.exit(1) if __name__ == "__main__": diff --git a/examples/streaming_mode_ipython.py b/examples/streaming_mode_ipython.py index 6b2b554..7265afa 100644 --- a/examples/streaming_mode_ipython.py +++ b/examples/streaming_mode_ipython.py @@ -4,6 +4,10 @@ IPython-friendly code snippets for ClaudeSDKClient streaming mode. These examples are designed to be copy-pasted directly into IPython. Each example is self-contained and can be run independently. + +The queries are intentionally simplistic. In reality, a query can be a more +complex task that Claude SDK uses its agentic capabilities and tools (e.g. run +bash commands, edit files, search the web, fetch web content) to accomplish. """ # ============================================================================ @@ -13,15 +17,14 @@ Each example is self-contained and can be run independently. from claude_code_sdk import ClaudeSDKClient, AssistantMessage, TextBlock, ResultMessage async with ClaudeSDKClient() as client: - await client.send_message("What is 2+2?") + print("User: What is 2+2?") + await client.query("What is 2+2?") async for msg in client.receive_response(): if isinstance(msg, AssistantMessage): for block in msg.content: if isinstance(block, TextBlock): print(f"Claude: {block.text}") - elif isinstance(msg, ResultMessage) and msg.total_cost_usd: - print(f"Cost: ${msg.total_cost_usd:.4f}") # ============================================================================ @@ -33,7 +36,8 @@ from claude_code_sdk import ClaudeSDKClient, AssistantMessage, TextBlock async with ClaudeSDKClient() as client: async def send_and_receive(prompt): - await client.send_message(prompt) + print(f"User: {prompt}") + await client.query(prompt) async for msg in client.receive_response(): if isinstance(msg, AssistantMessage): for block in msg.content: @@ -66,10 +70,12 @@ async def get_response(): # Use it multiple times -await client.send_message("What's 2+2?") +print("User: What's 2+2?") +await client.query("What's 2+2?") await get_response() -await client.send_message("What's 10*10?") +print("User: What's 10*10?") +await client.query("What's 10*10?") await get_response() # Don't forget to disconnect when done @@ -89,7 +95,8 @@ async with ClaudeSDKClient() as client: print("\n--- Sending initial message ---\n") # Send a long-running task - await client.send_message("Count from 1 to 100 slowly using bash sleep") + print("User: Count from 1 to 100, run bash sleep for 1 second in between") + await client.query("Count from 1 to 100, run bash sleep for 1 second in between") # Create a background task to consume messages messages_received = [] @@ -121,7 +128,7 @@ async with ClaudeSDKClient() as client: # Send a new message after interrupt print("\n--- After interrupt, sending new message ---\n") - await client.send_message("Just say 'Hello! I was interrupted.'") + await client.query("Just say 'Hello! I was interrupted.'") async for msg in client.receive_response(): if isinstance(msg, AssistantMessage): @@ -138,7 +145,8 @@ from claude_code_sdk import ClaudeSDKClient, AssistantMessage, TextBlock try: async with ClaudeSDKClient() as client: - await client.send_message("Run a bash sleep command for 60 seconds") + print("User: Run a bash sleep command for 60 seconds") + await client.query("Run a bash sleep command for 60 seconds") # Timeout after 20 seconds messages = [] @@ -156,6 +164,47 @@ except Exception as e: print(f"Error: {e}") +# ============================================================================ +# SENDING ASYNC ITERABLE OF MESSAGES +# ============================================================================ + +from claude_code_sdk import ClaudeSDKClient, AssistantMessage, TextBlock + +async def message_generator(): + """Generate multiple messages as an async iterable.""" + print("User: I have two math questions.") + yield { + "type": "user", + "message": {"role": "user", "content": "I have two math questions."}, + "parent_tool_use_id": None, + "session_id": "math-session" + } + print("User: What is 25 * 4?") + yield { + "type": "user", + "message": {"role": "user", "content": "What is 25 * 4?"}, + "parent_tool_use_id": None, + "session_id": "math-session" + } + print("User: What is 100 / 5?") + yield { + "type": "user", + "message": {"role": "user", "content": "What is 100 / 5?"}, + "parent_tool_use_id": None, + "session_id": "math-session" + } + +async with ClaudeSDKClient() as client: + # Send async iterable instead of string + await client.query(message_generator()) + + async for msg in client.receive_response(): + if isinstance(msg, AssistantMessage): + for block in msg.content: + if isinstance(block, TextBlock): + print(f"Claude: {block.text}") + + # ============================================================================ # COLLECTING ALL MESSAGES INTO A LIST # ============================================================================ @@ -163,7 +212,8 @@ except Exception as e: from claude_code_sdk import ClaudeSDKClient, AssistantMessage, TextBlock, ResultMessage async with ClaudeSDKClient() as client: - await client.send_message("What are the primary colors?") + print("User: What are the primary colors?") + await client.query("What are the primary colors?") # Collect all messages into a list messages = [msg async for msg in client.receive_response()] @@ -176,5 +226,3 @@ async with ClaudeSDKClient() as client: print(f"Claude: {block.text}") elif isinstance(msg, ResultMessage): print(f"Total messages: {len(messages)}") - if msg.total_cost_usd: - print(f"Cost: ${msg.total_cost_usd:.4f}") diff --git a/src/claude_code_sdk/client.py b/src/claude_code_sdk/client.py index dbf9aa6..3c24cb1 100644 --- a/src/claude_code_sdk/client.py +++ b/src/claude_code_sdk/client.py @@ -42,7 +42,7 @@ class ClaudeSDKClient: # Automatically connects with empty stream for interactive use async with ClaudeSDKClient() as client: # Send initial message - await client.send_message("Let's solve a math problem step by step") + await client.query("Let's solve a math problem step by step") # Receive and process response async for message in client.receive_messages(): @@ -50,7 +50,7 @@ class ClaudeSDKClient: break # Send follow-up based on response - await client.send_message("What's 15% of 80?") + await client.query("What's 15% of 80?") # Continue conversation... # Automatically disconnects @@ -60,14 +60,14 @@ class ClaudeSDKClient: ```python async with ClaudeSDKClient() as client: # Start a long task - await client.send_message("Count to 1000") + await client.query("Count to 1000") # Interrupt after 2 seconds await asyncio.sleep(2) await client.interrupt() # Send new instruction - await client.send_message("Never mind, what's 2+2?") + await client.query("Never mind, what's 2+2?") ``` Example - Manual connection: @@ -81,7 +81,7 @@ class ClaudeSDKClient: await client.connect(message_stream()) # Send additional messages dynamically - await client.send_message("What's the weather?") + await client.query("What's the weather?") async for message in client.receive_messages(): print(message) @@ -128,9 +128,9 @@ class ClaudeSDKClient: if message: yield message - async def send_message(self, prompt: str | AsyncIterable[dict[str, Any]], session_id: str = "default") -> None: + async def query(self, prompt: str | AsyncIterable[dict[str, Any]], session_id: str = "default") -> None: """ - Send a new message in streaming mode. + Send a new request in streaming mode. Args: prompt: Either a string message or an async iterable of message dictionaries @@ -186,7 +186,7 @@ class ClaudeSDKClient: Example: ```python async with ClaudeSDKClient() as client: - await client.send_message("What's the capital of France?") + await client.query("What's the capital of France?") async for msg in client.receive_response(): if isinstance(msg, AssistantMessage): diff --git a/tests/test_streaming_client.py b/tests/test_streaming_client.py index 49a5291..884d7c4 100644 --- a/tests/test_streaming_client.py +++ b/tests/test_streaming_client.py @@ -116,8 +116,8 @@ class TestClaudeSDKClientStreaming: anyio.run(_test) - def test_send_message(self): - """Test sending a message.""" + def test_query(self): + """Test sending a query.""" async def _test(): with patch( @@ -127,7 +127,7 @@ class TestClaudeSDKClientStreaming: mock_transport_class.return_value = mock_transport async with ClaudeSDKClient() as client: - await client.send_message("Test message") + await client.query("Test message") # Verify send_request was called with correct format mock_transport.send_request.assert_called_once() @@ -151,7 +151,7 @@ class TestClaudeSDKClientStreaming: mock_transport_class.return_value = mock_transport async with ClaudeSDKClient() as client: - await client.send_message("Test", session_id="custom-session") + await client.query("Test", session_id="custom-session") call_args = mock_transport.send_request.call_args messages, options = call_args[0] @@ -166,7 +166,7 @@ class TestClaudeSDKClientStreaming: async def _test(): client = ClaudeSDKClient() with pytest.raises(CLIConnectionError, match="Not connected"): - await client.send_message("Test") + await client.query("Test") anyio.run(_test) @@ -360,7 +360,7 @@ class TestClaudeSDKClientStreaming: receive_task = asyncio.create_task(get_next_message()) # Send message while receiving - await client.send_message("Question 1") + await client.query("Question 1") # Wait for first message first_msg = await receive_task