Merge pull request #62 from HKUDS/insert_content_list

Direct Content List Insertion
This commit is contained in:
zrguo
2025-07-23 19:04:53 +08:00
committed by GitHub
5 changed files with 903 additions and 0 deletions

176
README.md
View File

@@ -79,6 +79,7 @@ Users can query documents containing **interleaved text**, **visual diagrams**,
- **🧠 Specialized Content Analysis** - Dedicated processors for images, tables, mathematical equations, and heterogeneous content types
- **🔗 Multimodal Knowledge Graph** - Automatic entity extraction and cross-modal relationship discovery for enhanced understanding
- **⚡ Adaptive Processing Modes** - Flexible MinerU-based parsing or direct multimodal content injection workflows
- **📋 Direct Content List Insertion** - Bypass document parsing by directly inserting pre-parsed content lists from external sources
- **🎯 Hybrid Intelligent Retrieval** - Advanced search capabilities spanning textual and multimodal content with contextual understanding
</div>
@@ -702,6 +703,181 @@ if __name__ == "__main__":
asyncio.run(load_existing_lightrag())
```
#### 7. Direct Content List Insertion
For scenarios where you already have a pre-parsed content list (e.g., from external parsers or previous processing), you can directly insert it into RAGAnything without document parsing:
```python
import asyncio
from raganything import RAGAnything, RAGAnythingConfig
from lightrag.llm.openai import openai_complete_if_cache, openai_embed
from lightrag.utils import EmbeddingFunc
async def insert_content_list_example():
# Set up API configuration
api_key = "your-api-key"
base_url = "your-base-url" # Optional
# Create RAGAnything configuration
config = RAGAnythingConfig(
working_dir="./rag_storage",
enable_image_processing=True,
enable_table_processing=True,
enable_equation_processing=True,
)
# Define model functions
def llm_model_func(prompt, system_prompt=None, history_messages=[], **kwargs):
return openai_complete_if_cache(
"gpt-4o-mini",
prompt,
system_prompt=system_prompt,
history_messages=history_messages,
api_key=api_key,
base_url=base_url,
**kwargs,
)
def vision_model_func(prompt, system_prompt=None, history_messages=[], image_data=None, **kwargs):
if image_data:
return openai_complete_if_cache(
"gpt-4o",
"",
system_prompt=None,
history_messages=[],
messages=[
{"role": "system", "content": system_prompt} if system_prompt else None,
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_data}"}}
],
} if image_data else {"role": "user", "content": prompt},
],
api_key=api_key,
base_url=base_url,
**kwargs,
)
else:
return llm_model_func(prompt, system_prompt, history_messages, **kwargs)
embedding_func = EmbeddingFunc(
embedding_dim=3072,
max_token_size=8192,
func=lambda texts: openai_embed(
texts,
model="text-embedding-3-large",
api_key=api_key,
base_url=base_url,
),
)
# Initialize RAGAnything
rag = RAGAnything(
config=config,
llm_model_func=llm_model_func,
vision_model_func=vision_model_func,
embedding_func=embedding_func,
)
# Example: Pre-parsed content list from external source
content_list = [
{
"type": "text",
"text": "This is the introduction section of our research paper.",
"page_idx": 0 # Page number where this content appears
},
{
"type": "image",
"img_path": "/absolute/path/to/figure1.jpg", # IMPORTANT: Use absolute path
"img_caption": ["Figure 1: System Architecture"],
"img_footnote": ["Source: Authors' original design"],
"page_idx": 1 # Page number where this image appears
},
{
"type": "table",
"table_body": "| Method | Accuracy | F1-Score |\n|--------|----------|----------|\n| Ours | 95.2% | 0.94 |\n| Baseline | 87.3% | 0.85 |",
"table_caption": ["Table 1: Performance Comparison"],
"table_footnote": ["Results on test dataset"],
"page_idx": 2 # Page number where this table appears
},
{
"type": "equation",
"latex": "P(d|q) = \\frac{P(q|d) \\cdot P(d)}{P(q)}",
"text": "Document relevance probability formula",
"page_idx": 3 # Page number where this equation appears
},
{
"type": "text",
"text": "In conclusion, our method demonstrates superior performance across all metrics.",
"page_idx": 4 # Page number where this content appears
}
]
# Insert the content list directly
await rag.insert_content_list(
content_list=content_list,
file_path="research_paper.pdf", # Reference file name for citation
split_by_character=None, # Optional text splitting
split_by_character_only=False, # Optional text splitting mode
doc_id=None, # Optional custom document ID (will be auto-generated if not provided)
display_stats=True # Show content statistics
)
# Query the inserted content
result = await rag.aquery(
"What are the key findings and performance metrics mentioned in the research?",
mode="hybrid"
)
print("Query result:", result)
# You can also insert multiple content lists with different document IDs
another_content_list = [
{
"type": "text",
"text": "This is content from another document.",
"page_idx": 0 # Page number where this content appears
},
{
"type": "table",
"table_body": "| Feature | Value |\n|---------|-------|\n| Speed | Fast |\n| Accuracy | High |",
"table_caption": ["Feature Comparison"],
"page_idx": 1 # Page number where this table appears
}
]
await rag.insert_content_list(
content_list=another_content_list,
file_path="another_document.pdf",
doc_id="custom-doc-id-123" # Custom document ID
)
if __name__ == "__main__":
asyncio.run(insert_content_list_example())
```
**Content List Format:**
The `content_list` should follow the standard format with each item being a dictionary containing:
- **Text content**: `{"type": "text", "text": "content text", "page_idx": 0}`
- **Image content**: `{"type": "image", "img_path": "/absolute/path/to/image.jpg", "img_caption": ["caption"], "img_footnote": ["note"], "page_idx": 1}`
- **Table content**: `{"type": "table", "table_body": "markdown table", "table_caption": ["caption"], "table_footnote": ["note"], "page_idx": 2}`
- **Equation content**: `{"type": "equation", "latex": "LaTeX formula", "text": "description", "page_idx": 3}`
- **Generic content**: `{"type": "custom_type", "content": "any content", "page_idx": 4}`
**Important Notes:**
- **`img_path`**: Must be an absolute path to the image file (e.g., `/home/user/images/chart.jpg` or `C:\Users\user\images\chart.jpg`)
- **`page_idx`**: Represents the page number where the content appears in the original document (0-based indexing)
- **Content ordering**: Items are processed in the order they appear in the list
This method is particularly useful when:
- You have content from external parsers (non-MinerU/Docling)
- You want to process programmatically generated content
- You need to insert content from multiple sources into a single knowledge base
- You have cached parsing results that you want to reuse
---
## 🛠️ Examples

View File

@@ -75,6 +75,7 @@
- **🧠 多模态内容分析引擎** - 针对图像、表格、公式和通用文本内容部署专门的处理器,确保各类内容的精准解析
- **🔗 基于知识图谱索引** - 实现自动化实体提取和关系构建,建立跨模态的语义连接网络
- **⚡ 灵活的处理架构** - 支持基于MinerU的智能解析模式和直接多模态内容插入模式满足不同应用场景需求
- **📋 直接内容列表插入** - 跳过文档解析,直接插入来自外部源的预解析内容列表,支持多种数据来源整合
- **🎯 跨模态检索机制** - 实现跨文本和多模态内容的智能检索,提供精准的信息定位和匹配能力
</div>
@@ -699,6 +700,181 @@ if __name__ == "__main__":
asyncio.run(load_existing_lightrag())
```
#### 7. 直接插入内容列表
当您已经有预解析的内容列表(例如,来自外部解析器或之前的处理结果)时,可以直接插入到 RAGAnything 中而无需文档解析:
```python
import asyncio
from raganything import RAGAnything, RAGAnythingConfig
from lightrag.llm.openai import openai_complete_if_cache, openai_embed
from lightrag.utils import EmbeddingFunc
async def insert_content_list_example():
# 设置 API 配置
api_key = "your-api-key"
base_url = "your-base-url" # 可选
# 创建 RAGAnything 配置
config = RAGAnythingConfig(
working_dir="./rag_storage",
enable_image_processing=True,
enable_table_processing=True,
enable_equation_processing=True,
)
# 定义模型函数
def llm_model_func(prompt, system_prompt=None, history_messages=[], **kwargs):
return openai_complete_if_cache(
"gpt-4o-mini",
prompt,
system_prompt=system_prompt,
history_messages=history_messages,
api_key=api_key,
base_url=base_url,
**kwargs,
)
def vision_model_func(prompt, system_prompt=None, history_messages=[], image_data=None, **kwargs):
if image_data:
return openai_complete_if_cache(
"gpt-4o",
"",
system_prompt=None,
history_messages=[],
messages=[
{"role": "system", "content": system_prompt} if system_prompt else None,
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_data}"}}
],
} if image_data else {"role": "user", "content": prompt},
],
api_key=api_key,
base_url=base_url,
**kwargs,
)
else:
return llm_model_func(prompt, system_prompt, history_messages, **kwargs)
embedding_func = EmbeddingFunc(
embedding_dim=3072,
max_token_size=8192,
func=lambda texts: openai_embed(
texts,
model="text-embedding-3-large",
api_key=api_key,
base_url=base_url,
),
)
# 初始化 RAGAnything
rag = RAGAnything(
config=config,
llm_model_func=llm_model_func,
vision_model_func=vision_model_func,
embedding_func=embedding_func,
)
# 示例:来自外部源的预解析内容列表
content_list = [
{
"type": "text",
"text": "这是我们研究论文的引言部分。",
"page_idx": 0 # 此内容出现的页码
},
{
"type": "image",
"img_path": "/absolute/path/to/figure1.jpg", # 重要:使用绝对路径
"img_caption": ["图1系统架构"],
"img_footnote": ["来源:作者原创设计"],
"page_idx": 1 # 此图像出现的页码
},
{
"type": "table",
"table_body": "| 方法 | 准确率 | F1分数 |\n|------|--------|--------|\n| 我们的方法 | 95.2% | 0.94 |\n| 基准方法 | 87.3% | 0.85 |",
"table_caption": ["表1性能对比"],
"table_footnote": ["测试数据集结果"],
"page_idx": 2 # 此表格出现的页码
},
{
"type": "equation",
"latex": "P(d|q) = \\frac{P(q|d) \\cdot P(d)}{P(q)}",
"text": "文档相关性概率公式",
"page_idx": 3 # 此公式出现的页码
},
{
"type": "text",
"text": "总之,我们的方法在所有指标上都表现出优越的性能。",
"page_idx": 4 # 此内容出现的页码
}
]
# 直接插入内容列表
await rag.insert_content_list(
content_list=content_list,
file_path="research_paper.pdf", # 用于引用的参考文件名
split_by_character=None, # 可选的文本分割
split_by_character_only=False, # 可选的文本分割模式
doc_id=None, # 可选的自定义文档ID如果未提供将自动生成
display_stats=True # 显示内容统计信息
)
# 查询插入的内容
result = await rag.aquery(
"研究中提到的主要发现和性能指标是什么?",
mode="hybrid"
)
print("查询结果:", result)
# 您也可以使用不同的文档ID插入多个内容列表
another_content_list = [
{
"type": "text",
"text": "这是来自另一个文档的内容。",
"page_idx": 0 # 此内容出现的页码
},
{
"type": "table",
"table_body": "| 特性 | 值 |\n|------|----|\n| 速度 | 快速 |\n| 准确性 | 高 |",
"table_caption": ["特性对比"],
"page_idx": 1 # 此表格出现的页码
}
]
await rag.insert_content_list(
content_list=another_content_list,
file_path="another_document.pdf",
doc_id="custom-doc-id-123" # 自定义文档ID
)
if __name__ == "__main__":
asyncio.run(insert_content_list_example())
```
**内容列表格式:**
`content_list` 应遵循标准格式,每个项目都是包含以下内容的字典:
- **文本内容**: `{"type": "text", "text": "内容文本", "page_idx": 0}`
- **图像内容**: `{"type": "image", "img_path": "/absolute/path/to/image.jpg", "img_caption": ["标题"], "img_footnote": ["注释"], "page_idx": 1}`
- **表格内容**: `{"type": "table", "table_body": "markdown表格", "table_caption": ["标题"], "table_footnote": ["注释"], "page_idx": 2}`
- **公式内容**: `{"type": "equation", "latex": "LaTeX公式", "text": "描述", "page_idx": 3}`
- **通用内容**: `{"type": "custom_type", "content": "任何内容", "page_idx": 4}`
**重要说明:**
- **`img_path`**: 必须是图像文件的绝对路径(例如:`/home/user/images/chart.jpg``C:\Users\user\images\chart.jpg`
- **`page_idx`**: 表示内容在原始文档中出现的页码从0开始的索引
- **内容顺序**: 项目按照在列表中出现的顺序进行处理
此方法在以下情况下特别有用:
- 您有来自外部解析器的内容非MinerU/Docling
- 您想要处理程序化生成的内容
- 您需要将来自多个源的内容插入到单个知识库中
- 您有想要重用的缓存解析结果
---
## 🛠️ 示例

View File

@@ -0,0 +1,419 @@
#!/usr/bin/env python
"""
Example script demonstrating direct content list insertion with RAGAnything
This example shows how to:
1. Create a simple content list with different content types
2. Insert content list directly without document parsing using insert_content_list() method
3. Perform pure text queries using aquery() method
4. Perform multimodal queries with specific multimodal content using aquery_with_multimodal() method
5. Handle different types of multimodal content in the inserted knowledge base
"""
import os
import argparse
import asyncio
import logging
import logging.config
from pathlib import Path
# Add project root directory to Python path
import sys
sys.path.append(str(Path(__file__).parent.parent))
from lightrag.llm.openai import openai_complete_if_cache, openai_embed
from lightrag.utils import EmbeddingFunc, logger, set_verbose_debug
from raganything import RAGAnything, RAGAnythingConfig
from dotenv import load_dotenv
load_dotenv(dotenv_path=".env", override=False)
def configure_logging():
"""Configure logging for the application"""
# Get log directory path from environment variable or use current directory
log_dir = os.getenv("LOG_DIR", os.getcwd())
log_file_path = os.path.abspath(
os.path.join(log_dir, "insert_content_list_example.log")
)
print(f"\nInsert Content List example log file: {log_file_path}\n")
os.makedirs(os.path.dirname(log_dir), exist_ok=True)
# Get log file max size and backup count from environment variables
log_max_bytes = int(os.getenv("LOG_MAX_BYTES", 10485760)) # Default 10MB
log_backup_count = int(os.getenv("LOG_BACKUP_COUNT", 5)) # Default 5 backups
logging.config.dictConfig(
{
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"default": {
"format": "%(levelname)s: %(message)s",
},
"detailed": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
},
},
"handlers": {
"console": {
"formatter": "default",
"class": "logging.StreamHandler",
"stream": "ext://sys.stderr",
},
"file": {
"formatter": "detailed",
"class": "logging.handlers.RotatingFileHandler",
"filename": log_file_path,
"maxBytes": log_max_bytes,
"backupCount": log_backup_count,
"encoding": "utf-8",
},
},
"loggers": {
"lightrag": {
"handlers": ["console", "file"],
"level": "INFO",
"propagate": False,
},
},
}
)
# Set the logger level to INFO
logger.setLevel(logging.INFO)
# Enable verbose debug if needed
set_verbose_debug(os.getenv("VERBOSE", "false").lower() == "true")
def create_sample_content_list():
"""
Create a simple content list for testing insert_content_list functionality
Returns:
List[Dict]: Sample content list with various content types
Note:
- img_path should be absolute path to the image file
- page_idx represents the page number where the content appears (0-based)
"""
content_list = [
# Introduction text
{
"type": "text",
"text": "Welcome to the RAGAnything System Documentation. This guide covers the advanced multimodal document processing capabilities and features of our comprehensive RAG system.",
"page_idx": 0, # Page number where this content appears
},
# System architecture image
{
"type": "image",
"img_path": "/absolute/path/to/system_architecture.jpg", # IMPORTANT: Use absolute path to image file
"img_caption": ["Figure 1: RAGAnything System Architecture"],
"img_footnote": [
"The architecture shows the complete pipeline from document parsing to multimodal query processing"
],
"page_idx": 1, # Page number where this image appears
},
# Performance comparison table
{
"type": "table",
"table_body": """| System | Accuracy | Processing Speed | Memory Usage |
|--------|----------|------------------|--------------|
| RAGAnything | 95.2% | 120ms | 2.1GB |
| Traditional RAG | 87.3% | 180ms | 3.2GB |
| Baseline System | 82.1% | 220ms | 4.1GB |
| Simple Retrieval | 76.5% | 95ms | 1.8GB |""",
"table_caption": [
"Table 1: Performance Comparison of Different RAG Systems"
],
"table_footnote": [
"All tests conducted on the same hardware with identical test datasets"
],
"page_idx": 2, # Page number where this table appears
},
# Mathematical formula
{
"type": "equation",
"latex": "Relevance(d, q) = \\sum_{i=1}^{n} w_i \\cdot sim(t_i^d, t_i^q) \\cdot \\alpha_i",
"text": "Document relevance scoring formula where w_i are term weights, sim() is similarity function, and α_i are modality importance factors",
"page_idx": 3, # Page number where this equation appears
},
# Feature description
{
"type": "text",
"text": "The system supports multiple content modalities including text, images, tables, and mathematical equations. Each modality is processed using specialized processors optimized for that content type.",
"page_idx": 4, # Page number where this content appears
},
# Technical specifications table
{
"type": "table",
"table_body": """| Feature | Specification |
|---------|---------------|
| Supported Formats | PDF, DOCX, PPTX, XLSX, Images |
| Max Document Size | 100MB |
| Concurrent Processing | Up to 8 documents |
| Query Response Time | <200ms average |
| Knowledge Graph Nodes | Up to 1M entities |""",
"table_caption": ["Table 2: Technical Specifications"],
"table_footnote": [
"Specifications may vary based on hardware configuration"
],
"page_idx": 5, # Page number where this table appears
},
# Conclusion
{
"type": "text",
"text": "RAGAnything represents a significant advancement in multimodal document processing, providing comprehensive solutions for complex knowledge extraction and retrieval tasks.",
"page_idx": 6, # Page number where this content appears
},
]
return content_list
async def demo_insert_content_list(
api_key: str,
base_url: str = None,
working_dir: str = None,
):
"""
Demonstrate content list insertion and querying with RAGAnything
Args:
api_key: OpenAI API key
base_url: Optional base URL for API
working_dir: Working directory for RAG storage
"""
try:
# Create RAGAnything configuration
config = RAGAnythingConfig(
working_dir=working_dir or "./rag_storage",
enable_image_processing=True,
enable_table_processing=True,
enable_equation_processing=True,
display_content_stats=True, # Show content statistics
)
# Define LLM model function
def llm_model_func(prompt, system_prompt=None, history_messages=[], **kwargs):
return openai_complete_if_cache(
"gpt-4o-mini",
prompt,
system_prompt=system_prompt,
history_messages=history_messages,
api_key=api_key,
base_url=base_url,
**kwargs,
)
# Define vision model function for image processing
def vision_model_func(
prompt, system_prompt=None, history_messages=[], image_data=None, **kwargs
):
if image_data:
return openai_complete_if_cache(
"gpt-4o",
"",
system_prompt=None,
history_messages=[],
messages=[
{"role": "system", "content": system_prompt}
if system_prompt
else None,
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_data}"
},
},
],
}
if image_data
else {"role": "user", "content": prompt},
],
api_key=api_key,
base_url=base_url,
**kwargs,
)
else:
return llm_model_func(prompt, system_prompt, history_messages, **kwargs)
# Define embedding function
embedding_func = EmbeddingFunc(
embedding_dim=3072,
max_token_size=8192,
func=lambda texts: openai_embed(
texts,
model="text-embedding-3-large",
api_key=api_key,
base_url=base_url,
),
)
# Initialize RAGAnything
rag = RAGAnything(
config=config,
llm_model_func=llm_model_func,
vision_model_func=vision_model_func,
embedding_func=embedding_func,
)
# Create sample content list
logger.info("Creating sample content list...")
content_list = create_sample_content_list()
logger.info(f"Created content list with {len(content_list)} items")
# Insert content list directly
logger.info("\nInserting content list into RAGAnything...")
await rag.insert_content_list(
content_list=content_list,
file_path="raganything_documentation.pdf", # Reference file name for citation
split_by_character=None, # Optional text splitting
split_by_character_only=False, # Optional text splitting mode
doc_id="demo-doc-001", # Custom document ID
display_stats=True, # Show content statistics
)
logger.info("Content list insertion completed!")
# Example queries - demonstrating different query approaches
logger.info("\nQuerying inserted content:")
# 1. Pure text queries using aquery()
text_queries = [
"What is RAGAnything and what are its main features?",
"How does RAGAnything compare to traditional RAG systems?",
"What are the technical specifications of the system?",
]
for query in text_queries:
logger.info(f"\n[Text Query]: {query}")
result = await rag.aquery(query, mode="hybrid")
logger.info(f"Answer: {result}")
# 2. Multimodal query with specific multimodal content using aquery_with_multimodal()
logger.info(
"\n[Multimodal Query]: Analyzing new performance data against existing benchmarks"
)
multimodal_result = await rag.aquery_with_multimodal(
"Compare this new performance data with the existing benchmark results in the documentation",
multimodal_content=[
{
"type": "table",
"table_data": """Method,Accuracy,Speed,Memory
New_Approach,97.1%,110ms,1.9GB
Enhanced_RAG,91.4%,140ms,2.5GB""",
"table_caption": "Latest experimental results",
}
],
mode="hybrid",
)
logger.info(f"Answer: {multimodal_result}")
# 3. Another multimodal query with equation content
logger.info("\n[Multimodal Query]: Mathematical formula analysis")
equation_result = await rag.aquery_with_multimodal(
"How does this similarity formula relate to the relevance scoring mentioned in the documentation?",
multimodal_content=[
{
"type": "equation",
"latex": "sim(a, b) = \\frac{a \\cdot b}{||a|| \\times ||b||} + \\beta \\cdot context\\_weight",
"equation_caption": "Enhanced cosine similarity with context weighting",
}
],
mode="hybrid",
)
logger.info(f"Answer: {equation_result}")
# 4. Insert another content list with different document ID
logger.info("\nInserting additional content list...")
additional_content = [
{
"type": "text",
"text": "This is additional documentation about advanced features and configuration options.",
"page_idx": 0, # Page number where this content appears
},
{
"type": "table",
"table_body": """| Configuration | Default Value | Range |
|---------------|---------------|-------|
| Chunk Size | 512 tokens | 128-2048 |
| Context Window | 4096 tokens | 1024-8192 |
| Batch Size | 32 | 1-128 |""",
"table_caption": ["Advanced Configuration Parameters"],
"page_idx": 1, # Page number where this table appears
},
]
await rag.insert_content_list(
content_list=additional_content,
file_path="advanced_configuration.pdf",
doc_id="demo-doc-002", # Different document ID
)
# Query combined knowledge base
logger.info("\n[Combined Query]: What configuration options are available?")
combined_result = await rag.aquery(
"What configuration options are available and what are their default values?",
mode="hybrid",
)
logger.info(f"Answer: {combined_result}")
except Exception as e:
logger.error(f"Error in content list insertion demo: {str(e)}")
import traceback
logger.error(traceback.format_exc())
def main():
"""Main function to run the example"""
parser = argparse.ArgumentParser(description="Insert Content List Example")
parser.add_argument(
"--working_dir", "-w", default="./rag_storage", help="Working directory path"
)
parser.add_argument(
"--api-key",
default=os.getenv("LLM_BINDING_API_KEY"),
help="OpenAI API key (defaults to LLM_BINDING_API_KEY env var)",
)
parser.add_argument(
"--base-url",
default=os.getenv("LLM_BINDING_HOST"),
help="Optional base URL for API",
)
args = parser.parse_args()
# Check if API key is provided
if not args.api_key:
logger.error("Error: OpenAI API key is required")
logger.error("Set api key environment variable or use --api-key option")
return
# Run the demo
asyncio.run(
demo_insert_content_list(
args.api_key,
args.base_url,
args.working_dir,
)
)
if __name__ == "__main__":
# Configure logging first
configure_logging()
print("RAGAnything Insert Content List Example")
print("=" * 45)
print("Demonstrating direct content list insertion without document parsing")
print("=" * 45)
main()

