Add state preservation support to FunctionCallSpec and HumanContactSpec

What I did
-----------

- Enabled state preservation in `FunctionCallSpec` and `HumanContactSpec`, allowing applications to maintain context across request lifecycles without storing state.
- Updated documentation and examples to demonstrate the new state preservation feature.

How I did it
-----------

- Added an optional `state` field to the models.
- Created new examples and updated existing ones to showcase state usage.
- Wrote tests to verify state is preserved across requests.
- Refactored session management in `async_cloud.py` for simplicity.
- [x] I have ensured `make check test` passes

How to verify it
-----------

- Run `make check test` to ensure all tests pass.
- Review the `app-statehooks.py` example for state preservation without webhooks.
- Check the updated documentation under the **State Preservation** section.

Description for the changelog
--------------

- Add state preservation support to `FunctionCallSpec` and `HumanContactSpec`, enabling applications to maintain context across requests without storing state.
This commit is contained in:
dexhorthy
2024-12-18 13:39:09 -08:00
parent 848330becf
commit a3d286a7a7
13 changed files with 520 additions and 55 deletions

View File

@@ -9,15 +9,29 @@ Please provide the following information:
-->
### What I did
## What I did
### How I did it
<!--
A succint description of the user-facing changes, at most 2-3 bullet points.
Less about the code changes that happened, more about the outcomes this work drives
-->
## How I did it
<!--
Describe the work you did, tests you ran, changes you made, etc
-->
- [ ] I have ensured `make check test` passes
### How to verify it
## How to verify it
### Description for the changelog
<!--
Describe how to test this, which examples to run to verify it, or anything
else that would be helpful to folks exploring this work
-->
## Description for the changelog
<!--
Write a short (one line) summary that describes the changes in this

View File

