mirror of
https://github.com/getzep/graphiti.git
synced 2024-09-08 19:13:11 +03:00
* chore: Add romeo runner * fix: Linter * wip * wip dump * chore: Update romeo parser * chore: Anthropic model fix * wip * allbirds * allbirds runner * format * wip * wip * mypy updates * update * remove r * update tests * format * wip * chore: Strategically update the message * rebase and fix import issues * Update package imports for graphiti_core in examples and utils * nits * chore: Update OpenAI GPT-4o model to gpt-4o-2024-08-06 * implement groq * improvments & linting * cleanup and nits * Refactor package imports for graphiti_core in examples and utils * Refactor package imports for graphiti_core in examples and utils * implement diskcache * remove debug stuff * log cache hit when debugging only * Improve LLM config. Fix bugs (#41) Refactor LLMConfig class to allow None values for model and base_url * chore: Resolve mc --------- Co-authored-by: paulpaliychuk <pavlo.paliychuk.ca@gmail.com> Co-authored-by: prestonrasmussen <prasmuss15@gmail.com>
54 lines
1.6 KiB
Python
54 lines
1.6 KiB
Python
import os
|
|
import re
|
|
|
|
|
|
def sanitize_text(text):
|
|
# Remove special characters and extra whitespace
|
|
sanitized = re.sub(r'[^a-zA-Z0-9\s]', '', text)
|
|
sanitized = ' '.join(sanitized.split())
|
|
return sanitized
|
|
|
|
|
|
def parse_script(filename):
|
|
current_speaker = None
|
|
current_speech = []
|
|
messages = []
|
|
|
|
with open(filename, encoding='utf-8') as file:
|
|
for line in file:
|
|
line = line.strip()
|
|
|
|
# Check if this line is a new speaker
|
|
if (
|
|
line
|
|
and line.isupper()
|
|
and not line.startswith('ACT')
|
|
and not line.startswith('SCENE')
|
|
):
|
|
# If we have a current speaker, save their message
|
|
if current_speaker:
|
|
sanitized_speech = sanitize_text(' '.join(current_speech))
|
|
messages.append((sanitize_text(current_speaker), sanitized_speech))
|
|
|
|
# Start a new speech
|
|
current_speaker = line
|
|
current_speech = []
|
|
elif line and not line.startswith('[') and current_speaker:
|
|
# Add this line to the current speech
|
|
current_speech.append(line)
|
|
|
|
# Add the last speech
|
|
if current_speaker:
|
|
sanitized_speech = sanitize_text(' '.join(current_speech))
|
|
messages.append((sanitize_text(current_speaker), sanitized_speech))
|
|
|
|
return messages
|
|
|
|
|
|
def get_hamilton_messages():
|
|
file_path = 'hamilton.txt'
|
|
script_dir = os.path.dirname(__file__)
|
|
relative_path = os.path.join(script_dir, file_path)
|
|
# Use the function
|
|
return parse_script(relative_path)
|