Merge pull request #64 from ShorthillsAI/main

Add Batch Processing and Enhanced Markdown Features
This commit is contained in:
zrguo
2025-07-29 17:06:09 +08:00
committed by GitHub
9 changed files with 3765 additions and 111 deletions

341
docs/batch_processing.md Normal file
View File

@@ -0,0 +1,341 @@
# Batch Processing
This document describes the batch processing feature for RAG-Anything, which allows you to process multiple documents in parallel for improved throughput.
## Overview
The batch processing feature allows you to process multiple documents concurrently, significantly improving throughput for large document collections. It provides parallel processing, progress tracking, error handling, and flexible configuration options.
## Key Features
- **Parallel Processing**: Process multiple files concurrently using thread pools
- **Progress Tracking**: Real-time progress bars with `tqdm`
- **Error Handling**: Comprehensive error reporting and recovery
- **Flexible Input**: Support for files, directories, and recursive search
- **Configurable Workers**: Adjustable number of parallel workers
- **Installation Check Bypass**: Optional skip for environments with package conflicts
## Installation
```bash
# Basic installation
pip install raganything[all]
# Required for batch processing
pip install tqdm
```
## Usage
### Basic Batch Processing
```python
from raganything.batch_parser import BatchParser
# Create batch parser
batch_parser = BatchParser(
parser_type="mineru", # or "docling"
max_workers=4,
show_progress=True,
timeout_per_file=300,
skip_installation_check=False # Set to True if having parser installation issues
)
# Process multiple files
result = batch_parser.process_batch(
file_paths=["doc1.pdf", "doc2.docx", "folder/"],
output_dir="./batch_output",
parse_method="auto",
recursive=True
)
# Check results
print(result.summary())
print(f"Success rate: {result.success_rate:.1f}%")
print(f"Processing time: {result.processing_time:.2f} seconds")
```
### Asynchronous Batch Processing
```python
import asyncio
from raganything.batch_parser import BatchParser
async def async_batch_processing():
batch_parser = BatchParser(
parser_type="mineru",
max_workers=4,
show_progress=True
)
# Process files asynchronously
result = await batch_parser.process_batch_async(
file_paths=["doc1.pdf", "doc2.docx"],
output_dir="./output",
parse_method="auto"
)
return result
# Run async processing
result = asyncio.run(async_batch_processing())
```
### Integration with RAG-Anything
```python
from raganything import RAGAnything
rag = RAGAnything()
# Process documents with batch functionality
result = rag.process_documents_batch(
file_paths=["doc1.pdf", "doc2.docx"],
output_dir="./output",
max_workers=4,
show_progress=True
)
print(f"Processed {len(result.successful_files)} files successfully")
```
### Process Documents with RAG Integration
```python
# Process documents in batch and then add them to RAG
result = await rag.process_documents_with_rag_batch(
file_paths=["doc1.pdf", "doc2.docx"],
output_dir="./output",
max_workers=4,
show_progress=True
)
print(f"Processed {result['successful_rag_files']} files with RAG")
print(f"Total processing time: {result['total_processing_time']:.2f} seconds")
```
### Command Line Interface
```bash
# Basic batch processing
python -m raganything.batch_parser path/to/docs/ --output ./output --workers 4
# With specific parser
python -m raganything.batch_parser path/to/docs/ --parser mineru --method auto
# Without progress bar
python -m raganything.batch_parser path/to/docs/ --output ./output --no-progress
# Help
python -m raganything.batch_parser --help
```
## Configuration
### Environment Variables
```env
# Batch processing configuration
MAX_CONCURRENT_FILES=4
SUPPORTED_FILE_EXTENSIONS=.pdf,.docx,.doc,.pptx,.ppt,.xlsx,.xls,.txt,.md
RECURSIVE_FOLDER_PROCESSING=true
PARSER_OUTPUT_DIR=./parsed_output
```
### BatchParser Parameters
- **parser_type**: `"mineru"` or `"docling"` (default: `"mineru"`)
- **max_workers**: Number of parallel workers (default: `4`)
- **show_progress**: Show progress bar (default: `True`)
- **timeout_per_file**: Timeout per file in seconds (default: `300`)
- **skip_installation_check**: Skip parser installation check (default: `False`)
## Supported File Types
- **PDF files**: `.pdf`
- **Office documents**: `.doc`, `.docx`, `.ppt`, `.pptx`, `.xls`, `.xlsx`
- **Images**: `.png`, `.jpg`, `.jpeg`, `.bmp`, `.tiff`, `.tif`, `.gif`, `.webp`
- **Text files**: `.txt`, `.md`
## API Reference
### BatchProcessingResult
```python
@dataclass
class BatchProcessingResult:
successful_files: List[str] # Successfully processed files
failed_files: List[str] # Failed files
total_files: int # Total number of files
processing_time: float # Total processing time in seconds
errors: Dict[str, str] # Error messages for failed files
output_dir: str # Output directory used
def summary(self) -> str: # Human-readable summary
def success_rate(self) -> float: # Success rate as percentage
```
### BatchParser Methods
```python
class BatchParser:
def __init__(self, parser_type: str = "mineru", max_workers: int = 4, ...):
"""Initialize batch parser"""
def get_supported_extensions(self) -> List[str]:
"""Get list of supported file extensions"""
def filter_supported_files(self, file_paths: List[str], recursive: bool = True) -> List[str]:
"""Filter files to only supported types"""
def process_batch(self, file_paths: List[str], output_dir: str, ...) -> BatchProcessingResult:
"""Process files in batch"""
async def process_batch_async(self, file_paths: List[str], output_dir: str, ...) -> BatchProcessingResult:
"""Process files in batch asynchronously"""
```
## Performance Considerations
### Memory Usage
- Each worker uses additional memory
- Recommended: 2-4 workers for most systems
- Monitor memory usage with large files
### CPU Usage
- Parallel processing utilizes multiple cores
- Optimal worker count depends on CPU cores and file sizes
- I/O may become bottleneck with many small files
### Recommended Settings
- **Small files** (< 1MB): Higher worker count (6-8)
- **Large files** (> 100MB): Lower worker count (2-3)
- **Mixed sizes**: Start with 4 workers and adjust
## Troubleshooting
### Common Issues
#### Memory Errors
```python
# Solution: Reduce max_workers
batch_parser = BatchParser(max_workers=2)
```
#### Timeout Errors
```python
# Solution: Increase timeout_per_file
batch_parser = BatchParser(timeout_per_file=600) # 10 minutes
```
#### Parser Installation Issues
```python
# Solution: Skip installation check
batch_parser = BatchParser(skip_installation_check=True)
```
#### File Not Found Errors
- Check file paths and permissions
- Ensure input files exist
- Verify directory access rights
### Debug Mode
Enable debug logging for detailed information:
```python
import logging
logging.basicConfig(level=logging.DEBUG)
# Create batch parser with debug logging
batch_parser = BatchParser(parser_type="mineru", max_workers=2)
```
### Error Handling
The batch processor provides comprehensive error handling:
```python
result = batch_parser.process_batch(file_paths=["doc1.pdf", "doc2.docx"])
# Check for errors
if result.failed_files:
print("Failed files:")
for file_path in result.failed_files:
error_message = result.errors.get(file_path, "Unknown error")
print(f" - {file_path}: {error_message}")
# Process only successful files
for file_path in result.successful_files:
print(f"Successfully processed: {file_path}")
```
## Examples
### Process Entire Directory
```python
from pathlib import Path
# Process all supported files in a directory
batch_parser = BatchParser(max_workers=4)
directory_path = Path("./documents")
result = batch_parser.process_batch(
file_paths=[str(directory_path)],
output_dir="./processed",
recursive=True # Include subdirectories
)
print(f"Processed {len(result.successful_files)} out of {result.total_files} files")
```
### Filter Files Before Processing
```python
# Get all files in directory
all_files = ["doc1.pdf", "image.png", "spreadsheet.xlsx", "unsupported.xyz"]
# Filter to supported files only
supported_files = batch_parser.filter_supported_files(all_files)
print(f"Will process {len(supported_files)} out of {len(all_files)} files")
# Process only supported files
result = batch_parser.process_batch(
file_paths=supported_files,
output_dir="./output"
)
```
### Custom Error Handling
```python
def process_with_retry(file_paths, max_retries=3):
"""Process files with retry logic"""
for attempt in range(max_retries):
result = batch_parser.process_batch(file_paths, "./output")
if not result.failed_files:
break # All files processed successfully
print(f"Attempt {attempt + 1}: {len(result.failed_files)} files failed")
file_paths = result.failed_files # Retry failed files
return result
```
## Best Practices
1. **Start with default settings** and adjust based on performance
2. **Monitor system resources** during batch processing
3. **Use appropriate worker counts** for your hardware
4. **Handle errors gracefully** with retry logic
5. **Test with small batches** before processing large collections
6. **Use skip_installation_check** if facing parser installation issues
7. **Enable progress tracking** for long-running operations
8. **Set appropriate timeouts** based on expected file processing times
## Conclusion
The batch processing feature significantly improves RAG-Anything's throughput for large document collections. It provides flexible configuration options, comprehensive error handling, and seamless integration with the existing RAG-Anything pipeline.

552
docs/enhanced_markdown.md Normal file
View File

@@ -0,0 +1,552 @@
# Enhanced Markdown Conversion
This document describes the enhanced markdown conversion feature for RAG-Anything, which provides high-quality PDF generation from markdown files with multiple backend options and advanced styling.
## Overview
The enhanced markdown conversion feature provides professional-quality PDF generation from markdown files. It supports multiple conversion backends, advanced styling options, syntax highlighting, and seamless integration with RAG-Anything's document processing pipeline.
## Key Features
- **Multiple Backends**: WeasyPrint, Pandoc, and automatic backend selection
- **Advanced Styling**: Custom CSS, syntax highlighting, and professional layouts
- **Image Support**: Embedded images with proper scaling and positioning
- **Table Support**: Formatted tables with borders and professional styling
- **Code Highlighting**: Syntax highlighting for code blocks using Pygments
- **Custom Templates**: Support for custom CSS and document templates
- **Table of Contents**: Automatic TOC generation with navigation links
- **Professional Typography**: High-quality fonts and spacing
## Installation
### Required Dependencies
```bash
# Basic installation
pip install raganything[all]
# Required for enhanced markdown conversion
pip install markdown weasyprint pygments
```
### Optional Dependencies
```bash
# For Pandoc backend (system installation required)
# Ubuntu/Debian:
sudo apt-get install pandoc wkhtmltopdf
# macOS:
brew install pandoc wkhtmltopdf
# Or using conda:
conda install -c conda-forge pandoc wkhtmltopdf
```
### Backend-Specific Installation
#### WeasyPrint (Recommended)
```bash
# Install WeasyPrint with system dependencies
pip install weasyprint
# Ubuntu/Debian system dependencies:
sudo apt-get install -y build-essential python3-dev python3-pip \
python3-setuptools python3-wheel python3-cffi libcairo2 \
libpango-1.0-0 libpangocairo-1.0-0 libgdk-pixbuf2.0-0 \
libffi-dev shared-mime-info
```
#### Pandoc
- Download from: https://pandoc.org/installing.html
- Requires system-wide installation
- Used for complex document structures and LaTeX-quality output
## Usage
### Basic Conversion
```python
from raganything.enhanced_markdown import EnhancedMarkdownConverter, MarkdownConfig
# Create converter with default settings
converter = EnhancedMarkdownConverter()
# Convert markdown file to PDF
success = converter.convert_file_to_pdf(
input_path="document.md",
output_path="document.pdf",
method="auto" # Automatically select best available backend
)
if success:
print("✅ Conversion successful!")
else:
print("❌ Conversion failed")
```
### Advanced Configuration
```python
# Create custom configuration
config = MarkdownConfig(
page_size="A4", # A4, Letter, Legal, etc.
margin="1in", # CSS-style margins
font_size="12pt", # Base font size
line_height="1.5", # Line spacing
include_toc=True, # Generate table of contents
syntax_highlighting=True, # Enable code syntax highlighting
# Custom CSS styling
custom_css="""
body {
font-family: 'Georgia', serif;
color: #333;
}
h1 {
color: #2c3e50;
border-bottom: 2px solid #3498db;
padding-bottom: 0.3em;
}
code {
background-color: #f8f9fa;
padding: 2px 4px;
border-radius: 3px;
}
pre {
background-color: #f8f9fa;
border-left: 4px solid #3498db;
padding: 15px;
border-radius: 5px;
}
table {
border-collapse: collapse;
width: 100%;
margin: 1em 0;
}
th, td {
border: 1px solid #ddd;
padding: 8px 12px;
text-align: left;
}
th {
background-color: #f2f2f2;
font-weight: bold;
}
"""
)
converter = EnhancedMarkdownConverter(config)
```
### Backend Selection
```python
# Check available backends
converter = EnhancedMarkdownConverter()
backend_info = converter.get_backend_info()
print("Available backends:")
for backend, available in backend_info["available_backends"].items():
status = "✅" if available else "❌"
print(f" {status} {backend}")
print(f"Recommended backend: {backend_info['recommended_backend']}")
# Use specific backend
converter.convert_file_to_pdf(
input_path="document.md",
output_path="document.pdf",
method="weasyprint" # or "pandoc", "pandoc_system", "auto"
)
```
### Content Conversion
```python
# Convert markdown content directly (not from file)
markdown_content = """
# Sample Document
## Introduction
This is a **bold** statement with *italic* text.
## Code Example
```python
def hello_world():
print("Hello, World!")
return "Success"
```
## Table
| Feature | Status | Notes |
|---------|--------|-------|
| PDF Generation | ✅ | Working |
| Syntax Highlighting | ✅ | Pygments |
| Custom CSS | ✅ | Full support |
"""
success = converter.convert_markdown_to_pdf(
markdown_content=markdown_content,
output_path="sample.pdf",
method="auto"
)
```
### Command Line Interface
```bash
# Basic conversion
python -m raganything.enhanced_markdown document.md --output document.pdf
# With specific backend
python -m raganything.enhanced_markdown document.md --method weasyprint
# With custom CSS file
python -m raganything.enhanced_markdown document.md --css custom_style.css
# Show backend information
python -m raganything.enhanced_markdown --info
# Help
python -m raganything.enhanced_markdown --help
```
## Backend Comparison
| Backend | Pros | Cons | Best For | Quality |
|---------|------|------|----------|---------|
| **WeasyPrint** | • Excellent CSS support<br>• Fast rendering<br>• Great web-style layouts<br>• Python-based | • Limited LaTeX features<br>• Requires system deps | • Web-style documents<br>• Custom styling<br>• Fast conversion | ⭐⭐⭐⭐ |
| **Pandoc** | • Extensive features<br>• LaTeX-quality output<br>• Academic formatting<br>• Many input/output formats | • Slower conversion<br>• System installation<br>• Complex setup | • Academic papers<br>• Complex documents<br>• Publication quality | ⭐⭐⭐⭐⭐ |
| **Auto** | • Automatic selection<br>• Fallback support<br>• User-friendly | • May not use optimal backend | • General use<br>• Quick setup<br>• Development | ⭐⭐⭐⭐ |
## Configuration Options
### MarkdownConfig Parameters
```python
@dataclass
class MarkdownConfig:
# Page layout
page_size: str = "A4" # A4, Letter, Legal, A3, etc.
margin: str = "1in" # CSS margin format
font_size: str = "12pt" # Base font size
line_height: str = "1.5" # Line spacing multiplier
# Content options
include_toc: bool = True # Generate table of contents
syntax_highlighting: bool = True # Enable code highlighting
image_max_width: str = "100%" # Maximum image width
table_style: str = "..." # Default table CSS
# Styling
css_file: Optional[str] = None # External CSS file path
custom_css: Optional[str] = None # Inline CSS content
template_file: Optional[str] = None # Custom HTML template
# Output options
output_format: str = "pdf" # Currently only PDF supported
output_dir: Optional[str] = None # Output directory
# Metadata
metadata: Optional[Dict[str, str]] = None # Document metadata
```
### Supported Markdown Features
#### Basic Formatting
- **Headers**: `# ## ### #### ##### ######`
- **Emphasis**: `*italic*`, `**bold**`, `***bold italic***`
- **Links**: `[text](url)`, `[text][ref]`
- **Images**: `![alt](url)`, `![alt][ref]`
- **Lists**: Ordered and unordered, nested
- **Blockquotes**: `> quote`
- **Line breaks**: Double space or `\n\n`
#### Advanced Features
- **Tables**: GitHub-style tables with alignment
- **Code blocks**: Fenced code blocks with language specification
- **Inline code**: `backtick code`
- **Horizontal rules**: `---` or `***`
- **Footnotes**: `[^1]` references
- **Definition lists**: Term and definition pairs
- **Attributes**: `{#id .class key=value}`
#### Code Highlighting
```markdown
```python
def example_function():
"""This will be syntax highlighted"""
return "Hello, World!"
```
```javascript
function exampleFunction() {
// This will also be highlighted
return "Hello, World!";
}
```
```
## Integration with RAG-Anything
The enhanced markdown conversion integrates seamlessly with RAG-Anything:
```python
from raganything import RAGAnything
# Initialize RAG-Anything
rag = RAGAnything()
# Process markdown files - enhanced conversion is used automatically
await rag.process_document_complete("document.md")
# Batch processing with enhanced markdown conversion
result = rag.process_documents_batch(
file_paths=["doc1.md", "doc2.md", "doc3.md"],
output_dir="./output"
)
# The .md files will be converted to PDF using enhanced conversion
# before being processed by the RAG system
```
## Performance Considerations
### Conversion Speed
- **WeasyPrint**: ~1-3 seconds for typical documents
- **Pandoc**: ~3-10 seconds for typical documents
- **Large documents**: Time scales roughly linearly with content
### Memory Usage
- **WeasyPrint**: ~50-100MB per conversion
- **Pandoc**: ~100-200MB per conversion
- **Images**: Large images increase memory usage significantly
### Optimization Tips
1. **Resize large images** before embedding
2. **Use compressed images** (JPEG for photos, PNG for graphics)
3. **Limit concurrent conversions** to avoid memory issues
4. **Cache converted content** when processing multiple times
## Examples
### Sample Markdown Document
```markdown
# Technical Documentation
## Table of Contents
[TOC]
## Overview
This document provides comprehensive technical specifications.
## Architecture
### System Components
1. **Parser Engine**: Handles document processing
2. **Storage Layer**: Manages data persistence
3. **Query Interface**: Provides search capabilities
### Code Implementation
```python
from raganything import RAGAnything
# Initialize system
rag = RAGAnything(config={
"working_dir": "./storage",
"enable_image_processing": True
})
# Process document
await rag.process_document_complete("document.pdf")
```
### Performance Metrics
| Component | Throughput | Latency | Memory |
|-----------|------------|---------|--------|
| Parser | 100 docs/hour | 36s avg | 2.5 GB |
| Storage | 1000 ops/sec | 1ms avg | 512 MB |
| Query | 50 queries/sec | 20ms avg | 1 GB |
## Integration Notes
> **Important**: Always validate input before processing.
## Conclusion
The enhanced system provides excellent performance for document processing workflows.
```
### Generated PDF Features
The enhanced markdown converter produces PDFs with:
- **Professional typography** with proper font selection and spacing
- **Syntax-highlighted code blocks** using Pygments
- **Formatted tables** with borders and alternating row colors
- **Clickable table of contents** with navigation links
- **Responsive images** that scale appropriately
- **Custom styling** through CSS
- **Proper page breaks** and margins
- **Document metadata** and properties
## Troubleshooting
### Common Issues
#### WeasyPrint Installation Problems
```bash
# Ubuntu/Debian: Install system dependencies
sudo apt-get update
sudo apt-get install -y build-essential python3-dev libcairo2 \
libpango-1.0-0 libpangocairo-1.0-0 libgdk-pixbuf2.0-0 \
libffi-dev shared-mime-info
# Then reinstall WeasyPrint
pip install --force-reinstall weasyprint
```
#### Pandoc Not Found
```bash
# Check if Pandoc is installed
pandoc --version
# Install Pandoc (Ubuntu/Debian)
sudo apt-get install pandoc wkhtmltopdf
# Or download from: https://pandoc.org/installing.html
```
#### CSS Issues
- Check CSS syntax in custom_css
- Verify CSS file paths exist
- Test CSS with simple HTML first
- Use browser developer tools to debug styling
#### Image Problems
- Ensure images are accessible (correct paths)
- Check image file formats (PNG, JPEG, GIF supported)
- Verify image file permissions
- Consider image size and format optimization
#### Font Issues
```python
# Use web-safe fonts
config = MarkdownConfig(
custom_css="""
body {
font-family: 'Arial', 'Helvetica', sans-serif;
}
"""
)
```
### Debug Mode
Enable detailed logging for troubleshooting:
```python
import logging
# Enable debug logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Create converter with debug logging
converter = EnhancedMarkdownConverter()
result = converter.convert_file_to_pdf("test.md", "test.pdf")
```
### Error Handling
```python
def robust_conversion(input_path, output_path):
"""Convert with fallback backends"""
converter = EnhancedMarkdownConverter()
# Try backends in order of preference
backends = ["weasyprint", "pandoc", "auto"]
for backend in backends:
try:
success = converter.convert_file_to_pdf(
input_path=input_path,
output_path=output_path,
method=backend
)
if success:
print(f"✅ Conversion successful with {backend}")
return True
except Exception as e:
print(f"❌ {backend} failed: {str(e)}")
continue
print("❌ All backends failed")
return False
```
## API Reference
### EnhancedMarkdownConverter
```python
class EnhancedMarkdownConverter:
def __init__(self, config: Optional[MarkdownConfig] = None):
"""Initialize converter with optional configuration"""
def convert_file_to_pdf(self, input_path: str, output_path: str, method: str = "auto") -> bool:
"""Convert markdown file to PDF"""
def convert_markdown_to_pdf(self, markdown_content: str, output_path: str, method: str = "auto") -> bool:
"""Convert markdown content to PDF"""
def get_backend_info(self) -> Dict[str, Any]:
"""Get information about available backends"""
def convert_with_weasyprint(self, markdown_content: str, output_path: str) -> bool:
"""Convert using WeasyPrint backend"""
def convert_with_pandoc(self, markdown_content: str, output_path: str) -> bool:
"""Convert using Pandoc backend"""
```
## Best Practices
1. **Choose the right backend** for your use case:
- **WeasyPrint** for web-style documents and custom CSS
- **Pandoc** for academic papers and complex formatting
- **Auto** for general use and development
2. **Optimize images** before embedding:
- Use appropriate formats (JPEG for photos, PNG for graphics)
- Compress images to reduce file size
- Set reasonable maximum widths
3. **Design responsive layouts**:
- Use relative units (%, em) instead of absolute (px)
- Test with different page sizes
- Consider print-specific CSS
4. **Test your styling**:
- Start with default styling and incrementally customize
- Test with sample content before production use
- Validate CSS syntax
5. **Handle errors gracefully**:
- Implement fallback backends
- Provide meaningful error messages
- Log conversion attempts for debugging
6. **Performance optimization**:
- Cache converted content when possible
- Process large batches with appropriate worker counts
- Monitor memory usage with large documents
## Conclusion
The enhanced markdown conversion feature provides professional-quality PDF generation with flexible styling options and multiple backend support. It seamlessly integrates with RAG-Anything's document processing pipeline while offering standalone functionality for markdown-to-PDF conversion needs.

