Rename folder + remove coloring on omniparseragent printouts

This commit is contained in:
Thomas Dhome-Casanova
2025-01-29 22:44:23 -08:00
parent 746507b9d9
commit 7800a24b27
17 changed files with 0 additions and 1772 deletions

View File

@@ -1,17 +0,0 @@
"""
Define some colorful stuffs for better visualization in the chat.
"""
# Define the RGB colors for each letter
colors = {
'S': 'rgb(106, 158, 210)',
'h': 'rgb(111, 163, 82)',
'o': 'rgb(209, 100, 94)',
'w': 'rgb(238, 171, 106)',
'U': 'rgb(0, 0, 0)',
'I': 'rgb(0, 0, 0)',
}
colorful_text_vlm = "**OmniParser Agent**"
colorful_text_user = "**User**"

View File

@@ -1,374 +0,0 @@
"""
Entrypoint for Gradio, see https://gradio.app/
python app.py --windows_host_url xxxx:8006/ --omniparser_host_url localhost:8000
"""
import os
from datetime import datetime
from enum import StrEnum
from functools import partial
from pathlib import Path
from typing import cast
import argparse
import gradio as gr
from anthropic import APIResponse
from anthropic.types import TextBlock
from anthropic.types.beta import BetaMessage, BetaTextBlock, BetaToolUseBlock
from anthropic.types.tool_use_block import ToolUseBlock
from computer_use_demo.loop import (
APIProvider,
sampling_loop_sync,
)
from computer_use_demo.tools import ToolResult
CONFIG_DIR = Path("~/.anthropic").expanduser()
API_KEY_FILE = CONFIG_DIR / "api_key"
INTRO_TEXT = '''
🚀🤖✨ It's Play Time!
Welcome to the OmniParser+X Demo! X = [GPT-4o/4o-mini, Claude, Phi, Llama]. Let OmniParser turn your general purpose vision-langauge model to an AI agent.
Type a message and press submit to start OmniParser+X. Press the trash icon in the chat to clear the message history.
'''
def parse_arguments():
parser = argparse.ArgumentParser(description="Gradio App")
parser.add_argument("--windows_host_url", type=str, default='localhost:8006')
parser.add_argument("--omniparser_host_url", type=str, default="localhost:8000")
return parser.parse_args()
args = parse_arguments()
windows_host_url = args.windows_host_url
omniparser_host_url = args.omniparser_host_url
print(f"Windows host URL: {windows_host_url}")
print(f"OmniParser host URL: {omniparser_host_url}")
class Sender(StrEnum):
USER = "user"
BOT = "assistant"
TOOL = "tool"
def setup_state(state):
if "messages" not in state:
state["messages"] = []
if "model" not in state:
state["model"] = "omniparser + gpt-4o"
if "provider" not in state:
state["provider"] = "openai"
if "openai_api_key" not in state: # Fetch API keys from environment variables
state["openai_api_key"] = os.getenv("OPENAI_API_KEY", "")
if "anthropic_api_key" not in state:
state["anthropic_api_key"] = os.getenv("ANTHROPIC_API_KEY", "")
if "api_key" not in state:
state["api_key"] = ""
if "auth_validated" not in state:
state["auth_validated"] = False
if "responses" not in state:
state["responses"] = {}
if "tools" not in state:
state["tools"] = {}
if "only_n_most_recent_images" not in state:
state["only_n_most_recent_images"] = 2
if 'chatbot_messages' not in state:
state['chatbot_messages'] = []
async def main(state):
"""Render loop for Gradio"""
setup_state(state)
return "Setup completed"
def validate_auth(provider: APIProvider, api_key: str | None):
if provider == APIProvider.ANTHROPIC:
if not api_key:
return "Enter your Anthropic API key to continue."
if provider == APIProvider.BEDROCK:
import boto3
if not boto3.Session().get_credentials():
return "You must have AWS credentials set up to use the Bedrock API."
if provider == APIProvider.VERTEX:
import google.auth
from google.auth.exceptions import DefaultCredentialsError
if not os.environ.get("CLOUD_ML_REGION"):
return "Set the CLOUD_ML_REGION environment variable to use the Vertex API."
try:
google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
except DefaultCredentialsError:
return "Your google cloud credentials are not set up correctly."
def load_from_storage(filename: str) -> str | None:
"""Load data from a file in the storage directory."""
try:
file_path = CONFIG_DIR / filename
if file_path.exists():
data = file_path.read_text().strip()
if data:
return data
except Exception as e:
print(f"Debug: Error loading {filename}: {e}")
return None
def save_to_storage(filename: str, data: str) -> None:
"""Save data to a file in the storage directory."""
try:
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
file_path = CONFIG_DIR / filename
file_path.write_text(data)
# Ensure only user can read/write the file
file_path.chmod(0o600)
except Exception as e:
print(f"Debug: Error saving {filename}: {e}")
def _api_response_callback(response: APIResponse[BetaMessage], response_state: dict):
response_id = datetime.now().isoformat()
response_state[response_id] = response
def _tool_output_callback(tool_output: ToolResult, tool_id: str, tool_state: dict):
tool_state[tool_id] = tool_output
def chatbot_output_callback(message, chatbot_state, hide_images=False, sender="bot"):
def _render_message(message: str | BetaTextBlock | BetaToolUseBlock | ToolResult, hide_images=False):
print(f"_render_message: {str(message)[:100]}")
if isinstance(message, str):
return message
is_tool_result = not isinstance(message, str) and (
isinstance(message, ToolResult)
or message.__class__.__name__ == "ToolResult"
or message.__class__.__name__ == "CLIResult"
)
if not message or (
is_tool_result
and hide_images
and not hasattr(message, "error")
and not hasattr(message, "output")
): # return None if hide_images is True
return
# render tool result
if is_tool_result:
message = cast(ToolResult, message)
if message.output:
return message.output
if message.error:
return f"Error: {message.error}"
if message.base64_image and not hide_images:
# somehow can't display via gr.Image
# image_data = base64.b64decode(message.base64_image)
# return gr.Image(value=Image.open(io.BytesIO(image_data)))
return f'<img src="data:image/png;base64,{message.base64_image}">'
elif isinstance(message, BetaTextBlock) or isinstance(message, TextBlock):
return f"Analysis: {message.text}"
elif isinstance(message, BetaToolUseBlock) or isinstance(message, ToolUseBlock):
# return f"Tool Use: {message.name}\nInput: {message.input}"
return f"Next I will perform the following action: {message.input}"
else:
return message
def _truncate_string(s, max_length=500):
"""Truncate long strings for concise printing."""
if isinstance(s, str) and len(s) > max_length:
return s[:max_length] + "..."
return s
# processing Anthropic messages
message = _render_message(message, hide_images)
if sender == "bot":
chatbot_state.append((None, message))
else:
chatbot_state.append((message, None))
# Create a concise version of the chatbot state for printing
concise_state = [(_truncate_string(user_msg), _truncate_string(bot_msg))
for user_msg, bot_msg in chatbot_state]
# print(f"chatbot_output_callback chatbot_state: {concise_state} (truncated)")
def process_input(user_input, state):
# Append the user message to state["messages"]
state["messages"].append(
{
"role": Sender.USER,
"content": [TextBlock(type="text", text=user_input)],
}
)
# Append the user's message to chatbot_messages with None for the assistant's reply
state['chatbot_messages'].append((user_input, None))
yield state['chatbot_messages'] # Yield to update the chatbot UI with the user's message
print("state")
print(state)
# Run sampling_loop_sync with the chatbot_output_callback
for loop_msg in sampling_loop_sync(
model=state["model"],
provider=state["provider"],
messages=state["messages"],
output_callback=partial(chatbot_output_callback, chatbot_state=state['chatbot_messages'], hide_images=False),
tool_output_callback=partial(_tool_output_callback, tool_state=state["tools"]),
api_response_callback=partial(_api_response_callback, response_state=state["responses"]),
api_key=state["api_key"],
only_n_most_recent_images=state["only_n_most_recent_images"],
omniparser_url=omniparser_host_url
):
if loop_msg is None:
yield state['chatbot_messages']
print("End of task. Close the loop.")
break
yield state['chatbot_messages'] # Yield the updated chatbot_messages to update the chatbot UI
with gr.Blocks(theme=gr.themes.Default()) as demo:
gr.HTML("""
<style>
.no-padding {
padding: 0 !important;
}
.no-padding > div {
padding: 0 !important;
}
</style>
""")
state = gr.State({}) # Use Gradio's state management
setup_state(state.value) # Initialize the state
# Retrieve screen details
gr.Markdown("# OmniParser + ✖️ Demo")
if not os.getenv("HIDE_WARNING", False):
gr.Markdown(INTRO_TEXT)
with gr.Accordion("Settings", open=True):
with gr.Row():
with gr.Column():
model = gr.Dropdown(
label="Model",
choices=["omniparser + gpt-4o", "omniparser + phi35v", "claude-3-5-sonnet-20241022"],
value="omniparser + gpt-4o", # Set to one of the choices
interactive=True,
)
with gr.Column():
only_n_images = gr.Slider(
label="N most recent screenshots",
minimum=0,
maximum=10,
step=1,
value=2,
interactive=True
)
with gr.Row():
with gr.Column(1):
provider = gr.Dropdown(
label="API Provider",
choices=[option.value for option in APIProvider],
value="openai",
interactive=False,
)
with gr.Column(2):
api_key = gr.Textbox(
label="API Key",
type="password",
value=state.value.get("api_key", ""),
placeholder="Paste your API key here",
interactive=True,
)
with gr.Row():
with gr.Column(scale=8):
chat_input = gr.Textbox(show_label=False, placeholder="Type a message to send to Omniparser + X ...", container=False)
with gr.Column(scale=1, min_width=50):
submit_button = gr.Button(value="Send", variant="primary")
with gr.Row():
with gr.Column(scale=1):
chatbot = gr.Chatbot(label="Chatbot History", autoscroll=True, height=580)
with gr.Column(scale=3):
if not windows_host_url:
iframe = gr.HTML(
f'<iframe src="http://localhost:8006/vnc.html?view_only=1&autoconnect=1&resize=scale" width="100%" height="580" allow="fullscreen"></iframe>',
container=False,
elem_classes="no-padding"
)
else:
# machine_fqdn = socket.getfqdn()
# print('machine_fqdn:', machine_fqdn)
iframe = gr.HTML(
f'<iframe src="http://{windows_host_url}/vnc.html?view_only=1&autoconnect=1&resize=scale" width="100%" height="580" allow="fullscreen"></iframe>',
container=False,
elem_classes="no-padding"
)
def update_model(model_selection, state):
state["model"] = model_selection
print(f"Model updated to: {state['model']}")
if model_selection == "claude-3-5-sonnet-20241022":
provider_choices = [option.value for option in APIProvider if option.value != "openai"]
elif model_selection == "omniparser + gpt-4o" or model_selection == "omniparser + phi35v":
provider_choices = ["openai"]
else:
provider_choices = [option.value for option in APIProvider]
default_provider_value = provider_choices[0]
provider_interactive = len(provider_choices) > 1
api_key_placeholder = f"{default_provider_value.title()} API Key"
# Update state
state["provider"] = default_provider_value
state["api_key"] = state.get(f"{default_provider_value}_api_key", "")
# Calls to update other components UI
provider_update = gr.update(
choices=provider_choices,
value=default_provider_value,
interactive=provider_interactive
)
api_key_update = gr.update(
placeholder=api_key_placeholder,
value=state["api_key"]
)
return provider_update, api_key_update
def update_only_n_images(only_n_images_value, state):
state["only_n_most_recent_images"] = only_n_images_value
def update_provider(provider_value, state):
# Update state
state["provider"] = provider_value
state["api_key"] = state.get(f"{provider_value}_api_key", "")
# Calls to update other components UI
api_key_update = gr.update(
placeholder=f"{provider_value.title()} API Key",
value=state["api_key"]
)
return api_key_update
def update_api_key(api_key_value, state):
state["api_key"] = api_key_value
state[f'{state["provider"]}_api_key'] = api_key_value
def clear_chat(state):
# Reset message-related state
state["messages"] = []
state["responses"] = {}
state["tools"] = {}
state['chatbot_messages'] = []
return state['chatbot_messages']
model.change(fn=update_model, inputs=[model, state], outputs=[provider, api_key])
only_n_images.change(fn=update_only_n_images, inputs=[only_n_images, state], outputs=None)
provider.change(fn=update_provider, inputs=[provider, state], outputs=api_key)
api_key.change(fn=update_api_key, inputs=[api_key, state], outputs=None)
chatbot.clear(fn=clear_chat, inputs=[state], outputs=[chatbot])
submit_button.click(process_input, [chat_input, state], chatbot)
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7888)

