add notification sounds

This commit is contained in:
Alex Cheema
2024-11-25 21:21:00 +04:00
parent 062f5e3e29
commit ca8d59a215

View File

@@ -9,6 +9,9 @@ from typing import List, Dict, Optional
from pathlib import Path
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import time
import simpleaudio as sa
from datetime import datetime
class AsyncCircleCIClient:
def __init__(self, token: str, project_slug: str):
@@ -104,6 +107,19 @@ class PackageSizeTracker:
self.setup_logging(debug)
self.client = AsyncCircleCIClient(token, project_slug)
self.logger = logging.getLogger("PackageSizeTracker")
self.last_data_hash = None
self.notification_sound_path = Path(__file__).parent / "notification.wav"
# Sound file paths - replace these with your actual sound files
sounds_dir = Path(__file__).parent / "sounds"
self.sounds = {
'lines_up': sounds_dir / "lines_increased.wav",
'lines_down': sounds_dir / "lines_decreased.wav",
'tokens_up': sounds_dir / "tokens_increased.wav",
'tokens_down': sounds_dir / "tokens_decreased.wav",
'size_up': sounds_dir / "size_increased.wav",
'size_down': sounds_dir / "size_decreased.wav"
}
def setup_logging(self, debug: bool):
level = logging.DEBUG if debug else logging.INFO
@@ -882,6 +898,104 @@ class PackageSizeTracker:
print("\n")
def _calculate_data_hash(self, data: List[Dict]) -> str:
"""Calculate a hash of the data to detect changes"""
return hash(str(sorted([
(d.get('commit_hash'), d.get('timestamp'))
for d in data
])))
def _play_sound(self, sound_key: str):
"""Play a specific notification sound"""
try:
sound_path = self.sounds.get(sound_key)
if sound_path and sound_path.exists():
wave_obj = sa.WaveObject.from_wave_file(str(sound_path))
wave_obj.play()
else:
self.logger.warning(f"Sound file not found: {sound_key} at {sound_path}")
except Exception as e:
self.logger.error(f"Failed to play sound {sound_key}: {e}")
def _check_metrics_changes(self, current_data: List[Dict], previous_data: List[Dict]):
"""Check for specific metric changes and play appropriate sounds"""
if not previous_data:
return
# Get latest data points
current = current_data[-1]
previous = previous_data[-1]
# Check line count changes
if 'total_lines' in current and 'total_lines' in previous:
diff = current['total_lines'] - previous['total_lines']
if diff > 0:
self.logger.info(f"Lines of code increased by {diff:,}")
self._play_sound('lines_up')
elif diff < 0:
self.logger.info(f"Lines of code decreased by {abs(diff):,}")
self._play_sound('lines_down')
# Check tokens per second changes
if 'tokens_per_second' in current and 'tokens_per_second' in previous:
diff = current['tokens_per_second'] - previous['tokens_per_second']
if diff > 0:
self.logger.info(f"Tokens per second increased by {diff:.2f}")
self._play_sound('tokens_up')
elif diff < 0:
self.logger.info(f"Tokens per second decreased by {abs(diff):.2f}")
self._play_sound('tokens_down')
# Check package size changes
if 'total_size_mb' in current and 'total_size_mb' in previous:
diff = current['total_size_mb'] - previous['total_size_mb']
if diff > 0:
self.logger.info(f"Package size increased by {diff:.2f}MB")
self._play_sound('size_up')
elif diff < 0:
self.logger.info(f"Package size decreased by {abs(diff):.2f}MB")
self._play_sound('size_down')
async def run_dashboard(self, update_interval: int = 30):
"""Run the dashboard with periodic updates"""
self.logger.info(f"Starting real-time dashboard with {update_interval}s updates")
previous_data = None
while True:
try:
start_time = time.time()
# Collect new data
current_data = await self.collect_data()
if not current_data:
self.logger.warning("No data collected")
continue
# Generate report
report_path = self.generate_report(current_data)
if report_path:
self.logger.info(
f"Dashboard updated at {datetime.now().strftime('%H:%M:%S')}"
)
# Check for metric changes and play appropriate sounds
self._check_metrics_changes(current_data, previous_data)
# Update previous data
previous_data = current_data
# Calculate sleep time
elapsed = time.time() - start_time
sleep_time = max(0, update_interval - elapsed)
await asyncio.sleep(sleep_time)
except Exception as e:
self.logger.error(f"Error in dashboard update loop: {e}")
if self.debug:
raise
await asyncio.sleep(update_interval)
async def main():
token = os.getenv("CIRCLECI_TOKEN")
project_slug = os.getenv("CIRCLECI_PROJECT_SLUG")
@@ -894,15 +1008,10 @@ async def main():
tracker = PackageSizeTracker(token, project_slug, debug)
try:
data = await tracker.collect_data()
if not data:
print("No data found!")
return
report_path = tracker.generate_report(data)
if report_path:
print(f"\nDetailed report available at: {report_path}")
# Run the real-time dashboard instead of one-off collection
await tracker.run_dashboard()
except KeyboardInterrupt:
print("\nDashboard stopped by user")
except Exception as e:
logging.error(f"Error: {str(e)}")
if debug: