Files
office_document_test/raganything/raganything.py
2025-08-06 17:09:02 +08:00

521 lines
21 KiB
Python

"""
Complete document parsing + multimodal content insertion Pipeline
This script integrates:
1. Document parsing (using configurable parsers)
2. Pure text content LightRAG insertion
3. Specialized processing for multimodal content (using different processors)
"""
import os
from typing import Dict, Any, Optional, Callable
import sys
import asyncio
from dataclasses import dataclass, field
from pathlib import Path
# Add project root directory to Python path
sys.path.insert(0, str(Path(__file__).parent.parent))
from lightrag import LightRAG
from lightrag.utils import logger
from dotenv import load_dotenv
# Load environment variables from .env file
# The OS environment variables take precedence over the .env file
load_dotenv(dotenv_path=".env", override=False)
# Import configuration and modules
from raganything.config import RAGAnythingConfig
from raganything.query import QueryMixin
from raganything.processor import ProcessorMixin
from raganything.batch import BatchMixin
from raganything.utils import get_processor_supports
from raganything.parser import MineruParser, DoclingParser
# Import specialized processors
from raganything.modalprocessors import (
ImageModalProcessor,
TableModalProcessor,
EquationModalProcessor,
GenericModalProcessor,
ContextExtractor,
ContextConfig,
)
@dataclass
class RAGAnything(QueryMixin, ProcessorMixin, BatchMixin):
"""Multimodal Document Processing Pipeline - Complete document parsing and insertion pipeline"""
# Core Components
# ---
lightrag: Optional[LightRAG] = field(default=None)
"""Optional pre-initialized LightRAG instance."""
llm_model_func: Optional[Callable] = field(default=None)
"""LLM model function for text analysis."""
vision_model_func: Optional[Callable] = field(default=None)
"""Vision model function for image analysis."""
embedding_func: Optional[Callable] = field(default=None)
"""Embedding function for text vectorization."""
config: Optional[RAGAnythingConfig] = field(default=None)
"""Configuration object, if None will create with environment variables."""
# LightRAG Configuration
# ---
lightrag_kwargs: Dict[str, Any] = field(default_factory=dict)
"""Additional keyword arguments for LightRAG initialization when lightrag is not provided.
This allows passing all LightRAG configuration parameters like:
- kv_storage, vector_storage, graph_storage, doc_status_storage
- top_k, chunk_top_k, max_entity_tokens, max_relation_tokens, max_total_tokens
- cosine_threshold, related_chunk_number
- chunk_token_size, chunk_overlap_token_size, tokenizer, tiktoken_model_name
- embedding_batch_num, embedding_func_max_async, embedding_cache_config
- llm_model_name, llm_model_max_token_size, llm_model_max_async, llm_model_kwargs
- rerank_model_func, vector_db_storage_cls_kwargs, enable_llm_cache
- max_parallel_insert, max_graph_nodes, addon_params, etc.
"""
# Internal State
# ---
modal_processors: Dict[str, Any] = field(default_factory=dict, init=False)
"""Dictionary of multimodal processors."""
context_extractor: Optional[ContextExtractor] = field(default=None, init=False)
"""Context extractor for providing surrounding content to modal processors."""
parse_cache: Optional[Any] = field(default=None, init=False)
"""Parse result cache storage using LightRAG KV storage."""
def __post_init__(self):
"""Post-initialization setup following LightRAG pattern"""
# Initialize configuration if not provided
if self.config is None:
self.config = RAGAnythingConfig()
# Set working directory
self.working_dir = self.config.working_dir
# Set up logger (use existing logger, don't configure it)
self.logger = logger
# Set up document parser
self.doc_parser = (
DoclingParser() if self.config.parser == "docling" else MineruParser()
)
# Create working directory if needed
if not os.path.exists(self.working_dir):
os.makedirs(self.working_dir)
self.logger.info(f"Created working directory: {self.working_dir}")
# Log configuration info
self.logger.info("RAGAnything initialized with config:")
self.logger.info(f" Working directory: {self.config.working_dir}")
self.logger.info(f" Parser: {self.config.parser}")
self.logger.info(f" Parse method: {self.config.parse_method}")
self.logger.info(
f" Multimodal processing - Image: {self.config.enable_image_processing}, "
f"Table: {self.config.enable_table_processing}, "
f"Equation: {self.config.enable_equation_processing}"
)
self.logger.info(f" Max concurrent files: {self.config.max_concurrent_files}")
def __del__(self):
"""Cleanup resources when object is destroyed"""
try:
import asyncio
if asyncio.get_event_loop().is_running():
# If we're in an async context, schedule cleanup
asyncio.create_task(self.finalize_storages())
else:
# Run cleanup synchronously
asyncio.run(self.finalize_storages())
except Exception as e:
# Use print instead of logger since logger might be cleaned up already
print(f"Warning: Failed to finalize RAGAnything storages: {e}")
def _create_context_config(self) -> ContextConfig:
"""Create context configuration from RAGAnything config"""
return ContextConfig(
context_window=self.config.context_window,
context_mode=self.config.context_mode,
max_context_tokens=self.config.max_context_tokens,
include_headers=self.config.include_headers,
include_captions=self.config.include_captions,
filter_content_types=self.config.context_filter_content_types,
)
def _create_context_extractor(self) -> ContextExtractor:
"""Create context extractor with tokenizer from LightRAG"""
if self.lightrag is None:
raise ValueError(
"LightRAG must be initialized before creating context extractor"
)
context_config = self._create_context_config()
return ContextExtractor(
config=context_config, tokenizer=self.lightrag.tokenizer
)
def _initialize_processors(self):
"""Initialize multimodal processors with appropriate model functions"""
if self.lightrag is None:
raise ValueError(
"LightRAG instance must be initialized before creating processors"
)
# Create context extractor
self.context_extractor = self._create_context_extractor()
# Create different multimodal processors based on configuration
self.modal_processors = {}
if self.config.enable_image_processing:
self.modal_processors["image"] = ImageModalProcessor(
lightrag=self.lightrag,
modal_caption_func=self.vision_model_func or self.llm_model_func,
context_extractor=self.context_extractor,
)
if self.config.enable_table_processing:
self.modal_processors["table"] = TableModalProcessor(
lightrag=self.lightrag,
modal_caption_func=self.llm_model_func,
context_extractor=self.context_extractor,
)
if self.config.enable_equation_processing:
self.modal_processors["equation"] = EquationModalProcessor(
lightrag=self.lightrag,
modal_caption_func=self.llm_model_func,
context_extractor=self.context_extractor,
)
# Always include generic processor as fallback
self.modal_processors["generic"] = GenericModalProcessor(
lightrag=self.lightrag,
modal_caption_func=self.llm_model_func,
context_extractor=self.context_extractor,
)
self.logger.info("Multimodal processors initialized with context support")
self.logger.info(f"Available processors: {list(self.modal_processors.keys())}")
self.logger.info(f"Context configuration: {self._create_context_config()}")
def update_config(self, **kwargs):
"""Update configuration with new values"""
for key, value in kwargs.items():
if hasattr(self.config, key):
setattr(self.config, key, value)
self.logger.debug(f"Updated config: {key} = {value}")
else:
self.logger.warning(f"Unknown config parameter: {key}")
async def _ensure_lightrag_initialized(self):
"""Ensure LightRAG instance is initialized, create if necessary"""
# Check parser installation first
if not self.doc_parser.check_installation():
raise RuntimeError(
f"Parser '{self.config.parser}' is not properly installed. "
"Please install it using pip install or uv pip install."
)
if self.lightrag is not None:
# LightRAG was pre-provided, but we need to ensure it's properly initialized
# and that parse_cache is set up
# Ensure LightRAG storages are initialized
if (
not hasattr(self.lightrag, "_storages_status")
or self.lightrag._storages_status.name != "INITIALIZED"
):
self.logger.info(
"Initializing storages for pre-provided LightRAG instance"
)
await self.lightrag.initialize_storages()
from lightrag.kg.shared_storage import initialize_pipeline_status
await initialize_pipeline_status()
# Initialize parse cache if not already done
if self.parse_cache is None:
self.logger.info(
"Initializing parse cache for pre-provided LightRAG instance"
)
self.parse_cache = self.lightrag.key_string_value_json_storage_cls(
namespace="parse_cache",
workspace=self.lightrag.workspace,
global_config=self.lightrag.__dict__,
embedding_func=self.embedding_func,
)
await self.parse_cache.initialize()
# Initialize processors if not already done
if not self.modal_processors:
self._initialize_processors()
return
# Validate required functions for creating new LightRAG instance
if self.llm_model_func is None:
raise ValueError(
"llm_model_func must be provided when LightRAG is not pre-initialized"
)
if self.embedding_func is None:
raise ValueError(
"embedding_func must be provided when LightRAG is not pre-initialized"
)
from lightrag.kg.shared_storage import initialize_pipeline_status
# Prepare LightRAG initialization parameters
lightrag_params = {
"working_dir": self.working_dir,
"llm_model_func": self.llm_model_func,
"embedding_func": self.embedding_func,
}
# Merge user-provided lightrag_kwargs, which can override defaults
lightrag_params.update(self.lightrag_kwargs)
# Log the parameters being used for initialization (excluding sensitive data)
log_params = {
k: v
for k, v in lightrag_params.items()
if not callable(v)
and k not in ["llm_model_kwargs", "vector_db_storage_cls_kwargs"]
}
self.logger.info(f"Initializing LightRAG with parameters: {log_params}")
# Create LightRAG instance with merged parameters
self.lightrag = LightRAG(**lightrag_params)
await self.lightrag.initialize_storages()
await initialize_pipeline_status()
# Initialize parse cache storage using LightRAG's KV storage
self.parse_cache = self.lightrag.key_string_value_json_storage_cls(
namespace="parse_cache",
workspace=self.lightrag.workspace,
global_config=self.lightrag.__dict__,
embedding_func=self.embedding_func,
)
await self.parse_cache.initialize()
# Initialize processors after LightRAG is ready
self._initialize_processors()
self.logger.info("LightRAG, parse cache, and multimodal processors initialized")
async def finalize_storages(self):
"""Finalize all storages including parse cache and LightRAG storages
This method should be called when shutting down to properly clean up resources
and persist any cached data. It will finalize both the parse cache and LightRAG's
internal storages.
Example usage:
try:
rag_anything = RAGAnything(...)
await rag_anything.process_file("document.pdf")
# ... other operations ...
finally:
# Always finalize storages to clean up resources
if rag_anything:
await rag_anything.finalize_storages()
Note:
- This method is automatically called in __del__ when the object is destroyed
- Manual calling is recommended in production environments
- All finalization tasks run concurrently for better performance
"""
try:
tasks = []
# Finalize parse cache if it exists
if self.parse_cache is not None:
tasks.append(self.parse_cache.finalize())
self.logger.debug("Scheduled parse cache finalization")
# Finalize LightRAG storages if LightRAG is initialized
if self.lightrag is not None:
tasks.append(self.lightrag.finalize_storages())
self.logger.debug("Scheduled LightRAG storages finalization")
# Run all finalization tasks concurrently
if tasks:
await asyncio.gather(*tasks)
self.logger.info("Successfully finalized all RAGAnything storages")
else:
self.logger.debug("No storages to finalize")
except Exception as e:
self.logger.error(f"Error during storage finalization: {e}")
raise
def check_parser_installation(self) -> bool:
"""
Check if the configured parser is properly installed
Returns:
bool: True if the configured parser is properly installed
"""
return self.doc_parser.check_installation()
def get_config_info(self) -> Dict[str, Any]:
"""Get current configuration information"""
config_info = {
"directory": {
"working_dir": self.config.working_dir,
"parser_output_dir": self.config.parser_output_dir,
},
"parsing": {
"parser": self.config.parser,
"parse_method": self.config.parse_method,
"display_content_stats": self.config.display_content_stats,
},
"multimodal_processing": {
"enable_image_processing": self.config.enable_image_processing,
"enable_table_processing": self.config.enable_table_processing,
"enable_equation_processing": self.config.enable_equation_processing,
},
"context_extraction": {
"context_window": self.config.context_window,
"context_mode": self.config.context_mode,
"max_context_tokens": self.config.max_context_tokens,
"include_headers": self.config.include_headers,
"include_captions": self.config.include_captions,
"filter_content_types": self.config.context_filter_content_types,
},
"batch_processing": {
"max_concurrent_files": self.config.max_concurrent_files,
"supported_file_extensions": self.config.supported_file_extensions,
"recursive_folder_processing": self.config.recursive_folder_processing,
},
"logging": {
"note": "Logging fields have been removed - configure logging externally",
},
}
# Add LightRAG configuration if available
if self.lightrag_kwargs:
# Filter out sensitive data and callable objects for display
safe_kwargs = {
k: v
for k, v in self.lightrag_kwargs.items()
if not callable(v)
and k not in ["llm_model_kwargs", "vector_db_storage_cls_kwargs"]
}
config_info["lightrag_config"] = {
"custom_parameters": safe_kwargs,
"note": "LightRAG will be initialized with these additional parameters",
}
else:
config_info["lightrag_config"] = {
"custom_parameters": {},
"note": "Using default LightRAG parameters",
}
return config_info
def set_content_source_for_context(
self, content_source, content_format: str = "auto"
):
"""Set content source for context extraction in all modal processors
Args:
content_source: Source content for context extraction (e.g., MinerU content list)
content_format: Format of content source ("minerU", "text_chunks", "auto")
"""
if not self.modal_processors:
self.logger.warning(
"Modal processors not initialized. Content source will be set when processors are created."
)
return
for processor_name, processor in self.modal_processors.items():
try:
processor.set_content_source(content_source, content_format)
self.logger.debug(f"Set content source for {processor_name} processor")
except Exception as e:
self.logger.error(
f"Failed to set content source for {processor_name}: {e}"
)
self.logger.info(
f"Content source set for context extraction (format: {content_format})"
)
def update_context_config(self, **context_kwargs):
"""Update context extraction configuration
Args:
**context_kwargs: Context configuration parameters to update
(context_window, context_mode, max_context_tokens, etc.)
"""
# Update the main config
for key, value in context_kwargs.items():
if hasattr(self.config, key):
setattr(self.config, key, value)
self.logger.debug(f"Updated context config: {key} = {value}")
else:
self.logger.warning(f"Unknown context config parameter: {key}")
# Recreate context extractor with new config if processors are initialized
if self.lightrag and self.modal_processors:
try:
self.context_extractor = self._create_context_extractor()
# Update all processors with new context extractor
for processor_name, processor in self.modal_processors.items():
processor.context_extractor = self.context_extractor
self.logger.info(
"Context configuration updated and applied to all processors"
)
self.logger.info(
f"New context configuration: {self._create_context_config()}"
)
except Exception as e:
self.logger.error(f"Failed to update context configuration: {e}")
def get_processor_info(self) -> Dict[str, Any]:
"""Get processor information"""
base_info = {
"mineru_installed": MineruParser.check_installation(MineruParser()),
"config": self.get_config_info(),
"models": {
"llm_model": "External function"
if self.llm_model_func
else "Not provided",
"vision_model": "External function"
if self.vision_model_func
else "Not provided",
"embedding_model": "External function"
if self.embedding_func
else "Not provided",
},
}
if not self.modal_processors:
base_info["status"] = "Not initialized"
base_info["processors"] = {}
else:
base_info["status"] = "Initialized"
base_info["processors"] = {}
for proc_type, processor in self.modal_processors.items():
base_info["processors"][proc_type] = {
"class": processor.__class__.__name__,
"supports": get_processor_supports(proc_type),
"enabled": True,
}
return base_info