View File

@@ -1,182 +0,0 @@
"""
Agentic sampling loop that calls the Anthropic API and local implenmentation of anthropic-defined computer use tools.
"""
import asyncio
import platform
from collections.abc import Callable
from datetime import datetime
from enum import StrEnum
from typing import Any, cast
from anthropic import Anthropic, AnthropicBedrock, AnthropicVertex, APIResponse
from anthropic.types import (
ToolResultBlockParam,
)
from anthropic.types.beta import (
BetaContentBlock,
BetaContentBlockParam,
BetaImageBlockParam,
BetaMessage,
BetaMessageParam,
BetaTextBlockParam,
BetaToolResultBlockParam,
)
from anthropic.types import TextBlock
from anthropic.types.beta import BetaMessage, BetaTextBlock, BetaToolUseBlock
from computer_use_demo.tools import ComputerTool, ToolCollection, ToolResult
from PIL import Image
from io import BytesIO
import gradio as gr
from typing import Dict
BETA_FLAG = "computer-use-2024-10-22"
class APIProvider(StrEnum):
ANTHROPIC = "anthropic"
BEDROCK = "bedrock"
VERTEX = "vertex"
SYSTEM_PROMPT = f"""<SYSTEM_CAPABILITY>
* You are utilizing a Windows system with internet access.
* The current date is {datetime.today().strftime('%A, %B %d, %Y')}.
</SYSTEM_CAPABILITY>
"""
class AnthropicActor:
def __init__(
self,
model: str,
provider: APIProvider,
api_key: str,
api_response_callback: Callable[[APIResponse[BetaMessage]], None],
max_tokens: int = 4096,
only_n_most_recent_images: int | None = None,
print_usage: bool = True,
):
self.model = model
self.provider = provider
self.api_key = api_key
self.api_response_callback = api_response_callback
self.max_tokens = max_tokens
self.only_n_most_recent_images = only_n_most_recent_images
self.tool_collection = ToolCollection(ComputerTool())
self.system = SYSTEM_PROMPT
self.total_token_usage = 0
self.total_cost = 0
self.print_usage = print_usage
# Instantiate the appropriate API client based on the provider
if provider == APIProvider.ANTHROPIC:
self.client = Anthropic(api_key=api_key)
elif provider == APIProvider.VERTEX:
self.client = AnthropicVertex()
elif provider == APIProvider.BEDROCK:
self.client = AnthropicBedrock()
def __call__(
self,
*,
messages: list[BetaMessageParam]
):
"""
Generate a response given history messages.
"""
if self.only_n_most_recent_images:
_maybe_filter_to_n_most_recent_images(messages, self.only_n_most_recent_images)
# Call the API synchronously
raw_response = self.client.beta.messages.with_raw_response.create(
max_tokens=self.max_tokens,
messages=messages,
model=self.model,
system=self.system,
tools=self.tool_collection.to_params(),
betas=["computer-use-2024-10-22"],
)
self.api_response_callback(cast(APIResponse[BetaMessage], raw_response))
response = raw_response.parse()
print(f"AnthropicActor response: {response}")
self.total_token_usage += response.usage.input_tokens + response.usage.output_tokens
self.total_cost += (response.usage.input_tokens * 3 / 1000000 + response.usage.output_tokens * 15 / 1000000)
if self.print_usage:
print(f"Claude total token usage so far: {self.total_token_usage}, total cost so far: $USD{self.total_cost}")
return response
def _maybe_filter_to_n_most_recent_images(
messages: list[BetaMessageParam],
images_to_keep: int,
min_removal_threshold: int = 10,
):
"""
With the assumption that images are screenshots that are of diminishing value as
the conversation progresses, remove all but the final `images_to_keep` tool_result
images in place, with a chunk of min_removal_threshold to reduce the amount we
break the implicit prompt cache.
"""
if images_to_keep is None:
return messages
tool_result_blocks = cast(
list[ToolResultBlockParam],
[
item
for message in messages
for item in (
message["content"] if isinstance(message["content"], list) else []
)
if isinstance(item, dict) and item.get("type") == "tool_result"
],
)
total_images = sum(
1
for tool_result in tool_result_blocks
for content in tool_result.get("content", [])
if isinstance(content, dict) and content.get("type") == "image"
)
images_to_remove = total_images - images_to_keep
# for better cache behavior, we want to remove in chunks
images_to_remove -= images_to_remove % min_removal_threshold
for tool_result in tool_result_blocks:
if isinstance(tool_result.get("content"), list):
new_content = []
for content in tool_result.get("content", []):
if isinstance(content, dict) and content.get("type") == "image":
if images_to_remove > 0:
images_to_remove -= 1
continue
new_content.append(content)
tool_result["content"] = new_content
if __name__ == "__main__":
pass
# client = Anthropic(api_key="")
# response = client.beta.messages.with_raw_response.create(
# max_tokens=4096,
# model="claude-3-5-sonnet-20241022",
# system=SYSTEM_PROMPT,
# # tools=ToolCollection(
# # ComputerTool(),
# # ).to_params(),
# betas=["computer-use-2024-10-22"],
# messages=[
# {"role": "user", "content": "click on (199, 199)."}
# ],
# )
# print(f"AnthropicActor response: {response.parse().usage.input_tokens+response.parse().usage.output_tokens}")