View File

@@ -25,6 +25,10 @@ from lightrag.llm.openai import openai_complete_if_cache, openai_embed
from lightrag.utils import EmbeddingFunc, logger, set_verbose_debug
from raganything import RAGAnything, RAGAnythingConfig
from dotenv import load_dotenv
load_dotenv(dotenv_path=".env", override=False)
def configure_logging():
"""Configure logging for the application"""

View File

@@ -722,3 +722,131 @@ class ProcessorMixin:
)
self.logger.info(f"Document {file_path} processing complete!")
async def insert_content_list(
self,
content_list: List[Dict[str, Any]],
file_path: str = "unknown_document",
split_by_character: str | None = None,
split_by_character_only: bool = False,
doc_id: str | None = None,
display_stats: bool = None,
):
"""
Insert content list directly without document parsing
Args:
content_list: Pre-parsed content list containing text and multimodal items.
Each item should be a dictionary with the following structure:
- Text: {"type": "text", "text": "content", "page_idx": 0}
- Image: {"type": "image", "img_path": "/absolute/path/to/image.jpg",
"img_caption": ["caption"], "img_footnote": ["note"], "page_idx": 1}
- Table: {"type": "table", "table_body": "markdown table",
"table_caption": ["caption"], "table_footnote": ["note"], "page_idx": 2}
- Equation: {"type": "equation", "latex": "LaTeX formula",
"text": "description", "page_idx": 3}
- Generic: {"type": "custom_type", "content": "any content", "page_idx": 4}
file_path: Reference file path/name for citation (defaults to "unknown_document")
split_by_character: Optional character to split the text by
split_by_character_only: If True, split only by the specified character
doc_id: Optional document ID, if not provided will be generated from content
display_stats: Whether to display content statistics (defaults to config.display_content_stats)
Note:
- img_path must be an absolute path to the image file
- page_idx represents the page number where the content appears (0-based indexing)
- Items are processed in the order they appear in the list
"""
# Ensure LightRAG is initialized
await self._ensure_lightrag_initialized()
# Use config defaults if not provided
if display_stats is None:
display_stats = self.config.display_content_stats
self.logger.info(
f"Starting direct content list insertion for: {file_path} ({len(content_list)} items)"
)
# Generate doc_id based on content if not provided
if doc_id is None:
doc_id = self._generate_content_based_doc_id(content_list)
# Display content statistics if requested
if display_stats:
self.logger.info("\nContent Information:")
self.logger.info(f"* Total blocks in content_list: {len(content_list)}")
# Count elements by type
block_types: Dict[str, int] = {}
for block in content_list:
if isinstance(block, dict):
block_type = block.get("type", "unknown")
if isinstance(block_type, str):
block_types[block_type] = block_types.get(block_type, 0) + 1
self.logger.info("* Content block types:")
for block_type, count in block_types.items():
self.logger.info(f" - {block_type}: {count}")
# Step 1: Separate text and multimodal content
text_content, multimodal_items = separate_content(content_list)
# Step 1.5: Set content source for context extraction in multimodal processing
if hasattr(self, "set_content_source_for_context") and multimodal_items:
self.logger.info(
"Setting content source for context-aware multimodal processing..."
)
self.set_content_source_for_context(
content_list, self.config.content_format
)
# Step 2: Insert pure text content with all parameters
if text_content.strip():
file_name = os.path.basename(file_path)
await insert_text_content(
self.lightrag,
text_content,
file_paths=file_name,
split_by_character=split_by_character,
split_by_character_only=split_by_character_only,
ids=doc_id,
)
# Step 3: Process multimodal content (using specialized processors)
if multimodal_items:
await self._process_multimodal_content(multimodal_items, file_path, doc_id)
else:
# If no multimodal content, mark as processed to avoid future checks
try:
existing_doc_status = await self.lightrag.doc_status.get_by_id(doc_id)
if existing_doc_status and not existing_doc_status.get(
"multimodal_processed", False
):
existing_multimodal_chunks = existing_doc_status.get(
"multimodal_chunks_list", []
)
await self.lightrag.doc_status.upsert(
{
doc_id: {
**existing_doc_status,
"multimodal_chunks_list": existing_multimodal_chunks,
"multimodal_chunks_count": len(
existing_multimodal_chunks
),
"multimodal_processed": True,
"updated_at": time.strftime("%Y-%m-%dT%H:%M:%S+00:00"),
}
}
)
await self.lightrag.doc_status.index_done_callback()
self.logger.debug(
f"Marked document {doc_id[:8]}... as having no multimodal content"
)
except Exception as e:
self.logger.debug(
f"Error updating doc_status for no multimodal content: {e}"
)
self.logger.info(f"Content list insertion complete for: {file_path}")