View File

@@ -0,0 +1,550 @@
#!/usr/bin/env python
"""
Batch Processing Example for RAG-Anything
This example demonstrates how to use the batch processing capabilities
to process multiple documents in parallel for improved throughput.
Features demonstrated:
- Basic batch processing with BatchParser
- Asynchronous batch processing
- Integration with RAG-Anything
- Error handling and progress tracking
- File filtering and directory processing
"""
import asyncio
import logging
from pathlib import Path
import tempfile
import time
# Add project root directory to Python path
import sys
sys.path.append(str(Path(__file__).parent.parent))
from raganything import RAGAnything, RAGAnythingConfig
from raganything.batch_parser import BatchParser
def create_sample_documents():
"""Create sample documents for batch processing testing"""
temp_dir = Path(tempfile.mkdtemp())
sample_files = []
# Create various document types
documents = {
"document1.txt": "This is a simple text document for testing batch processing.",
"document2.txt": "Another text document with different content.",
"document3.md": """# Markdown Document
## Introduction
This is a markdown document for testing.
### Features
- Markdown formatting
- Code blocks
- Lists
```python
def example():
return "Hello from markdown"
```
""",
"report.txt": """Business Report
Executive Summary:
This report demonstrates batch processing capabilities.
Key Findings:
1. Parallel processing improves throughput
2. Progress tracking enhances user experience
3. Error handling ensures reliability
Conclusion:
Batch processing is essential for large-scale document processing.
""",
"notes.md": """# Meeting Notes
## Date: 2024-01-15
### Attendees
- Alice Johnson
- Bob Smith
- Carol Williams
### Discussion Topics
1. **Batch Processing Implementation**
- Parallel document processing
- Progress tracking
- Error handling strategies
2. **Performance Metrics**
- Target: 100 documents/hour
- Memory usage: < 4GB
- Success rate: > 95%
### Action Items
- [ ] Implement batch processing
- [ ] Add progress bars
- [ ] Test with large document sets
- [ ] Optimize memory usage
### Next Steps
Continue development and testing of batch processing features.
"""
}
# Create files
for filename, content in documents.items():
file_path = temp_dir / filename
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
sample_files.append(str(file_path))
return sample_files, temp_dir
def demonstrate_basic_batch_processing():
"""Demonstrate basic batch processing functionality"""
print("\n" + "=" * 60)
print("BASIC BATCH PROCESSING DEMONSTRATION")
print("=" * 60)
# Create sample documents
sample_files, temp_dir = create_sample_documents()
try:
print(f"Created {len(sample_files)} sample documents in: {temp_dir}")
for file_path in sample_files:
print(f" - {Path(file_path).name}")
# Create batch parser
batch_parser = BatchParser(
parser_type="mineru",
max_workers=3,
show_progress=True,
timeout_per_file=60,
skip_installation_check=True # Skip installation check for demo
)
print(f"\nBatch parser configured:")
print(f" - Parser type: mineru")
print(f" - Max workers: 3")
print(f" - Progress tracking: enabled")
print(f" - Timeout per file: 60 seconds")
# Check supported extensions
supported_extensions = batch_parser.get_supported_extensions()
print(f" - Supported extensions: {supported_extensions}")
# Filter files to supported types
supported_files = batch_parser.filter_supported_files(sample_files)
print(f"\nFile filtering results:")
print(f" - Total files: {len(sample_files)}")
print(f" - Supported files: {len(supported_files)}")
# Process batch
output_dir = temp_dir / "batch_output"
print(f"\nStarting batch processing...")
print(f"Output directory: {output_dir}")
start_time = time.time()
result = batch_parser.process_batch(
file_paths=supported_files,
output_dir=str(output_dir),
parse_method="auto",
recursive=False
)
processing_time = time.time() - start_time
# Display results
print("\n" + "-" * 40)
print("BATCH PROCESSING RESULTS")
print("-" * 40)
print(result.summary())
print(f"Total processing time: {processing_time:.2f} seconds")
print(f"Success rate: {result.success_rate:.1f}%")
if result.successful_files:
print(f"\nSuccessfully processed files:")
for file_path in result.successful_files:
print(f"{Path(file_path).name}")
if result.failed_files:
print(f"\nFailed files:")
for file_path in result.failed_files:
error = result.errors.get(file_path, "Unknown error")
print(f"{Path(file_path).name}: {error}")
return result
except Exception as e:
print(f"❌ Batch processing demonstration failed: {str(e)}")
return None
async def demonstrate_async_batch_processing():
"""Demonstrate asynchronous batch processing"""
print("\n" + "=" * 60)
print("ASYNCHRONOUS BATCH PROCESSING DEMONSTRATION")
print("=" * 60)
# Create sample documents
sample_files, temp_dir = create_sample_documents()
try:
print(f"Processing {len(sample_files)} documents asynchronously...")
# Create batch parser
batch_parser = BatchParser(
parser_type="mineru",
max_workers=2,
show_progress=True,
skip_installation_check=True
)
# Process batch asynchronously
output_dir = temp_dir / "async_output"
start_time = time.time()
result = await batch_parser.process_batch_async(
file_paths=sample_files,
output_dir=str(output_dir),
parse_method="auto",
recursive=False
)
processing_time = time.time() - start_time
# Display results
print("\n" + "-" * 40)
print("ASYNC BATCH PROCESSING RESULTS")
print("-" * 40)
print(result.summary())
print(f"Async processing time: {processing_time:.2f} seconds")
print(f"Success rate: {result.success_rate:.1f}%")
return result
except Exception as e:
print(f"❌ Async batch processing demonstration failed: {str(e)}")
return None
async def demonstrate_rag_integration():
"""Demonstrate batch processing integration with RAG-Anything"""
print("\n" + "=" * 60)
print("RAG-ANYTHING BATCH INTEGRATION DEMONSTRATION")
print("=" * 60)
# Create sample documents
sample_files, temp_dir = create_sample_documents()
try:
# Initialize RAG-Anything with temporary storage
config = RAGAnythingConfig(
working_dir=str(temp_dir / "rag_storage"),
enable_image_processing=True,
enable_table_processing=True,
enable_equation_processing=True,
max_concurrent_files=2
)
rag = RAGAnything(config=config)
print("RAG-Anything initialized with batch processing capabilities")
# Show available batch methods
batch_methods = [method for method in dir(rag) if 'batch' in method.lower()]
print(f"Available batch methods: {batch_methods}")
# Demonstrate batch processing with RAG integration
print(f"\nProcessing {len(sample_files)} documents with RAG integration...")
# Use the RAG-integrated batch processing
try:
# Process documents in batch
result = rag.process_documents_batch(
file_paths=sample_files,
output_dir=str(temp_dir / "rag_batch_output"),
max_workers=2,
show_progress=True
)
print("\n" + "-" * 40)
print("RAG BATCH PROCESSING RESULTS")
print("-" * 40)
print(result.summary())
print(f"Success rate: {result.success_rate:.1f}%")
# Demonstrate batch processing with full RAG integration
print(f"\nProcessing documents with full RAG integration...")
rag_result = await rag.process_documents_with_rag_batch(
file_paths=sample_files[:2], # Process subset for demo
output_dir=str(temp_dir / "rag_full_output"),
max_workers=1,
show_progress=True
)
print("\n" + "-" * 40)
print("FULL RAG INTEGRATION RESULTS")
print("-" * 40)
print(f"Parse result: {rag_result['parse_result'].summary()}")
print(f"RAG processing time: {rag_result['total_processing_time']:.2f} seconds")
print(f"Successfully processed with RAG: {rag_result['successful_rag_files']}")
print(f"Failed RAG processing: {rag_result['failed_rag_files']}")
return rag_result
except Exception as e:
print(f"⚠️ RAG integration demo completed with limitations: {str(e)}")
print("Note: This is expected in environments without full API configuration")
return None
except Exception as e:
print(f"❌ RAG integration demonstration failed: {str(e)}")
return None
def demonstrate_directory_processing():
"""Demonstrate processing entire directories"""
print("\n" + "=" * 60)
print("DIRECTORY PROCESSING DEMONSTRATION")
print("=" * 60)
# Create a directory structure with nested files
temp_dir = Path(tempfile.mkdtemp())
# Create main directory files
main_files = {
"overview.txt": "Main directory overview document",
"readme.md": "# Project README\n\nThis is the main project documentation."
}
# Create subdirectory
sub_dir = temp_dir / "subdirectory"
sub_dir.mkdir()
sub_files = {
"details.txt": "Detailed information in subdirectory",
"notes.md": "# Notes\n\nAdditional notes and information."
}
# Write all files
all_files = []
for filename, content in main_files.items():
file_path = temp_dir / filename
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
all_files.append(str(file_path))
for filename, content in sub_files.items():
file_path = sub_dir / filename
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
all_files.append(str(file_path))
try:
print(f"Created directory structure:")
print(f" Main directory: {temp_dir}")
print(f" Files in main: {list(main_files.keys())}")
print(f" Subdirectory: {sub_dir}")
print(f" Files in sub: {list(sub_files.keys())}")
# Create batch parser
batch_parser = BatchParser(
parser_type="mineru",
max_workers=2,
show_progress=True,
skip_installation_check=True
)
# Process entire directory recursively
print(f"\nProcessing entire directory recursively...")
result = batch_parser.process_batch(
file_paths=[str(temp_dir)], # Pass directory path
output_dir=str(temp_dir / "directory_output"),
parse_method="auto",
recursive=True # Include subdirectories
)
print("\n" + "-" * 40)
print("DIRECTORY PROCESSING RESULTS")
print("-" * 40)
print(result.summary())
print(f"Total files found and processed: {result.total_files}")
print(f"Success rate: {result.success_rate:.1f}%")
if result.successful_files:
print(f"\nSuccessfully processed:")
for file_path in result.successful_files:
relative_path = Path(file_path).relative_to(temp_dir)
print(f"{relative_path}")
return result
except Exception as e:
print(f"❌ Directory processing demonstration failed: {str(e)}")
return None
def demonstrate_error_handling():
"""Demonstrate error handling and recovery"""
print("\n" + "=" * 60)
print("ERROR HANDLING DEMONSTRATION")
print("=" * 60)
temp_dir = Path(tempfile.mkdtemp())
# Create files with various issues
files_with_issues = {
"valid_file.txt": "This is a valid file that should process successfully.",
"empty_file.txt": "", # Empty file
"large_file.txt": "x" * 1000000, # Large file (1MB of 'x')
}
created_files = []
for filename, content in files_with_issues.items():
file_path = temp_dir / filename
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
created_files.append(str(file_path))
# Add a non-existent file to the list
created_files.append(str(temp_dir / "non_existent_file.txt"))
try:
print(f"Testing error handling with {len(created_files)} files:")
for file_path in created_files:
name = Path(file_path).name
exists = Path(file_path).exists()
size = Path(file_path).stat().st_size if exists else 0
print(f" - {name}: {'exists' if exists else 'missing'}, {size} bytes")
# Create batch parser with short timeout for demonstration
batch_parser = BatchParser(
parser_type="mineru",
max_workers=2,
show_progress=True,
timeout_per_file=30, # Short timeout for demo
skip_installation_check=True
)
# Process files and handle errors
result = batch_parser.process_batch(
file_paths=created_files,
output_dir=str(temp_dir / "error_test_output"),
parse_method="auto"
)
print("\n" + "-" * 40)
print("ERROR HANDLING RESULTS")
print("-" * 40)
print(result.summary())
if result.successful_files:
print(f"\nSuccessful files:")
for file_path in result.successful_files:
print(f"{Path(file_path).name}")
if result.failed_files:
print(f"\nFailed files with error details:")
for file_path in result.failed_files:
error = result.errors.get(file_path, "Unknown error")
print(f"{Path(file_path).name}: {error}")
# Demonstrate retry logic
if result.failed_files:
print(f"\nDemonstrating retry logic for {len(result.failed_files)} failed files...")
# Retry only the failed files
retry_result = batch_parser.process_batch(
file_paths=result.failed_files,
output_dir=str(temp_dir / "retry_output"),
parse_method="auto"
)
print(f"Retry results: {retry_result.summary()}")
return result
except Exception as e:
print(f"❌ Error handling demonstration failed: {str(e)}")
return None
async def main():
"""Main demonstration function"""
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
print("RAG-Anything Batch Processing Demonstration")
print("=" * 70)
print("This example demonstrates various batch processing capabilities:")
print(" - Basic batch processing with progress tracking")
print(" - Asynchronous processing for improved performance")
print(" - Integration with RAG-Anything pipeline")
print(" - Directory processing with recursive file discovery")
print(" - Comprehensive error handling and recovery")
results = {}
# Run demonstrations
print("\n🚀 Starting demonstrations...")
# Basic batch processing
results['basic'] = demonstrate_basic_batch_processing()
# Asynchronous processing
results['async'] = await demonstrate_async_batch_processing()
# RAG integration
results['rag'] = await demonstrate_rag_integration()
# Directory processing
results['directory'] = demonstrate_directory_processing()
# Error handling
results['error_handling'] = demonstrate_error_handling()
# Summary
print("\n" + "=" * 70)
print("DEMONSTRATION SUMMARY")
print("=" * 70)
for demo_name, result in results.items():
if result:
if hasattr(result, 'success_rate'):
print(f"{demo_name.upper()}: {result.success_rate:.1f}% success rate")
else:
print(f"{demo_name.upper()}: Completed successfully")
else:
print(f"{demo_name.upper()}: Failed or had limitations")
print("\n📊 Key Features Demonstrated:")
print(" - Parallel document processing with configurable worker counts")
print(" - Real-time progress tracking with tqdm progress bars")
print(" - Comprehensive error handling and reporting")
print(" - File filtering based on supported document types")
print(" - Directory processing with recursive file discovery")
print(" - Asynchronous processing for improved performance")
print(" - Integration with RAG-Anything document pipeline")
print(" - Retry logic for failed documents")
print(" - Detailed processing statistics and timing")
print("\n💡 Best Practices Highlighted:")
print(" - Use appropriate worker counts for your system")
print(" - Enable progress tracking for long-running operations")
print(" - Handle errors gracefully with retry mechanisms")
print(" - Filter files to supported types before processing")
print(" - Set reasonable timeouts for document processing")
print(" - Use skip_installation_check for environments with conflicts")
if __name__ == "__main__":
asyncio.run(main())