View File

@@ -1,107 +0,0 @@
import os
import logging
import base64
import requests
def is_image_path(text):
# Checking if the input text ends with typical image file extensions
image_extensions = (".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".tif")
if text.endswith(image_extensions):
return True
else:
return False
def encode_image(image_path):
"""Encode image file to base64."""
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode("utf-8")
def run_oai_interleaved(messages: list, system: str, llm: str, api_key: str, max_tokens=256, temperature=0):
api_key = api_key or os.environ.get("OPENAI_API_KEY")
if not api_key:
raise ValueError("OPENAI_API_KEY is not set")
headers = {"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"}
final_messages = [{"role": "system", "content": system}]
# image_url = "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg"
if type(messages) == list:
for item in messages:
contents = []
if isinstance(item, dict):
for cnt in item["content"]:
if isinstance(cnt, str):
if is_image_path(cnt):
base64_image = encode_image(cnt)
content = {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}}
else:
content = {"type": "text", "text": cnt}
else:
# in this case it is a text block from anthropic
content = {"type": "text", "text": str(cnt)}
contents.append(content)
message = {"role": 'user', "content": contents}
else: # str
contents.append({"type": "text", "text": item})
message = {"role": "user", "content": contents}
final_messages.append(message)
elif isinstance(messages, str):
final_messages = [{"role": "user", "content": messages}]
# import pdb; pdb.set_trace()
print("[oai] sending messages:", {"role": "user", "content": messages})
payload = {
"model": llm,
"messages": final_messages,
"max_tokens": max_tokens,
"temperature": temperature,
# "stop": stop,
}
# from IPython.core.debugger import Pdb; Pdb().set_trace()
response = requests.post(
"https://api.openai.com/v1/chat/completions", headers=headers, json=payload
)
try:
text = response.json()['choices'][0]['message']['content']
token_usage = int(response.json()['usage']['total_tokens'])
return text, token_usage
# return error message if the response is not successful
except Exception as e:
print(f"Error in interleaved openAI: {e}. This may due to your invalid OPENAI_API_KEY. Please check the response: {response.json()} ")
return response.json()
if __name__ == "__main__":
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key:
raise ValueError("OPENAI_API_KEY is not set")
text, token_usage = run_oai_interleaved(
messages= [{"content": [
"What is in the screenshot?",
"./tmp/outputs/screenshot_0b04acbb783d4706bc93873d17ba8c05.png"],
"role": "user"
}],
llm="gpt-4o-mini",
system="You are a helpful assistant",
api_key=api_key,
max_tokens=256,
temperature=0)
print(text, token_usage)
# There is an introduction describing the Calyx... 36986

View File

@@ -1,44 +0,0 @@
import requests
import base64
from pathlib import Path
from computer_use_demo.tools.screen_capture import get_screenshot
from computer_use_demo.agent.llm_utils.oai import encode_image
OUTPUT_DIR = "./tmp/outputs"
class OmniParserClient:
def __init__(self,
url: str) -> None:
self.url = url
def __call__(self,):
screenshot, screenshot_path = get_screenshot()
screenshot_path = str(screenshot_path)
image_base64 = encode_image(screenshot_path)
response = requests.post(self.url, json={"base64_image": image_base64})
response_json = response.json()
print('omniparser latency:', response_json['latency'])
som_image_data = base64.b64decode(response_json['som_image_base64'])
screenshot_path_uuid = Path(screenshot_path).stem.replace("screenshot_", "")
som_screenshot_path = f"{OUTPUT_DIR}/screenshot_som_{screenshot_path_uuid}.png"
with open(som_screenshot_path, "wb") as f:
f.write(som_image_data)
response_json['width'] = screenshot.size[0]
response_json['height'] = screenshot.size[1]
response_json['original_screenshot_base64'] = image_base64
response_json['screenshot_uuid'] = screenshot_path_uuid
response_json = self.reformat_messages(response_json)
return response_json
def reformat_messages(self, response_json: dict):
screen_info = ""
for idx, element in enumerate(response_json["parsed_content_list"]):
element['idx'] = idx
if element['type'] == 'text':
screen_info += f'ID: {idx}, Text: {element["content"]}\n'
elif element['type'] == 'icon':
screen_info += f'ID: {idx}, Icon: {element["content"]}\n'
response_json['screen_info'] = screen_info
return response_json

