merge util files and remove duplicated library import
This commit is contained in:
@@ -1,25 +0,0 @@
|
||||
import os
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Dict, Optional
|
||||
|
||||
def get_latest_files(directory: str, file_types: list = ['.webm', '.zip']) -> Dict[str, Optional[str]]:
|
||||
"""Get the latest recording and trace files"""
|
||||
latest_files: Dict[str, Optional[str]] = {ext: None for ext in file_types}
|
||||
|
||||
if not os.path.exists(directory):
|
||||
os.makedirs(directory, exist_ok=True)
|
||||
return latest_files
|
||||
|
||||
for file_type in file_types:
|
||||
try:
|
||||
matches = list(Path(directory).rglob(f"*{file_type}"))
|
||||
if matches:
|
||||
latest = max(matches, key=lambda p: p.stat().st_mtime)
|
||||
# Only return files that are complete (not being written)
|
||||
if time.time() - latest.stat().st_mtime > 1.0:
|
||||
latest_files[file_type] = str(latest)
|
||||
except Exception as e:
|
||||
print(f"Error getting latest {file_type} file: {e}")
|
||||
|
||||
return latest_files
|
||||
@@ -1,48 +0,0 @@
|
||||
import base64
|
||||
import asyncio
|
||||
from typing import AsyncGenerator
|
||||
from playwright.async_api import BrowserContext, Error as PlaywrightError
|
||||
|
||||
async def capture_screenshot(browser_context) -> str:
|
||||
"""Capture and encode a screenshot"""
|
||||
try:
|
||||
# Get the implementation context - handle both direct Playwright context and wrapped context
|
||||
context = browser_context
|
||||
if hasattr(browser_context, 'context'):
|
||||
context = browser_context.context
|
||||
|
||||
if not context:
|
||||
return "<div>No browser context available</div>"
|
||||
|
||||
# Get all pages
|
||||
pages = context.pages
|
||||
if not pages:
|
||||
return "<div>Waiting for page to be available...</div>"
|
||||
|
||||
# Use the first non-blank page or fallback to first page
|
||||
active_page = None
|
||||
for page in pages:
|
||||
if page.url != 'about:blank':
|
||||
active_page = page
|
||||
break
|
||||
|
||||
if not active_page and pages:
|
||||
active_page = pages[0]
|
||||
|
||||
if not active_page:
|
||||
return "<div>No active page available</div>"
|
||||
|
||||
# Take screenshot
|
||||
try:
|
||||
screenshot = await active_page.screenshot(
|
||||
type='jpeg',
|
||||
quality=75,
|
||||
scale="css"
|
||||
)
|
||||
encoded = base64.b64encode(screenshot).decode('utf-8')
|
||||
return f'<img src="data:image/jpeg;base64,{encoded}" style="width:100%; max-width:1200px; border:1px solid #ccc;">'
|
||||
except Exception as e:
|
||||
return f"<div class='error'>Screenshot failed: {str(e)}</div>"
|
||||
|
||||
except Exception as e:
|
||||
return f"<div class='error'>Screenshot error: {str(e)}</div>"
|
||||
@@ -4,9 +4,11 @@
|
||||
# @Email : wenshaoguo1026@gmail.com
|
||||
# @Project : browser-use-webui
|
||||
# @FileName: utils.py
|
||||
|
||||
import base64
|
||||
import os
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Dict, Optional
|
||||
|
||||
from langchain_anthropic import ChatAnthropic
|
||||
from langchain_google_genai import ChatGoogleGenerativeAI
|
||||
@@ -140,3 +142,69 @@ def encode_image(img_path):
|
||||
with open(img_path, "rb") as fin:
|
||||
image_data = base64.b64encode(fin.read()).decode("utf-8")
|
||||
return image_data
|
||||
|
||||
|
||||
def get_latest_files(directory: str, file_types: list = ['.webm', '.zip']) -> Dict[str, Optional[str]]:
|
||||
"""Get the latest recording and trace files"""
|
||||
latest_files: Dict[str, Optional[str]] = {ext: None for ext in file_types}
|
||||
|
||||
if not os.path.exists(directory):
|
||||
os.makedirs(directory, exist_ok=True)
|
||||
return latest_files
|
||||
|
||||
for file_type in file_types:
|
||||
try:
|
||||
matches = list(Path(directory).rglob(f"*{file_type}"))
|
||||
if matches:
|
||||
latest = max(matches, key=lambda p: p.stat().st_mtime)
|
||||
# Only return files that are complete (not being written)
|
||||
if time.time() - latest.stat().st_mtime > 1.0:
|
||||
latest_files[file_type] = str(latest)
|
||||
except Exception as e:
|
||||
print(f"Error getting latest {file_type} file: {e}")
|
||||
|
||||
return latest_files
|
||||
|
||||
async def capture_screenshot(browser_context) -> str:
|
||||
"""Capture and encode a screenshot"""
|
||||
try:
|
||||
# Get the implementation context - handle both direct Playwright context and wrapped context
|
||||
context = browser_context
|
||||
if hasattr(browser_context, 'context'):
|
||||
context = browser_context.context
|
||||
|
||||
if not context:
|
||||
return "<div>No browser context available</div>"
|
||||
|
||||
# Get all pages
|
||||
pages = context.pages
|
||||
if not pages:
|
||||
return "<div>Waiting for page to be available...</div>"
|
||||
|
||||
# Use the first non-blank page or fallback to first page
|
||||
active_page = None
|
||||
for page in pages:
|
||||
if page.url != 'about:blank':
|
||||
active_page = page
|
||||
break
|
||||
|
||||
if not active_page and pages:
|
||||
active_page = pages[0]
|
||||
|
||||
if not active_page:
|
||||
return "<div>No active page available</div>"
|
||||
|
||||
# Take screenshot
|
||||
try:
|
||||
screenshot = await active_page.screenshot(
|
||||
type='jpeg',
|
||||
quality=75,
|
||||
scale="css"
|
||||
)
|
||||
encoded = base64.b64encode(screenshot).decode('utf-8')
|
||||
return f'<img src="data:image/jpeg;base64,{encoded}" style="width:100%; max-width:1200px; border:1px solid #ccc;">'
|
||||
except Exception as e:
|
||||
return f"<div class='error'>Screenshot failed: {str(e)}</div>"
|
||||
|
||||
except Exception as e:
|
||||
return f"<div class='error'>Screenshot error: {str(e)}</div>"
|
||||
31
webui.py
31
webui.py
@@ -5,43 +5,36 @@
|
||||
# @Project : browser-use-webui
|
||||
# @FileName: webui.py
|
||||
|
||||
import pdb
|
||||
import os
|
||||
import glob
|
||||
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv()
|
||||
import asyncio
|
||||
import argparse
|
||||
import gradio as gr
|
||||
import os
|
||||
import asyncio
|
||||
|
||||
from browser_use.agent.service import Agent
|
||||
from playwright.async_api import async_playwright
|
||||
from browser_use.browser.browser import Browser, BrowserConfig
|
||||
from browser_use.browser.context import (
|
||||
BrowserContextConfig,
|
||||
BrowserContextWindowSize,
|
||||
)
|
||||
from browser_use.agent.service import Agent
|
||||
from src.browser.custom_browser import CustomBrowser
|
||||
from src.controller.custom_controller import CustomController
|
||||
|
||||
from src.utils import utils
|
||||
from src.agent.custom_agent import CustomAgent
|
||||
from src.agent.custom_prompts import CustomSystemPrompt
|
||||
from src.browser.custom_browser import CustomBrowser
|
||||
from src.agent.custom_prompts import CustomSystemPrompt
|
||||
from src.browser.config import BrowserPersistenceConfig
|
||||
from src.browser.custom_context import BrowserContextConfig
|
||||
from src.controller.custom_controller import CustomController
|
||||
from src.utils import utils
|
||||
from src.utils.utils import update_model_dropdown
|
||||
from src.browser.config import BrowserPersistenceConfig
|
||||
from src.browser.custom_browser import CustomBrowser
|
||||
from browser_use.browser.browser import BrowserConfig
|
||||
from browser_use.browser.context import BrowserContextConfig, BrowserContextWindowSize
|
||||
from src.utils.utils import update_model_dropdown, get_latest_files, capture_screenshot
|
||||
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv()
|
||||
|
||||
# Global variables for persistence
|
||||
_global_browser = None
|
||||
_global_browser_context = None
|
||||
_global_playwright = None
|
||||
from src.utils.file_utils import get_latest_files
|
||||
from src.utils.stream_utils import capture_screenshot
|
||||
|
||||
|
||||
async def run_browser_agent(
|
||||
agent_type,
|
||||
|
||||
Reference in New Issue
Block a user