@@ -41,7 +41,9 @@ To get started with agent webhooks, you'll need to:
### Email Payload
For email webhooks, the payload will be look like
The webhook payload models are defined in [models_agent_webhook.py](https://github.com/humanlayer/humanlayer/blob/main/humanlayer/core/models_agent_webhook.py).
For email webhooks, the payload will look like
<ParamField body="email_payload" type="object">
<Expandable title="EmailPayload">
@@ -91,27 +93,54 @@ For email webhooks, the payload will be look like
a JSON example might look like
```json
## State Preservation
When making function calls or human contacts, you can include a state object that will be preserved and returned in webhooks. This allows your application to be stateless while maintaining context across the request lifecycle.
Example:
```python
# Store the current state when requesting approval
function_call = FunctionCall(
run_id="run_123",
call_id="call_456",
spec=FunctionCallSpec(
fn="send_email",
kwargs={"to": "user@example.com"},
state={
"conversation_history": previous_messages,
"user_preferences": preferences,
}
)
)
# The state will be returned in the webhook payload
# allowing you to restore the conversation context
```
## Next Stepsjson
{
"from_address": "overworked-admin@coolcompany.com",
"to_address": "cool-agent@humanlayer.dev",
"subject": "FWD: help",
"body": "can you add a ticket for this?",
"message_id": "<1234567890@mailsrv.coolcompany.com>",
"raw_email": "This is a placeholder for a test email - if this were a real email, it would include headers, etc.",
"datetime": "2024-01-02T00:00:00Z",
"previous_thread": [
{
"from_address": "needy-employee@coolcompany.com",
"to_address": ["overworked-admin@coolcompany.com"],
"cc_address": [],
"subject": "help",
"content": "i need some help",
"raw_email": "i need some help",
"datetime": "2024-01-01T00:00:00Z"
}
]
"from_address": "overworked-admin@coolcompany.com",
"to_address": "cool-agent@humanlayer.dev",
"subject": "FWD: help",
"body": "can you add a ticket for this?",
"message_id": "<1234567890@mailsrv.coolcompany.com>",
"raw_email": "This is a placeholder for a test email - if this were a real email, it would include headers, etc.",
"datetime": "2024-01-02T00:00:00Z",
"previous_thread": [
{
"from_address": "needy-employee@coolcompany.com",
"to_address": ["overworked-admin@coolcompany.com"],
"cc_address": [],
"subject": "help",
"content": "i need some help",
"raw_email": "i need some help",
"datetime": "2024-01-01T00:00:00Z"
}
]
}
```
### Using Ngrok for local development
@@ -119,3 +148,4 @@ a JSON example might look like
1. Start your local API server and note the port it's running on
2. Install ngrok and run `ngrok http <port>`
3. Copy the ngrok URL and use it as the callback URL in the HumanLayer dashboard
```

View File

@@ -1,5 +1,5 @@
random_id = curl-example-$(shell date +%s)
HUMANLAYER_API_BASE = https://api.humanlayer.dev/humanlayer/v1
HUMANLAYER_API_BASE ?= https://api.humanlayer.dev/humanlayer/v1
.PHONY: check-api-key
check-api-key:
@@ -9,6 +9,27 @@ check-api-key:
check-jq:
@if ! command -v jq &> /dev/null; then echo "jq is not installed"; exit 1; fi
.PHONY: test-large-context
test-large-context: check-api-key check-jq
curl -s -X POST $(HUMANLAYER_API_BASE)/function_calls \
-H "Authorization: Bearer ${HUMANLAYER_API_KEY}" \
-H "Content-Type: application/json" \
-d '{ \
"run_id": "curl-example", \
"call_id": "$(random_id)", \
"spec": { \
"fn": "send_email", \
"kwargs": { \
"to": "user@example.com", \
"subject": "Hello", \
"body": "This is a test email" \
}, \
"state": { \
"large_array": $(shell python3 -c "import json; print(json.dumps([i for i in range(100000)]))") \
} \
} \
}' | jq .
.PHONY: create-approval
create-approval: check-api-key check-jq
@curl -s -X POST $(HUMANLAYER_API_BASE)/function_calls \

View File

@@ -32,6 +32,17 @@ curl -X POST https://api.humanlayer.dev/humanlayer/v1/function_calls \
## Usage w/ Makefile
### Test Large Context Size Limit
Test the context size limit by sending a large payload:
```bash
export HUMANLAYER_API_KEY=<your-api-key>
make test-large-context
```
This will send a function call with a large array in the state field to test the context size limits.
### 1. Create an Approval Request
Create a new function call approval request:

View File

@@ -0,0 +1,235 @@
# a webhooks-free version
import json
import logging
from enum import Enum
from fastapi import BackgroundTasks, FastAPI
from typing import Any, Dict, Literal, Union
import marvin
from pydantic import BaseModel
from humanlayer import AsyncHumanLayer, FunctionCall, HumanContact
from humanlayer.core.models import ContactChannel, EmailContactChannel, HumanContactSpec
from humanlayer.core.models_agent_webhook import EmailMessage, EmailPayload
app = FastAPI(title="HumanLayer FastAPI Email Example", version="1.0.0")
logger = logging.getLogger(__name__)
# Root endpoint
@app.get("/")
async def root() -> Dict[str, str]:
return {
"message": "Welcome to a HumanLayer Email Example",
"instructions": "https://github.com/humanlayer/humanlayer/blob/main/examples/fastapi-email/README.md",
}
##############################
######## Biz Logic ########
##############################
class ClarificationRequest(BaseModel):
intent: Literal["request_more_information"]
message: str
class DraftCampaign(BaseModel):
intent: Literal["ready_to_draft_campaign"]
campaign: "Campaign"
class PublishCampaign(BaseModel):
intent: Literal["human_approved__campaign_ready_to_publish"]
campaign_id: int
async def determine_next_step(
thread: "Thread",
) -> ClarificationRequest | DraftCampaign | PublishCampaign:
"""determine the next step in the email thread"""
response: Union[ClarificationRequest, DraftCampaign, PublishCampaign] = await marvin.cast_async(
json.dumps([event.model_dump(mode="json") for event in thread.events]),
Union[ClarificationRequest, DraftCampaign, PublishCampaign],
instructions="""
determine if you have enough information to create a campaign, or if you need more input.
The campaign should be a list of gift boxes to include in a promotional campaign.
Once a campaign is drafted, a human will automatically review it. If the human approves,
and has not requested any changes, you should publish it.
""",
)
return response
class CampaignItem(BaseModel):
id: int
name: str
description: str
class Campaign(BaseModel):
id: int
url: str
items: list[CampaignItem]
def format_approval_message(self) -> str:
items_str = "\n".join(f"{item.name} - {item.description}" for item in self.items)
return f"""
The preview campaign is live at {self.url}
The items include:
{items_str}
Do you think this is good to publish?
"""
async def publish_campaign(campaign_id: int) -> None:
"""tool to publish a campaign"""
print(f"Published campaign {campaign_id}")
async def is_approval(message: str) -> bool:
"""check if the human approved the campaign"""
answer: str = await marvin.classify_async(
message,
[
"approved",
"rejected",
"unknown",
],
)
return answer == "approved"
##########################
######## CONTEXT #########
##########################
class EventType(Enum):
EMAIL_RECEIVED = "email_received"
REQUEST_MORE_INFORMATION = "request_more_information"
HUMAN_RESPONSE = "human_response"
CAMPAIGN_DRAFTED = "campaign_drafted"
CAMPAIGN_PUBLISHED = "campaign_published"
class Event(BaseModel):
type: EventType
data: Any # don't really care about this, it will just be context to the LLM
# todo you probably want to version this but for now lets assume we're not going to change the schema
class Thread(BaseModel):
initial_email: EmailPayload
# initial_slack_message: SlackMessage
events: list[Event]
def to_state(self) -> dict:
"""Convert thread to a state dict for preservation"""
return self.model_dump(mode="json")
@classmethod
def from_state(cls, state: dict) -> "Thread":
"""Restore thread from preserved state"""
return cls.model_validate(state)
##########################
######## Handlers ########
##########################
async def handle_continued_thread(thread: Thread) -> None:
humanlayer = AsyncHumanLayer(contact_channel=ContactChannel(email=thread.initial_email.as_channel()))
# maybe: if thread gets too long, summarize parts of it - your call!
# new_thread = maybe_summarize_parts_of_thread(thread)
logger.info(f"thread received, determining next step. Last event: {thread.events[-1].type}")
next_step = await determine_next_step(thread)
logger.info(f"next step: {next_step.intent}")
if next_step.intent == "request_more_information":
logger.info(f"requesting more information: {next_step.message}")
thread.events.append(Event(type=EventType.REQUEST_MORE_INFORMATION, data=next_step.message))
await humanlayer.create_human_contact(spec=HumanContactSpec(msg=next_step.message, state=thread.to_state()))
elif next_step.intent == "ready_to_draft_campaign":
campaign_info = next_step.campaign
logger.info(f"drafted campaign_info: {campaign_info.model_dump_json()}")
thread.events.append(Event(type=EventType.CAMPAIGN_DRAFTED, data=campaign_info))
await humanlayer.create_human_contact(
spec=HumanContactSpec(msg=campaign_info.format_approval_message(), state=thread.to_state())
)
elif next_step.intent == "human_approved__campaign_ready_to_publish":
campaign_id = next_step.campaign_id
logger.info(f"drafted campaign_info: {campaign_id}")
await publish_campaign(campaign_id)
thread.events.append(Event(type=EventType.CAMPAIGN_PUBLISHED, data=campaign_id))
await humanlayer.create_human_contact(
spec=HumanContactSpec(
msg="Approved and published campaign. Let me know if you wanna make any other changes!",
state=thread.to_state(),
)
)
logger.info(f"thread sent to humanlayer. Last event: {thread.events[-1].type}")
@app.post("/webhook/new-email-thread")
async def email_inbound(email_payload: EmailPayload, background_tasks: BackgroundTasks) -> Dict[str, Any]:
"""
route to kick off new processing thread from an email
"""
# test payload
if email_payload.is_test or email_payload.from_address == "overworked-admin@coolcompany.com":
logger.info("test payload received, skipping")
return {"status": "ok"}
logger.info(f"inbound email received: {email_payload.model_dump_json()}")
thread = Thread(initial_email=email_payload, events=[])
thread.events.append(Event(type=EventType.EMAIL_RECEIVED, data=email_payload))
background_tasks.add_task(handle_continued_thread, thread)
return {"status": "ok"}
@app.post("/webhook/human-response-on-existing-thread")
async def human_response(
human_response: FunctionCall | HumanContact, background_tasks: BackgroundTasks
) -> Dict[str, Any]:
"""
route to handle human responses
"""
if human_response.spec.state is not None:
thread = Thread.model_validate(human_response.spec.state)
else:
# decide what's the right way to handle this? probably logger.warn and proceed
raise ValueError("state is required")
if isinstance(human_response, HumanContact):
thread.events.append(
Event(type=EventType.HUMAN_RESPONSE, data={"human_response": human_response.status.response})
)
background_tasks.add_task(handle_continued_thread, thread)
return {"status": "ok"}
if __name__ == "__main__":
import uvicorn
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
uvicorn.run(app, host="0.0.0.0", port=8000) # noqa: S104

View File

@@ -137,23 +137,7 @@ def to_prompt(thread: Thread) -> str:
##########################
######## Handlers ########
##########################
class EmailMessage(BaseModel):
from_address: str
to_address: list[str]
cc_address: list[str]
subject: str
content: str
datetime: str
class EmailPayload(BaseModel):
from_address: str
to_address: str
subject: str
body: str
message_id: str
previous_thread: list[EmailMessage] | None = None
raw_email: str
from humanlayer.core.models_agent_webhook import EmailMessage, EmailPayload
async def handle_inbound_email(email_payload: EmailPayload) -> None:
@@ -238,7 +222,9 @@ async def get_human_feedback_on_campaign(campaign_info, humanlayer, thread) -> s
Do you think this is good to publish?
"""
approval_response = (await humanlayer.fetch_human_response(spec=HumanContactSpec(msg=msg))).as_completed()
approval_response = (
await humanlayer.fetch_human_response(spec=HumanContactSpec(msg=msg, state=thread.model_dump()))
).as_completed()
logger.info(f"approval_response: {approval_response}")
thread.events.append(Event(type=EventType.HUMAN_SUPPLIED_CAMPAIGN_DRAFT_FEEDBACK, data=approval_response))
return approval_response

View File

@@ -52,6 +52,8 @@ type FunctionCallSpec = {
// the contact channel to use to contact the human
channel?: ContactChannel
reject_options?: ResponseOption[]
// Optional state to be preserved across the request lifecycle
state?: Record<string, any>
}
type FunctionCall = {
@@ -68,6 +70,8 @@ type HumanContactSpec = {
// the contact channel to use to contact the human
channel?: ContactChannel
response_options?: ResponseOption[]
// Optional state to be preserved across the request lifecycle
state?: Record<string, any>
}
type HumanContactStatus = {

View File

@@ -302,6 +302,54 @@ def test_human_as_tool_forwards_contact_channel() -> None:
contacts.get.assert_called_once_with("generated-id")
def test_human_contact_preserves_state() -> None:
"""
test that the human contact preserves state through the request lifecycle
"""
mock_backend = Mock(spec=AgentBackend)
contacts = Mock(spec=AgentStore[HumanContact, HumanContactStatus])
mock_backend.contacts.return_value = contacts
state = {"conversation_history": ["hello", "world"]}
human_contact = HumanContact(
run_id="generated-id",
call_id="generated-id",
spec=HumanContactSpec(msg="What is your favorite color?", state=state),
)
contacts.add.return_value = human_contact
contacts.get.return_value = human_contact.model_copy(
update={
"status": HumanContactStatus(
response="Blue",
)
},
deep=True,
)
hl = HumanLayer(
backend=mock_backend,
genid=lambda x: "generated-id",
sleep=lambda x: None,
)
human_tool = hl.fetch_human_response(HumanContactSpec(msg="What is your favorite color?", state=state))
response = human_tool.as_completed()
assert response == "Blue"
contacts.add.assert_called_once_with(
HumanContact(
run_id="generated-id",
call_id="generated-id",
spec=HumanContactSpec(
msg="What is your favorite color?",
state=state,
),
)
)
contacts.get.assert_called_once_with("generated-id")
def test_create_human_contact_with_call_id() -> None:
"""
test that the create_human_contact method works when you supply a call_id

View File

@@ -290,6 +290,54 @@ def test_fetch_approval_forwards_contact_channel() -> None:
functions.get.assert_called_once_with("generated-id")
def test_fetch_approval_forwards_state() -> None:
"""
test that the fetch_approval method forwards the state,
even if not explicitly provided in the method call
"""
mock_backend = Mock(spec=AgentBackend)
functions = Mock(spec=AgentStore[FunctionCall, FunctionCallStatus])
mock_backend.functions.return_value = functions
state = {"conversation_history": ["hello", "world"]}
function_call = FunctionCall(
run_id="generated-id",
call_id="generated-id",
spec=FunctionCallSpec(fn="_fn_", kwargs={"bar": "baz"}, state=state),
)
functions.add.return_value = function_call
functions.get.return_value = function_call.model_copy(
deep=True,
update={"status": FunctionCallStatus(approved=True)},
)
hl = HumanLayer(
backend=mock_backend,
genid=lambda x: "generated-id",
sleep=lambda x: None,
)
resp = hl.fetch_approval(
FunctionCallSpec(fn="_fn_", kwargs={"bar": "baz"}, state=state),
).as_completed()
assert resp.approved is True
functions.add.assert_called_once_with(
FunctionCall(
run_id="generated-id",
call_id="generated-id",
spec=FunctionCallSpec(
fn="_fn_",
kwargs={"bar": "baz"},
state=state,
),
)
)
functions.get.assert_called_once_with("generated-id")
def test_create_function_call_with_call_id() -> None:
"""
test that the create_function_call method works when you supply a call_id

View File

@@ -349,6 +349,10 @@ class AsyncHumanLayer(BaseModel):
) -> FunctionCall:
"""Create a function call asynchronously"""
assert self.backend is not None, "create requires a backend, did you forget your HUMANLAYER_API_KEY?"
if not spec.channel:
spec.channel = self.contact_channel
call_id = call_id or self.genid("call")
call = FunctionCall(
run_id=self.run_id, # type: ignore
@@ -381,6 +385,10 @@ class AsyncHumanLayer(BaseModel):
) -> HumanContact:
"""Create a human contact request asynchronously"""
assert self.backend is not None, "create requires a backend, did you forget your HUMANLAYER_API_KEY?"
if not spec.channel:
spec.channel = self.contact_channel
call_id = call_id or self.genid("call")
contact = HumanContact(
run_id=self.run_id, # type: ignore

View File

@@ -35,24 +35,21 @@ class AsyncHumanLayerCloudConnection(BaseModel):
if not self.api_key:
raise ValueError("HUMANLAYER_API_KEY is required for cloud approvals")
async def get_session(self) -> aiohttp.ClientSession:
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(headers={"Authorization": f"Bearer {self.api_key}"})
return self._session
async def request(
self,
method: str,
path: str,
**kwargs: Any,
) -> Dict[str, Any]:
session = await self.get_session()
async with session.request(
method,
f"{self.api_base_url}{path}",
timeout=aiohttp.ClientTimeout(total=10),
**kwargs,
) as response:
async with (
aiohttp.ClientSession(headers={"Authorization": f"Bearer {self.api_key}"}) as session,
session.request(
method,
f"{self.api_base_url}{path}",
timeout=aiohttp.ClientTimeout(total=10),
**kwargs,
) as response,
):
response_json = await response.json()
logger.debug(
"response %d %s",

View File

@@ -164,6 +164,7 @@ class FunctionCallSpec(BaseModel):
kwargs: dict
channel: ContactChannel | None = None
reject_options: list[ResponseOption] | None = None
state: dict | None = None # Optional state to be preserved across the request lifecycle
class FunctionCallStatus(BaseModel):
@@ -219,6 +220,7 @@ class HumanContactSpec(BaseModel):
subject: str | None = None
channel: ContactChannel | None = None
response_options: list[ResponseOption] | None = None
state: dict | None = None # Optional state to be preserved across the request lifecycle
class HumanContactStatus(BaseModel):

View File

@@ -0,0 +1,61 @@
"""
Models for agent webhook payloads.
These models define the structure of payloads sent to agent webhooks when events occur,
such as receiving an email. They are used by the HumanLayer platform to serialize webhook
data in a consistent format that can be consumed by agent implementations.
For example, when an email is received, HumanLayer will send an EmailPayload to the
configured webhook endpoint containing the email content and metadata.
"""
from typing import List, Optional
from pydantic import BaseModel
from humanlayer.core.models import EmailContactChannel
class EmailMessage(BaseModel):
"""A message in an email thread"""
from_address: str
to_address: List[str]
cc_address: List[str]
subject: str
content: str
datetime: str
class EmailPayload(BaseModel):
"""Payload for email agent webhooks"""
from_address: str
to_address: str
subject: str
body: str
message_id: str
previous_thread: Optional[List[EmailMessage]] = None
raw_email: str
is_test: bool | None = None # will be set if the email is a test webhook from the server
def as_channel(self, context_about_user: str | None = None) -> EmailContactChannel:
return EmailContactChannel.in_reply_to(
from_address=self.from_address,
message_id=self.message_id,
subject=self.subject,
context_about_user=context_about_user,
)
class SlackMessage(BaseModel):
from_user_id: str
channel_id: str
content: str
message_id: str
class SlackThread(BaseModel):
thread_ts: str
channel_id: str
events: list[SlackMessage]