View File

@@ -1,302 +0,0 @@
import json
from collections.abc import Callable
from typing import cast, Callable
import uuid
from PIL import Image, ImageDraw
import base64
from io import BytesIO
from anthropic import APIResponse
from anthropic.types import ToolResultBlockParam
from anthropic.types.beta import BetaMessage, BetaTextBlock, BetaToolUseBlock, BetaMessageParam, BetaUsage
from computer_use_demo.agent.llm_utils.oai import run_oai_interleaved
from computer_use_demo.colorful_text import colorful_text_vlm
import time
import re
OUTPUT_DIR = "./tmp/outputs"
def extract_data(input_string, data_type):
# Regular expression to extract content starting from '```python' until the end if there are no closing backticks
pattern = f"```{data_type}" + r"(.*?)(```|$)"
# Extract content
# re.DOTALL allows '.' to match newlines as well
matches = re.findall(pattern, input_string, re.DOTALL)
# Return the first match if exists, trimming whitespace and ignoring potential closing backticks
return matches[0][0].strip() if matches else input_string
class VLMAgent:
def __init__(
self,
model: str,
provider: str,
api_key: str,
output_callback: Callable,
api_response_callback: Callable,
max_tokens: int = 4096,
only_n_most_recent_images: int | None = None,
print_usage: bool = True,
):
if model == "omniparser + gpt-4o":
self.model = "gpt-4o-2024-11-20"
else:
raise ValueError(f"Model {model} not supported")
self.provider = provider
self.api_key = api_key
self.api_response_callback = api_response_callback
self.max_tokens = max_tokens
self.only_n_most_recent_images = only_n_most_recent_images
self.output_callback = output_callback
self.print_usage = print_usage
self.total_token_usage = 0
self.total_cost = 0
self.system = ''
def __call__(self, messages: list, parsed_screen: list[str, list, dict]):
# Show results of Omniparser
image_base64 = parsed_screen['original_screenshot_base64']
latency_omniparser = parsed_screen['latency']
self.output_callback(f'Screenshot for {colorful_text_vlm}:\n<img src="data:image/png;base64,{image_base64}">',
sender="bot")
self.output_callback(f'Set of Marks Screenshot for {colorful_text_vlm}:\n<img src="data:image/png;base64,{parsed_screen["som_image_base64"]}">', sender="bot")
screen_info = str(parsed_screen['screen_info'])
# self.output_callback(f'Screen Info for {colorful_text_vlm}:\n{screen_info}', sender="bot")
self.output_callback(
f'<details>'
f' <summary>Screen Info for {colorful_text_vlm}</summary>'
f' <pre>{screen_info}</pre>'
f'</details>',
sender="bot"
)
screenshot_uuid = parsed_screen['screenshot_uuid']
screen_width, screen_height = parsed_screen['width'], parsed_screen['height']
# example parsed_screen: {"som_image_base64": dino_labled_img, "parsed_content_list": parsed_content_list, "screen_info"}
boxids_and_labels = parsed_screen["screen_info"]
system = self._get_system_prompt(boxids_and_labels)
# drop looping actions msg, byte image etc
planner_messages = messages
# import pdb; pdb.set_trace()
planner_messages = _keep_latest_images(planner_messages)
# if self.only_n_most_recent_images:
# _maybe_filter_to_n_most_recent_images(planner_messages, self.only_n_most_recent_images)
# print(f"filtered_messages: {planner_messages}\n\n", "full messages:", messages)
if isinstance(planner_messages[-1], dict):
if not isinstance(planner_messages[-1]["content"], list):
planner_messages[-1]["content"] = [planner_messages[-1]["content"]]
planner_messages[-1]["content"].append(f"{OUTPUT_DIR}/screenshot_{screenshot_uuid}.png")
planner_messages[-1]["content"].append(f"{OUTPUT_DIR}/screenshot_som_{screenshot_uuid}.png")
# print(f"Sending messages to VLMPlanner : {planner_messages}")
start = time.time()
if "gpt" in self.model:
vlm_response, token_usage = run_oai_interleaved(
messages=planner_messages,
system=system,
llm=self.model,
api_key=self.api_key,
max_tokens=self.max_tokens,
temperature=0,
)
print(f"oai token usage: {token_usage}")
self.total_token_usage += token_usage
self.total_cost += (token_usage * 0.15 / 1000000) # https://openai.com/api/pricing/
elif "phi" in self.model:
pass # TODO
else:
raise ValueError(f"Model {self.model} not supported")
latency_vlm = time.time() - start
self.output_callback(f"VLMPlanner latency: {latency_vlm}, Omniparser latency: {latency_omniparser}", sender="bot")
print(f"VLMPlanner response: {vlm_response}")
if self.print_usage:
print(f"VLMPlanner total token usage so far: {self.total_token_usage}. Total cost so far: $USD{self.total_cost:.5f}")
vlm_response_json = extract_data(vlm_response, "json")
vlm_response_json = json.loads(vlm_response_json)
# map "box_id" to "idx" in parsed_screen, and output the xy coordinate of bbox
try:
bbox = parsed_screen["parsed_content_list"][int(vlm_response_json["Box ID"])]["bbox"]
vlm_response_json["box_centroid_coordinate"] = [int((bbox[0] + bbox[2]) / 2 * screen_width), int((bbox[1] + bbox[3]) / 2 * screen_height)]
# draw a circle on the screenshot image to indicate the action
self.draw_action(vlm_response_json, image_base64)
except:
print("No Box ID in the response.")
# Convert the VLM output to a string for printing in chat
vlm_plan_str = ""
for key, value in vlm_response_json.items():
if key == "Reasoning":
vlm_plan_str += f'{value}'
else:
vlm_plan_str += f'\n{key}: {value}'
# self.output_callback(f"{colorful_text_vlm}:\n{vlm_plan_str}", sender="bot")
# construct the response so that anthropicExcutor can execute the tool
response_content = [BetaTextBlock(text=vlm_plan_str, type='text')]
if 'box_centroid_coordinate' in vlm_response_json:
move_cursor_block = BetaToolUseBlock(id=f'toolu_{uuid.uuid4()}',
input={'action': 'mouse_move', 'coordinate': vlm_response_json["box_centroid_coordinate"]},
name='computer', type='tool_use')
response_content.append(move_cursor_block)
if vlm_response_json["Next Action"] == "type":
click_block = BetaToolUseBlock(id=f'toolu_{uuid.uuid4()}', input={'action': 'left_click'}, name='computer', type='tool_use')
sim_content_block = BetaToolUseBlock(id=f'toolu_{uuid.uuid4()}',
input={'action': vlm_response_json["Next Action"], 'text': vlm_response_json["value"]},
name='computer', type='tool_use')
response_content.extend([click_block, sim_content_block])
elif vlm_response_json["Next Action"] == "None":
print("Task paused/completed.")
else:
sim_content_block = BetaToolUseBlock(id=f'toolu_{uuid.uuid4()}',
input={'action': vlm_response_json["Next Action"]},
name='computer', type='tool_use')
response_content.append(sim_content_block)
response_message = BetaMessage(id=f'toolu_{uuid.uuid4()}', content=response_content, model='', role='assistant', type='message', stop_reason='tool_use', usage=BetaUsage(input_tokens=0, output_tokens=0))
return response_message, vlm_response_json
def _api_response_callback(self, response: APIResponse):
self.api_response_callback(response)
def _get_system_prompt(self, screen_info: str = ""):
return f"""
You are using a Windows device.
You are able to use a mouse and keyboard to interact with the computer based on the given task and screenshot.
You can only interact with the desktop GUI (no terminal or application menu access).
You may be given some history plan and actions, this is the response from the previous loop.
You should carefully consider your plan base on the task, screenshot, and history actions.
Here is the list of all detected bounding boxes by IDs on the screen and their description:{screen_info}
Your available "Next Action" only include:
- type: type a string of text.
- left_click: Describe the ui element to be clicked.
- double_click: Describe the ui element to be double clicked.
- right_click: Describe the ui element to be right clicked.
- escape: Press an ESCAPE key.
- hover: Describe the ui element to be hovered.
- scroll_up: Scroll the screen up.
- scroll_down: Scroll the screen down.
- press: Describe the ui element to be pressed.
Based on the visual information from the screenshot image and the detected bounding boxes, please determine the next action, the Box ID you should operate on, and the value (if the action is 'type') in order to complete the task.
Output format:
```json
{{
"Reasoning": str, # describe what is in the current screen, taking into account the history, then describe your step-by-step thoughts on how to achieve the task, choose one action from available actions at a time.
"Next Action": "action_type, action description" | "None" # one action at a time, describe it in short and precisely.
'Box ID': n,
'value': "xxx" # if the action is type, you should provide the text to type.
}}
```
One Example:
```json
{{
"Reasoning": "The current screen shows google result of amazon, in previous action I have searched amazon on google. Then I need to click on the first search results to go to amazon.com.",
"Next Action": "left_click",
'Box ID': m,
}}
```
Another Example:
```json
{{
"Reasoning": "The current screen shows the front page of amazon. There is no previous action. Therefore I need to type "Apple watch" in the search bar.",
"Next Action": "type",
'Box ID': n,
'value': "Apple watch"
}}
```
IMPORTANT NOTES:
1. You should only give a single action at a time.
2. You should give an analysis to the current screen, and reflect on what has been done by looking at the history, then describe your step-by-step thoughts on how to achieve the task.
3. Attach the next action prediction in the "Next Action".
4. You should not include other actions, such as keyboard shortcuts.
5. When the task is completed, you should say "Next Action": "None" in the json field.
"""
def draw_action(self, vlm_response_json, image_base64):
# draw a circle using the coordinate in parsed_screen['som_image_base64']
image_data = base64.b64decode(image_base64)
image = Image.open(BytesIO(image_data))
draw = ImageDraw.Draw(image)
x, y = vlm_response_json["box_centroid_coordinate"]
radius = 30
draw.ellipse((x - radius, y - radius, x + radius, y + radius), fill='red')
buffered = BytesIO()
image.save(buffered, format="PNG")
image_with_circle_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8")
self.output_callback(f'Action performed on the red circle with centroid ({x}, {y}), for {colorful_text_vlm}:\n<img src="data:image/png;base64,{image_with_circle_base64}">', sender="bot")
def _keep_latest_images(messages):
for i in range(len(messages)-1):
if isinstance(messages[i]["content"], list):
for cnt in messages[i]["content"]:
if isinstance(cnt, str):
if cnt.endswith((".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".tif")):
messages[i]["content"].remove(cnt)
return messages
def _maybe_filter_to_n_most_recent_images(
messages: list[BetaMessageParam],
images_to_keep: int,
min_removal_threshold: int = 10,
):
"""
With the assumption that images are screenshots that are of diminishing value as
the conversation progresses, remove all but the final `images_to_keep` tool_result
images in place, with a chunk of min_removal_threshold to reduce the amount we
break the implicit prompt cache.
"""
if images_to_keep is None:
return messages
tool_result_blocks = cast(
list[ToolResultBlockParam],
[
item
for message in messages
for item in (
message["content"] if isinstance(message["content"], list) else []
)
if isinstance(item, dict) and item.get("type") == "tool_result"
],
)
total_images = sum(
1
for tool_result in tool_result_blocks
for content in tool_result.get("content", [])
if isinstance(content, dict) and content.get("type") == "image"
)
images_to_remove = total_images - images_to_keep
# for better cache behavior, we want to remove in chunks
images_to_remove -= images_to_remove % min_removal_threshold
for tool_result in tool_result_blocks:
if isinstance(tool_result.get("content"), list):
new_content = []
for content in tool_result.get("content", []):
if isinstance(content, dict) and content.get("type") == "image":
if images_to_remove > 0:
images_to_remove -= 1
continue
new_content.append(content)
tool_result["content"] = new_content

View File

@@ -1,17 +0,0 @@
"""
Define some colorful stuffs for better visualization in the chat.
"""
# Define the RGB colors for each letter
colors = {
'S': 'rgb(106, 158, 210)',
'h': 'rgb(111, 163, 82)',
'o': 'rgb(209, 100, 94)',
'w': 'rgb(238, 171, 106)',
'U': 'rgb(0, 0, 0)',
'I': 'rgb(0, 0, 0)',
}
colorful_text_vlm = "**OmniParser Agent**"
colorful_text_user = "**User**"

View File

@@ -1,132 +0,0 @@
import asyncio
from typing import Any, Dict, cast
from collections.abc import Callable
from anthropic.types.beta import (
BetaContentBlock,
BetaContentBlockParam,
BetaImageBlockParam,
BetaMessage,
BetaMessageParam,
BetaTextBlockParam,
BetaToolResultBlockParam,
)
from anthropic.types import TextBlock
from anthropic.types.beta import BetaMessage, BetaTextBlock, BetaToolUseBlock
from ..tools import ComputerTool, ToolCollection, ToolResult
class AnthropicExecutor:
def __init__(
self,
output_callback: Callable[[BetaContentBlockParam], None],
tool_output_callback: Callable[[Any, str], None],
):
self.tool_collection = ToolCollection(
ComputerTool()
)
self.output_callback = output_callback
self.tool_output_callback = tool_output_callback
def __call__(self, response: BetaMessage, messages: list[BetaMessageParam]):
new_message = {
"role": "assistant",
"content": cast(list[BetaContentBlockParam], response.content),
}
if new_message not in messages:
messages.append(new_message)
else:
print("new_message already in messages, there are duplicates.")
tool_result_content: list[BetaToolResultBlockParam] = []
for content_block in cast(list[BetaContentBlock], response.content):
self.output_callback(content_block, sender="bot")
# Execute the tool
if content_block.type == "tool_use":
# Run the asynchronous tool execution in a synchronous context
result = asyncio.run(self.tool_collection.run(
name=content_block.name,
tool_input=cast(dict[str, Any], content_block.input),
))
self.output_callback(result, sender="bot")
tool_result_content.append(
_make_api_tool_result(result, content_block.id)
)
self.tool_output_callback(result, content_block.id)
# Craft messages based on the content_block
# Note: to display the messages in the gradio, you should organize the messages in the following way (user message, bot message)
display_messages = _message_display_callback(messages)
# display_messages = []
# Send the messages to the gradio
for user_msg, bot_msg in display_messages:
# yield [user_msg, bot_msg], tool_result_content
yield [None, None], tool_result_content
if not tool_result_content:
return messages
return tool_result_content
def _message_display_callback(messages):
display_messages = []
for msg in messages:
try:
if isinstance(msg["content"][0], TextBlock):
display_messages.append((msg["content"][0].text, None)) # User message
elif isinstance(msg["content"][0], BetaTextBlock):
display_messages.append((None, msg["content"][0].text)) # Bot message
elif isinstance(msg["content"][0], BetaToolUseBlock):
display_messages.append((None, f"Tool Use: {msg['content'][0].name}\nInput: {msg['content'][0].input}")) # Bot message
elif isinstance(msg["content"][0], Dict) and msg["content"][0]["content"][-1]["type"] == "image":
display_messages.append((None, f'<img src="data:image/png;base64,{msg["content"][0]["content"][-1]["source"]["data"]}">')) # Bot message
else:
print(msg["content"][0])
except Exception as e:
print("error", e)
pass
return display_messages
def _make_api_tool_result(
result: ToolResult, tool_use_id: str
) -> BetaToolResultBlockParam:
"""Convert an agent ToolResult to an API ToolResultBlockParam."""
tool_result_content: list[BetaTextBlockParam | BetaImageBlockParam] | str = []
is_error = False
if result.error:
is_error = True
tool_result_content = _maybe_prepend_system_tool_result(result, result.error)
else:
if result.output:
tool_result_content.append(
{
"type": "text",
"text": _maybe_prepend_system_tool_result(result, result.output),
}
)
if result.base64_image:
tool_result_content.append(
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/png",
"data": result.base64_image,
},
}
)
return {
"type": "tool_result",
"content": tool_result_content,
"tool_use_id": tool_use_id,
"is_error": is_error,
}
def _maybe_prepend_system_tool_result(result: ToolResult, result_text: str):
if result.system:
result_text = f"<system>{result.system}</system>\n{result_text}"
return result_text

