Merge branch 'HKUDS:main' into main

This commit is contained in:
Shorthills AI
2025-07-28 10:23:59 +05:30
committed by GitHub
3 changed files with 1150 additions and 285 deletions

View File

@@ -1,7 +1,7 @@
from .raganything import RAGAnything as RAGAnything
from .config import RAGAnythingConfig as RAGAnythingConfig
__version__ = "1.2.4"
__version__ = "1.2.5"
__author__ = "Zirui Guo"
__url__ = "https://github.com/HKUDS/RAG-Anything"

View File

@@ -439,19 +439,27 @@ class BaseModalProcessor:
logger.error(f"Error getting context for item {item_info}: {e}")
return ""
async def process_multimodal_content(
async def generate_description_only(
self,
modal_content,
content_type: str,
file_path: str = "manual_creation",
entity_name: str = None,
item_info: Dict[str, Any] = None,
batch_mode: bool = False,
doc_id: str = None,
chunk_order_index: int = 0,
entity_name: str = None,
) -> Tuple[str, Dict[str, Any]]:
"""Process multimodal content with context support"""
# Subclasses need to implement specific processing logic
"""
Generate text description and entity info only, without entity relation extraction.
Used for batch processing stage 1.
Args:
modal_content: Modal content to process
content_type: Type of modal content
item_info: Item information for context extraction
entity_name: Optional predefined entity name
Returns:
Tuple of (description, entity_info)
"""
# Subclasses must implement this method
raise NotImplementedError("Subclasses must implement this method")
async def _create_entity_and_chunk(
@@ -799,20 +807,28 @@ class ImageModalProcessor(BaseModalProcessor):
logger.error(f"Failed to encode image {image_path}: {e}")
return ""
async def process_multimodal_content(
async def generate_description_only(
self,
modal_content,
content_type: str,
file_path: str = "manual_creation",
entity_name: str = None,
item_info: Dict[str, Any] = None,
batch_mode: bool = False,
doc_id: str = None,
chunk_order_index: int = 0,
entity_name: str = None,
) -> Tuple[str, Dict[str, Any]]:
"""Process image content with context support"""
"""
Generate image description and entity info only, without entity relation extraction.
Used for batch processing stage 1.
Args:
modal_content: Image content to process
content_type: Type of modal content ("image")
item_info: Item information for context extraction
entity_name: Optional predefined entity name
Returns:
Tuple of (enhanced_caption, entity_info)
"""
try:
# Parse image content
# Parse image content (reuse existing logic)
if isinstance(modal_content, str):
try:
content_data = json.loads(modal_content)
@@ -825,6 +841,17 @@ class ImageModalProcessor(BaseModalProcessor):
captions = content_data.get("img_caption", [])
footnotes = content_data.get("img_footnote", [])
# Validate image path
if not image_path:
raise ValueError(
f"No image path provided in modal_content: {modal_content}"
)
# Convert to Path object and check if it exists
image_path_obj = Path(image_path)
if not image_path_obj.exists():
raise FileNotFoundError(f"Image file not found: {image_path}")
# Extract context for current item
context = ""
if item_info:
@@ -853,12 +880,7 @@ class ImageModalProcessor(BaseModalProcessor):
footnotes=footnotes if footnotes else "None",
)
# If image path exists, try to encode image
logger.debug(f"Begin Analysis of Image: {image_path}")
if not image_path or not Path(image_path).exists():
raise FileNotFoundError(f"Image file not found: {image_path}")
# Encode image to base64
image_base64 = self._encode_image_to_base64(image_path)
if not image_base64:
raise RuntimeError(f"Failed to encode image to base64: {image_path}")
@@ -870,10 +892,54 @@ class ImageModalProcessor(BaseModalProcessor):
system_prompt=PROMPTS["IMAGE_ANALYSIS_SYSTEM"],
)
# Parse response
# Parse response (reuse existing logic)
enhanced_caption, entity_info = self._parse_response(response, entity_name)
return enhanced_caption, entity_info
except Exception as e:
logger.error(f"Error generating image description: {e}")
# Fallback processing
fallback_entity = {
"entity_name": entity_name
if entity_name
else f"image_{compute_mdhash_id(str(modal_content))}",
"entity_type": "image",
"summary": f"Image content: {str(modal_content)[:100]}",
}
return str(modal_content), fallback_entity
async def process_multimodal_content(
self,
modal_content,
content_type: str,
file_path: str = "manual_creation",
entity_name: str = None,
item_info: Dict[str, Any] = None,
batch_mode: bool = False,
doc_id: str = None,
chunk_order_index: int = 0,
) -> Tuple[str, Dict[str, Any]]:
"""Process image content with context support"""
try:
# Generate description and entity info
enhanced_caption, entity_info = await self.generate_description_only(
modal_content, content_type, item_info, entity_name
)
# Build complete image content
if isinstance(modal_content, str):
try:
content_data = json.loads(modal_content)
except json.JSONDecodeError:
content_data = {"description": modal_content}
else:
content_data = modal_content
image_path = content_data.get("img_path", "")
captions = content_data.get("img_caption", [])
footnotes = content_data.get("img_footnote", [])
modal_chunk = PROMPTS["image_chunk"].format(
image_path=image_path,
captions=", ".join(captions) if captions else "None",
@@ -944,6 +1010,96 @@ class ImageModalProcessor(BaseModalProcessor):
class TableModalProcessor(BaseModalProcessor):
"""Processor specialized for table content"""
async def generate_description_only(
self,
modal_content,
content_type: str,
item_info: Dict[str, Any] = None,
entity_name: str = None,
) -> Tuple[str, Dict[str, Any]]:
"""
Generate table description and entity info only, without entity relation extraction.
Used for batch processing stage 1.
Args:
modal_content: Table content to process
content_type: Type of modal content ("table")
item_info: Item information for context extraction
entity_name: Optional predefined entity name
Returns:
Tuple of (enhanced_caption, entity_info)
"""
try:
# Parse table content (reuse existing logic)
if isinstance(modal_content, str):
try:
content_data = json.loads(modal_content)
except json.JSONDecodeError:
content_data = {"table_body": modal_content}
else:
content_data = modal_content
table_img_path = content_data.get("img_path")
table_caption = content_data.get("table_caption", [])
table_body = content_data.get("table_body", "")
table_footnote = content_data.get("table_footnote", [])
# Extract context for current item
context = ""
if item_info:
context = self._get_context_for_item(item_info)
# Build table analysis prompt with context
if context:
table_prompt = PROMPTS.get(
"table_prompt_with_context", PROMPTS["table_prompt"]
).format(
context=context,
entity_name=entity_name
if entity_name
else "descriptive name for this table",
table_img_path=table_img_path,
table_caption=table_caption if table_caption else "None",
table_body=table_body,
table_footnote=table_footnote if table_footnote else "None",
)
else:
table_prompt = PROMPTS["table_prompt"].format(
entity_name=entity_name
if entity_name
else "descriptive name for this table",
table_img_path=table_img_path,
table_caption=table_caption if table_caption else "None",
table_body=table_body,
table_footnote=table_footnote if table_footnote else "None",
)
# Call LLM for table analysis
response = await self.modal_caption_func(
table_prompt,
system_prompt=PROMPTS["TABLE_ANALYSIS_SYSTEM"],
)
# Parse response (reuse existing logic)
enhanced_caption, entity_info = self._parse_table_response(
response, entity_name
)
return enhanced_caption, entity_info
except Exception as e:
logger.error(f"Error generating table description: {e}")
# Fallback processing
fallback_entity = {
"entity_name": entity_name
if entity_name
else f"table_{compute_mdhash_id(str(modal_content))}",
"entity_type": "table",
"summary": f"Table content: {str(modal_content)[:100]}",
}
return str(modal_content), fallback_entity
async def process_multimodal_content(
self,
modal_content,
@@ -956,76 +1112,55 @@ class TableModalProcessor(BaseModalProcessor):
chunk_order_index: int = 0,
) -> Tuple[str, Dict[str, Any]]:
"""Process table content with context support"""
# Parse table content
if isinstance(modal_content, str):
try:
content_data = json.loads(modal_content)
except json.JSONDecodeError:
content_data = {"table_body": modal_content}
else:
content_data = modal_content
table_img_path = content_data.get("img_path")
table_caption = content_data.get("table_caption", [])
table_body = content_data.get("table_body", "")
table_footnote = content_data.get("table_footnote", [])
logger.debug(f"Begin Analysis of Table: {table_img_path}")
# Extract context for current item
context = ""
if item_info:
context = self._get_context_for_item(item_info)
# Build table analysis prompt with context
if context:
table_prompt = PROMPTS.get(
"table_prompt_with_context", PROMPTS["table_prompt"]
).format(
context=context,
entity_name=entity_name
if entity_name
else "descriptive name for this table",
table_img_path=table_img_path,
table_caption=table_caption if table_caption else "None",
table_body=table_body,
table_footnote=table_footnote if table_footnote else "None",
)
else:
table_prompt = PROMPTS["table_prompt"].format(
entity_name=entity_name
if entity_name
else "descriptive name for this table",
table_img_path=table_img_path,
table_caption=table_caption if table_caption else "None",
table_body=table_body,
table_footnote=table_footnote if table_footnote else "None",
try:
# Generate description and entity info
enhanced_caption, entity_info = await self.generate_description_only(
modal_content, content_type, item_info, entity_name
)
response = await self.modal_caption_func(
table_prompt,
system_prompt=PROMPTS["TABLE_ANALYSIS_SYSTEM"],
)
# Parse table content for building complete chunk
if isinstance(modal_content, str):
try:
content_data = json.loads(modal_content)
except json.JSONDecodeError:
content_data = {"table_body": modal_content}
else:
content_data = modal_content
# Parse response
enhanced_caption, entity_info = self._parse_table_response(
response, entity_name
)
table_img_path = content_data.get("img_path")
table_caption = content_data.get("table_caption", [])
table_body = content_data.get("table_body", "")
table_footnote = content_data.get("table_footnote", [])
# TODO: Add Retry Mechanism
# Build complete table content
modal_chunk = PROMPTS["table_chunk"].format(
table_img_path=table_img_path,
table_caption=", ".join(table_caption) if table_caption else "None",
table_body=table_body,
table_footnote=", ".join(table_footnote) if table_footnote else "None",
enhanced_caption=enhanced_caption,
)
# Build complete table content
modal_chunk = PROMPTS["table_chunk"].format(
table_img_path=table_img_path,
table_caption=", ".join(table_caption) if table_caption else "None",
table_body=table_body,
table_footnote=", ".join(table_footnote) if table_footnote else "None",
enhanced_caption=enhanced_caption,
)
return await self._create_entity_and_chunk(
modal_chunk,
entity_info,
file_path,
batch_mode,
doc_id,
chunk_order_index,
)
return await self._create_entity_and_chunk(
modal_chunk, entity_info, file_path, batch_mode, doc_id, chunk_order_index
)
except Exception as e:
logger.error(f"Error processing table content: {e}")
# Fallback processing
fallback_entity = {
"entity_name": entity_name
if entity_name
else f"table_{compute_mdhash_id(str(modal_content))}",
"entity_type": "table",
"summary": f"Table content: {str(modal_content)[:100]}",
}
return str(modal_content), fallback_entity
def _parse_table_response(
self, response: str, entity_name: str = None
@@ -1069,6 +1204,90 @@ class TableModalProcessor(BaseModalProcessor):
class EquationModalProcessor(BaseModalProcessor):
"""Processor specialized for equation content"""
async def generate_description_only(
self,
modal_content,
content_type: str,
item_info: Dict[str, Any] = None,
entity_name: str = None,
) -> Tuple[str, Dict[str, Any]]:
"""
Generate equation description and entity info only, without entity relation extraction.
Used for batch processing stage 1.
Args:
modal_content: Equation content to process
content_type: Type of modal content ("equation")
item_info: Item information for context extraction
entity_name: Optional predefined entity name
Returns:
Tuple of (enhanced_caption, entity_info)
"""
try:
# Parse equation content (reuse existing logic)
if isinstance(modal_content, str):
try:
content_data = json.loads(modal_content)
except json.JSONDecodeError:
content_data = {"equation": modal_content}
else:
content_data = modal_content
equation_text = content_data.get("text")
equation_format = content_data.get("text_format", "")
# Extract context for current item
context = ""
if item_info:
context = self._get_context_for_item(item_info)
# Build equation analysis prompt with context
if context:
equation_prompt = PROMPTS.get(
"equation_prompt_with_context", PROMPTS["equation_prompt"]
).format(
context=context,
equation_text=equation_text,
equation_format=equation_format,
entity_name=entity_name
if entity_name
else "descriptive name for this equation",
)
else:
equation_prompt = PROMPTS["equation_prompt"].format(
equation_text=equation_text,
equation_format=equation_format,
entity_name=entity_name
if entity_name
else "descriptive name for this equation",
)
# Call LLM for equation analysis
response = await self.modal_caption_func(
equation_prompt,
system_prompt=PROMPTS["EQUATION_ANALYSIS_SYSTEM"],
)
# Parse response (reuse existing logic)
enhanced_caption, entity_info = self._parse_equation_response(
response, entity_name
)
return enhanced_caption, entity_info
except Exception as e:
logger.error(f"Error generating equation description: {e}")
# Fallback processing
fallback_entity = {
"entity_name": entity_name
if entity_name
else f"equation_{compute_mdhash_id(str(modal_content))}",
"entity_type": "equation",
"summary": f"Equation content: {str(modal_content)[:100]}",
}
return str(modal_content), fallback_entity
async def process_multimodal_content(
self,
modal_content,
@@ -1081,66 +1300,51 @@ class EquationModalProcessor(BaseModalProcessor):
chunk_order_index: int = 0,
) -> Tuple[str, Dict[str, Any]]:
"""Process equation content with context support"""
# Parse equation content
if isinstance(modal_content, str):
try:
content_data = json.loads(modal_content)
except json.JSONDecodeError:
content_data = {"equation": modal_content}
else:
content_data = modal_content
equation_text = content_data.get("text")
equation_format = content_data.get("text_format", "")
logger.debug(f"Begin Analysis of Equation: {equation_text}")
# Extract context for current item
context = ""
if item_info:
context = self._get_context_for_item(item_info)
# Build equation analysis prompt with context
if context:
equation_prompt = PROMPTS.get(
"equation_prompt_with_context", PROMPTS["equation_prompt"]
).format(
context=context,
equation_text=equation_text,
equation_format=equation_format,
entity_name=entity_name
if entity_name
else "descriptive name for this equation",
)
else:
equation_prompt = PROMPTS["equation_prompt"].format(
equation_text=equation_text,
equation_format=equation_format,
entity_name=entity_name
if entity_name
else "descriptive name for this equation",
try:
# Generate description and entity info
enhanced_caption, entity_info = await self.generate_description_only(
modal_content, content_type, item_info, entity_name
)
response = await self.modal_caption_func(
equation_prompt,
system_prompt=PROMPTS["EQUATION_ANALYSIS_SYSTEM"],
)
# Parse equation content for building complete chunk
if isinstance(modal_content, str):
try:
content_data = json.loads(modal_content)
except json.JSONDecodeError:
content_data = {"equation": modal_content}
else:
content_data = modal_content
# Parse response
enhanced_caption, entity_info = self._parse_equation_response(
response, entity_name
)
equation_text = content_data.get("text")
equation_format = content_data.get("text_format", "")
# Build complete equation content
modal_chunk = PROMPTS["equation_chunk"].format(
equation_text=equation_text,
equation_format=equation_format,
enhanced_caption=enhanced_caption,
)
# Build complete equation content
modal_chunk = PROMPTS["equation_chunk"].format(
equation_text=equation_text,
equation_format=equation_format,
enhanced_caption=enhanced_caption,
)
return await self._create_entity_and_chunk(
modal_chunk, entity_info, file_path, batch_mode, doc_id, chunk_order_index
)
return await self._create_entity_and_chunk(
modal_chunk,
entity_info,
file_path,
batch_mode,
doc_id,
chunk_order_index,
)
except Exception as e:
logger.error(f"Error processing equation content: {e}")
# Fallback processing
fallback_entity = {
"entity_name": entity_name
if entity_name
else f"equation_{compute_mdhash_id(str(modal_content))}",
"entity_type": "equation",
"summary": f"Equation content: {str(modal_content)[:100]}",
}
return str(modal_content), fallback_entity
def _parse_equation_response(
self, response: str, entity_name: str = None
@@ -1184,6 +1388,80 @@ class EquationModalProcessor(BaseModalProcessor):
class GenericModalProcessor(BaseModalProcessor):
"""Generic processor for other types of modal content"""
async def generate_description_only(
self,
modal_content,
content_type: str,
item_info: Dict[str, Any] = None,
entity_name: str = None,
) -> Tuple[str, Dict[str, Any]]:
"""
Generate generic modal description and entity info only, without entity relation extraction.
Used for batch processing stage 1.
Args:
modal_content: Generic modal content to process
content_type: Type of modal content
item_info: Item information for context extraction
entity_name: Optional predefined entity name
Returns:
Tuple of (enhanced_caption, entity_info)
"""
try:
# Extract context for current item
context = ""
if item_info:
context = self._get_context_for_item(item_info)
# Build generic analysis prompt with context
if context:
generic_prompt = PROMPTS.get(
"generic_prompt_with_context", PROMPTS["generic_prompt"]
).format(
context=context,
content_type=content_type,
entity_name=entity_name
if entity_name
else f"descriptive name for this {content_type}",
content=str(modal_content),
)
else:
generic_prompt = PROMPTS["generic_prompt"].format(
content_type=content_type,
entity_name=entity_name
if entity_name
else f"descriptive name for this {content_type}",
content=str(modal_content),
)
# Call LLM for generic analysis
response = await self.modal_caption_func(
generic_prompt,
system_prompt=PROMPTS["GENERIC_ANALYSIS_SYSTEM"].format(
content_type=content_type
),
)
# Parse response (reuse existing logic)
enhanced_caption, entity_info = self._parse_generic_response(
response, entity_name, content_type
)
return enhanced_caption, entity_info
except Exception as e:
logger.error(f"Error generating {content_type} description: {e}")
# Fallback processing
fallback_entity = {
"entity_name": entity_name
if entity_name
else f"{content_type}_{compute_mdhash_id(str(modal_content))}",
"entity_type": content_type,
"summary": f"{content_type} content: {str(modal_content)[:100]}",
}
return str(modal_content), fallback_entity
async def process_multimodal_content(
self,
modal_content,
@@ -1196,56 +1474,39 @@ class GenericModalProcessor(BaseModalProcessor):
chunk_order_index: int = 0,
) -> Tuple[str, Dict[str, Any]]:
"""Process generic modal content with context support"""
logger.debug(f"Begin Analysis of {content_type}: {modal_content}")
# Extract context for current item
context = ""
if item_info:
context = self._get_context_for_item(item_info)
# Build generic analysis prompt with context
if context:
generic_prompt = PROMPTS.get(
"generic_prompt_with_context", PROMPTS["generic_prompt"]
).format(
context=context,
content_type=content_type,
entity_name=entity_name
if entity_name
else f"descriptive name for this {content_type}",
content=str(modal_content),
)
else:
generic_prompt = PROMPTS["generic_prompt"].format(
content_type=content_type,
entity_name=entity_name
if entity_name
else f"descriptive name for this {content_type}",
content=str(modal_content),
try:
# Generate description and entity info
enhanced_caption, entity_info = await self.generate_description_only(
modal_content, content_type, item_info, entity_name
)
response = await self.modal_caption_func(
generic_prompt,
system_prompt=PROMPTS["GENERIC_ANALYSIS_SYSTEM"].format(
content_type=content_type
),
)
# Build complete content
modal_chunk = PROMPTS["generic_chunk"].format(
content_type=content_type.title(),
content=str(modal_content),
enhanced_caption=enhanced_caption,
)
# Parse response
enhanced_caption, entity_info = self._parse_generic_response(
response, entity_name, content_type
)
return await self._create_entity_and_chunk(
modal_chunk,
entity_info,
file_path,
batch_mode,
doc_id,
chunk_order_index,
)
# Build complete content
modal_chunk = PROMPTS["generic_chunk"].format(
content_type=content_type.title(),
content=str(modal_content),
enhanced_caption=enhanced_caption,
)
return await self._create_entity_and_chunk(
modal_chunk, entity_info, file_path, batch_mode, doc_id, chunk_order_index
)
except Exception as e:
logger.error(f"Error processing {content_type} content: {e}")
# Fallback processing
fallback_entity = {
"entity_name": entity_name
if entity_name
else f"{content_type}_{compute_mdhash_id(str(modal_content))}",
"entity_type": content_type,
"summary": f"{content_type} content: {str(modal_content)[:100]}",
}
return str(modal_content), fallback_entity
def _parse_generic_response(
self, response: str, entity_name: str = None, content_type: str = "content"

View File

@@ -8,7 +8,7 @@ import os
import time
import hashlib
import json
from typing import Dict, List, Any
from typing import Dict, List, Any, Tuple
from pathlib import Path
from raganything.parser import MineruParser, DoclingParser
from raganything.utils import (
@@ -16,6 +16,8 @@ from raganything.utils import (
insert_text_content,
get_processor_for_type,
)
import asyncio
from lightrag.utils import compute_mdhash_id
class ProcessorMixin:
@@ -441,38 +443,74 @@ class ProcessorMixin:
self.logger.debug("No multimodal content to process")
return
# Check if multimodal content for this document is already processed
# Check multimodal processing status - handle LightRAG's early "PROCESSED" marking
try:
existing_doc_status = await self.lightrag.doc_status.get_by_id(doc_id)
if existing_doc_status:
# Check if multimodal processing is already completed
# Check if multimodal content is already processed
multimodal_processed = existing_doc_status.get(
"multimodal_processed", False
)
existing_multimodal_chunks = existing_doc_status.get(
"multimodal_chunks_list", []
)
if multimodal_processed and len(existing_multimodal_chunks) >= len(
multimodal_items
):
if multimodal_processed:
self.logger.info(
f"Multimodal content already processed for document {doc_id} "
f"({len(existing_multimodal_chunks)} chunks found, {len(multimodal_items)} items to process)"
f"Document {doc_id} multimodal content is already processed"
)
return
elif len(existing_multimodal_chunks) > 0:
# Even if status is "PROCESSED" (text processing done),
# we still need to process multimodal content if not yet done
doc_status = existing_doc_status.get("status", "")
if doc_status == "PROCESSED" and not multimodal_processed:
self.logger.info(
f"Partial multimodal content found for document {doc_id} "
f"({len(existing_multimodal_chunks)} chunks exist, will reprocess all)"
f"Document {doc_id} text processing is complete, but multimodal content still needs processing"
)
# Continue with multimodal processing
elif doc_status == "PROCESSED" and multimodal_processed:
self.logger.info(
f"Document {doc_id} is fully processed (text + multimodal)"
)
return
except Exception as e:
self.logger.debug(f"Error checking multimodal cache for {doc_id}: {e}")
self.logger.debug(f"Error checking document status for {doc_id}: {e}")
# Continue with processing if cache check fails
# Use ProcessorMixin's own batch processing that can handle multiple content types
self.logger.info("Starting multimodal content processing...")
try:
await self._process_multimodal_content_batch_type_aware(
multimodal_items=multimodal_items, file_path=file_path, doc_id=doc_id
)
# Mark multimodal content as processed and update final status
await self._mark_multimodal_processing_complete(doc_id)
self.logger.info("Multimodal content processing complete")
except Exception as e:
self.logger.error(f"Error in multimodal processing: {e}")
# Fallback to individual processing if batch processing fails
self.logger.warning("Falling back to individual multimodal processing")
await self._process_multimodal_content_individual(
multimodal_items, file_path, doc_id
)
# Mark multimodal content as processed even after fallback
await self._mark_multimodal_processing_complete(doc_id)
async def _process_multimodal_content_individual(
self, multimodal_items: List[Dict[str, Any]], file_path: str, doc_id: str
):
"""
Process multimodal content individually (fallback method)
Args:
multimodal_items: List of multimodal items
file_path: File path (for reference)
doc_id: Document ID for proper chunk association
"""
file_name = os.path.basename(file_path)
# Collect all chunk results for batch processing (similar to text content processing)
@@ -480,7 +518,6 @@ class ProcessorMixin:
multimodal_chunk_ids = []
# Get current text chunks count to set proper order indexes for multimodal chunks
existing_doc_status = await self.lightrag.doc_status.get_by_id(doc_id)
existing_chunks_count = (
existing_doc_status.get("chunks_count", 0) if existing_doc_status else 0
@@ -517,8 +554,7 @@ class ProcessorMixin:
batch_mode=True,
doc_id=doc_id, # Pass doc_id for proper association
chunk_order_index=existing_chunks_count
+ i
+ 1, # Proper order index
+ i, # Proper order index
)
# Collect chunk results for batch processing
@@ -542,32 +578,29 @@ class ProcessorMixin:
self.logger.debug("Exception details:", exc_info=True)
continue
# Update doc_status to include multimodal chunks
# Update doc_status to include multimodal chunks in the standard chunks_list
if multimodal_chunk_ids:
try:
# Get current document status
current_doc_status = await self.lightrag.doc_status.get_by_id(doc_id)
if current_doc_status:
existing_multimodal_chunks = current_doc_status.get(
"multimodal_chunks_list", []
existing_chunks_list = current_doc_status.get("chunks_list", [])
existing_chunks_count = current_doc_status.get("chunks_count", 0)
# Add multimodal chunks to the standard chunks_list
updated_chunks_list = existing_chunks_list + multimodal_chunk_ids
updated_chunks_count = existing_chunks_count + len(
multimodal_chunk_ids
)
# Combine existing chunks with new multimodal chunks
updated_multimodal_chunks_list = (
existing_multimodal_chunks + multimodal_chunk_ids
)
# Update document status with separated chunk lists
# Update document status with integrated chunk list
await self.lightrag.doc_status.upsert(
{
doc_id: {
**current_doc_status, # Keep existing fields
"multimodal_chunks_list": updated_multimodal_chunks_list, # Separated multimodal chunks
"multimodal_chunks_count": len(
updated_multimodal_chunks_list
),
"multimodal_processed": True, # Mark multimodal processing as complete
"chunks_list": updated_chunks_list, # Integrated chunks list
"chunks_count": updated_chunks_count, # Updated total count
"updated_at": time.strftime("%Y-%m-%dT%H:%M:%S+00:00"),
}
}
@@ -577,7 +610,7 @@ class ProcessorMixin:
await self.lightrag.doc_status.index_done_callback()
self.logger.info(
f"Updated doc_status with {len(multimodal_chunk_ids)} multimodal chunks"
f"Updated doc_status with {len(multimodal_chunk_ids)} multimodal chunks integrated into chunks_list"
)
except Exception as e:
@@ -613,7 +646,628 @@ class ProcessorMixin:
await self.lightrag._insert_done()
self.logger.info("Multimodal content processing complete")
self.logger.info("Individual multimodal content processing complete")
# Mark multimodal content as processed
await self._mark_multimodal_processing_complete(doc_id)
async def _process_multimodal_content_batch_type_aware(
self, multimodal_items: List[Dict[str, Any]], file_path: str, doc_id: str
):
"""
Type-aware batch processing that selects correct processors based on content type.
This is the corrected implementation that handles different modality types properly.
Args:
multimodal_items: List of multimodal items with different types
file_path: File path for citation
doc_id: Document ID for proper association
"""
if not multimodal_items:
self.logger.debug("No multimodal content to process")
return
# Get existing chunks count for proper order indexing
try:
existing_doc_status = await self.lightrag.doc_status.get_by_id(doc_id)
existing_chunks_count = (
existing_doc_status.get("chunks_count", 0) if existing_doc_status else 0
)
except Exception:
existing_chunks_count = 0
# Use LightRAG's concurrency control
semaphore = asyncio.Semaphore(getattr(self.lightrag, "max_parallel_insert", 2))
# Stage 1: Concurrent generation of descriptions using correct processors for each type
async def process_single_item_with_correct_processor(
item: Dict[str, Any], index: int
):
"""Process single item using the correct processor for its type"""
async with semaphore:
try:
content_type = item.get("type", "unknown")
# Select the correct processor based on content type
processor = get_processor_for_type(
self.modal_processors, content_type
)
if not processor:
self.logger.warning(
f"No processor found for type: {content_type}"
)
return None
item_info = {
"page_idx": item.get("page_idx", 0),
"index": index,
"type": content_type,
}
# Call the correct processor's description generation method
(
description,
entity_info,
) = await processor.generate_description_only(
modal_content=item,
content_type=content_type,
item_info=item_info,
entity_name=None, # Let LLM auto-generate
)
return {
"index": index,
"content_type": content_type,
"description": description,
"entity_info": entity_info,
"original_item": item,
"item_info": item_info,
"chunk_order_index": existing_chunks_count + index,
"processor": processor, # Keep reference to the processor used
}
except Exception as e:
self.logger.error(
f"Error generating description for {content_type} item {index}: {e}"
)
return None
# Process all items concurrently with correct processors
tasks = [
asyncio.create_task(process_single_item_with_correct_processor(item, i))
for i, item in enumerate(multimodal_items)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter successful results
multimodal_data_list = []
for result in results:
if isinstance(result, Exception):
self.logger.error(f"Task failed: {result}")
continue
if result is not None:
multimodal_data_list.append(result)
if not multimodal_data_list:
self.logger.warning("No valid multimodal descriptions generated")
return
self.logger.info(
f"Generated descriptions for {len(multimodal_data_list)}/{len(multimodal_items)} multimodal items using correct processors"
)
# Stage 2: Convert to LightRAG chunks format
lightrag_chunks = self._convert_to_lightrag_chunks_type_aware(
multimodal_data_list, file_path, doc_id
)
# Stage 3: Store chunks to LightRAG storage
await self._store_chunks_to_lightrag_storage_type_aware(lightrag_chunks)
# Stage 3.5: Store multimodal main entities to entities_vdb
await self._store_multimodal_main_entities(
multimodal_data_list, lightrag_chunks, file_path
)
# Track chunk IDs for doc_status update
chunk_ids = list(lightrag_chunks.keys())
# Stage 4: Use LightRAG's batch entity relation extraction
chunk_results = await self._batch_extract_entities_lightrag_style_type_aware(
lightrag_chunks
)
# Stage 5: Add belongs_to relations (multimodal-specific)
enhanced_chunk_results = await self._batch_add_belongs_to_relations_type_aware(
chunk_results, multimodal_data_list
)
# Stage 6: Use LightRAG's batch merge
await self._batch_merge_lightrag_style_type_aware(
enhanced_chunk_results, file_path
)
# Stage 7: Update doc_status with integrated chunks_list
await self._update_doc_status_with_chunks_type_aware(doc_id, chunk_ids)
def _convert_to_lightrag_chunks_type_aware(
self, multimodal_data_list: List[Dict[str, Any]], file_path: str, doc_id: str
) -> Dict[str, Any]:
"""Convert multimodal data to LightRAG standard chunks format"""
chunks = {}
for data in multimodal_data_list:
description = data["description"]
entity_info = data["entity_info"]
chunk_order_index = data["chunk_order_index"]
content_type = data["content_type"]
original_item = data["original_item"]
# Apply the appropriate chunk template based on content type
formatted_chunk_content = self._apply_chunk_template(
content_type, original_item, description
)
# Generate chunk_id
chunk_id = compute_mdhash_id(formatted_chunk_content, prefix="chunk-")
# Calculate tokens
tokens = len(self.lightrag.tokenizer.encode(formatted_chunk_content))
# Build LightRAG standard chunk format
chunks[chunk_id] = {
"content": formatted_chunk_content, # Now uses the templated content
"tokens": tokens,
"full_doc_id": doc_id,
"chunk_order_index": chunk_order_index,
"file_path": os.path.basename(file_path),
"llm_cache_list": [], # LightRAG will populate this field
# Multimodal-specific metadata
"is_multimodal": True,
"modal_entity_name": entity_info["entity_name"],
"original_type": data["content_type"],
"page_idx": data["item_info"].get("page_idx", 0),
}
self.logger.debug(
f"Converted {len(chunks)} multimodal items to multimodal chunks format"
)
return chunks
def _apply_chunk_template(
self, content_type: str, original_item: Dict[str, Any], description: str
) -> str:
"""
Apply the appropriate chunk template based on content type
Args:
content_type: Type of content (image, table, equation, generic)
original_item: Original multimodal item data
description: Enhanced description generated by the processor
Returns:
Formatted chunk content using the appropriate template
"""
from raganything.prompt import PROMPTS
try:
if content_type == "image":
image_path = original_item.get("img_path", "")
captions = original_item.get("img_caption", [])
footnotes = original_item.get("img_footnote", [])
return PROMPTS["image_chunk"].format(
image_path=image_path,
captions=", ".join(captions) if captions else "None",
footnotes=", ".join(footnotes) if footnotes else "None",
enhanced_caption=description,
)
elif content_type == "table":
table_img_path = original_item.get("img_path", "")
table_caption = original_item.get("table_caption", [])
table_body = original_item.get("table_body", "")
table_footnote = original_item.get("table_footnote", [])
return PROMPTS["table_chunk"].format(
table_img_path=table_img_path,
table_caption=", ".join(table_caption) if table_caption else "None",
table_body=table_body,
table_footnote=", ".join(table_footnote)
if table_footnote
else "None",
enhanced_caption=description,
)
elif content_type == "equation":
equation_text = original_item.get("text", "")
equation_format = original_item.get("text_format", "")
return PROMPTS["equation_chunk"].format(
equation_text=equation_text,
equation_format=equation_format,
enhanced_caption=description,
)
else: # generic or unknown types
content = str(original_item.get("content", original_item))
return PROMPTS["generic_chunk"].format(
content_type=content_type.title(),
content=content,
enhanced_caption=description,
)
except Exception as e:
self.logger.warning(
f"Error applying chunk template for {content_type}: {e}"
)
# Fallback to just the description if template fails
return description
async def _store_chunks_to_lightrag_storage_type_aware(
self, chunks: Dict[str, Any]
):
"""Store chunks to storage"""
try:
# Store in text_chunks storage (required for extract_entities)
await self.lightrag.text_chunks.upsert(chunks)
# Store in chunks vector database for retrieval
await self.lightrag.chunks_vdb.upsert(chunks)
self.logger.debug(f"Stored {len(chunks)} multimodal chunks to storage")
except Exception as e:
self.logger.error(f"Error storing chunks to storage: {e}")
raise
async def _store_multimodal_main_entities(
self,
multimodal_data_list: List[Dict[str, Any]],
lightrag_chunks: Dict[str, Any],
file_path: str,
):
"""
Store multimodal main entities to entities_vdb.
This ensures that entities like "TableName (table)" are properly indexed.
Args:
multimodal_data_list: List of processed multimodal data with entity info
lightrag_chunks: Chunks in LightRAG format (already formatted with templates)
"""
if not multimodal_data_list:
return
# Create entities_vdb entries for all multimodal main entities
entities_to_store = {}
for data in multimodal_data_list:
entity_info = data["entity_info"]
entity_name = entity_info["entity_name"]
description = data["description"]
content_type = data["content_type"]
original_item = data["original_item"]
# Apply the same chunk template to get the formatted content
formatted_chunk_content = self._apply_chunk_template(
content_type, original_item, description
)
# Generate chunk_id using the formatted content (same as in _convert_to_lightrag_chunks)
chunk_id = compute_mdhash_id(formatted_chunk_content, prefix="chunk-")
# Generate entity_id using LightRAG's standard format
entity_id = compute_mdhash_id(entity_name, prefix="ent-")
# Create entity data in LightRAG format
entity_data = {
"entity_name": entity_name,
"entity_type": entity_info.get("entity_type", content_type),
"content": entity_info.get("summary", description),
"source_id": chunk_id,
"file_path": os.path.basename(file_path),
}
entities_to_store[entity_id] = entity_data
if entities_to_store:
try:
# Store entities in knowledge graph
for entity_id, entity_data in entities_to_store.items():
entity_name = entity_data["entity_name"]
# Create node data for knowledge graph
node_data = {
"entity_id": entity_name,
"entity_type": entity_data["entity_type"],
"description": entity_data["content"],
"source_id": entity_data["source_id"],
"file_path": entity_data["file_path"],
"created_at": int(time.time()),
}
# Store in knowledge graph
await self.lightrag.chunk_entity_relation_graph.upsert_node(
entity_name, node_data
)
# Store in entities_vdb
await self.lightrag.entities_vdb.upsert(entities_to_store)
await self.lightrag.entities_vdb.index_done_callback()
self.logger.debug(
f"Stored {len(entities_to_store)} multimodal main entities to knowledge graph and entities_vdb"
)
except Exception as e:
self.logger.error(f"Error storing multimodal main entities: {e}")
raise
async def _batch_extract_entities_lightrag_style_type_aware(
self, lightrag_chunks: Dict[str, Any]
) -> List[Tuple]:
"""Use LightRAG's extract_entities for batch entity relation extraction"""
from lightrag.kg.shared_storage import (
get_namespace_data,
get_pipeline_status_lock,
)
from lightrag.operate import extract_entities
# Get pipeline status (consistent with LightRAG)
pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_pipeline_status_lock()
# Directly use LightRAG's extract_entities
chunk_results = await extract_entities(
chunks=lightrag_chunks,
global_config=self.lightrag.__dict__,
pipeline_status=pipeline_status,
pipeline_status_lock=pipeline_status_lock,
llm_response_cache=self.lightrag.llm_response_cache,
text_chunks_storage=self.lightrag.text_chunks,
)
self.logger.info(
f"Extracted entities from {len(lightrag_chunks)} multimodal chunks"
)
return chunk_results
async def _batch_add_belongs_to_relations_type_aware(
self, chunk_results: List[Tuple], multimodal_data_list: List[Dict[str, Any]]
) -> List[Tuple]:
"""Add belongs_to relations for multimodal entities"""
# Create mapping from chunk_id to modal_entity_name
chunk_to_modal_entity = {}
chunk_to_file_path = {}
for data in multimodal_data_list:
description = data["description"]
content_type = data["content_type"]
original_item = data["original_item"]
# Use the same template formatting as in _convert_to_lightrag_chunks_type_aware
formatted_chunk_content = self._apply_chunk_template(
content_type, original_item, description
)
chunk_id = compute_mdhash_id(formatted_chunk_content, prefix="chunk-")
chunk_to_modal_entity[chunk_id] = data["entity_info"]["entity_name"]
chunk_to_file_path[chunk_id] = data.get("file_path", "multimodal_content")
enhanced_chunk_results = []
belongs_to_count = 0
for maybe_nodes, maybe_edges in chunk_results:
# Find corresponding modal_entity_name for this chunk
chunk_id = None
for nodes_dict in maybe_nodes.values():
if nodes_dict:
chunk_id = nodes_dict[0].get("source_id")
break
if chunk_id and chunk_id in chunk_to_modal_entity:
modal_entity_name = chunk_to_modal_entity[chunk_id]
file_path = chunk_to_file_path.get(chunk_id, "multimodal_content")
# Add belongs_to relations for all extracted entities
for entity_name in maybe_nodes.keys():
if entity_name != modal_entity_name: # Avoid self-relation
belongs_to_relation = {
"src_id": entity_name,
"tgt_id": modal_entity_name,
"description": f"Entity {entity_name} belongs to {modal_entity_name}",
"keywords": "belongs_to,part_of,contained_in",
"source_id": chunk_id,
"weight": 10.0,
"file_path": file_path,
}
# Add to maybe_edges
edge_key = (entity_name, modal_entity_name)
if edge_key not in maybe_edges:
maybe_edges[edge_key] = []
maybe_edges[edge_key].append(belongs_to_relation)
belongs_to_count += 1
enhanced_chunk_results.append((maybe_nodes, maybe_edges))
self.logger.info(
f"Added {belongs_to_count} belongs_to relations for multimodal entities"
)
return enhanced_chunk_results
async def _batch_merge_lightrag_style_type_aware(
self, enhanced_chunk_results: List[Tuple], file_path: str
):
"""Use LightRAG's merge_nodes_and_edges for batch merge"""
from lightrag.kg.shared_storage import (
get_namespace_data,
get_pipeline_status_lock,
)
from lightrag.operate import merge_nodes_and_edges
pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_pipeline_status_lock()
await merge_nodes_and_edges(
chunk_results=enhanced_chunk_results,
knowledge_graph_inst=self.lightrag.chunk_entity_relation_graph,
entity_vdb=self.lightrag.entities_vdb,
relationships_vdb=self.lightrag.relationships_vdb,
global_config=self.lightrag.__dict__,
pipeline_status=pipeline_status,
pipeline_status_lock=pipeline_status_lock,
llm_response_cache=self.lightrag.llm_response_cache,
current_file_number=1,
total_files=1,
file_path=os.path.basename(file_path),
)
await self.lightrag._insert_done()
async def _update_doc_status_with_chunks_type_aware(
self, doc_id: str, chunk_ids: List[str]
):
"""Update document status with multimodal chunks"""
try:
# Get current document status
current_doc_status = await self.lightrag.doc_status.get_by_id(doc_id)
if current_doc_status:
existing_chunks_list = current_doc_status.get("chunks_list", [])
existing_chunks_count = current_doc_status.get("chunks_count", 0)
# Add multimodal chunks to the standard chunks_list
updated_chunks_list = existing_chunks_list + chunk_ids
updated_chunks_count = existing_chunks_count + len(chunk_ids)
# Update document status with integrated chunk list
await self.lightrag.doc_status.upsert(
{
doc_id: {
**current_doc_status, # Keep existing fields
"chunks_list": updated_chunks_list, # Integrated chunks list
"chunks_count": updated_chunks_count, # Updated total count
"updated_at": time.strftime("%Y-%m-%dT%H:%M:%S+00:00"),
}
}
)
# Ensure doc_status update is persisted to disk
await self.lightrag.doc_status.index_done_callback()
self.logger.info(
f"Updated doc_status: added {len(chunk_ids)} multimodal chunks to standard chunks_list "
f"(total chunks: {updated_chunks_count})"
)
except Exception as e:
self.logger.warning(
f"Error updating doc_status with multimodal chunks: {e}"
)
async def _mark_multimodal_processing_complete(self, doc_id: str):
"""Mark multimodal content processing as complete in the document status."""
try:
current_doc_status = await self.lightrag.doc_status.get_by_id(doc_id)
if current_doc_status:
await self.lightrag.doc_status.upsert(
{
doc_id: {
**current_doc_status,
"multimodal_processed": True,
"updated_at": time.strftime("%Y-%m-%dT%H:%M:%S+00:00"),
}
}
)
await self.lightrag.doc_status.index_done_callback()
self.logger.debug(
f"Marked multimodal content processing as complete for document {doc_id}"
)
except Exception as e:
self.logger.warning(
f"Error marking multimodal processing as complete for document {doc_id}: {e}"
)
async def is_document_fully_processed(self, doc_id: str) -> bool:
"""
Check if a document is fully processed (both text and multimodal content).
Args:
doc_id: Document ID to check
Returns:
bool: True if both text and multimodal content are processed
"""
try:
doc_status = await self.lightrag.doc_status.get_by_id(doc_id)
if not doc_status:
return False
text_processed = doc_status.get("status") == "PROCESSED"
multimodal_processed = doc_status.get("multimodal_processed", False)
return text_processed and multimodal_processed
except Exception as e:
self.logger.error(
f"Error checking document processing status for {doc_id}: {e}"
)
return False
async def get_document_processing_status(self, doc_id: str) -> Dict[str, Any]:
"""
Get detailed processing status for a document.
Args:
doc_id: Document ID to check
Returns:
Dict with processing status details
"""
try:
doc_status = await self.lightrag.doc_status.get_by_id(doc_id)
if not doc_status:
return {
"exists": False,
"text_processed": False,
"multimodal_processed": False,
"fully_processed": False,
"chunks_count": 0,
}
text_processed = doc_status.get("status") == "PROCESSED"
multimodal_processed = doc_status.get("multimodal_processed", False)
fully_processed = text_processed and multimodal_processed
return {
"exists": True,
"text_processed": text_processed,
"multimodal_processed": multimodal_processed,
"fully_processed": fully_processed,
"chunks_count": doc_status.get("chunks_count", 0),
"chunks_list": doc_status.get("chunks_list", []),
"status": doc_status.get("status", ""),
"updated_at": doc_status.get("updated_at", ""),
"raw_status": doc_status,
}
except Exception as e:
self.logger.error(
f"Error getting document processing status for {doc_id}: {e}"
)
return {
"exists": False,
"error": str(e),
"text_processed": False,
"multimodal_processed": False,
"fully_processed": False,
"chunks_count": 0,
}
async def process_document_complete(
self,
@@ -689,37 +1343,12 @@ class ProcessorMixin:
if multimodal_items:
await self._process_multimodal_content(multimodal_items, file_path, doc_id)
else:
# If no multimodal content, mark as processed to avoid future checks
try:
existing_doc_status = await self.lightrag.doc_status.get_by_id(doc_id)
if existing_doc_status and not existing_doc_status.get(
"multimodal_processed", False
):
existing_multimodal_chunks = existing_doc_status.get(
"multimodal_chunks_list", []
)
await self.lightrag.doc_status.upsert(
{
doc_id: {
**existing_doc_status,
"multimodal_chunks_list": existing_multimodal_chunks,
"multimodal_chunks_count": len(
existing_multimodal_chunks
),
"multimodal_processed": True,
"updated_at": time.strftime("%Y-%m-%dT%H:%M:%S+00:00"),
}
}
)
await self.lightrag.doc_status.index_done_callback()
self.logger.debug(
f"Marked document {doc_id[:8]}... as having no multimodal content"
)
except Exception as e:
self.logger.debug(
f"Error updating doc_status for no multimodal content: {e}"
)
# If no multimodal content, mark multimodal processing as complete
# This ensures the document status properly reflects completion of all processing
await self._mark_multimodal_processing_complete(doc_id)
self.logger.debug(
f"No multimodal content found in document {doc_id}, marked multimodal processing as complete"
)
self.logger.info(f"Document {file_path} processing complete!")
@@ -817,36 +1446,11 @@ class ProcessorMixin:
if multimodal_items:
await self._process_multimodal_content(multimodal_items, file_path, doc_id)
else:
# If no multimodal content, mark as processed to avoid future checks
try:
existing_doc_status = await self.lightrag.doc_status.get_by_id(doc_id)
if existing_doc_status and not existing_doc_status.get(
"multimodal_processed", False
):
existing_multimodal_chunks = existing_doc_status.get(
"multimodal_chunks_list", []
)
await self.lightrag.doc_status.upsert(
{
doc_id: {
**existing_doc_status,
"multimodal_chunks_list": existing_multimodal_chunks,
"multimodal_chunks_count": len(
existing_multimodal_chunks
),
"multimodal_processed": True,
"updated_at": time.strftime("%Y-%m-%dT%H:%M:%S+00:00"),
}
}
)
await self.lightrag.doc_status.index_done_callback()
self.logger.debug(
f"Marked document {doc_id[:8]}... as having no multimodal content"
)
except Exception as e:
self.logger.debug(
f"Error updating doc_status for no multimodal content: {e}"
)
# If no multimodal content, mark multimodal processing as complete
# This ensures the document status properly reflects completion of all processing
await self._mark_multimodal_processing_complete(doc_id)
self.logger.debug(
f"No multimodal content found in document {doc_id}, marked multimodal processing as complete"
)
self.logger.info(f"Content list insertion complete for: {file_path}")