feat: add token tracking and ASCII art interface

Add comprehensive token usage tracking and beautiful ASCII art interface
to enhance user experience and provide cost transparency.

- Add TokenTracker class for real-time token usage monitoring
- Implement ASCII art banner with pyfiglet integration
- Add cost estimation based on OpenAI pricing models
- Display real-time token usage after each AI operation
- Add 'tokens' command in interactive mode for usage statistics
- Show session summary with detailed analytics at session end

- Integrate token tracking into OpenAIProvider with optional TokenTracker
- Add pyfiglet and tiktoken dependencies to pyproject.toml
- Create ascii_art.py module for banner display functionality
- Update main.py to initialize and manage token tracking sessions
- Add session management with unique session IDs
- Implement detailed usage tables with rich formatting

- Update README.md with token tracking features and examples
- Add ASCII art banner to welcome screen documentation
- Include cost transparency and usage analytics sections
- Update version to 0.2.5 in README examples
- Remove Windows from supported OS list in bug report template

- Add comprehensive unit tests for TokenTracker functionality
- Add unit tests for ASCII art display features
- Ensure proper error handling and edge cases covered

- Real-time cost estimates displayed after each operation
- Beautiful ASCII art welcome banner that adapts to terminal width
- Session-based token tracking with detailed summaries
- Interactive 'tokens' command for on-demand usage statistics
- Rich formatted tables for better readability

Breaking Changes: None
Dependencies Added: pyfiglet>=1.0.0, tiktoken>=0.5.0
This commit is contained in:
d-k-patel
2025-08-22 11:30:11 +05:30
parent 48b2d8482e
commit 5f54f63b13
10 changed files with 1233 additions and 8 deletions

View File

@@ -15,7 +15,7 @@ A clear and concise description of what the bug is.
3. Expected behavior: `...`
## Environment
- **OS**: [e.g., macOS 14.0, Ubuntu 22.04, Windows 11]
- **OS**: [e.g., macOS 14.0, Ubuntu 22.04]
- **Python version**: [e.g., 3.10.0]
- **aiclip version**: [e.g., 0.2.2]
- **ffmpeg version**: [e.g., 6.0]

View File