View File

@@ -1,129 +0,0 @@
"""
Agentic sampling loop that calls the Anthropic API and local implenmentation of anthropic-defined computer use tools.
"""
from collections.abc import Callable
from enum import StrEnum
from anthropic import APIResponse
from anthropic.types import (
TextBlock,
)
from anthropic.types.beta import (
BetaContentBlock,
BetaMessage,
BetaMessageParam
)
from computer_use_demo.tools import ToolResult
from computer_use_demo.agent.llm_utils.omniparserclient import OmniParserClient
from computer_use_demo.agent.anthropic_agent import AnthropicActor
from computer_use_demo.agent.vlm_agent import VLMAgent
from computer_use_demo.executor.anthropic_executor import AnthropicExecutor
BETA_FLAG = "computer-use-2024-10-22"
class APIProvider(StrEnum):
ANTHROPIC = "anthropic"
BEDROCK = "bedrock"
VERTEX = "vertex"
OPENAI = "openai"
PROVIDER_TO_DEFAULT_MODEL_NAME: dict[APIProvider, str] = {
APIProvider.ANTHROPIC: "claude-3-5-sonnet-20241022",
APIProvider.BEDROCK: "anthropic.claude-3-5-sonnet-20241022-v2:0",
APIProvider.VERTEX: "claude-3-5-sonnet-v2@20241022",
APIProvider.OPENAI: "gpt-4o",
}
def sampling_loop_sync(
*,
model: str,
provider: APIProvider | None,
messages: list[BetaMessageParam],
output_callback: Callable[[BetaContentBlock], None],
tool_output_callback: Callable[[ToolResult, str], None],
api_response_callback: Callable[[APIResponse[BetaMessage]], None],
api_key: str,
only_n_most_recent_images: int | None = 2,
max_tokens: int = 4096,
omniparser_url: str
):
"""
Synchronous agentic sampling loop for the assistant/tool interaction of computer use.
"""
print('in sampling_loop_sync, model:', model)
omniparser_client = OmniParserClient(url=f"http://{omniparser_url}/parse/")
if model == "claude-3-5-sonnet-20241022":
# Register Actor and Executor
actor = AnthropicActor(
model=model,
provider=provider,
api_key=api_key,
api_response_callback=api_response_callback,
max_tokens=max_tokens,
only_n_most_recent_images=only_n_most_recent_images
)
# from IPython.core.debugger import Pdb; Pdb().set_trace()
executor = AnthropicExecutor(
output_callback=output_callback,
tool_output_callback=tool_output_callback
)
elif model == "omniparser + gpt-4o" or model == "omniparser + phi35v":
actor = VLMAgent(
model=model,
provider=provider,
api_key=api_key,
api_response_callback=api_response_callback,
output_callback=output_callback,
)
executor = AnthropicExecutor(
output_callback=output_callback,
tool_output_callback=tool_output_callback,
)
else:
raise ValueError(f"Model {model} not supported")
print(f"Model Inited: {model}, Provider: {provider}")
tool_result_content = None
print(f"Start the message loop. User messages: {messages}")
if model == "claude-3-5-sonnet-20241022": # Anthropic loop
while True:
parsed_screen = omniparser_client() # parsed_screen: {"som_image_base64": dino_labled_img, "parsed_content_list": parsed_content_list, "screen_info"}
import pdb; pdb.set_trace()
screen_info_block = TextBlock(text='Below is the structured accessibility information of the current UI screen, which includes text and icons you can operate on, take these information into account when you are making the prediction for the next action. Note you will still need to take screenshot to get the image: \n' + parsed_screen['screen_info'], type='text')
# # messages[-1]['content'].append(screen_info_block)
screen_info_dict = {"role": "user", "content": [screen_info_block]}
messages.append(screen_info_dict)
tools_use_needed = actor(messages=messages)
for message, tool_result_content in executor(tools_use_needed, messages):
yield message
if not tool_result_content:
return messages
messages.append({"content": tool_result_content, "role": "user"})
elif model == "omniparser + gpt-4o" or model == "omniparser + phi35v":
while True:
parsed_screen = omniparser_client()
tools_use_needed, vlm_response_json = actor(messages=messages, parsed_screen=parsed_screen)
for message, tool_result_content in executor(tools_use_needed, messages):
yield message
if not tool_result_content:
return messages
# import pdb; pdb.set_trace()
# messages.append({"role": "user",
# "content": ["History plan:\n" + str(vlm_response_json['Reasoning'])]})
# messages.append({"content": tool_result_content, "role": "user"})

