feat: refactor ui code & add tests for caching (#9)

Refactoring UI Code (moving out of main.py)
Executing tests as part of CI
Adding a new test for caching
This commit is contained in:
Aymen
2023-12-25 09:03:01 +01:00
committed by GitHub
parent fd147e7d22
commit 66d5667f99
7 changed files with 276 additions and 139 deletions

28
.github/workflows/python-ci.yml vendored Normal file
View File

@@ -0,0 +1,28 @@
name: Python CI
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: 3.x
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Run tests
run: |
python -m unittest discover -s tests

2
.gitignore vendored
View File

@@ -47,3 +47,5 @@ coverage.xml
*.cover
*.py,cover
.h
*.db

View File

@@ -1,49 +1,73 @@
import sqlite3
import hashlib
import json
import sqlite3
import functools
## retrieved from https://www.kevinkatz.io/posts/memoize-to-sqlite
## Originally from https://www.kevinkatz.io/posts/memoize-to-sqlite
def memoize_to_sqlite(func_name:str, filename: str = "cache.db"):
def memoize_to_sqlite(func_name: str, filename: str = "cache.db"):
"""
Memoization decorator that caches the output of a method in a SQLite
database.
"""
db_conn = sqlite3.connect(filename)
print("opening database")
db_conn.execute(
"CREATE TABLE IF NOT EXISTS cache (hash TEXT PRIMARY KEY, result TEXT)"
)
db_conn.execute(
"CREATE INDEX IF NOT EXISTS cache_ndx on cache(hash)"
)
def memoize(func):
def decorator(func):
@functools.wraps(func)
def wrapped(*args, **kwargs):
# Compute the hash of the <function name>:<argument>
xs = f"{func_name}:{repr(tuple(args[1:]))}:{repr(kwargs)}".encode("utf-8")
arg_hash = hashlib.sha256(xs).hexdigest()
# Check if the result is already cached
cursor = db_conn.cursor()
cursor.execute(
"SELECT result FROM cache WHERE hash = ?", (arg_hash,)
)
row = cursor.fetchone()
if row is not None:
return json.loads(row[0])
# Compute the result and cache it
result = func(*args, **kwargs)
if func_name == "chat_completion":
print(result)
cursor.execute(
"INSERT INTO cache (hash, result) VALUES (?, ?)",
(arg_hash, json.dumps(result))
)
db_conn.commit()
return result
with SQLiteMemoization(filename) as memoizer:
return memoizer.fetch_or_compute(func, func_name, *args, **kwargs)
return wrapped
return memoize
return decorator
class SQLiteMemoization:
def __init__(self, filename):
self.filename = filename
self.connection = None
def __enter__(self):
self.connection = sqlite3.connect(self.filename)
self._initialize_database()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.connection.close()
self.connection = None
def _initialize_database(self):
self.connection.execute(
"CREATE TABLE IF NOT EXISTS cache (hash TEXT PRIMARY KEY, result TEXT)"
)
self.connection.execute(
"CREATE INDEX IF NOT EXISTS cache_ndx ON cache(hash)"
)
def fetch_or_compute(self, func, func_name, *args, **kwargs):
arg_hash = self._compute_hash(func_name, *args, **kwargs)
result = self._fetch_from_cache(arg_hash)
if result is not None:
return result
return self._compute_and_cache_result(func, arg_hash, *args, **kwargs)
def _compute_hash(self, func_name, *args, **kwargs):
data = f"{func_name}:{repr(args)}:{repr(kwargs)}".encode("utf-8")
return hashlib.sha256(data).hexdigest()
def _fetch_from_cache(self, arg_hash):
cursor = self.connection.cursor()
cursor.execute("SELECT result FROM cache WHERE hash = ?", (arg_hash,))
row = cursor.fetchone()
return json.loads(row[0]) if row else None
def _compute_and_cache_result(self, func, arg_hash, *args, **kwargs):
result = func(*args, **kwargs)
self._cache_result(arg_hash, result)
return result
def _cache_result(self, arg_hash, result):
cursor = self.connection.cursor()
cursor.execute(
"INSERT INTO cache (hash, result) VALUES (?, ?)",
(arg_hash, json.dumps(result))
)
self.connection.commit()

121
main.py
View File

@@ -1,120 +1,67 @@
import os
import time
import threading
from agents.microagent import MicroAgent
import time
from dotenv import load_dotenv
from colorama import Fore, Style
from agents.microagent_manager import MicroAgentManager
from utils.utility import get_env_variable, time_function
from prompt_management.prompts import USER_INPUTS, USER_INPUTS_SINGLE
from colorama import Fore, Style
from terminaltables import AsciiTable
from itertools import cycle
from utils.ui import clear_console, display_agent_info, display_agent_info, print_final_output, format_text
def clear_console():
"""Clears the console screen."""
os.system('cls' if os.name == 'nt' else 'clear')
# Constants
QUESTION_SET = [
"What is 5+9?",
"What is the population of Thailand?",
"What is the population of Sweden?",
"What is the population of Sweden and Thailand combined?"
]
def display_agent_info(manager, stop_event, outputs):
def initialize_manager(api_key):
"""
Continuously displays comprehensive information about the agents.
Initialize and return the MicroAgentManager with the given API key.
"""
animation = cycle(['🌑', '🌒', '🌓', '🌔', '🌕', '🌖', '🌗', '🌘'])
while not stop_event.is_set():
clear_console()
header = [
"👤 Agent",
"🔁 Evolve Count",
"💻 Code Executions",
"👥 Active Agents",
"📈 Usage Count",
"🌟 Depth",
"Working?",
"📝 Last Input",
"🚦 Status"
]
agents_data = [header]
agents = manager.get_agents()
for agent in agents:
active_agents = ", ".join(f"{k}->{v}" for k, v in agent.active_agents.items())
agents_data.append([
agent.purpose,
agent.evolve_count,
agent.number_of_code_executions,
active_agents,
agent.usage_count,
agent.depth,
"" if agent.working_agent else "",
agent.last_input,
agent.current_status
])
table = AsciiTable(agents_data)
print(Fore.CYAN + "🤖 \033[1m Agents Status:\033[0m \n" + Style.RESET_ALL)
print(table.table)
for output in outputs:
print(output)
print(f"\nAgents are running.. {next(animation)}\n", end='\r') # '\r' returns the cursor to the start of the line
time.sleep(1)
manager = MicroAgentManager(api_key)
manager.create_agents()
return manager
@time_function
def process_user_input(manager, user_input, outputs):
def process_user_input(manager, user_input):
"""
Processes a single user input and generates a response.
"""
agent = manager.get_or_create_agent("Bootstrap Agent", depth=1, sample_input=user_input)
return agent.respond(user_input)
def process_questions(manager, outputs):
"""
Process each question in the QUESTION_SET and append outputs.
"""
for question_number, user_input in enumerate(QUESTION_SET, start=1):
response = process_user_input(manager, user_input)
output_text = format_text(question_number, user_input, response)
outputs.append(output_text)
def main():
api_key = get_env_variable("OPENAI_KEY", raise_error=False)
load_dotenv()
api_key = get_env_variable("OPENAI_KEY")
if not api_key:
print(Fore.RED + "🚫 Error: OPENAI_KEY environment variable is not set." + Style.RESET_ALL)
print(f"{Fore.RED}🚫 Error: OPENAI_KEY environment variable is not set.{Style.RESET_ALL}")
return
manager = MicroAgentManager(api_key)
manager.create_agents()
manager = initialize_manager(api_key)
outputs = []
stop_event = threading.Event()
display_thread = threading.Thread(target=display_agent_info, args=(manager, stop_event, outputs))
display_thread.start()
question_number = 1
try:
for user_input in USER_INPUTS:
response = process_user_input(manager, user_input, outputs)
output_text = Fore.YELLOW + "\n\n🔍 Question " + str(question_number) +": "+ Style.RESET_ALL + f" {user_input}\n" + Fore.GREEN + "💡 Response:" + Style.RESET_ALL + f" {response}"
outputs += [output_text]
question_number += 1
try:
process_questions(manager, outputs)
finally:
time.sleep(5)
stop_event.set()
clear_console()
for output in outputs:
print(output)
for agent in manager.get_agents():
print("📊 Stats for " + agent.purpose + ":")
print("🔁 Evolve Count: " + str(agent.evolve_count))
print("💻 Code Executions: " + str(agent.number_of_code_executions))
print("👥 Active Agents: " + str(agent.active_agents))
print("📈 Usage Count: " + str(agent.usage_count))
print("🏔️ Max Depth: " + str(agent.max_depth))
print("🌟 Depth: " + str(agent.depth))
print("🛠️ Working Agent: " + str(agent.working_agent))
print("📝 Last Input: " + str(agent.last_input))
print("🚦 Status: " + str(agent.current_status))
print(Fore.MAGENTA + f"\nPrompt for {agent.purpose}:" + Style.RESET_ALL)
print(Fore.LIGHTMAGENTA_EX + agent.dynamic_prompt + "\n" + Style.RESET_ALL)
print_final_output(outputs, manager)
display_thread.join()
display_agent_info(manager, stop_event, outputs)
def microagent_factory(initial_prompt, purpose, api_key, depth, max_depth, bootstrap_agent):
return MicroAgent(initial_prompt, purpose, api_key, depth, max_depth, bootstrap_agent)
if __name__ == "__main__":
from dotenv import load_dotenv
load_dotenv()
main()

View File

@@ -76,16 +76,4 @@ STATIC_PRE_PROMPT = (
"NEVER do any calculations yourself. Always write python code if you need to do calculations. (Even for simple calculations like 3+6) "
"Write code to solve the task. You can only use the following frameworks: numpy, requests, pandas, requests, beautifulsoup4, matplotlib, seaborn, sqlalchemy, pymysql, scipy, scikit-learn, statsmodels, click, python-dotenv, virtualenv, scrapy, oauthlib, tweepy, datetime, openpyxl, xlrd, loguru, pytest, paramiko, cryptography, lxml" +
AGENT_PART
)
USER_INPUTS_SINGLE = [
"What is the population of Thailand?",
"What is 5+9?",
]
USER_INPUTS = [
"What is 5+9?",
"What is the population of Thailand?",
"What is the population of Sweden?",
"What is the population of Sweden and Thailand combined?"
]
)

View File

@@ -0,0 +1,55 @@
import unittest
from unittest.mock import MagicMock
from integrations.memoize import SQLiteMemoization, memoize_to_sqlite
import uuid
import os
class TestSQLiteMemoization(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.db_file = 'test.db'
cls.memoizer = SQLiteMemoization(cls.db_file)
cls.memoizer.__enter__()
@classmethod
def tearDownClass(cls):
cls.memoizer.__exit__(None, None, None)
os.remove(cls.db_file)
def test_initialization_creates_cache_table(self):
cursor = self.memoizer.connection.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='cache'")
self.assertIsNotNone(cursor.fetchone(), "Cache table should be created during initialization")
def test_hash_computation_returns_string(self):
test_hash = self.memoizer._compute_hash('test_func', 1, 2, 3, key='value')
self.assertIsInstance(test_hash, str, "Hash computation should return a string")
def test_caching_mechanism_stores_and_retrieves_data_correctly(self):
test_hash = self.memoizer._compute_hash('test_func', 1, 2, 3)
self.memoizer._cache_result(test_hash, "test_result")
cached_result = self.memoizer._fetch_from_cache(test_hash)
self.assertEqual(cached_result, "test_result", "Caching mechanism should store and retrieve data correctly")
def test_memoization_decorator_caches_function_output(self):
return_pong = MagicMock(return_value="pong")
unique_arg = uuid.uuid4().hex
@memoize_to_sqlite('test_func', self.db_file)
def ping(arg):
return return_pong(arg)
self.assertEqual(ping(unique_arg), "pong")
return_pong.assert_called_once()
return_pong.reset_mock()
self.assertEqual(ping(unique_arg), "pong")
return_pong.assert_not_called()
def test_resource_management_closes_connection(self):
with SQLiteMemoization(self.db_file) as memoizer:
self.assertIsNotNone(memoizer.connection, "Connection should be established")
self.assertIsNone(memoizer.connection, "Connection should be closed after context manager exit")
if __name__ == '__main__':
unittest.main()

93
utils/ui.py Normal file
View File

@@ -0,0 +1,93 @@
import os
from colorama import Fore, Style
from terminaltables import AsciiTable
from itertools import cycle
import time
def clear_console():
"""Clears the console screen."""
os.system('cls' if os.name == 'nt' else 'clear')
def display_agent_info(manager, stop_event, outputs):
"""
Continuously displays comprehensive information about the agents.
"""
animation = cycle(['🌑', '🌒', '🌓', '🌔', '🌕', '🌖', '🌗', '🌘'])
while not stop_event.is_set():
clear_console()
header = [
"👤 Agent",
"🔁 Evolve Count",
"💻 Code Executions",
"👥 Active Agents",
"📈 Usage Count",
"🌟 Depth",
"Working?",
"📝 Last Input",
"🚦 Status"
]
agents_data = [header]
agents = manager.get_agents()
for agent in agents:
active_agents = ", ".join(f"{k}->{v}" for k, v in agent.active_agents.items())
agents_data.append([
agent.purpose,
agent.evolve_count,
agent.number_of_code_executions,
active_agents,
agent.usage_count,
agent.depth,
"" if agent.working_agent else "",
agent.last_input,
agent.current_status
])
table = AsciiTable(agents_data)
print(Fore.CYAN + "🤖 \033[1m Agents Status:\033[0m \n" + Style.RESET_ALL)
print(table.table)
for output in outputs:
print(output)
print(f"\nAgents are running.. {next(animation)}\n", end='\r') # '\r' returns the cursor to the start of the line
time.sleep(1)
def print_final_output(outputs, manager):
"""
Print final outputs and agent statistics.
"""
clear_console()
for output in outputs:
print(output)
for agent in manager.get_agents():
print_agent_statistics(agent)
def print_agent_statistics(agent):
"""
Print statistics for a given agent.
"""
print(f"📊 Stats for {agent.purpose}:")
stats = [
f"🔁 Evolve Count: {agent.evolve_count}",
f"💻 Code Executions: {agent.number_of_code_executions}",
f"👥 Active Agents: {agent.active_agents}",
f"📈 Usage Count: {agent.usage_count}",
f"🏔️ Max Depth: {agent.max_depth}",
f"🌟 Depth: {agent.depth}",
f"🛠️ Working Agent: {agent.working_agent}",
f"📝 Last Input: {agent.last_input}",
f"🚦 Status: {agent.current_status}",
f"{Fore.MAGENTA}\nPrompt for {agent.purpose}:{Style.RESET_ALL}",
f"{Fore.LIGHTMAGENTA_EX}{agent.dynamic_prompt}\n{Style.RESET_ALL}"
]
print('\n'.join(stats))
def format_text(question_number, user_input, response):
"""
Formats the text with color and style.
"""
formatted_text = f"{Fore.YELLOW}\n\n🔍 Question {question_number}: {Style.RESET_ALL} {user_input}\n{Fore.GREEN}💡 Response:{Style.RESET_ALL} {response}"
return formatted_text