notify smaller (#49)

* notify smaller

* updates

* fix

* fix

---------

Co-authored-by: Kartik Sarangmath <kartiksarangmath@Kartiks-MacBook-Air.local>
This commit is contained in:
ksarangmath
2025-08-07 00:04:05 -07:00
committed by GitHub
parent f598cd165b
commit b5489fe68e
4 changed files with 286 additions and 10 deletions

View File

@@ -22,6 +22,8 @@ from ..db import (
submit_user_message,
delete_agent_instance,
update_agent_instance_name,
get_message_by_id,
get_instance_git_diff,
)
from ..models import (
AgentInstanceDetail,
@@ -180,21 +182,59 @@ async def stream_messages(
# Parse the JSON payload
data = json.loads(payload)
# Check event type and send appropriate SSE event
# Check event type and handle accordingly
event_type = data.get("event_type")
if event_type == "status_update":
# Send status_update event
# Status updates already have all needed data
yield f"event: status_update\ndata: {json.dumps(data)}\n\n"
elif event_type == "message_update":
# Send message_update event for frontend to handle
yield f"event: message_update\ndata: {json.dumps(data)}\n\n"
elif event_type == "git_diff_update":
# Send git_diff_update event for frontend to handle
yield f"event: git_diff_update\ndata: {json.dumps(data)}\n\n"
else:
# Regular message event (either message_insert or legacy without event_type)
elif event_type == "message_insert":
# Fetch and send new message
message_id = data.get("id")
if not message_id:
continue
message_data = get_message_by_id(db, UUID(message_id), user_id)
if message_data:
data.update(message_data)
yield f"event: message\ndata: {json.dumps(data)}\n\n"
elif event_type == "message_update":
# Fetch and send message update
message_id = data.get("id")
if not message_id:
continue
message_data = get_message_by_id(db, UUID(message_id), user_id)
if message_data:
# Preserve old_requires_user_input from notification
old_requires_user_input = data.get(
"old_requires_user_input"
)
data.update(message_data)
if old_requires_user_input is not None:
data["old_requires_user_input"] = (
old_requires_user_input
)
yield f"event: message_update\ndata: {json.dumps(data)}\n\n"
elif event_type == "git_diff_update":
# Fetch and send git diff update
instance_id_str = data.get("instance_id")
if not instance_id_str:
continue
diff_data = get_instance_git_diff(
db, UUID(instance_id_str), user_id
)
if diff_data:
data["git_diff"] = diff_data["git_diff"]
yield f"event: git_diff_update\ndata: {json.dumps(data)}\n\n"
except asyncio.TimeoutError:
# Send heartbeat to keep connection alive
yield f"event: heartbeat\ndata: {json.dumps({'timestamp': asyncio.get_event_loop().time()})}\n\n"

View File

@@ -8,6 +8,8 @@ from .queries import (
submit_user_message,
delete_agent_instance,
update_agent_instance_name,
get_message_by_id,
get_instance_git_diff,
)
from .user_agent_queries import (
create_user_agent,
@@ -28,6 +30,8 @@ __all__ = [
"submit_user_message",
"delete_agent_instance",
"update_agent_instance_name",
"get_message_by_id",
"get_instance_git_diff",
"create_user_agent",
"get_user_agents",
"update_user_agent",

View File

@@ -507,3 +507,46 @@ def update_agent_instance_name(
# Return the updated instance in the standard format
return _format_instance(instance)
def get_message_by_id(db: Session, message_id: UUID, user_id: UUID) -> dict | None:
"""
Get a single message by ID with user authorization check.
Returns the message data if authorized, None if not found or unauthorized.
"""
message = (
db.query(Message)
.join(AgentInstance, Message.agent_instance_id == AgentInstance.id)
.filter(Message.id == message_id, AgentInstance.user_id == user_id)
.first()
)
if not message:
return None
return {
"id": str(message.id),
"agent_instance_id": str(message.agent_instance_id),
"sender_type": message.sender_type.value,
"content": message.content,
"created_at": message.created_at.isoformat() + "Z",
"requires_user_input": message.requires_user_input,
"message_metadata": message.message_metadata,
}
def get_instance_git_diff(db: Session, instance_id: UUID, user_id: UUID) -> dict | None:
"""
Get the git diff for an agent instance with user authorization check.
Returns the git diff data if authorized, None if not found or unauthorized.
"""
instance = (
db.query(AgentInstance)
.filter(AgentInstance.id == instance_id, AgentInstance.user_id == user_id)
.first()
)
if not instance:
return None
return {"instance_id": str(instance.id), "git_diff": instance.git_diff}

View File

@@ -0,0 +1,189 @@
"""Fix large payload notifications
Revision ID: 9fe045ea7ad9
Revises: 9c1915ca1cd2
Create Date: 2025-08-06 21:34:06.523760
"""
from typing import Sequence, Union
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "9fe045ea7ad9"
down_revision: Union[str, None] = "9c1915ca1cd2"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Update notify_message_change to send lightweight notifications without content
op.execute("""
CREATE OR REPLACE FUNCTION notify_message_change() RETURNS trigger AS $$
DECLARE
channel_name text;
payload text;
event_type text;
BEGIN
-- Create channel name based on instance ID
channel_name := 'message_channel_' || NEW.agent_instance_id::text;
-- Determine event type
IF TG_OP = 'INSERT' THEN
event_type := 'message_insert';
ELSIF TG_OP = 'UPDATE' THEN
event_type := 'message_update';
END IF;
-- Create JSON payload WITHOUT the actual content (lightweight notification)
payload := json_build_object(
'event_type', event_type,
'id', NEW.id,
'agent_instance_id', NEW.agent_instance_id,
'sender_type', NEW.sender_type,
'created_at', to_char(NEW.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"'),
'requires_user_input', NEW.requires_user_input,
'old_requires_user_input', CASE
WHEN TG_OP = 'UPDATE' THEN OLD.requires_user_input
ELSE NULL
END
)::text;
-- Send notification (quote channel name for UUIDs with hyphens)
EXECUTE format('NOTIFY %I, %L', channel_name, payload);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
""")
# Update notify_instance_change to send lightweight git_diff notifications
op.execute("""
CREATE OR REPLACE FUNCTION notify_instance_change() RETURNS trigger AS $$
DECLARE
channel_name text;
payload text;
BEGIN
-- Create channel name based on instance ID
channel_name := 'message_channel_' || NEW.id::text;
-- Check if status changed
IF OLD.status IS DISTINCT FROM NEW.status THEN
-- Create JSON payload with status update data (keep as-is, already lightweight)
payload := json_build_object(
'event_type', 'status_update',
'instance_id', NEW.id,
'status', NEW.status,
'timestamp', to_char(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"')
)::text;
-- Send notification (quote channel name for UUIDs with hyphens)
EXECUTE format('NOTIFY %I, %L', channel_name, payload);
END IF;
-- Check if git_diff changed
IF OLD.git_diff IS DISTINCT FROM NEW.git_diff THEN
-- Create JSON payload WITHOUT the actual git_diff (lightweight notification)
payload := json_build_object(
'event_type', 'git_diff_update',
'instance_id', NEW.id,
'timestamp', to_char(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"')
)::text;
-- Send notification (quote channel name for UUIDs with hyphens)
EXECUTE format('NOTIFY %I, %L', channel_name, payload);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
""")
def downgrade() -> None:
# Restore original notify_message_change with full content
op.execute("""
CREATE OR REPLACE FUNCTION notify_message_change() RETURNS trigger AS $$
DECLARE
channel_name text;
payload text;
event_type text;
BEGIN
-- Create channel name based on instance ID
channel_name := 'message_channel_' || NEW.agent_instance_id::text;
-- Determine event type
IF TG_OP = 'INSERT' THEN
event_type := 'message_insert';
ELSIF TG_OP = 'UPDATE' THEN
event_type := 'message_update';
END IF;
-- Create JSON payload with message data (original version with content)
payload := json_build_object(
'event_type', event_type,
'id', NEW.id,
'agent_instance_id', NEW.agent_instance_id,
'sender_type', NEW.sender_type,
'content', NEW.content,
'created_at', to_char(NEW.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"'),
'requires_user_input', NEW.requires_user_input,
'message_metadata', NEW.message_metadata,
'old_requires_user_input', CASE
WHEN TG_OP = 'UPDATE' THEN OLD.requires_user_input
ELSE NULL
END
)::text;
-- Send notification (quote channel name for UUIDs with hyphens)
EXECUTE format('NOTIFY %I, %L', channel_name, payload);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
""")
# Restore original notify_instance_change with full git_diff
op.execute("""
CREATE OR REPLACE FUNCTION notify_instance_change() RETURNS trigger AS $$
DECLARE
channel_name text;
payload text;
BEGIN
-- Create channel name based on instance ID
channel_name := 'message_channel_' || NEW.id::text;
-- Check if status changed
IF OLD.status IS DISTINCT FROM NEW.status THEN
-- Create JSON payload with status update data
payload := json_build_object(
'event_type', 'status_update',
'instance_id', NEW.id,
'status', NEW.status,
'timestamp', to_char(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"')
)::text;
-- Send notification (quote channel name for UUIDs with hyphens)
EXECUTE format('NOTIFY %I, %L', channel_name, payload);
END IF;
-- Check if git_diff changed
IF OLD.git_diff IS DISTINCT FROM NEW.git_diff THEN
-- Create JSON payload with git_diff update data (original with full diff)
payload := json_build_object(
'event_type', 'git_diff_update',
'instance_id', NEW.id,
'git_diff', NEW.git_diff,
'timestamp', to_char(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"')
)::text;
-- Send notification (quote channel name for UUIDs with hyphens)
EXECUTE format('NOTIFY %I, %L', channel_name, payload);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
""")