View File

@@ -1,12 +0,0 @@
from .base import CLIResult, ToolResult
from .collection import ToolCollection
from .computer import ComputerTool
from .screen_capture import get_screenshot
__ALL__ = [
CLIResult,
ComputerTool,
ToolCollection,
ToolResult,
get_screenshot,
]

View File

@@ -1,69 +0,0 @@
from abc import ABCMeta, abstractmethod
from dataclasses import dataclass, fields, replace
from typing import Any
from anthropic.types.beta import BetaToolUnionParam
class BaseAnthropicTool(metaclass=ABCMeta):
"""Abstract base class for Anthropic-defined tools."""
@abstractmethod
def __call__(self, **kwargs) -> Any:
"""Executes the tool with the given arguments."""
...
@abstractmethod
def to_params(
self,
) -> BetaToolUnionParam:
raise NotImplementedError
@dataclass(kw_only=True, frozen=True)
class ToolResult:
"""Represents the result of a tool execution."""
output: str | None = None
error: str | None = None
base64_image: str | None = None
system: str | None = None
def __bool__(self):
return any(getattr(self, field.name) for field in fields(self))
def __add__(self, other: "ToolResult"):
def combine_fields(
field: str | None, other_field: str | None, concatenate: bool = True
):
if field and other_field:
if concatenate:
return field + other_field
raise ValueError("Cannot combine tool results")
return field or other_field
return ToolResult(
output=combine_fields(self.output, other.output),
error=combine_fields(self.error, other.error),
base64_image=combine_fields(self.base64_image, other.base64_image, False),
system=combine_fields(self.system, other.system),
)
def replace(self, **kwargs):
"""Returns a new ToolResult with the given fields replaced."""
return replace(self, **kwargs)
class CLIResult(ToolResult):
"""A ToolResult that can be rendered as a CLI output."""
class ToolFailure(ToolResult):
"""A ToolResult that represents a failure."""
class ToolError(Exception):
"""Raised when a tool encounters an error."""
def __init__(self, message):
self.message = message