File diff suppressed because it is too large Load Diff

View File

@@ -1,17 +1,36 @@
"""
Batch processing functionality for RAGAnything
Contains methods for processing multiple files in batches
Contains methods for processing multiple documents in batch mode
"""
import asyncio
from typing import Optional, List
import logging
from pathlib import Path
from typing import List, Dict, Any, Optional, TYPE_CHECKING
import time
from .batch_parser import BatchParser, BatchProcessingResult
if TYPE_CHECKING:
from .config import RAGAnythingConfig
class BatchMixin:
"""BatchMixin class containing batch processing functionality for RAGAnything"""
# Type hints for mixin attributes (will be available when mixed into RAGAnything)
config: "RAGAnythingConfig"
logger: logging.Logger
# Type hints for methods that will be available from other mixins
async def _ensure_lightrag_initialized(self) -> None: ...
async def process_document_complete(self, file_path: str, **kwargs) -> None: ...
# ==========================================
# ORIGINAL BATCH PROCESSING METHOD (RESTORED)
# ==========================================
async def process_folder_complete(
self,
folder_path: str,
@@ -25,151 +44,343 @@ class BatchMixin:
max_workers: int = None,
):
"""
Process all files in a folder in batch
Process all supported files in a folder
Args:
folder_path: Path to the folder to process
output_dir: Parser output directory (defaults to config.parser_output_dir)
parse_method: Parse method (defaults to config.parse_method)
display_stats: Whether to display content statistics for each file (defaults to False for batch processing)
split_by_character: Optional character to split text by
split_by_character_only: If True, split only by the specified character
file_extensions: List of file extensions to process (defaults to config.supported_file_extensions)
recursive: Whether to recursively process subfolders (defaults to config.recursive_folder_processing)
max_workers: Maximum number of concurrent workers (defaults to config.max_concurrent_files)
folder_path: Path to the folder containing files to process
output_dir: Directory for parsed outputs (optional)
parse_method: Parsing method to use (optional)
display_stats: Whether to display statistics (optional)
split_by_character: Character to split by (optional)
split_by_character_only: Whether to split only by character (optional)
file_extensions: List of file extensions to process (optional)
recursive: Whether to process folders recursively (optional)
max_workers: Maximum number of workers for concurrent processing (optional)
"""
# Ensure LightRAG is initialized
await self._ensure_lightrag_initialized()
# Use config defaults if not provided
if output_dir is None:
output_dir = self.config.parser_output_dir
if parse_method is None:
parse_method = self.config.parse_method
if display_stats is None:
display_stats = False # Default to False for batch processing
display_stats = True
if file_extensions is None:
file_extensions = self.config.supported_file_extensions
if recursive is None:
recursive = self.config.recursive_folder_processing
if max_workers is None:
max_workers = self.config.max_concurrent_files
if file_extensions is None:
file_extensions = self.config.supported_file_extensions
folder_path = Path(folder_path)
if not folder_path.exists() or not folder_path.is_dir():
raise ValueError(
f"Folder does not exist or is not a valid directory: {folder_path}"
)
await self._ensure_lightrag_initialized()
# Convert file extensions to set for faster lookup
target_extensions = set(ext.lower().strip() for ext in file_extensions)
# Get all files in the folder
folder_path_obj = Path(folder_path)
if not folder_path_obj.exists():
raise FileNotFoundError(f"Folder not found: {folder_path}")
# Log the extensions being used
self.logger.info(
f"Processing files with extensions: {sorted(target_extensions)}"
)
# Collect all files to process
# Collect files based on supported extensions
files_to_process = []
if recursive:
# Recursively traverse all subfolders
for file_path in folder_path.rglob("*"):
if (
file_path.is_file()
and file_path.suffix.lower() in target_extensions
):
files_to_process.append(file_path)
else:
# Process only current folder
for file_path in folder_path.glob("*"):
if (
file_path.is_file()
and file_path.suffix.lower() in target_extensions
):
files_to_process.append(file_path)
for file_ext in file_extensions:
if recursive:
pattern = f"**/*{file_ext}"
else:
pattern = f"*{file_ext}"
files_to_process.extend(folder_path_obj.glob(pattern))
if not files_to_process:
self.logger.info(f"No files to process found in {folder_path}")
self.logger.warning(f"No supported files found in {folder_path}")
return
self.logger.info(f"Found {len(files_to_process)} files to process")
self.logger.info("File type distribution:")
self.logger.info(
f"Found {len(files_to_process)} files to process in {folder_path}"
)
# Count file types
file_type_count = {}
for file_path in files_to_process:
ext = file_path.suffix.lower()
file_type_count[ext] = file_type_count.get(ext, 0) + 1
# Create output directory if it doesn't exist
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
for ext, count in sorted(file_type_count.items()):
self.logger.info(f" {ext}: {count} files")
# Create progress tracking
processed_count = 0
failed_files = []
# Use semaphore to control concurrency
# Process files with controlled concurrency
semaphore = asyncio.Semaphore(max_workers)
tasks = []
async def process_single_file(file_path: Path, index: int) -> None:
"""Process a single file"""
async def process_single_file(file_path: Path):
async with semaphore:
nonlocal processed_count
try:
self.logger.info(
f"[{index}/{len(files_to_process)}] Processing: {file_path}"
)
# Create separate output directory for each file
file_output_dir = Path(output_dir) / file_path.stem
file_output_dir.mkdir(parents=True, exist_ok=True)
# Process file
await self.process_document_complete(
file_path=str(file_path),
output_dir=str(file_output_dir),
str(file_path),
output_dir=output_dir,
parse_method=parse_method,
display_stats=display_stats,
split_by_character=split_by_character,
split_by_character_only=split_by_character_only,
)
processed_count += 1
self.logger.info(
f"[{index}/{len(files_to_process)}] Successfully processed: {file_path}"
)
return True, str(file_path), None
except Exception as e:
self.logger.error(
f"[{index}/{len(files_to_process)}] Failed to process: {file_path}"
)
self.logger.error(f"Error: {str(e)}")
failed_files.append((file_path, str(e)))
self.logger.error(f"Failed to process {file_path}: {str(e)}")
return False, str(file_path), str(e)
# Create all processing tasks
tasks = []
for index, file_path in enumerate(files_to_process, 1):
task = process_single_file(file_path, index)
# Create tasks for all files
for file_path in files_to_process:
task = asyncio.create_task(process_single_file(file_path))
tasks.append(task)
# Wait for all tasks to complete
await asyncio.gather(*tasks, return_exceptions=True)
results = await asyncio.gather(*tasks, return_exceptions=True)
# Output processing statistics
self.logger.info("\n===== Batch Processing Complete =====")
self.logger.info(f"Total files: {len(files_to_process)}")
self.logger.info(f"Successfully processed: {processed_count}")
self.logger.info(f"Failed: {len(failed_files)}")
# Process results
successful_files = []
failed_files = []
for result in results:
if isinstance(result, Exception):
failed_files.append(("unknown", str(result)))
else:
success, file_path, error = result
if success:
successful_files.append(file_path)
else:
failed_files.append((file_path, error))
if failed_files:
self.logger.info("\nFailed files:")
for file_path, error in failed_files:
self.logger.info(f" - {file_path}: {error}")
# Display statistics if requested
if display_stats:
self.logger.info("Processing complete!")
self.logger.info(f" Successful: {len(successful_files)} files")
self.logger.info(f" Failed: {len(failed_files)} files")
if failed_files:
self.logger.warning("Failed files:")
for file_path, error in failed_files:
self.logger.warning(f" - {file_path}: {error}")
# ==========================================
# NEW ENHANCED BATCH PROCESSING METHODS
# ==========================================
def process_documents_batch(
self,
file_paths: List[str],
output_dir: Optional[str] = None,
parse_method: Optional[str] = None,
max_workers: Optional[int] = None,
recursive: Optional[bool] = None,
show_progress: bool = True,
**kwargs,
) -> BatchProcessingResult:
"""
Process multiple documents in batch using the new BatchParser
Args:
file_paths: List of file paths or directories to process
output_dir: Output directory for parsed files
parse_method: Parsing method to use
max_workers: Maximum number of workers for parallel processing
recursive: Whether to process directories recursively
show_progress: Whether to show progress bar
**kwargs: Additional arguments passed to the parser
Returns:
BatchProcessingResult: Results of the batch processing
"""
# Use config defaults if not specified
if output_dir is None:
output_dir = self.config.parser_output_dir
if parse_method is None:
parse_method = self.config.parse_method
if max_workers is None:
max_workers = self.config.max_concurrent_files
if recursive is None:
recursive = self.config.recursive_folder_processing
# Create batch parser
batch_parser = BatchParser(
parser_type=self.config.parser,
max_workers=max_workers,
show_progress=show_progress,
skip_installation_check=True, # Skip installation check for better UX
)
# Process batch
return batch_parser.process_batch(
file_paths=file_paths,
output_dir=output_dir,
parse_method=parse_method,
recursive=recursive,
**kwargs,
)
async def process_documents_batch_async(
self,
file_paths: List[str],
output_dir: Optional[str] = None,
parse_method: Optional[str] = None,
max_workers: Optional[int] = None,
recursive: Optional[bool] = None,
show_progress: bool = True,
**kwargs,
) -> BatchProcessingResult:
"""
Asynchronously process multiple documents in batch
Args:
file_paths: List of file paths or directories to process
output_dir: Output directory for parsed files
parse_method: Parsing method to use
max_workers: Maximum number of workers for parallel processing
recursive: Whether to process directories recursively
show_progress: Whether to show progress bar
**kwargs: Additional arguments passed to the parser
Returns:
BatchProcessingResult: Results of the batch processing
"""
# Use config defaults if not specified
if output_dir is None:
output_dir = self.config.parser_output_dir
if parse_method is None:
parse_method = self.config.parse_method
if max_workers is None:
max_workers = self.config.max_concurrent_files
if recursive is None:
recursive = self.config.recursive_folder_processing
# Create batch parser
batch_parser = BatchParser(
parser_type=self.config.parser,
max_workers=max_workers,
show_progress=show_progress,
skip_installation_check=True, # Skip installation check for better UX
)
# Process batch asynchronously
return await batch_parser.process_batch_async(
file_paths=file_paths,
output_dir=output_dir,
parse_method=parse_method,
recursive=recursive,
**kwargs,
)
def get_supported_file_extensions(self) -> List[str]:
"""Get list of supported file extensions for batch processing"""
batch_parser = BatchParser(parser_type=self.config.parser)
return batch_parser.get_supported_extensions()
def filter_supported_files(
self, file_paths: List[str], recursive: Optional[bool] = None
) -> List[str]:
"""
Filter file paths to only include supported file types
Args:
file_paths: List of file paths to filter
recursive: Whether to process directories recursively
Returns:
List of supported file paths
"""
if recursive is None:
recursive = self.config.recursive_folder_processing
batch_parser = BatchParser(parser_type=self.config.parser)
return batch_parser.filter_supported_files(file_paths, recursive)
async def process_documents_with_rag_batch(
self,
file_paths: List[str],
output_dir: Optional[str] = None,
parse_method: Optional[str] = None,
max_workers: Optional[int] = None,
recursive: Optional[bool] = None,
show_progress: bool = True,
**kwargs,
) -> Dict[str, Any]:
"""
Process documents in batch and then add them to RAG
This method combines document parsing and RAG insertion:
1. First, parse all documents using batch processing
2. Then, process each successfully parsed document with RAG
Args:
file_paths: List of file paths or directories to process
output_dir: Output directory for parsed files
parse_method: Parsing method to use
max_workers: Maximum number of workers for parallel processing
recursive: Whether to process directories recursively
show_progress: Whether to show progress bar
**kwargs: Additional arguments passed to the parser
Returns:
Dict containing both parse results and RAG processing results
"""
start_time = time.time()
# Use config defaults if not specified
if output_dir is None:
output_dir = self.config.parser_output_dir
if parse_method is None:
parse_method = self.config.parse_method
if max_workers is None:
max_workers = self.config.max_concurrent_files
if recursive is None:
recursive = self.config.recursive_folder_processing
self.logger.info("Starting batch processing with RAG integration")
# Step 1: Parse documents in batch
parse_result = self.process_documents_batch(
file_paths=file_paths,
output_dir=output_dir,
parse_method=parse_method,
max_workers=max_workers,
recursive=recursive,
show_progress=show_progress,
**kwargs,
)
# Step 2: Process with RAG
# Initialize RAG system
await self._ensure_lightrag_initialized()
# Then, process each successful file with RAG
rag_results = {}
if parse_result.successful_files:
self.logger.info(
f"Processing {len(parse_result.successful_files)} files with RAG"
)
# Process files with RAG (this could be parallelized in the future)
for file_path in parse_result.successful_files:
try:
# Process the successfully parsed file with RAG
await self.process_document_complete(
file_path,
output_dir=output_dir,
parse_method=parse_method,
**kwargs,
)
# Get some statistics about the processed content
# This would require additional tracking in the RAG system
rag_results[file_path] = {"status": "success", "processed": True}
except Exception as e:
self.logger.error(
f"Failed to process {file_path} with RAG: {str(e)}"
)
rag_results[file_path] = {
"status": "failed",
"error": str(e),
"processed": False,
}
processing_time = time.time() - start_time
return {
"total": len(files_to_process),
"success": processed_count,
"failed": len(failed_files),
"failed_files": failed_files,
"parse_result": parse_result,
"rag_results": rag_results,
"total_processing_time": processing_time,
"successful_rag_files": len(
[r for r in rag_results.values() if r["processed"]]
),
"failed_rag_files": len(
[r for r in rag_results.values() if not r["processed"]]
),
}

