Add finalize_storages

This commit is contained in:
zrguo
2025-07-31 18:34:36 +08:00
parent bc7d5ec0b3
commit d5ff598a9b

View File

@@ -10,6 +10,7 @@ This script integrates:
import os
from typing import Dict, Any, Optional, Callable
import sys
import asyncio
from dataclasses import dataclass, field
from pathlib import Path
@@ -128,6 +129,21 @@ class RAGAnything(QueryMixin, ProcessorMixin, BatchMixin):
)
self.logger.info(f" Max concurrent files: {self.config.max_concurrent_files}")
def __del__(self):
"""Cleanup resources when object is destroyed"""
try:
import asyncio
if asyncio.get_event_loop().is_running():
# If we're in an async context, schedule cleanup
asyncio.create_task(self.finalize_storages())
else:
# Run cleanup synchronously
asyncio.run(self.finalize_storages())
except Exception as e:
# Use print instead of logger since logger might be cleaned up already
print(f"Warning: Failed to finalize RAGAnything storages: {e}")
def _create_context_config(self) -> ContextConfig:
"""Create context configuration from RAGAnything config"""
return ContextConfig(
@@ -268,6 +284,52 @@ class RAGAnything(QueryMixin, ProcessorMixin, BatchMixin):
self.logger.info("LightRAG, parse cache, and multimodal processors initialized")
async def finalize_storages(self):
"""Finalize all storages including parse cache and LightRAG storages
This method should be called when shutting down to properly clean up resources
and persist any cached data. It will finalize both the parse cache and LightRAG's
internal storages.
Example usage:
try:
rag_anything = RAGAnything(...)
await rag_anything.process_file("document.pdf")
# ... other operations ...
finally:
# Always finalize storages to clean up resources
if rag_anything:
await rag_anything.finalize_storages()
Note:
- This method is automatically called in __del__ when the object is destroyed
- Manual calling is recommended in production environments
- All finalization tasks run concurrently for better performance
"""
try:
tasks = []
# Finalize parse cache if it exists
if self.parse_cache is not None:
tasks.append(self.parse_cache.finalize())
self.logger.debug("Scheduled parse cache finalization")
# Finalize LightRAG storages if LightRAG is initialized
if self.lightrag is not None:
tasks.append(self.lightrag.finalize_storages())
self.logger.debug("Scheduled LightRAG storages finalization")
# Run all finalization tasks concurrently
if tasks:
await asyncio.gather(*tasks)
self.logger.info("Successfully finalized all RAGAnything storages")
else:
self.logger.debug("No storages to finalize")
except Exception as e:
self.logger.error(f"Error during storage finalization: {e}")
raise
def check_parser_installation(self) -> bool:
"""
Check if the configured parser is properly installed