Files
browser-use/browser_use/dom/service.py
Gregor Žunič 89c63fdd63 Added custom actions registry and fixed extraction layer (#20)
* Validator

* Test mind2web

* Cleaned up logger

* Pytest logger

* Cleaned up logger

* Disable flag for human input

* Multiple clicks per button

* Multiple clicks per button

* More structured system prompt

* Fields with description

* System prompt example

* One logger

* Cleaner logging

* Log step in step function

* Fix critical clicking error - wrong argument used

* Improved thought process of agent

* Improve system prompt

* Remove human input message

* Custome action registration

* Pydantic model for custom actions

* Pydantic model for custome output

* Runs through, model outputs functions, but not called yet

* Work in progress - description for custome actions

* Description works, but schema not yet

* Model can call the right action - but is not executed

* Seperate is_controller_action  and is_custom_action

* Works! Model can call custom function

* Use registry for action, but result is not feed back to model

* Include result in messages

* Works with custom function - but typing is not correct

* Renamed registry

* First test cases

* Captcha tests

* Pydantic for tests

* Improve prompts for multy step

* System prompt structure

* Handle errors like validation error

* Refactor error handling in agent

* Refactor error handling in agent

* Improved logging

* Update view

* Fix click parameter to index

* Simplify dynamic actions

* Use run instead of step

* Rename history

* Rename AgentService to Agent

* Rename ControllerService to Controller

* Pytest file

* Rename get state

* Rename BrowserService

* reversed dom extraction recursion to while

* Rename use_vision

* Rename use_vision

* reversed dom tree items and made browser less anoying

* Renaming and fixing type errors

* Renamed class names for agent

* updated requirements

* Update prompt

* Action registration works for user and controller

* Fix done call by returning ActionResult

* Fix if result is none

* Rename AgentOutput and ActionModel

* Improved prompt Passes 6/8 tests from test_agent_actions

* Calculate token cost

* Improve display

* Simplified logger

* Test function calling

* created super simple xpath extraction algo

* Tests logging

* tiny fixes to dom extraction

* Remove test

* Dont log number of clicks

* Pytest file

* merged per element js checks

* Check if driver is still open

* super fast processing

* fixed agent planning and stuff

* Fix example

* Fix example

* Improve error

* Improved error correction

* New line for step

* small type error fixes

* Test for pydantic

* Fix line

* Removed sample

* fixed readme and examples

---------

Co-authored-by: magmueller <mamagnus00@gmail.com>
2024-11-15 21:42:02 +01:00

429 lines
12 KiB
Python

import json
import logging
from typing import Optional
from bs4 import BeautifulSoup, NavigableString, PageElement, Tag
from selenium import webdriver
from selenium.webdriver.remote.webelement import WebElement
from browser_use.dom.views import (
BatchCheckResults,
DomContentItem,
ElementCheckResult,
ElementState,
ProcessedDomContent,
TextCheckResult,
TextState,
)
from browser_use.utils import time_execution_sync
logger = logging.getLogger(__name__)
class DomService:
def __init__(self, driver: webdriver.Chrome):
self.driver = driver
self.xpath_cache = {} # Add cache at instance level
def get_clickable_elements(self) -> ProcessedDomContent:
# Clear xpath cache on each new DOM processing
self.xpath_cache = {}
html_content = self.driver.page_source
return self._process_content(html_content)
@time_execution_sync('--_process_content')
def _process_content(self, html_content: str) -> ProcessedDomContent:
soup = BeautifulSoup(html_content, 'html.parser')
output_items: list[DomContentItem] = []
selector_map: dict[int, str] = {}
current_index = 0
# Collectors for batch processing with order tracking
interactive_elements: dict[str, tuple[Tag, int]] = {} # xpath -> (element, order)
text_nodes: dict[str, tuple[NavigableString, int]] = {} # xpath -> (text_node, order)
xpath_order_counter = 0 # Track order of appearance
dom_queue: list[tuple[PageElement, list, Optional[str]]] = (
[(element, [], None) for element in reversed(list(soup.body.children))]
if soup.body
else []
)
# First pass: collect all elements that need checking
while dom_queue:
element, path_indices, parent_xpath = dom_queue.pop()
if isinstance(element, Tag):
if not self._is_element_accepted(element):
element.decompose()
continue
siblings = (
list(element.parent.find_all(element.name, recursive=False))
if element.parent
else []
)
sibling_index = siblings.index(element) + 1 if siblings else 1
current_path = path_indices + [(element.name, sibling_index)]
element_xpath = '//' + '/'.join(f'{tag}[{idx}]' for tag, idx in current_path)
# Add children to queue with their path information
for child in reversed(list(element.children)):
dom_queue.append((child, current_path, element_xpath)) # Pass parent's xpath
# Collect interactive elements with their order
if (
self._is_interactive_element(element) or self._is_leaf_element(element)
) and self._is_active(element):
interactive_elements[element_xpath] = (element, xpath_order_counter)
xpath_order_counter += 1
elif isinstance(element, NavigableString) and element.strip():
if element.parent and element.parent not in [e[0] for e in dom_queue]:
if parent_xpath:
text_nodes[parent_xpath] = (element, xpath_order_counter)
xpath_order_counter += 1
# Batch check all elements
element_results = self._batch_check_elements(interactive_elements)
text_results = self._batch_check_texts(text_nodes)
# Create ordered results
ordered_results: list[
tuple[int, str, bool, str, int, bool]
] = [] # [(order, xpath, is_clickable, content, depth, is_text_only), ...]
# Process interactive elements
for xpath, (element, order) in interactive_elements.items():
if xpath in element_results.elements:
result = element_results.elements[xpath]
if result.isVisible and result.isTopElement:
text_content = self._extract_text_from_all_children(element)
tag_name = element.name
attributes = self._get_essential_attributes(element)
output_string = f"<{tag_name}{' ' + attributes if attributes else ''}>{text_content}</{tag_name}>"
depth = len(xpath.split('/')) - 2
ordered_results.append((order, xpath, True, output_string, depth, False))
# Process text nodes
for xpath, (text_node, order) in text_nodes.items():
if xpath in text_results.texts:
result = text_results.texts[xpath]
if result.isVisible:
text_content = self._cap_text_length(text_node.strip())
if text_content:
depth = len(xpath.split('/')) - 2
ordered_results.append((order, xpath, False, text_content, depth, True))
# Sort by original order
ordered_results.sort(key=lambda x: x[0])
# Build final output maintaining order
for i, (_, xpath, is_clickable, content, depth, is_text_only) in enumerate(ordered_results):
output_items.append(
DomContentItem(
index=i,
text=content,
# clickable=is_clickable,
depth=depth,
is_text_only=is_text_only,
)
)
# if is_clickable: # Only add clickable elements to selector map
# TODO: make this right, for now we add all elements (except text) to selector map
if not is_text_only:
selector_map[i] = xpath
return ProcessedDomContent(items=output_items, selector_map=selector_map)
def _batch_check_elements(self, elements: dict[str, tuple[Tag, int]]) -> BatchCheckResults:
"""Batch check all interactive elements at once."""
if not elements:
return BatchCheckResults(elements={}, texts={})
check_script = """
return (function() {
const results = {};
const elements = %s;
for (const [xpath, elementData] of Object.entries(elements)) {
const element = document.evaluate(xpath, document, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue;
if (!element) continue;
// Check visibility
const isVisible = element.checkVisibility({
checkOpacity: true,
checkVisibilityCSS: true
});
if (!isVisible) continue;
// Check if topmost
const rect = element.getBoundingClientRect();
const points = [
{x: rect.left + rect.width * 0.25, y: rect.top + rect.height * 0.25},
{x: rect.left + rect.width * 0.75, y: rect.top + rect.height * 0.25},
{x: rect.left + rect.width * 0.25, y: rect.top + rect.height * 0.75},
{x: rect.left + rect.width * 0.75, y: rect.top + rect.height * 0.75},
{x: rect.left + rect.width / 2, y: rect.top + rect.height / 2}
];
const isTopElement = points.some(point => {
const topEl = document.elementFromPoint(point.x, point.y);
let current = topEl;
while (current && current !== document.body) {
if (current === element) return true;
current = current.parentElement;
}
return false;
});
if (isTopElement) {
results[xpath] = {
xpath: xpath,
isVisible: true,
isTopElement: true
};
}
}
return results;
})();
""" % json.dumps({xpath: {} for xpath in elements.keys()})
try:
results = self.driver.execute_script(check_script)
return BatchCheckResults(
elements={xpath: ElementCheckResult(**data) for xpath, data in results.items()},
texts={},
)
except Exception as e:
logger.error('Error in batch element check: %s', e)
return BatchCheckResults(elements={}, texts={})
def _batch_check_texts(
self, texts: dict[str, tuple[NavigableString, int]]
) -> BatchCheckResults:
"""Batch check all text nodes at once."""
if not texts:
return BatchCheckResults(elements={}, texts={})
check_script = """
return (function() {
const results = {};
const texts = %s;
for (const [xpath, textData] of Object.entries(texts)) {
const parent = document.evaluate(xpath, document, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue;
if (!parent) continue;
try {
const range = document.createRange();
const textNode = parent.childNodes[textData.index];
range.selectNodeContents(textNode);
const rect = range.getBoundingClientRect();
const isVisible = (
rect.width !== 0 &&
rect.height !== 0 &&
rect.top >= 0 &&
rect.top <= window.innerHeight &&
parent.checkVisibility({
checkOpacity: true,
checkVisibilityCSS: true
})
);
if (isVisible) {
results[xpath] = {
xpath: xpath,
isVisible: true
};
}
} catch (e) {
continue;
}
}
return results;
})();
""" % json.dumps(
{
xpath: {'index': list(text_node[0].parent.children).index(text_node[0])}
for xpath, text_node in texts.items()
if text_node[0].parent
}
)
try:
results = self.driver.execute_script(check_script)
return BatchCheckResults(
elements={},
texts={xpath: TextCheckResult(**data) for xpath, data in results.items()},
)
except Exception as e:
logger.error('Error in batch text check: %s', e)
return BatchCheckResults(elements={}, texts={})
def _cap_text_length(self, text: str, max_length: int = 250) -> str:
if len(text) > max_length:
half_length = max_length // 2
return text[:half_length] + '...' + text[-half_length:]
return text
def _extract_text_from_all_children(self, element: Tag) -> str:
# Tell BeautifulSoup that button tags can contain content
# if not hasattr(element.parser, 'BUTTON_TAGS'):
# element.parser.BUTTON_TAGS = set()
text_content = ''
for child in element.descendants:
if isinstance(child, NavigableString):
current_child_text = child.strip()
else:
current_child_text = child.get_text(strip=True)
text_content += '\n' + current_child_text
return self._cap_text_length(text_content.strip()) or ''
def _is_interactive_element(self, element: Tag) -> bool:
"""Check if element is interactive based on tag name and attributes."""
interactive_elements = {
'a',
'button',
'details',
'embed',
'input',
'label',
'menu',
'menuitem',
'object',
'select',
'textarea',
'summary',
# 'dialog',
# 'div',
}
interactive_roles = {
'button',
'menu',
'menuitem',
'link',
'checkbox',
'radio',
'slider',
'tab',
'tabpanel',
'textbox',
'combobox',
'grid',
'listbox',
'option',
'progressbar',
'scrollbar',
'searchbox',
'switch',
'tree',
'treeitem',
'spinbutton',
'tooltip',
# 'dialog', # added
# 'alertdialog', # added
'menuitemcheckbox',
'menuitemradio',
}
return (
element.name in interactive_elements
or element.get('role') in interactive_roles
or element.get('aria-role') in interactive_roles
or element.get('tabindex') == '0'
)
def _is_leaf_element(self, element: Tag) -> bool:
"""Check if element is a leaf element."""
if not element.get_text(strip=True):
return False
if not list(element.children):
return True
# Check for simple text-only elements
children = list(element.children)
if len(children) == 1 and isinstance(children[0], str):
return True
return False
def _is_element_accepted(self, element: Tag) -> bool:
"""Check if element is accepted based on tag name and special cases."""
leaf_element_deny_list = {'svg', 'iframe', 'script', 'style', 'link', 'meta'}
# First check if it's in deny list
if element.name in leaf_element_deny_list:
return False
return element.name not in leaf_element_deny_list
def _get_essential_attributes(self, element: Tag) -> str:
"""
Collects essential attributes from an element.
Args:
element: The BeautifulSoup PageElement
Returns:
A string of formatted essential attributes
"""
essential_attributes = [
'id',
'class',
'href',
'src',
'readonly',
'disabled',
'checked',
'selected',
'role',
'type', # Important for inputs, buttons
'name', # Important for form elements
'value', # Current value of form elements
'placeholder', # Helpful for understanding input purpose
'title', # Additional descriptive text
'alt', # Alternative text for images
'for', # Important for label associations
'autocomplete', # Form field behavior
]
# Collect essential attributes that have values
attrs = []
for attr in essential_attributes:
if attr in element.attrs:
element_attr = element[attr]
if isinstance(element_attr, str):
element_attr = element_attr
elif isinstance(element_attr, (list, tuple)):
element_attr = ' '.join(str(v) for v in element_attr)
attrs.append(f'{attr}="{self._cap_text_length(element_attr, 25)}"')
state_attributes_prefixes = (
'aria-',
'data-',
)
# Collect data- attributes
for attr in element.attrs:
if attr.startswith(state_attributes_prefixes):
attrs.append(f'{attr}="{element[attr]}"')
return ' '.join(attrs)
def _is_active(self, element: Tag) -> bool:
"""Check if element is active (not disabled)."""
return not (
element.get('disabled') is not None
or element.get('hidden') is not None
or element.get('aria-disabled') == 'true'
)