430
raganything/batch_parser.py Normal file
View File

@@ -0,0 +1,430 @@
"""
Batch and Parallel Document Parsing
This module provides functionality for processing multiple documents in parallel,
with progress reporting and error handling.
"""
import asyncio
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
import time
from tqdm import tqdm
from .parser import MineruParser, DoclingParser
@dataclass
class BatchProcessingResult:
"""Result of batch processing operation"""
successful_files: List[str]
failed_files: List[str]
total_files: int
processing_time: float
errors: Dict[str, str]
output_dir: str
@property
def success_rate(self) -> float:
"""Calculate success rate as percentage"""
if self.total_files == 0:
return 0.0
return (len(self.successful_files) / self.total_files) * 100
def summary(self) -> str:
"""Generate a summary of the batch processing results"""
return (
f"Batch Processing Summary:\n"
f" Total files: {self.total_files}\n"
f" Successful: {len(self.successful_files)} ({self.success_rate:.1f}%)\n"
f" Failed: {len(self.failed_files)}\n"
f" Processing time: {self.processing_time:.2f} seconds\n"
f" Output directory: {self.output_dir}"
)
class BatchParser:
"""
Batch document parser with parallel processing capabilities
Supports processing multiple documents concurrently with progress tracking
and comprehensive error handling.
"""
def __init__(
self,
parser_type: str = "mineru",
max_workers: int = 4,
show_progress: bool = True,
timeout_per_file: int = 300,
skip_installation_check: bool = False,
):
"""
Initialize batch parser
Args:
parser_type: Type of parser to use ("mineru" or "docling")
max_workers: Maximum number of parallel workers
show_progress: Whether to show progress bars
timeout_per_file: Timeout in seconds for each file
skip_installation_check: Skip parser installation check (useful for testing)
"""
self.parser_type = parser_type
self.max_workers = max_workers
self.show_progress = show_progress
self.timeout_per_file = timeout_per_file
self.logger = logging.getLogger(__name__)
# Initialize parser
if parser_type == "mineru":
self.parser = MineruParser()
elif parser_type == "docling":
self.parser = DoclingParser()
else:
raise ValueError(f"Unsupported parser type: {parser_type}")
# Check parser installation (optional)
if not skip_installation_check:
if not self.parser.check_installation():
self.logger.warning(
f"{parser_type.title()} parser installation check failed. "
f"This may be due to package conflicts. "
f"Use skip_installation_check=True to bypass this check."
)
# Don't raise an error, just warn - the parser might still work
def get_supported_extensions(self) -> List[str]:
"""Get list of supported file extensions"""
return list(
self.parser.OFFICE_FORMATS
| self.parser.IMAGE_FORMATS
| self.parser.TEXT_FORMATS
| {".pdf"}
)
def filter_supported_files(
self, file_paths: List[str], recursive: bool = True
) -> List[str]:
"""
Filter file paths to only include supported file types
Args:
file_paths: List of file paths or directories
recursive: Whether to search directories recursively
Returns:
List of supported file paths
"""
supported_extensions = set(self.get_supported_extensions())
supported_files = []
for path_str in file_paths:
path = Path(path_str)
if path.is_file():
if path.suffix.lower() in supported_extensions:
supported_files.append(str(path))
else:
self.logger.warning(f"Unsupported file type: {path}")
elif path.is_dir():
if recursive:
# Recursively find all files
for file_path in path.rglob("*"):
if (
file_path.is_file()
and file_path.suffix.lower() in supported_extensions
):
supported_files.append(str(file_path))
else:
# Only files in the directory (not subdirectories)
for file_path in path.glob("*"):
if (
file_path.is_file()
and file_path.suffix.lower() in supported_extensions
):
supported_files.append(str(file_path))
else:
self.logger.warning(f"Path does not exist: {path}")
return supported_files
def process_single_file(
self, file_path: str, output_dir: str, parse_method: str = "auto", **kwargs
) -> Tuple[bool, str, Optional[str]]:
"""
Process a single file
Args:
file_path: Path to the file to process
output_dir: Output directory
parse_method: Parsing method
**kwargs: Additional parser arguments
Returns:
Tuple of (success, file_path, error_message)
"""
try:
start_time = time.time()
# Create file-specific output directory
file_name = Path(file_path).stem
file_output_dir = Path(output_dir) / file_name
file_output_dir.mkdir(parents=True, exist_ok=True)
# Parse the document
content_list = self.parser.parse_document(
file_path=file_path,
output_dir=str(file_output_dir),
method=parse_method,
**kwargs,
)
processing_time = time.time() - start_time
self.logger.info(
f"Successfully processed {file_path} "
f"({len(content_list)} content blocks, {processing_time:.2f}s)"
)
return True, file_path, None
except Exception as e:
error_msg = f"Failed to process {file_path}: {str(e)}"
self.logger.error(error_msg)
return False, file_path, error_msg
def process_batch(
self,
file_paths: List[str],
output_dir: str,
parse_method: str = "auto",
recursive: bool = True,
**kwargs,
) -> BatchProcessingResult:
"""
Process multiple files in parallel
Args:
file_paths: List of file paths or directories to process
output_dir: Base output directory
parse_method: Parsing method for all files
recursive: Whether to search directories recursively
**kwargs: Additional parser arguments
Returns:
BatchProcessingResult with processing statistics
"""
start_time = time.time()
# Filter to supported files
supported_files = self.filter_supported_files(file_paths, recursive)
if not supported_files:
self.logger.warning("No supported files found to process")
return BatchProcessingResult(
successful_files=[],
failed_files=[],
total_files=0,
processing_time=0.0,
errors={},
output_dir=output_dir,
)
self.logger.info(f"Found {len(supported_files)} files to process")
# Create output directory
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
# Process files in parallel
successful_files = []
failed_files = []
errors = {}
# Create progress bar if requested
pbar = None
if self.show_progress:
pbar = tqdm(
total=len(supported_files),
desc=f"Processing files ({self.parser_type})",
unit="file",
)
try:
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all tasks
future_to_file = {
executor.submit(
self.process_single_file,
file_path,
output_dir,
parse_method,
**kwargs,
): file_path
for file_path in supported_files
}
# Process completed tasks
for future in as_completed(
future_to_file, timeout=self.timeout_per_file
):
success, file_path, error_msg = future.result()
if success:
successful_files.append(file_path)
else:
failed_files.append(file_path)
errors[file_path] = error_msg
if pbar:
pbar.update(1)
except Exception as e:
self.logger.error(f"Batch processing failed: {str(e)}")
# Mark remaining files as failed
for future in future_to_file:
if not future.done():
file_path = future_to_file[future]
failed_files.append(file_path)
errors[file_path] = f"Processing interrupted: {str(e)}"
if pbar:
pbar.update(1)
finally:
if pbar:
pbar.close()
processing_time = time.time() - start_time
# Create result
result = BatchProcessingResult(
successful_files=successful_files,
failed_files=failed_files,
total_files=len(supported_files),
processing_time=processing_time,
errors=errors,
output_dir=output_dir,
)
# Log summary
self.logger.info(result.summary())
return result
async def process_batch_async(
self,
file_paths: List[str],
output_dir: str,
parse_method: str = "auto",
recursive: bool = True,
**kwargs,
) -> BatchProcessingResult:
"""
Async version of batch processing
Args:
file_paths: List of file paths or directories to process
output_dir: Base output directory
parse_method: Parsing method for all files
recursive: Whether to search directories recursively
**kwargs: Additional parser arguments
Returns:
BatchProcessingResult with processing statistics
"""
# Run the sync version in a thread pool
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
self.process_batch,
file_paths,
output_dir,
parse_method,
recursive,
**kwargs,
)
def main():
"""Command-line interface for batch parsing"""
import argparse
parser = argparse.ArgumentParser(description="Batch document parsing")
parser.add_argument("paths", nargs="+", help="File paths or directories to process")
parser.add_argument("--output", "-o", required=True, help="Output directory")
parser.add_argument(
"--parser",
choices=["mineru", "docling"],
default="mineru",
help="Parser to use",
)
parser.add_argument(
"--method",
choices=["auto", "txt", "ocr"],
default="auto",
help="Parsing method",
)
parser.add_argument(
"--workers", type=int, default=4, help="Number of parallel workers"
)
parser.add_argument(
"--no-progress", action="store_true", help="Disable progress bar"
)
parser.add_argument(
"--recursive",
action="store_true",
default=True,
help="Search directories recursively",
)
parser.add_argument(
"--timeout", type=int, default=300, help="Timeout per file (seconds)"
)
args = parser.parse_args()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
try:
# Create batch parser
batch_parser = BatchParser(
parser_type=args.parser,
max_workers=args.workers,
show_progress=not args.no_progress,
timeout_per_file=args.timeout,
)
# Process files
result = batch_parser.process_batch(
file_paths=args.paths,
output_dir=args.output,
parse_method=args.method,
recursive=args.recursive,
)
# Print summary
print("\n" + result.summary())
# Exit with error code if any files failed
if result.failed_files:
return 1
return 0
except Exception as e:
print(f"Error: {str(e)}")
return 1
if __name__ == "__main__":
exit(main())