@@ -20,6 +20,8 @@
-**10x Faster**: Skip the documentation, Stack Overflow, and trial-and-error
- 🎯 **Battle-Tested**: Generates reliable, production-ready commands
- 🔄 **Smart Defaults**: Sensible codec and quality settings out of the box
- 🎨 **Beautiful Interface**: Colorful ASCII art and responsive terminal UI
- 📊 **Token Tracking**: Monitor your AI usage and costs in real-time
```bash
# Instead of this...
@@ -57,9 +59,19 @@ aiclip
```
```text
___ ____________ ________
/ | / _/ ____/ / / _/ __ \
/ /| | / // / / / / // /_/ /
/ ___ |_/ // /___/ /____/ // ____/
/_/ |_/___/\____/_____/___/_/
AI-Powered Video & Audio Processing
────────────────────────────────────────────────────────────
╭─────────────────────────────────────── Welcome to Interactive Mode ───────────────────────────────────────╮
│ │
│ ai-ffmpeg-cli v0.2.2
│ ai-ffmpeg-cli v0.2.5
│ │
│ AI-powered video and audio processing with natural language │
│ Type your request in plain English and let AI handle the ffmpeg complexity! │
@@ -82,6 +94,8 @@ aiclip
aiclip> convert this video to 720p
📊 parse_intent: 2,073 → 47 (2,120 total) | $0.0057
┏━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━┓
┃ # ┃ Command ┃ Output ┃ Status ┃
┡━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━┩
@@ -186,6 +200,17 @@ aiclip --verbose "your command"
aiclip --output-dir /path/to/output "convert video.mp4 to 720p"
```
### Interactive Mode Features
```bash
# View token usage statistics anytime
aiclip> tokens
# Beautiful ASCII art welcome screen
# Responsive terminal UI that adapts to your screen width
# Real-time token usage tracking with cost estimates
```
### Subcommands and option placement
You can also use the explicit `nl` subcommand. Put global options before the subcommand:
@@ -225,6 +250,8 @@ AICLIP_OUTPUT_DIR=aiclip # Default output directory
- **Context Aware**: Scans your directory to suggest input files and durations
- **Organized Output**: All generated files are saved to a dedicated output directory
- **Duration Support**: Automatically handles time-based requests (e.g., "5 second GIF")
- **Token Transparency**: Real-time tracking of AI usage and costs
- **Responsive UI**: ASCII art and tables adapt to your terminal width
## 📁 Output Directory Management
@@ -251,6 +278,41 @@ aiclip "convert video.mp4 to 720p"
- 🧹 **Clean workspace**: Input files stay separate from outputs
- 📊 **Progress tracking**: See all your generated files at a glance
## 📊 Token Usage Tracking
aiclip provides complete transparency into your AI usage and costs:
```bash
# Real-time token usage displayed after each operation
aiclip> convert video.mp4 to 720p
📊 parse_intent: 2,073 → 47 (2,120 total) | $0.0057
# View detailed session statistics anytime
aiclip> tokens
Token Usage Summary
┏━━━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Metric ┃ Value ┃ Details ┃
┡━━━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Session ID ┃ a1b2c3d4 ┃ Started at 14:30:15 │
│ Duration ┃ 45.2s ┃ Session duration │
│ Operations ┃ 3 ┃ Total operations performed │
│ Input Tokens ┃ 6,142 ┃ Total tokens sent to AI │
│ Output Tokens ┃ 156 ┃ Total tokens received from AI │
│ Total Tokens ┃ 6,298 ┃ Combined input + output │
│ Estimated Cost ┃ $0.0171 ┃ Based on OpenAI pricing │
└────────────────┴──────────┴───────────────────────────────┘
# Session summary displayed at the end of each session
```
**Features:**
- 💰 **Cost Transparency**: Real-time cost estimates based on current OpenAI pricing
- 📈 **Usage Analytics**: Track tokens per operation and session totals
- 🕒 **Session Tracking**: Monitor duration and operation counts
- 🎯 **Model-Specific**: Accurate token counting for GPT-4o, GPT-3.5-turbo, and more
- 📊 **Beautiful Tables**: Rich formatting that adapts to your terminal
## ⏱️ Duration and Time Handling
aiclip intelligently handles time-based requests for video and GIF creation:
@@ -314,6 +376,7 @@ make demo
- Ubuntu: `sudo apt install ffmpeg`
- Windows: Download from [ffmpeg.org](https://ffmpeg.org/)
- **OpenAI API key** for natural language processing
- **Terminal with color support** for the best visual experience
## 🆘 Troubleshooting
@@ -378,6 +441,8 @@ See our [Contributing Guide](CONTRIBUTING.md) to get started.
- 🔌 **Integrations**: GitHub Actions, Docker, CI/CD pipelines
- 🎬 **Enhanced Duration Support**: Better handling of time-based requests
- 📁 **Advanced Output Management**: Custom naming patterns and organization
- 📊 **Usage Analytics**: Historical token usage tracking and reporting
- 🎨 **Theme Customization**: Customizable color schemes and ASCII art fonts
## 📄 License

View File

@@ -52,7 +52,9 @@ dependencies = [
"openai>=1.37.0",
"python-dotenv>=1.0.0",
"pydantic>=2.0.0",
"typing-extensions>=4.8.0"
"typing-extensions>=4.8.0",
"pyfiglet>=1.0.0",
"tiktoken>=0.5.0"
]
[project.optional-dependencies]

View File

@@ -0,0 +1,355 @@
"""ASCII art module for ai-ffmpeg-cli.
This module provides colorful ASCII art displays using pyfiglet
with colors relevant to video and audio processing.
"""
from __future__ import annotations
import random
import shutil
import pyfiglet
from rich.console import Console
from rich.text import Text
# Initialize console for Rich output
console = Console()
def get_terminal_width() -> int:
"""Get the current terminal width.
Returns:
Terminal width in characters, defaults to 80 if cannot be determined
"""
try:
return shutil.get_terminal_size().columns
except (OSError, AttributeError):
return 80
def center_text(text: str, width: int) -> str:
"""Center-align text within a given width.
Args:
text: Text to center
width: Total width to center within
Returns:
Center-aligned text
"""
lines = text.split("\n")
centered_lines = []
for line in lines:
if line.strip():
# Calculate padding to center the line
padding = max(0, (width - len(line)) // 2)
centered_line = " " * padding + line
else:
centered_line = line
centered_lines.append(centered_line)
return "\n".join(centered_lines)
def get_ascii_art(text: str = "AICLIP", font: str = "slant") -> str:
"""Generate ASCII art using pyfiglet.
Args:
text: Text to convert to ASCII art
font: Pyfiglet font to use
Returns:
ASCII art string
"""
try:
fig = pyfiglet.Figlet(font=font)
ascii_art = fig.renderText(text)
# Center the ASCII art based on terminal width
terminal_width = get_terminal_width()
centered_art = center_text(ascii_art, terminal_width)
return centered_art
except Exception:
# Fallback to a simple ASCII art if pyfiglet fails
fallback_art = f"""
╔══════════════════════════════════════════════════════════════╗
{text}
╚══════════════════════════════════════════════════════════════╝
"""
terminal_width = get_terminal_width()
return center_text(fallback_art, terminal_width)
def display_colored_ascii_art(
text: str = "AICLIP",
font: str = "slant",
primary_color: str = "cyan",
secondary_color: str = "blue",
accent_color: str = "green",
) -> None:
"""Display colorful ASCII art with video/audio processing theme colors.
Args:
text: Text to display as ASCII art
font: Pyfiglet font to use
primary_color: Main color for the text (cyan - represents video/technology)
secondary_color: Secondary color for accents (blue - represents audio/water)
accent_color: Accent color for highlights (green - represents success/processing)
"""
ascii_art = get_ascii_art(text, font)
# Create colored text with gradient effect
colored_text = Text()
# Split the ASCII art into lines
lines = ascii_art.split("\n")
for i, line in enumerate(lines):
if not line.strip():
colored_text.append("\n")
continue
# Create a gradient effect using different colors
for j, char in enumerate(line):
if char.isspace():
colored_text.append(char)
else:
# Alternate between primary and secondary colors for gradient effect
if (i + j) % 3 == 0:
colored_text.append(char, style=f"bold {primary_color}")
elif (i + j) % 3 == 1:
colored_text.append(char, style=f"bold {secondary_color}")
else:
colored_text.append(char, style=f"bold {accent_color}")
colored_text.append("\n")
# Display the colored ASCII art
console.print(colored_text)
# Add a subtitle with accent color, center-aligned
subtitle = Text()
subtitle.append("AI-Powered Video & Audio Processing", style=f"italic {accent_color}")
console.print(subtitle, justify="center")
console.print()
def display_welcome_banner() -> None:
"""Display a welcome banner with ASCII art and project information."""
# Display the main ASCII art
display_colored_ascii_art()
# Add a decorative line, center-aligned
terminal_width = get_terminal_width()
line_length = min(60, terminal_width - 4) # Leave some margin
padding = max(0, (terminal_width - line_length) // 2)
decorative_line = " " * padding + "" * line_length
console.print(decorative_line, style="dim cyan")
console.print()
def get_random_font() -> str:
"""Get a random pyfiglet font for variety.
Returns:
Random font name
"""
fonts = [
"slant",
"banner",
"big",
"block",
"bubble",
"digital",
"isometric1",
"isometric2",
"isometric3",
"isometric4",
"letters",
"alligator",
"dotmatrix",
"bubblehead",
"bulbhead",
"chunky",
"coinstak",
"colossal",
"crawford",
"diamond",
"epic",
"fender",
"fourtops",
"goofy",
"graceful",
"hollywood",
"invita",
"isometric1",
"isometric2",
"isometric3",
"isometric4",
"italic",
"larry3d",
"lcd",
"lean",
"letters",
"alligator2",
"alligator3",
"alphabet",
"arrows",
"avatar",
"banner3-D",
"banner3",
"banner4",
"barbwire",
"basic",
"bell",
"bigchief",
"binary",
"block",
"bubble",
"caligraphy",
"caligraphy2",
"catwalk",
"chunky",
"coinstak",
"colossal",
"computer",
"contessa",
"contrast",
"cosmic",
"cosmike",
"crawford",
"crawford2",
"crazy",
"cricket",
"cyberlarge",
"cybermedium",
"cybersmall",
"diamond",
"digital",
"doh",
"doom",
"dotmatrix",
"drpepper",
"eftichess",
"eftifont",
"eftipiti",
"eftirobot",
"eftitalic",
"eftiwall",
"eftiwater",
"epic",
"fender",
"fourtops",
"fuzzy",
"goofy",
"gothic",
"graceful",
"gradient",
"graffiti",
"hollywood",
"invita",
"isometric1",
"isometric2",
"isometric3",
"isometric4",
"italic",
"ivrit",
"jacky",
"katakana",
"kban",
"larry3d",
"lcd",
"lean",
"letters",
"linux",
"lockergnome",
"madrid",
"marquee",
"maxfour",
"mike",
"mini",
"mirror",
"mnemonic",
"morse",
"moscow",
"nancyj",
"nancyj-fancy",
"nancyj-underlined",
"nipples",
"ntgreek",
"o8",
"ogre",
"pawp",
"peaks",
"pebbles",
"pepper",
"poison",
"puffy",
"pyramid",
"rectangles",
"relief",
"relief2",
"rev",
"roman",
"rot13",
"rounded",
"rowancap",
"rozzo",
"runic",
"runyc",
"sblood",
"script",
"serifcap",
"shadow",
"short",
"slant",
"slide",
"slscript",
"small",
"smisome1",
"smkeyboard",
"smscript",
"smshadow",
"smslant",
"smtengwar",
"speed",
"stampatello",
"standard",
"starwars",
"stellar",
"stop",
"straight",
"tanja",
"tengwar",
"term",
"thick",
"thin",
"threepoint",
"ticks",
"ticksslant",
"tiles",
"tinker-toy",
"tombstone",
"trek",
"tsalagi",
"twopoint",
"univers",
"usaflag",
"wavy",
"weird",
]
return random.choice(fonts)
def display_dynamic_banner() -> None:
"""Display a dynamic banner with random font and colors."""
# Choose a random font for variety
font = get_random_font()
# Display with the chosen font
display_colored_ascii_art(font=font)

View File

@@ -7,6 +7,7 @@ to parse natural language prompts into structured ffmpeg intents.
from __future__ import annotations
import json
from typing import TYPE_CHECKING
from typing import Any
from pydantic import ValidationError
@@ -16,6 +17,9 @@ from .credential_security import sanitize_error_message
from .custom_exceptions import ParseError
from .intent_models import FfmpegIntent
if TYPE_CHECKING:
from .token_tracker import TokenTracker
# Create secure logger that masks sensitive information
logger = create_secure_logger(__name__)
@@ -155,12 +159,13 @@ class OpenAIProvider(LLMProvider):
including error handling and response processing.
"""
def __init__(self, api_key: str, model: str) -> None:
def __init__(self, api_key: str, model: str, token_tracker: TokenTracker | None = None) -> None:
"""Initialize OpenAI provider with API key and model.
Args:
api_key: OpenAI API key for authentication
model: Model name to use for completions
token_tracker: Optional token tracker for monitoring usage
Raises:
Exception: When client initialization fails
@@ -173,6 +178,7 @@ class OpenAIProvider(LLMProvider):
try:
self.client = OpenAI(api_key=api_key)
self.model = model
self.token_tracker = token_tracker
except Exception as e:
# Sanitize error message to prevent API key exposure
sanitized_error = sanitize_error_message(str(e))
@@ -211,6 +217,30 @@ class OpenAIProvider(LLMProvider):
content = rsp.choices[0].message.content or "{}"
logger.debug(f"Received response length: {len(content)} characters")
# Track token usage if token tracker is available
if self.token_tracker and hasattr(rsp, "usage") and rsp.usage is not None:
input_tokens = rsp.usage.prompt_tokens if hasattr(rsp.usage, "prompt_tokens") else 0
output_tokens = (
rsp.usage.completion_tokens if hasattr(rsp.usage, "completion_tokens") else 0
)
# Calculate cost estimate
cost_estimate = self.token_tracker.get_cost_estimate(
self.model, input_tokens, output_tokens
)
# Track the operation
operation = self.token_tracker.track_operation(
operation="parse_intent",
model=self.model,
input_text=system + "\n" + user,
output_text=content,
cost_estimate=cost_estimate,
)
# Display real-time usage
self.token_tracker.display_realtime_usage(operation)
return content
except Exception as e:

View File

@@ -28,4 +28,4 @@ __all__ = [
"tenant_id",
]
__version__ = "0.2.3"
__version__ = "0.2.5"

View File

@@ -18,6 +18,7 @@ from rich.panel import Panel
from rich.table import Table
from rich.text import Text
from .ascii_art import display_welcome_banner
from .command_builder import build_commands
from .config import AppConfig
from .config import load_config
@@ -30,6 +31,7 @@ from .custom_exceptions import ParseError
from .intent_router import route_intent
from .llm_client import LLMClient
from .llm_client import OpenAIProvider
from .token_tracker import TokenTracker
from .version_info import __version__
# Initialize console for Rich output
@@ -38,13 +40,19 @@ console = Console()
# Initialize logger
logger = logging.getLogger(__name__)
# Initialize global token tracker
token_tracker = TokenTracker()
# Initialize Typer app with completion disabled and support for invocation without subcommands
app = typer.Typer(add_completion=False, help="AI-powered ffmpeg CLI", invoke_without_command=True)
def _display_welcome_screen() -> None:
"""Display a beautiful welcome screen for the interactive mode."""
# Create welcome panel
# Display ASCII art banner
display_welcome_banner()
# Create welcome panel with version info
welcome_text = Text()
welcome_text.append("ai-ffmpeg-cli", style="bold white")
welcome_text.append(" v", style="dim")
@@ -239,6 +247,7 @@ def _display_help_tips() -> None:
"Try: 'add subtitles to video.mp4'",
"Type 'exit' or 'quit' to leave interactive mode",
"Use Ctrl+C to cancel any operation",
"Type 'tokens' to see usage statistics",
]
tip_text = Text()
@@ -259,6 +268,23 @@ def _display_help_tips() -> None:
console.print()
def _start_token_session(cfg: AppConfig) -> None:
"""Start a new token tracking session."""
import uuid
session_id = str(uuid.uuid4())[:8] # Short session ID
token_tracker.start_session(session_id, cfg.model)
logger.debug(f"Started token tracking session: {session_id}")
def _display_token_summary() -> None:
"""Display token usage summary at the end of session."""
if token_tracker.current_session and token_tracker.current_session.operations:
console.print()
token_tracker.display_session_summary()
token_tracker.display_detailed_usage()
def _setup_logging(verbose: bool) -> None:
"""Configure logging based on verbosity level.
@@ -326,6 +352,9 @@ def _main_impl(
if invoked_none:
if prompt is not None:
try:
# Start token tracking session for one-shot command
_start_token_session(cfg)
# Execute one-shot command: scan context, parse intent, build and execute
context = scan(show_summary=False) # Don't show summary for one-shot commands
client = _make_llm(cfg)
@@ -350,6 +379,10 @@ def _main_impl(
assume_yes=yes,
output_dir=Path(cfg.output_directory),
)
# Display token summary for one-shot command
_display_token_summary()
raise typer.Exit(code)
except (ParseError, BuildError, ExecError) as e:
console.print(f"[red]❌ Error:[/red] {e}")
@@ -357,8 +390,12 @@ def _main_impl(
else:
# No subcommand and no prompt: enter interactive mode
if ctx is not None:
# Start token tracking session
_start_token_session(cfg)
nl(ctx=ctx, prompt=None)
return
# Display token summary at the end
_display_token_summary()
return
except ConfigError as e:
console.print(f"[red]❌ Configuration Error:[/red] {e}")
raise typer.Exit(1) from e
@@ -432,7 +469,7 @@ def _make_llm(cfg: AppConfig) -> LLMClient:
try:
# This will validate the API key format and presence
api_key = cfg.get_api_key_for_client()
provider = OpenAIProvider(api_key=api_key, model=cfg.model)
provider = OpenAIProvider(api_key=api_key, model=cfg.model, token_tracker=token_tracker)
return LLMClient(provider)
except ConfigError:
# Re-raise config errors for proper error handling
@@ -532,6 +569,14 @@ def nl(
if not line or line.lower() in {"exit", "quit"}:
console.print("[yellow]Goodbye![/yellow]")
break
if line.lower() == "tokens":
# Display token usage statistics
if token_tracker.current_session and token_tracker.current_session.operations:
token_tracker.display_session_summary()
token_tracker.display_detailed_usage()
else:
console.print("[dim]No token usage data available yet.[/dim]")
continue
try:
handle_one(line)
except (ParseError, BuildError, ExecError) as e:

View File

@@ -0,0 +1,275 @@
"""Token tracking module for ai-ffmpeg-cli.
This module provides functionality to track token usage using tiktoken
and display usage statistics to users.
"""
from __future__ import annotations
from dataclasses import dataclass
from dataclasses import field
from datetime import datetime
import tiktoken
from rich.console import Console
from rich.table import Table
from rich.text import Text
# Initialize console for Rich output
console = Console()
@dataclass
class TokenUsage:
"""Represents token usage for a single operation."""
operation: str
model: str
input_tokens: int
output_tokens: int
total_tokens: int
timestamp: datetime = field(default_factory=datetime.now)
cost_estimate: float | None = None
@property
def total_cost_estimate(self) -> float | None:
"""Calculate total cost estimate if available."""
if self.cost_estimate:
return self.cost_estimate
return None
@dataclass
class SessionTokenTracker:
"""Tracks token usage for a CLI session."""
session_id: str
start_time: datetime = field(default_factory=datetime.now)
operations: list[TokenUsage] = field(default_factory=list)
model: str = "gpt-4o"
def add_operation(self, operation: TokenUsage) -> None:
"""Add a token usage operation to the session."""
self.operations.append(operation)
@property
def total_input_tokens(self) -> int:
"""Get total input tokens for the session."""
return sum(op.input_tokens for op in self.operations)
@property
def total_output_tokens(self) -> int:
"""Get total output tokens for the session."""
return sum(op.output_tokens for op in self.operations)
@property
def total_tokens(self) -> int:
"""Get total tokens for the session."""
return sum(op.total_tokens for op in self.operations)
@property
def total_cost_estimate(self) -> float | None:
"""Get total cost estimate for the session."""
costs = [op.total_cost_estimate for op in self.operations if op.total_cost_estimate]
return sum(costs) if costs else None
@property
def session_duration(self) -> float:
"""Get session duration in seconds."""
return (datetime.now() - self.start_time).total_seconds()
class TokenTracker:
"""Main token tracking class."""
def __init__(self) -> None:
self.current_session: SessionTokenTracker | None = None
self.encoding_cache: dict[str, tiktoken.Encoding] = {}
def start_session(self, session_id: str, model: str = "gpt-4o") -> None:
"""Start a new token tracking session."""
self.current_session = SessionTokenTracker(session_id=session_id, model=model)
def get_encoding(self, model: str) -> tiktoken.Encoding:
"""Get tiktoken encoding for a model, with caching."""
if model not in self.encoding_cache:
try:
# Map common model names to their encoding
if "gpt-4" in model or "gpt-3.5" in model:
encoding_name = "cl100k_base" # GPT-4 and GPT-3.5-turbo
elif "gpt-3" in model:
encoding_name = "r50k_base" # GPT-3
else:
encoding_name = "cl100k_base" # Default to GPT-4 encoding
self.encoding_cache[model] = tiktoken.get_encoding(encoding_name)
except Exception:
# Fallback to GPT-4 encoding if model-specific encoding fails
self.encoding_cache[model] = tiktoken.get_encoding("cl100k_base")
return self.encoding_cache[model]
def count_tokens(self, text: str, model: str) -> int:
"""Count tokens in text for a specific model."""
if not text:
return 0
encoding = self.get_encoding(model)
return len(encoding.encode(text))
def track_operation(
self,
operation: str,
model: str,
input_text: str,
output_text: str,
cost_estimate: float | None = None,
) -> TokenUsage:
"""Track token usage for an operation."""
if not self.current_session:
# Create a default session if none exists
self.start_session("default", model)
input_tokens = self.count_tokens(input_text, model)
output_tokens = self.count_tokens(output_text, model)
total_tokens = input_tokens + output_tokens
usage = TokenUsage(
operation=operation,
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
total_tokens=total_tokens,
cost_estimate=cost_estimate,
)
if self.current_session:
self.current_session.add_operation(usage)
return usage
def display_session_summary(self) -> None:
"""Display a summary of token usage for the current session."""
if not self.current_session or not self.current_session.operations:
return
session = self.current_session
# Create summary table
summary_table = Table(
title="[bold cyan]Token Usage Summary[/bold cyan]",
show_header=True,
header_style="bold blue",
)
summary_table.add_column("Metric", style="bold cyan")
summary_table.add_column("Value", style="white", justify="right")
summary_table.add_column("Details", style="dim")
# Session info
summary_table.add_row(
"Session ID",
session.session_id,
f"Started at {session.start_time.strftime('%H:%M:%S')}",
)
summary_table.add_row("Duration", f"{session.session_duration:.1f}s", "Session duration")
summary_table.add_row(
"Operations", str(len(session.operations)), "Total operations performed"
)
# Token counts
summary_table.add_row(
"Input Tokens", f"{session.total_input_tokens:,}", "Total tokens sent to AI"
)
summary_table.add_row(
"Output Tokens",
f"{session.total_output_tokens:,}",
"Total tokens received from AI",
)
summary_table.add_row(
"Total Tokens", f"{session.total_tokens:,}", "Combined input + output"
)
# Cost estimate if available
if session.total_cost_estimate:
summary_table.add_row(
"Estimated Cost",
f"${session.total_cost_estimate:.4f}",
"Based on OpenAI pricing",
)
console.print(summary_table)
console.print()
def display_detailed_usage(self) -> None:
"""Display detailed token usage for each operation."""
if not self.current_session or not self.current_session.operations:
return
# Create detailed operations table
operations_table = Table(
title="[bold green]Detailed Token Usage[/bold green]",
show_header=True,
header_style="bold blue",
)
operations_table.add_column("#", style="bold cyan", justify="center")
operations_table.add_column("Operation", style="bold white")
operations_table.add_column("Model", style="cyan")
operations_table.add_column("Input", style="green", justify="right")
operations_table.add_column("Output", style="yellow", justify="right")
operations_table.add_column("Total", style="bold", justify="right")
operations_table.add_column("Time", style="dim")
for i, operation in enumerate(self.current_session.operations, 1):
operations_table.add_row(
str(i),
operation.operation,
operation.model,
f"{operation.input_tokens:,}",
f"{operation.output_tokens:,}",
f"{operation.total_tokens:,}",
operation.timestamp.strftime("%H:%M:%S"),
)
console.print(operations_table)
console.print()
def get_cost_estimate(self, model: str, input_tokens: int, output_tokens: int) -> float | None:
"""Get cost estimate for token usage based on OpenAI pricing."""
# OpenAI pricing (as of 2024, approximate)
pricing = {
"gpt-4o": {"input": 0.0025, "output": 0.01}, # per 1K tokens
"gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
"gpt-4-turbo": {"input": 0.01, "output": 0.03},
"gpt-3.5-turbo": {"input": 0.0005, "output": 0.0015},
}
# Find the closest model match
model_key = None
for key in pricing:
if key in model.lower():
model_key = key
break
if not model_key:
return None
input_cost = (input_tokens / 1000) * pricing[model_key]["input"]
output_cost = (output_tokens / 1000) * pricing[model_key]["output"]
return input_cost + output_cost
def display_realtime_usage(self, operation: TokenUsage) -> None:
"""Display real-time token usage for an operation."""
usage_text = Text()
usage_text.append("📊 ", style="bold cyan")
usage_text.append(f"{operation.operation}: ", style="bold white")
usage_text.append(f"{operation.input_tokens:,}{operation.output_tokens:,} ", style="dim")
usage_text.append(f"({operation.total_tokens:,} total)", style="green")
if operation.total_cost_estimate:
usage_text.append(f" | ${operation.total_cost_estimate:.4f}", style="yellow")
console.print(usage_text)

View File

@@ -0,0 +1,138 @@
"""Tests for ASCII art functionality."""
from unittest.mock import patch
from ai_ffmpeg_cli.ascii_art import center_text
from ai_ffmpeg_cli.ascii_art import display_colored_ascii_art
from ai_ffmpeg_cli.ascii_art import display_dynamic_banner
from ai_ffmpeg_cli.ascii_art import display_welcome_banner
from ai_ffmpeg_cli.ascii_art import get_ascii_art
from ai_ffmpeg_cli.ascii_art import get_random_font
from ai_ffmpeg_cli.ascii_art import get_terminal_width
class TestAsciiArt:
"""Test cases for ASCII art functionality."""
def test_get_ascii_art_default(self):
"""Test getting ASCII art with default parameters."""
result = get_ascii_art()
# Check that it's a non-empty string with ASCII art characters
assert isinstance(result, str)
assert len(result) > 0
# Should contain some ASCII art characters
assert any(char in result for char in ["/", "\\", "_", "|", "#", "="])
def test_get_ascii_art_custom_text(self):
"""Test getting ASCII art with custom text."""
result = get_ascii_art("TEST")
assert isinstance(result, str)
assert len(result) > 0
# Should contain some ASCII art characters
assert any(char in result for char in ["/", "\\", "_", "|", "#", "="])
def test_get_ascii_art_custom_font(self):
"""Test getting ASCII art with custom font."""
result = get_ascii_art("TEST", "banner")
assert isinstance(result, str)
assert len(result) > 0
# Should contain some ASCII art characters
assert any(char in result for char in ["/", "\\", "_", "|", "#", "="])
def test_get_ascii_art_fallback(self):
"""Test ASCII art fallback when pyfiglet fails."""
with patch("ai_ffmpeg_cli.ascii_art.pyfiglet.Figlet") as mock_figlet:
mock_figlet.side_effect = Exception("Font not found")
result = get_ascii_art("TEST")
assert "TEST" in result
assert "" in result # Check for fallback box characters
def test_get_random_font(self):
"""Test getting a random font."""
font = get_random_font()
assert isinstance(font, str)
assert len(font) > 0
def test_get_random_font_returns_valid_font(self):
"""Test that random font is from the valid font list."""
font = get_random_font()
# Since it's random, we can't guarantee it's in our small test list
# But we can test that it's a string and not empty
assert isinstance(font, str)
assert len(font) > 0
@patch("ai_ffmpeg_cli.ascii_art.console")
def test_display_colored_ascii_art(self, mock_console):
"""Test displaying colored ASCII art."""
display_colored_ascii_art("TEST")
# Verify that console.print was called
assert mock_console.print.called
@patch("ai_ffmpeg_cli.ascii_art.console")
def test_display_colored_ascii_art_custom_colors(self, mock_console):
"""Test displaying colored ASCII art with custom colors."""
display_colored_ascii_art(
"TEST", primary_color="red", secondary_color="yellow", accent_color="blue"
)
# Verify that console.print was called
assert mock_console.print.called
@patch("ai_ffmpeg_cli.ascii_art.console")
def test_display_welcome_banner(self, mock_console):
"""Test displaying welcome banner."""
display_welcome_banner()
# Verify that console.print was called multiple times
assert mock_console.print.call_count >= 2
@patch("ai_ffmpeg_cli.ascii_art.console")
def test_display_dynamic_banner(self, mock_console):
"""Test displaying dynamic banner."""
display_dynamic_banner()
# Verify that console.print was called
assert mock_console.print.called
def test_ascii_art_with_empty_text(self):
"""Test ASCII art with empty text."""
result = get_ascii_art("")
assert isinstance(result, str)
# Should still produce some output even with empty text
def test_ascii_art_with_special_characters(self):
"""Test ASCII art with special characters."""
result = get_ascii_art("AICLIP-123!")
assert isinstance(result, str)
assert len(result) > 0
@patch("ai_ffmpeg_cli.ascii_art.console")
def test_display_colored_ascii_art_with_long_text(self, mock_console):
"""Test displaying colored ASCII art with long text."""
long_text = "VERY_LONG_TEXT_FOR_TESTING"
display_colored_ascii_art(long_text)
# Verify that console.print was called
assert mock_console.print.called
def test_get_terminal_width(self):
"""Test getting terminal width."""
width = get_terminal_width()
assert isinstance(width, int)
assert width > 0
def test_center_text(self):
"""Test centering text."""
text = "Hello\nWorld"
centered = center_text(text, 20)
assert isinstance(centered, str)
assert "Hello" in centered
assert "World" in centered
def test_center_text_empty(self):
"""Test centering empty text."""
text = ""
centered = center_text(text, 20)
assert centered == ""
def test_center_text_short_width(self):
"""Test centering text when width is shorter than text."""
text = "Hello World"
centered = center_text(text, 5)
assert "Hello World" in centered

View File

@@ -0,0 +1,315 @@
"""Tests for token tracking functionality."""
from datetime import datetime
from unittest.mock import MagicMock
from unittest.mock import patch
from ai_ffmpeg_cli.token_tracker import SessionTokenTracker
from ai_ffmpeg_cli.token_tracker import TokenTracker
from ai_ffmpeg_cli.token_tracker import TokenUsage
class TestTokenUsage:
"""Test cases for TokenUsage class."""
def test_token_usage_creation(self):
"""Test creating a TokenUsage instance."""
usage = TokenUsage(
operation="test_operation",
model="gpt-4o",
input_tokens=100,
output_tokens=50,
total_tokens=150,
)
assert usage.operation == "test_operation"
assert usage.model == "gpt-4o"
assert usage.input_tokens == 100
assert usage.output_tokens == 50
assert usage.total_tokens == 150
assert usage.cost_estimate is None
def test_token_usage_with_cost(self):
"""Test TokenUsage with cost estimate."""
usage = TokenUsage(
operation="test_operation",
model="gpt-4o",
input_tokens=100,
output_tokens=50,
total_tokens=150,
cost_estimate=0.0025,
)
assert usage.total_cost_estimate == 0.0025
def test_token_usage_timestamp(self):
"""Test that TokenUsage has a timestamp."""
usage = TokenUsage(
operation="test_operation",
model="gpt-4o",
input_tokens=100,
output_tokens=50,
total_tokens=150,
)
assert isinstance(usage.timestamp, datetime)
class TestSessionTokenTracker:
"""Test cases for SessionTokenTracker class."""
def test_session_creation(self):
"""Test creating a session tracker."""
session = SessionTokenTracker(session_id="test123", model="gpt-4o")
assert session.session_id == "test123"
assert session.model == "gpt-4o"
assert len(session.operations) == 0
assert isinstance(session.start_time, datetime)
def test_add_operation(self):
"""Test adding operations to session."""
session = SessionTokenTracker(session_id="test123")
usage = TokenUsage(
operation="test_op",
model="gpt-4o",
input_tokens=100,
output_tokens=50,
total_tokens=150,
)
session.add_operation(usage)
assert len(session.operations) == 1
assert session.operations[0] == usage
def test_total_tokens_calculation(self):
"""Test total token calculations."""
session = SessionTokenTracker(session_id="test123")
# Add multiple operations
session.add_operation(TokenUsage("op1", "gpt-4o", 100, 50, 150))
session.add_operation(TokenUsage("op2", "gpt-4o", 200, 100, 300))
assert session.total_input_tokens == 300
assert session.total_output_tokens == 150
assert session.total_tokens == 450
def test_cost_calculation(self):
"""Test cost calculation with multiple operations."""
session = SessionTokenTracker(session_id="test123")
# Add operations with costs
op1 = TokenUsage("op1", "gpt-4o", 100, 50, 150, cost_estimate=0.001)
op2 = TokenUsage("op2", "gpt-4o", 200, 100, 300, cost_estimate=0.002)
session.add_operation(op1)
session.add_operation(op2)
assert session.total_cost_estimate == 0.003
def test_session_duration(self):
"""Test session duration calculation."""
session = SessionTokenTracker(session_id="test123")
# Duration should be a positive number
assert session.session_duration >= 0
class TestTokenTracker:
"""Test cases for TokenTracker class."""
def test_token_tracker_initialization(self):
"""Test TokenTracker initialization."""
tracker = TokenTracker()
assert tracker.current_session is None
assert isinstance(tracker.encoding_cache, dict)
assert len(tracker.encoding_cache) == 0
def test_start_session(self):
"""Test starting a new session."""
tracker = TokenTracker()
tracker.start_session("test123", "gpt-4o")
assert tracker.current_session is not None
assert tracker.current_session.session_id == "test123"
assert tracker.current_session.model == "gpt-4o"
@patch("ai_ffmpeg_cli.token_tracker.tiktoken.get_encoding")
def test_get_encoding_gpt4(self, mock_get_encoding):
"""Test getting encoding for GPT-4 model."""
mock_encoding = MagicMock()
mock_get_encoding.return_value = mock_encoding
tracker = TokenTracker()
encoding = tracker.get_encoding("gpt-4o")
assert encoding == mock_encoding
mock_get_encoding.assert_called_with("cl100k_base")
@patch("ai_ffmpeg_cli.token_tracker.tiktoken.get_encoding")
def test_get_encoding_gpt3(self, mock_get_encoding):
"""Test getting encoding for GPT-3 model."""
mock_encoding = MagicMock()
mock_get_encoding.return_value = mock_encoding
tracker = TokenTracker()
encoding = tracker.get_encoding("gpt-3")
assert encoding == mock_encoding
mock_get_encoding.assert_called_with("r50k_base")
@patch("ai_ffmpeg_cli.token_tracker.tiktoken.get_encoding")
def test_get_encoding_caching(self, mock_get_encoding):
"""Test that encodings are cached."""
mock_encoding = MagicMock()
mock_get_encoding.return_value = mock_encoding
tracker = TokenTracker()
# First call
encoding1 = tracker.get_encoding("gpt-4o")
# Second call should use cache
encoding2 = tracker.get_encoding("gpt-4o")
assert encoding1 == encoding2
# Should only be called once due to caching
mock_get_encoding.assert_called_once_with("cl100k_base")
def test_count_tokens_empty(self):
"""Test counting tokens in empty text."""
tracker = TokenTracker()
count = tracker.count_tokens("", "gpt-4o")
assert count == 0
def test_count_tokens_with_text(self):
"""Test counting tokens in text."""
tracker = TokenTracker()
# Mock the encoding to return a known token count
mock_encoding = MagicMock()
mock_encoding.encode.return_value = [1, 2, 3, 4, 5] # 5 tokens
tracker.encoding_cache["gpt-4o"] = mock_encoding
count = tracker.count_tokens("Hello world", "gpt-4o")
assert count == 5
def test_track_operation(self):
"""Test tracking an operation."""
tracker = TokenTracker()
tracker.start_session("test123", "gpt-4o")
# Mock encoding for token counting
mock_encoding = MagicMock()
mock_encoding.encode.return_value = [1, 2, 3] # 3 tokens for input
tracker.encoding_cache["gpt-4o"] = mock_encoding
usage = tracker.track_operation(
operation="test_op",
model="gpt-4o",
input_text="Hello",
output_text="World",
cost_estimate=0.001,
)
assert usage.operation == "test_op"
assert usage.model == "gpt-4o"
assert usage.input_tokens == 3
assert usage.output_tokens == 3 # "World" also has 3 tokens
assert usage.total_tokens == 6
assert usage.cost_estimate == 0.001
# Check that it was added to the session
assert len(tracker.current_session.operations) == 1
def test_track_operation_no_session(self):
"""Test tracking operation when no session exists."""
tracker = TokenTracker()
# Mock encoding
mock_encoding = MagicMock()
mock_encoding.encode.return_value = [1, 2]
tracker.encoding_cache["gpt-4o"] = mock_encoding
tracker.track_operation(
operation="test_op", model="gpt-4o", input_text="Hello", output_text="World"
)
# Should create a default session
assert tracker.current_session is not None
assert tracker.current_session.session_id == "default"
assert len(tracker.current_session.operations) == 1
def test_get_cost_estimate_gpt4o(self):
"""Test cost estimation for GPT-4o."""
tracker = TokenTracker()
cost = tracker.get_cost_estimate("gpt-4o", 1000, 500)
# GPT-4o pricing: input $0.0025/1K, output $0.01/1K
expected_cost = (1000 / 1000) * 0.0025 + (500 / 1000) * 0.01
assert cost == expected_cost
def test_get_cost_estimate_gpt35(self):
"""Test cost estimation for GPT-3.5-turbo."""
tracker = TokenTracker()
cost = tracker.get_cost_estimate("gpt-3.5-turbo", 1000, 500)
# GPT-3.5-turbo pricing: input $0.0005/1K, output $0.0015/1K
expected_cost = (1000 / 1000) * 0.0005 + (500 / 1000) * 0.0015
assert cost == expected_cost
def test_get_cost_estimate_unknown_model(self):
"""Test cost estimation for unknown model."""
tracker = TokenTracker()
cost = tracker.get_cost_estimate("unknown-model", 1000, 500)
assert cost is None
@patch("ai_ffmpeg_cli.token_tracker.console")
def test_display_session_summary(self, mock_console):
"""Test displaying session summary."""
tracker = TokenTracker()
tracker.start_session("test123", "gpt-4o")
# Add some operations
tracker.track_operation("op1", "gpt-4o", "input1", "output1", 0.001)
tracker.track_operation("op2", "gpt-4o", "input2", "output2", 0.002)
tracker.display_session_summary()
# Verify that console.print was called
assert mock_console.print.called
@patch("ai_ffmpeg_cli.token_tracker.console")
def test_display_detailed_usage(self, mock_console):
"""Test displaying detailed usage."""
tracker = TokenTracker()
tracker.start_session("test123", "gpt-4o")
# Add some operations
tracker.track_operation("op1", "gpt-4o", "input1", "output1")
tracker.track_operation("op2", "gpt-4o", "input2", "output2")
tracker.display_detailed_usage()
# Verify that console.print was called
assert mock_console.print.called
@patch("ai_ffmpeg_cli.token_tracker.console")
def test_display_realtime_usage(self, mock_console):
"""Test displaying real-time usage."""
tracker = TokenTracker()
usage = TokenUsage(
operation="test_op",
model="gpt-4o",
input_tokens=100,
output_tokens=50,
total_tokens=150,
cost_estimate=0.001,
)
tracker.display_realtime_usage(usage)
# Verify that console.print was called
assert mock_console.print.called
# Note: get_terminal_width and center_text functions are tested in test_ascii_art.py
# since they are part of the ascii_art module