View File

@@ -1,34 +0,0 @@
"""Collection classes for managing multiple tools."""
from typing import Any
from anthropic.types.beta import BetaToolUnionParam
from .base import (
BaseAnthropicTool,
ToolError,
ToolFailure,
ToolResult,
)
class ToolCollection:
"""A collection of anthropic-defined tools."""
def __init__(self, *tools: BaseAnthropicTool):
self.tools = tools
self.tool_map = {tool.to_params()["name"]: tool for tool in tools}
def to_params(
self,
) -> list[BetaToolUnionParam]:
return [tool.to_params() for tool in self.tools]
async def run(self, *, name: str, tool_input: dict[str, Any]) -> ToolResult:
tool = self.tool_map.get(name)
if not tool:
return ToolFailure(error=f"Tool {name} is invalid")
try:
return await tool(**tool_input)
except ToolError as e:
return ToolFailure(error=e.message)

View File

@@ -1,324 +0,0 @@
import base64
import time
from enum import StrEnum
from typing import Literal, TypedDict
from PIL import Image
from anthropic.types.beta import BetaToolComputerUse20241022Param
from .base import BaseAnthropicTool, ToolError, ToolResult
from .screen_capture import get_screenshot
import requests
import re
OUTPUT_DIR = "./tmp/outputs"
TYPING_DELAY_MS = 12
TYPING_GROUP_SIZE = 50
Action = Literal[
"key",
"type",
"mouse_move",
"left_click",
"left_click_drag",
"right_click",
"middle_click",
"double_click",
"screenshot",
"cursor_position",
]
class Resolution(TypedDict):
width: int
height: int
MAX_SCALING_TARGETS: dict[str, Resolution] = {
"XGA": Resolution(width=1024, height=768), # 4:3
"WXGA": Resolution(width=1280, height=800), # 16:10
"FWXGA": Resolution(width=1366, height=768), # ~16:9
}
class ScalingSource(StrEnum):
COMPUTER = "computer"
API = "api"
class ComputerToolOptions(TypedDict):
display_height_px: int
display_width_px: int
display_number: int | None
def chunks(s: str, chunk_size: int) -> list[str]:
return [s[i : i + chunk_size] for i in range(0, len(s), chunk_size)]
class ComputerTool(BaseAnthropicTool):
"""
A tool that allows the agent to interact with the screen, keyboard, and mouse of the current computer.
Adapted for Windows using 'pyautogui'.
"""
name: Literal["computer"] = "computer"
api_type: Literal["computer_20241022"] = "computer_20241022"
width: int
height: int
display_num: int | None
_screenshot_delay = 2.0
_scaling_enabled = True
@property
def options(self) -> ComputerToolOptions:
width, height = self.scale_coordinates(
ScalingSource.COMPUTER, self.width, self.height
)
return {
"display_width_px": width,
"display_height_px": height,
"display_number": self.display_num,
}
def to_params(self) -> BetaToolComputerUse20241022Param:
return {"name": self.name, "type": self.api_type, **self.options}
def __init__(self, is_scaling: bool = False):
super().__init__()
# Get screen width and height using Windows command
self.display_num = None
self.offset_x = 0
self.offset_y = 0
self.is_scaling = is_scaling
self.width, self.height = self.get_screen_size()
print(f"screen size: {self.width}, {self.height}")
self.key_conversion = {"Page_Down": "pagedown",
"Page_Up": "pageup",
"Super_L": "win",
"Escape": "esc"}
async def __call__(
self,
*,
action: Action,
text: str | None = None,
coordinate: tuple[int, int] | None = None,
**kwargs,
):
print(f"action: {action}, text: {text}, coordinate: {coordinate}, is_scaling: {self.is_scaling}")
if action in ("mouse_move", "left_click_drag"):
if coordinate is None:
raise ToolError(f"coordinate is required for {action}")
if text is not None:
raise ToolError(f"text is not accepted for {action}")
if not isinstance(coordinate, (list, tuple)) or len(coordinate) != 2:
raise ToolError(f"{coordinate} must be a tuple of length 2")
# if not all(isinstance(i, int) and i >= 0 for i in coordinate):
if not all(isinstance(i, int) for i in coordinate):
raise ToolError(f"{coordinate} must be a tuple of non-negative ints")
if self.is_scaling:
x, y = self.scale_coordinates(
ScalingSource.API, coordinate[0], coordinate[1]
)
else:
x, y = coordinate
# print(f"scaled_coordinates: {x}, {y}")
# print(f"offset: {self.offset_x}, {self.offset_y}")
# x += self.offset_x # TODO - check if this is needed
# y += self.offset_y
print(f"mouse move to {x}, {y}")
if action == "mouse_move":
self.send_to_vm(f"pyautogui.moveTo({x}, {y})")
return ToolResult(output=f"Moved mouse to ({x}, {y})")
elif action == "left_click_drag":
current_x, current_y = self.send_to_vm("pyautogui.position()")
self.send_to_vm(f"pyautogui.dragTo({x}, {y}, duration=0.5)")
return ToolResult(output=f"Dragged mouse from ({current_x}, {current_y}) to ({x}, {y})")
if action in ("key", "type"):
if text is None:
raise ToolError(f"text is required for {action}")
if coordinate is not None:
raise ToolError(f"coordinate is not accepted for {action}")
if not isinstance(text, str):
raise ToolError(output=f"{text} must be a string")
if action == "key":
# Handle key combinations
keys = text.split('+')
for key in keys:
key = self.key_conversion.get(key.strip(), key.strip())
key = key.lower()
self.send_to_vm(f"pyautogui.keyDown('{key}')") # Press down each key
for key in reversed(keys):
key = self.key_conversion.get(key.strip(), key.strip())
key = key.lower()
self.send_to_vm(f"pyautogui.keyUp('{key}')") # Release each key in reverse order
return ToolResult(output=f"Pressed keys: {text}")
elif action == "type":
self.send_to_vm(f"pyautogui.typewrite('{text}', interval={TYPING_DELAY_MS / 1000})") # Convert ms to seconds
self.send_to_vm("pyautogui.press('enter')")
screenshot_base64 = (await self.screenshot()).base64_image
return ToolResult(output=text, base64_image=screenshot_base64)
if action in (
"left_click",
"right_click",
"double_click",
"middle_click",
"screenshot",
"cursor_position",
"left_press",
):
if text is not None:
raise ToolError(f"text is not accepted for {action}")
if coordinate is not None:
raise ToolError(f"coordinate is not accepted for {action}")
if action == "screenshot":
return await self.screenshot()
elif action == "cursor_position":
x, y = self.send_to_vm("pyautogui.position()")
x, y = self.scale_coordinates(ScalingSource.COMPUTER, x, y)
return ToolResult(output=f"X={x},Y={y}")
else:
if action == "left_click":
self.send_to_vm("pyautogui.click()")
elif action == "right_click":
self.send_to_vm("pyautogui.rightClick()")
elif action == "middle_click":
self.send_to_vm("pyautogui.middleClick()")
elif action == "double_click":
self.send_to_vm("pyautogui.doubleClick()")
elif action == "left_press":
self.send_to_vm("pyautogui.mouseDown()")
time.sleep(1)
self.send_to_vm("pyautogui.mouseUp()")
return ToolResult(output=f"Performed {action}")
if action in ("scroll_up", "scroll_down"):
if action == "scroll_up":
self.send_to_vm("pyautogui.scroll(100)")
elif action == "scroll_down":
self.send_to_vm("pyautogui.scroll(-100)")
return ToolResult(output=f"Performed {action}")
raise ToolError(f"Invalid action: {action}")
def send_to_vm(self, action: str):
"""
Executes a python command on the server. Only return tuple of x,y when action is "pyautogui.position()"
"""
prefix = "import pyautogui; pyautogui.FAILSAFE = False;"
command_list = ["python", "-c", f"{prefix} {action}"]
parse = action == "pyautogui.position()"
if parse:
command_list[-1] = f"{prefix} print({action})"
try:
print(f"sending to vm: {command_list}")
response = requests.post(
f"http://localhost:5000/execute",
headers={'Content-Type': 'application/json'},
json={"command": command_list},
timeout=90
)
time.sleep(0.7) # avoid async error as actions take time to complete
print(f"action executed")
if response.status_code != 200:
raise ToolError(f"Failed to execute command. Status code: {response.status_code}")
if parse:
output = response.json()['output'].strip()
match = re.search(r'Point\(x=(\d+),\s*y=(\d+)\)', output)
if not match:
raise ToolError(f"Could not parse coordinates from output: {output}")
x, y = map(int, match.groups())
return x, y
except requests.exceptions.RequestException as e:
raise ToolError(f"An error occurred while trying to execute the command: {str(e)}")
async def screenshot(self):
if not hasattr(self, 'target_dimension'):
screenshot = self.padding_image(screenshot)
self.target_dimension = MAX_SCALING_TARGETS["WXGA"]
width, height = self.target_dimension["width"], self.target_dimension["height"]
screenshot, path = get_screenshot(resize=True, target_width=width, target_height=height)
time.sleep(0.7) # avoid async error as actions take time to complete
# return ToolResult()
return ToolResult(base64_image=base64.b64encode(path.read_bytes()).decode())
raise ToolError(f"Failed to take screenshot: {path} does not exist.")
def padding_image(self, screenshot):
"""Pad the screenshot to 16:10 aspect ratio, when the aspect ratio is not 16:10."""
_, height = screenshot.size
new_width = height * 16 // 10
padding_image = Image.new("RGB", (new_width, height), (255, 255, 255))
# padding to top left
padding_image.paste(screenshot, (0, 0))
return padding_image
def scale_coordinates(self, source: ScalingSource, x: int, y: int):
"""Scale coordinates to a target maximum resolution."""
if not self._scaling_enabled:
return x, y
ratio = self.width / self.height
target_dimension = None
for target_name, dimension in MAX_SCALING_TARGETS.items():
# allow some error in the aspect ratio - not ratios are exactly 16:9
if abs(dimension["width"] / dimension["height"] - ratio) < 0.02:
if dimension["width"] < self.width:
target_dimension = dimension
self.target_dimension = target_dimension
# print(f"target_dimension: {target_dimension}")
break
if target_dimension is None:
# TODO: currently we force the target to be WXGA (16:10), when it cannot find a match
target_dimension = MAX_SCALING_TARGETS["WXGA"]
self.target_dimension = MAX_SCALING_TARGETS["WXGA"]
# should be less than 1
x_scaling_factor = target_dimension["width"] / self.width
y_scaling_factor = target_dimension["height"] / self.height
if source == ScalingSource.API:
if x > self.width or y > self.height:
raise ToolError(f"Coordinates {x}, {y} are out of bounds")
# scale up
return round(x / x_scaling_factor), round(y / y_scaling_factor)
# scale down
return round(x * x_scaling_factor), round(y * y_scaling_factor)
def get_screen_size(self):
"""Return width and height of the screen"""
try:
response = requests.post(
f"http://localhost:5000/execute",
headers={'Content-Type': 'application/json'},
json={"command": ["python", "-c", "import pyautogui; print(pyautogui.size())"]},
timeout=90
)
if response.status_code != 200:
raise ToolError(f"Failed to get screen size. Status code: {response.status_code}")
output = response.json()['output'].strip()
match = re.search(r'Size\(width=(\d+),\s*height=(\d+)\)', output)
if not match:
raise ToolError(f"Could not parse screen size from output: {output}")
width, height = map(int, match.groups())
return width, height
except requests.exceptions.RequestException as e:
raise ToolError(f"An error occurred while trying to get screen size: {str(e)}")

View File

@@ -1,29 +0,0 @@
from pathlib import Path
from uuid import uuid4
import requests
from PIL import Image
from .base import BaseAnthropicTool, ToolError
from io import BytesIO
OUTPUT_DIR = "./tmp/outputs"
def get_screenshot(resize: bool = False, target_width: int = 1920, target_height: int = 1080):
"""Capture screenshot by requesting from HTTP endpoint - returns native resolution unless resized"""
output_dir = Path(OUTPUT_DIR)
output_dir.mkdir(parents=True, exist_ok=True)
path = output_dir / f"screenshot_{uuid4().hex}.png"
try:
response = requests.get('http://localhost:5000/screenshot')
if response.status_code != 200:
raise ToolError(f"Failed to capture screenshot: HTTP {response.status_code}")
# (1280, 800)
screenshot = Image.open(BytesIO(response.content))
if resize and screenshot.size != (target_width, target_height):
screenshot = screenshot.resize((target_width, target_height))
screenshot.save(path)
return screenshot, path
except Exception as e:
raise ToolError(f"Failed to capture screenshot: {str(e)}")