View File

@@ -0,0 +1,534 @@
"""
Enhanced Markdown to PDF Conversion
This module provides improved Markdown to PDF conversion with:
- Better formatting and styling
- Image support
- Table support
- Code syntax highlighting
- Custom templates
- Multiple output formats
"""
import os
import logging
from pathlib import Path
from typing import Dict, Any, Optional
from dataclasses import dataclass
import tempfile
import subprocess
try:
import markdown
MARKDOWN_AVAILABLE = True
except ImportError:
MARKDOWN_AVAILABLE = False
try:
from weasyprint import HTML
WEASYPRINT_AVAILABLE = True
except ImportError:
WEASYPRINT_AVAILABLE = False
try:
# Check if pandoc module exists (not used directly, just for detection)
import importlib.util
spec = importlib.util.find_spec("pandoc")
PANDOC_AVAILABLE = spec is not None
except ImportError:
PANDOC_AVAILABLE = False
@dataclass
class MarkdownConfig:
"""Configuration for Markdown to PDF conversion"""
# Styling options
css_file: Optional[str] = None
template_file: Optional[str] = None
page_size: str = "A4"
margin: str = "1in"
font_size: str = "12pt"
line_height: str = "1.5"
# Content options
include_toc: bool = True
syntax_highlighting: bool = True
image_max_width: str = "100%"
table_style: str = "border-collapse: collapse; width: 100%;"
# Output options
output_format: str = "pdf" # pdf, html, docx
output_dir: Optional[str] = None
# Advanced options
custom_css: Optional[str] = None
metadata: Optional[Dict[str, str]] = None
class EnhancedMarkdownConverter:
"""
Enhanced Markdown to PDF converter with multiple backends
Supports multiple conversion methods:
- WeasyPrint (recommended for HTML/CSS styling)
- Pandoc (recommended for complex documents)
- ReportLab (fallback, basic styling)
"""
def __init__(self, config: Optional[MarkdownConfig] = None):
"""
Initialize the converter
Args:
config: Configuration for conversion
"""
self.config = config or MarkdownConfig()
self.logger = logging.getLogger(__name__)
# Check available backends
self.available_backends = self._check_backends()
self.logger.info(f"Available backends: {list(self.available_backends.keys())}")
def _check_backends(self) -> Dict[str, bool]:
"""Check which conversion backends are available"""
backends = {
"weasyprint": WEASYPRINT_AVAILABLE,
"pandoc": PANDOC_AVAILABLE,
"markdown": MARKDOWN_AVAILABLE,
}
# Check if pandoc is installed on system
try:
subprocess.run(["pandoc", "--version"], capture_output=True, check=True)
backends["pandoc_system"] = True
except (subprocess.CalledProcessError, FileNotFoundError):
backends["pandoc_system"] = False
return backends
def _get_default_css(self) -> str:
"""Get default CSS styling"""
return """
body {
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
line-height: 1.6;
color: #333;
max-width: 800px;
margin: 0 auto;
padding: 20px;
}
h1, h2, h3, h4, h5, h6 {
color: #2c3e50;
margin-top: 1.5em;
margin-bottom: 0.5em;
}
h1 { font-size: 2em; border-bottom: 2px solid #3498db; padding-bottom: 0.3em; }
h2 { font-size: 1.5em; border-bottom: 1px solid #bdc3c7; padding-bottom: 0.2em; }
h3 { font-size: 1.3em; }
h4 { font-size: 1.1em; }
p { margin-bottom: 1em; }
code {
background-color: #f8f9fa;
padding: 2px 4px;
border-radius: 3px;
font-family: 'Courier New', monospace;
font-size: 0.9em;
}
pre {
background-color: #f8f9fa;
padding: 15px;
border-radius: 5px;
overflow-x: auto;
border-left: 4px solid #3498db;
}
pre code {
background-color: transparent;
padding: 0;
}
blockquote {
border-left: 4px solid #3498db;
margin: 0;
padding-left: 20px;
color: #7f8c8d;
}
table {
border-collapse: collapse;
width: 100%;
margin: 1em 0;
}
th, td {
border: 1px solid #ddd;
padding: 8px 12px;
text-align: left;
}
th {
background-color: #f2f2f2;
font-weight: bold;
}
img {
max-width: 100%;
height: auto;
display: block;
margin: 1em auto;
}
ul, ol {
margin-bottom: 1em;
}
li {
margin-bottom: 0.5em;
}
a {
color: #3498db;
text-decoration: none;
}
a:hover {
text-decoration: underline;
}
.toc {
background-color: #f8f9fa;
padding: 15px;
border-radius: 5px;
margin-bottom: 2em;
}
.toc ul {
list-style-type: none;
padding-left: 0;
}
.toc li {
margin-bottom: 0.3em;
}
.toc a {
color: #2c3e50;
}
"""
def _process_markdown_content(self, content: str) -> str:
"""Process Markdown content with extensions"""
if not MARKDOWN_AVAILABLE:
raise RuntimeError(
"Markdown library not available. Install with: pip install markdown"
)
# Configure Markdown extensions
extensions = [
"markdown.extensions.tables",
"markdown.extensions.fenced_code",
"markdown.extensions.codehilite",
"markdown.extensions.toc",
"markdown.extensions.attr_list",
"markdown.extensions.def_list",
"markdown.extensions.footnotes",
]
extension_configs = {
"codehilite": {
"css_class": "highlight",
"use_pygments": True,
},
"toc": {
"title": "Table of Contents",
"permalink": True,
},
}
# Convert Markdown to HTML
md = markdown.Markdown(
extensions=extensions, extension_configs=extension_configs
)
html_content = md.convert(content)
# Add CSS styling
css = self.config.custom_css or self._get_default_css()
# Create complete HTML document
html_doc = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Converted Document</title>
<style>
{css}
</style>
</head>
<body>
{html_content}
</body>
</html>
"""
return html_doc
def convert_with_weasyprint(self, markdown_content: str, output_path: str) -> bool:
"""Convert using WeasyPrint (best for styling)"""
if not WEASYPRINT_AVAILABLE:
raise RuntimeError(
"WeasyPrint not available. Install with: pip install weasyprint"
)
try:
# Process Markdown to HTML
html_content = self._process_markdown_content(markdown_content)
# Convert HTML to PDF
html = HTML(string=html_content)
html.write_pdf(output_path)
self.logger.info(
f"Successfully converted to PDF using WeasyPrint: {output_path}"
)
return True
except Exception as e:
self.logger.error(f"WeasyPrint conversion failed: {str(e)}")
return False
def convert_with_pandoc(
self, markdown_content: str, output_path: str, use_system_pandoc: bool = False
) -> bool:
"""Convert using Pandoc (best for complex documents)"""
if (
not self.available_backends.get("pandoc_system", False)
and not use_system_pandoc
):
raise RuntimeError(
"Pandoc not available. Install from: https://pandoc.org/installing.html"
)
temp_md_path = None
try:
import subprocess
# Create temporary markdown file
with tempfile.NamedTemporaryFile(
mode="w", suffix=".md", delete=False
) as temp_file:
temp_file.write(markdown_content)
temp_md_path = temp_file.name
# Build pandoc command with wkhtmltopdf engine
cmd = [
"pandoc",
temp_md_path,
"-o",
output_path,
"--pdf-engine=wkhtmltopdf",
"--standalone",
"--toc",
"--number-sections",
]
# Run pandoc
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode == 0:
self.logger.info(
f"Successfully converted to PDF using Pandoc: {output_path}"
)
return True
else:
self.logger.error(f"Pandoc conversion failed: {result.stderr}")
return False
except Exception as e:
self.logger.error(f"Pandoc conversion failed: {str(e)}")
return False
finally:
if temp_md_path and os.path.exists(temp_md_path):
try:
os.unlink(temp_md_path)
except OSError as e:
self.logger.error(
f"Failed to clean up temp file {temp_md_path}: {str(e)}"
)
def convert_markdown_to_pdf(
self, markdown_content: str, output_path: str, method: str = "auto"
) -> bool:
"""
Convert markdown content to PDF
Args:
markdown_content: Markdown content to convert
output_path: Output PDF file path
method: Conversion method ("auto", "weasyprint", "pandoc", "pandoc_system")
Returns:
True if conversion successful, False otherwise
"""
if method == "auto":
method = self._get_recommended_backend()
try:
if method == "weasyprint":
return self.convert_with_weasyprint(markdown_content, output_path)
elif method == "pandoc":
return self.convert_with_pandoc(markdown_content, output_path)
elif method == "pandoc_system":
return self.convert_with_pandoc(
markdown_content, output_path, use_system_pandoc=True
)
else:
raise ValueError(f"Unknown conversion method: {method}")
except Exception as e:
self.logger.error(f"{method.title()} conversion failed: {str(e)}")
return False
def convert_file_to_pdf(
self, input_path: str, output_path: Optional[str] = None, method: str = "auto"
) -> bool:
"""
Convert Markdown file to PDF
Args:
input_path: Input Markdown file path
output_path: Output PDF file path (optional)
method: Conversion method
Returns:
bool: True if conversion successful
"""
input_path_obj = Path(input_path)
if not input_path_obj.exists():
raise FileNotFoundError(f"Input file not found: {input_path}")
# Read markdown content
try:
with open(input_path_obj, "r", encoding="utf-8") as f:
markdown_content = f.read()
except UnicodeDecodeError:
# Try with different encodings
for encoding in ["gbk", "latin-1", "cp1252"]:
try:
with open(input_path_obj, "r", encoding=encoding) as f:
markdown_content = f.read()
break
except UnicodeDecodeError:
continue
else:
raise RuntimeError(
f"Could not decode file {input_path} with any supported encoding"
)
# Determine output path
if output_path is None:
output_path = str(input_path_obj.with_suffix(".pdf"))
return self.convert_markdown_to_pdf(markdown_content, output_path, method)
def get_backend_info(self) -> Dict[str, Any]:
"""Get information about available backends"""
return {
"available_backends": self.available_backends,
"recommended_backend": self._get_recommended_backend(),
"config": {
"page_size": self.config.page_size,
"margin": self.config.margin,
"font_size": self.config.font_size,
"include_toc": self.config.include_toc,
"syntax_highlighting": self.config.syntax_highlighting,
},
}
def _get_recommended_backend(self) -> str:
"""Get recommended backend based on availability"""
if self.available_backends.get("pandoc_system", False):
return "pandoc"
elif self.available_backends.get("weasyprint", False):
return "weasyprint"
else:
return "none"
def main():
"""Command-line interface for enhanced markdown conversion"""
import argparse
parser = argparse.ArgumentParser(description="Enhanced Markdown to PDF conversion")
parser.add_argument("input", nargs="?", help="Input markdown file")
parser.add_argument("--output", "-o", help="Output PDF file")
parser.add_argument(
"--method",
choices=["auto", "weasyprint", "pandoc", "pandoc_system"],
default="auto",
help="Conversion method",
)
parser.add_argument("--css", help="Custom CSS file")
parser.add_argument("--info", action="store_true", help="Show backend information")
args = parser.parse_args()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# Create converter
config = MarkdownConfig()
if args.css:
config.css_file = args.css
converter = EnhancedMarkdownConverter(config)
# Show backend info if requested
if args.info:
info = converter.get_backend_info()
print("Backend Information:")
for backend, available in info["available_backends"].items():
status = "" if available else ""
print(f" {status} {backend}")
print(f"Recommended backend: {info['recommended_backend']}")
return 0
# Check if input file is provided
if not args.input:
parser.error("Input file is required when not using --info")
# Convert file
try:
success = converter.convert_file_to_pdf(
input_path=args.input, output_path=args.output, method=args.method
)
if success:
print(f"✅ Successfully converted {args.input} to PDF")
return 0
else:
print("❌ Conversion failed")
return 1
except Exception as e:
print(f"❌ Error: {str(e)}")
return 1
if __name__ == "__main__":
exit(main())

View File

@@ -1,10 +1,10 @@
huggingface_hub
# LightRAG packages
lightrag-hku
# MinerU 2.0 packages (replaces magic-pdf)
mineru[core]
# Progress bars for batch processing
tqdm
# Note: Optional dependencies are now defined in setup.py extras_require:
# - [image]: Pillow>=10.0.0 (for BMP, TIFF, GIF, WebP format conversion)
# - [text]: reportlab>=4.0.0 (for TXT, MD to PDF conversion)

View File

@@ -64,6 +64,11 @@ extras_require = {
"text": ["reportlab>=4.0.0"], # For text file to PDF conversion (TXT, MD)
"office": [], # Office document processing requires LibreOffice (external program)
"all": ["Pillow>=10.0.0", "reportlab>=4.0.0"], # All optional features
"markdown": [
"markdown>=3.4.0",
"weasyprint>=60.0",
"pygments>=2.10.0",
], # Enhanced markdown conversion
}
setuptools.setup(