RAGAnything

This commit is contained in:
zrguo
2025-06-07 01:29:42 +08:00
parent 01bd14b78f
commit 9641d55a6d
18 changed files with 2787 additions and 0 deletions

61
.github/ISSUE_TEMPLATE/bug_report.yml vendored Normal file
View File

@@ -0,0 +1,61 @@
name: Bug Report
description: File a bug report
title: "[Bug]:"
labels: ["bug", "triage"]
body:
- type: checkboxes
id: existingcheck
attributes:
label: Do you need to file an issue?
description: Please help us manage our time by avoiding duplicates and common bugs with the steps below.
options:
- label: I have searched the existing issues and this bug is not already filed.
- label: I believe this is a legitimate bug, not just a question or feature request.
- type: textarea
id: description
attributes:
label: Describe the bug
description: A clear and concise description of what the bug is.
placeholder: What went wrong?
- type: textarea
id: reproduce
attributes:
label: Steps to reproduce
description: Steps to reproduce the behavior.
placeholder: How can we replicate the issue?
- type: textarea
id: expected_behavior
attributes:
label: Expected Behavior
description: A clear and concise description of what you expected to happen.
placeholder: What should have happened?
- type: textarea
id: configused
attributes:
label: LightRAG Config Used
description: The LightRAG configuration used for the run.
placeholder: The settings content or LightRAG configuration
value: |
# Paste your config here
- type: textarea
id: screenshotslogs
attributes:
label: Logs and screenshots
description: If applicable, add screenshots and logs to help explain your problem.
placeholder: Add logs and screenshots here
- type: textarea
id: additional_information
attributes:
label: Additional Information
description: |
- LightRAG Version: e.g., v0.1.1
- Operating System: e.g., Windows 10, Ubuntu 20.04
- Python Version: e.g., 3.8
- Related Issues: e.g., #1
- Any other relevant information.
value: |
- LightRAG Version:
- Operating System:
- Python Version:
- Related Issues:

1
.github/ISSUE_TEMPLATE/config.yml vendored Normal file
View File

@@ -0,0 +1 @@
blank_issues_enabled: false

View File

@@ -0,0 +1,26 @@
name: Feature Request
description: File a feature request
labels: ["enhancement"]
title: "[Feature Request]:"
body:
- type: checkboxes
id: existingcheck
attributes:
label: Do you need to file a feature request?
description: Please help us manage our time by avoiding duplicates and common feature request with the steps below.
options:
- label: I have searched the existing feature request and this feature request is not already filed.
- label: I believe this is a legitimate feature request, not just a question or bug.
- type: textarea
id: feature_request_description
attributes:
label: Feature Request Description
description: A clear and concise description of the feature request you would like.
placeholder: What this feature request add more or improve?
- type: textarea
id: additional_context
attributes:
label: Additional Context
description: Add any other context or screenshots about the feature request here.
placeholder: Any additional information

26
.github/ISSUE_TEMPLATE/question.yml vendored Normal file
View File

@@ -0,0 +1,26 @@
name: Question
description: Ask a general question
labels: ["question"]
title: "[Question]:"
body:
- type: checkboxes
id: existingcheck
attributes:
label: Do you need to ask a question?
description: Please help us manage our time by avoiding duplicates and common questions with the steps below.
options:
- label: I have searched the existing question and discussions and this question is not already answered.
- label: I believe this is a legitimate question, not just a bug or feature request.
- type: textarea
id: question
attributes:
label: Your Question
description: A clear and concise description of your question.
placeholder: What is your question?
- type: textarea
id: context
attributes:
label: Additional Context
description: Provide any additional context or details that might help us understand your question better.
placeholder: Add any relevant information here

11
.github/dependabot.yml vendored Normal file
View File

@@ -0,0 +1,11 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for all configuration options:
# https://docs.github.com/code-security/dependabot/dependabot-version-updates/configuration-options-for-the-dependabot.yml-file
version: 2
updates:
- package-ecosystem: "pip" # See documentation for possible values
directory: "/" # Location of package manifests
schedule:
interval: "weekly"

32
.github/pull_request_template.md vendored Normal file
View File

@@ -0,0 +1,32 @@
<!--
Thanks for contributing to RAGAnything!
Please ensure your pull request is ready for review before submitting.
About this template
This template helps contributors provide a clear and concise description of their changes. Feel free to adjust it as needed.
-->
## Description
[Briefly describe the changes made in this pull request.]
## Related Issues
[Reference any related issues or tasks addressed by this pull request.]
## Changes Made
[List the specific changes made in this pull request.]
## Checklist
- [ ] Changes tested locally
- [ ] Code reviewed
- [ ] Documentation updated (if necessary)
- [ ] Unit tests added (if applicable)
## Additional Notes
[Add any additional notes or context for the reviewer(s).]

47
.github/workflows/docker-publish.yml vendored Normal file
View File

@@ -0,0 +1,47 @@
name: Build and Push Docker Image
on:
release:
types: [published]
workflow_dispatch:
permissions:
contents: read
packages: write
jobs:
build-and-push:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Login to GitHub Container Registry
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata for Docker
id: meta
uses: docker/metadata-action@v5
with:
images: ghcr.io/${{ github.repository }}
tags: |
type=semver,pattern={{version}}
type=raw,value=latest,enable={{is_default_branch}}
- name: Build and push Docker image
uses: docker/build-push-action@v5
with:
context: .
platforms: linux/amd64,linux/arm64
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max

30
.github/workflows/linting.yaml vendored Normal file
View File

@@ -0,0 +1,30 @@
name: Linting and Formatting
on:
push:
branches:
- main
pull_request:
branches:
- main
jobs:
lint-and-format:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.x'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pre-commit
- name: Run pre-commit
run: pre-commit run --all-files --show-diff-on-failure

72
.gitignore vendored Normal file
View File

@@ -0,0 +1,72 @@
# Python-related files
__pycache__/
*.py[cod]
*.egg-info/
.eggs/
*.tgz
*.tar.gz
*.ini
# Virtual Environment
.venv/
env/
venv/
*.env*
.env_example
# Build / Distribution
dist/
build/
site/
# Logs / Reports
*.log
*.log.*
*.logfire
*.coverage/
log/
# Caches
.cache/
.mypy_cache/
.pytest_cache/
.ruff_cache/
.gradio/
.history/
temp/
# IDE / Editor Files
.idea/
.vscode/
.vscode/settings.json
# Framework-specific files
local_neo4jWorkDir/
neo4jWorkDir/
# Data & Storage
inputs/
rag_storage/
examples/input/
examples/output/
output*/
# Miscellaneous
.DS_Store
TODO.md
ignore_this.txt
*.ignore.*
# Project-specific files
dickens*/
book.txt
LightRAG.pdf
download_models_hf.py
lightrag-dev/
gui/
# unit-test files
test_*
# Cline files
memory-bank/

28
.pre-commit-config.yaml Normal file
View File

@@ -0,0 +1,28 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v5.0.0
hooks:
- id: trailing-whitespace
exclude: ^lightrag/api/webui/
- id: end-of-file-fixer
exclude: ^lightrag/api/webui/
- id: requirements-txt-fixer
exclude: ^lightrag/api/webui/
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.6.4
hooks:
- id: ruff-format
exclude: ^lightrag/api/webui/
- id: ruff
args: [--fix, --ignore=E402]
exclude: ^lightrag/api/webui/
- repo: https://github.com/mgedmin/check-manifest
rev: "0.49"
hooks:
- id: check-manifest
stages: [manual]
exclude: ^lightrag/api/webui/

166
env.example Normal file
View File

@@ -0,0 +1,166 @@
### This is sample file of .env
### Server Configuration
HOST=0.0.0.0
PORT=9621
WEBUI_TITLE='My Graph KB'
WEBUI_DESCRIPTION="Simple and Fast Graph Based RAG System"
OLLAMA_EMULATING_MODEL_TAG=latest
# WORKERS=2
# CORS_ORIGINS=http://localhost:3000,http://localhost:8080
### Login Configuration
# AUTH_ACCOUNTS='admin:admin123,user1:pass456'
# TOKEN_SECRET=Your-Key-For-LightRAG-API-Server
# TOKEN_EXPIRE_HOURS=48
# GUEST_TOKEN_EXPIRE_HOURS=24
# JWT_ALGORITHM=HS256
### API-Key to access LightRAG Server API
# LIGHTRAG_API_KEY=your-secure-api-key-here
# WHITELIST_PATHS=/health,/api/*
### Optional SSL Configuration
# SSL=true
# SSL_CERTFILE=/path/to/cert.pem
# SSL_KEYFILE=/path/to/key.pem
### Directory Configuration (defaults to current working directory)
### Should not be set if deploy by docker (Set by Dockerfile instead of .env)
### Default value is ./inputs and ./rag_storage
# INPUT_DIR=<absolute_path_for_doc_input_dir>
# WORKING_DIR=<absolute_path_for_working_dir>
### Max nodes return from grap retrieval
# MAX_GRAPH_NODES=1000
### Logging level
# LOG_LEVEL=INFO
# VERBOSE=False
# LOG_MAX_BYTES=10485760
# LOG_BACKUP_COUNT=5
### Logfile location (defaults to current working directory)
# LOG_DIR=/path/to/log/directory
### Settings for RAG query
# HISTORY_TURNS=3
# COSINE_THRESHOLD=0.2
# TOP_K=60
# MAX_TOKEN_TEXT_CHUNK=4000
# MAX_TOKEN_RELATION_DESC=4000
# MAX_TOKEN_ENTITY_DESC=4000
### Entity and ralation summarization configuration
### Language: English, Chinese, French, German ...
SUMMARY_LANGUAGE=English
### Number of duplicated entities/edges to trigger LLM re-summary on merge ( at least 3 is recommented)
# FORCE_LLM_SUMMARY_ON_MERGE=6
### Max tokens for entity/relations description after merge
# MAX_TOKEN_SUMMARY=500
### Number of parallel processing documents(Less than MAX_ASYNC/2 is recommended)
# MAX_PARALLEL_INSERT=2
### Chunk size for document splitting, 500~1500 is recommended
# CHUNK_SIZE=1200
# CHUNK_OVERLAP_SIZE=100
### LLM Configuration
ENABLE_LLM_CACHE=true
ENABLE_LLM_CACHE_FOR_EXTRACT=true
### Time out in seconds for LLM, None for infinite timeout
TIMEOUT=240
### Some models like o1-mini require temperature to be set to 1
TEMPERATURE=0
### Max concurrency requests of LLM
MAX_ASYNC=4
### MAX_TOKENS: max tokens send to LLM for entity relation summaries (less than context size of the model)
### MAX_TOKENS: set as num_ctx option for Ollama by API Server
MAX_TOKENS=32768
### LLM Binding type: openai, ollama, lollms, azure_openai
LLM_BINDING=openai
LLM_MODEL=gpt-4o
LLM_BINDING_HOST=https://api.openai.com/v1
LLM_BINDING_API_KEY=your_api_key
### Optional for Azure
# AZURE_OPENAI_API_VERSION=2024-08-01-preview
# AZURE_OPENAI_DEPLOYMENT=gpt-4o
### Embedding Configuration
### Embedding Binding type: openai, ollama, lollms, azure_openai
EMBEDDING_BINDING=ollama
EMBEDDING_MODEL=bge-m3:latest
EMBEDDING_DIM=1024
EMBEDDING_BINDING_API_KEY=your_api_key
# If the embedding service is deployed within the same Docker stack, use host.docker.internal instead of localhost
EMBEDDING_BINDING_HOST=http://localhost:11434
### Num of chunks send to Embedding in single request
# EMBEDDING_BATCH_NUM=32
### Max concurrency requests for Embedding
# EMBEDDING_FUNC_MAX_ASYNC=16
### Maximum tokens sent to Embedding for each chunk (no longer in use?)
# MAX_EMBED_TOKENS=8192
### Optional for Azure
# AZURE_EMBEDDING_DEPLOYMENT=text-embedding-3-large
# AZURE_EMBEDDING_API_VERSION=2023-05-15
### Data storage selection
# LIGHTRAG_KV_STORAGE=PGKVStorage
# LIGHTRAG_VECTOR_STORAGE=PGVectorStorage
# LIGHTRAG_DOC_STATUS_STORAGE=PGDocStatusStorage
# LIGHTRAG_GRAPH_STORAGE=Neo4JStorage
### TiDB Configuration (Deprecated)
# TIDB_HOST=localhost
# TIDB_PORT=4000
# TIDB_USER=your_username
# TIDB_PASSWORD='your_password'
# TIDB_DATABASE=your_database
### separating all data from difference Lightrag instances(deprecating)
# TIDB_WORKSPACE=default
### PostgreSQL Configuration
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_USER=your_username
POSTGRES_PASSWORD='your_password'
POSTGRES_DATABASE=your_database
POSTGRES_MAX_CONNECTIONS=12
### separating all data from difference Lightrag instances(deprecating)
# POSTGRES_WORKSPACE=default
### Neo4j Configuration
NEO4J_URI=neo4j+s://xxxxxxxx.databases.neo4j.io
NEO4J_USERNAME=neo4j
NEO4J_PASSWORD='your_password'
### Independent AGM Configuration(not for AMG embedded in PostreSQL)
# AGE_POSTGRES_DB=
# AGE_POSTGRES_USER=
# AGE_POSTGRES_PASSWORD=
# AGE_POSTGRES_HOST=
# AGE_POSTGRES_PORT=8529
# AGE Graph Name(apply to PostgreSQL and independent AGM)
### AGE_GRAPH_NAME is precated
# AGE_GRAPH_NAME=lightrag
### MongoDB Configuration
MONGO_URI=mongodb://root:root@localhost:27017/
MONGO_DATABASE=LightRAG
### separating all data from difference Lightrag instances(deprecating)
# MONGODB_GRAPH=false
### Milvus Configuration
MILVUS_URI=http://localhost:19530
MILVUS_DB_NAME=lightrag
# MILVUS_USER=root
# MILVUS_PASSWORD=your_password
# MILVUS_TOKEN=your_token
### Qdrant
QDRANT_URL=http://localhost:16333
# QDRANT_API_KEY=your-api-key
### Redis
REDIS_URI=redis://localhost:6379

224
modalprocessors_example.py Normal file
View File

@@ -0,0 +1,224 @@
"""
Example of directly using modal processors
This example demonstrates how to use LightRAG's modal processors directly without going through MinerU.
"""
import asyncio
import argparse
from lightrag.llm.openai import openai_complete_if_cache, openai_embed
from lightrag.kg.shared_storage import initialize_pipeline_status
from lightrag import LightRAG
from lightrag.modalprocessors import (
ImageModalProcessor,
TableModalProcessor,
EquationModalProcessor,
)
WORKING_DIR = "./rag_storage"
def get_llm_model_func(api_key: str, base_url: str = None):
return (
lambda prompt,
system_prompt=None,
history_messages=[],
**kwargs: openai_complete_if_cache(
"gpt-4o-mini",
prompt,
system_prompt=system_prompt,
history_messages=history_messages,
api_key=api_key,
base_url=base_url,
**kwargs,
)
)
def get_vision_model_func(api_key: str, base_url: str = None):
return (
lambda prompt,
system_prompt=None,
history_messages=[],
image_data=None,
**kwargs: openai_complete_if_cache(
"gpt-4o",
"",
system_prompt=None,
history_messages=[],
messages=[
{"role": "system", "content": system_prompt} if system_prompt else None,
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_data}"
},
},
],
}
if image_data
else {"role": "user", "content": prompt},
],
api_key=api_key,
base_url=base_url,
**kwargs,
)
if image_data
else openai_complete_if_cache(
"gpt-4o-mini",
prompt,
system_prompt=system_prompt,
history_messages=history_messages,
api_key=api_key,
base_url=base_url,
**kwargs,
)
)
async def process_image_example(lightrag: LightRAG, vision_model_func):
"""Example of processing an image"""
# Create image processor
image_processor = ImageModalProcessor(
lightrag=lightrag, modal_caption_func=vision_model_func
)
# Prepare image content
image_content = {
"img_path": "image.jpg",
"img_caption": ["Example image caption"],
"img_footnote": ["Example image footnote"],
}
# Process image
description, entity_info = await image_processor.process_multimodal_content(
modal_content=image_content,
content_type="image",
file_path="image_example.jpg",
entity_name="Example Image",
)
print("Image Processing Results:")
print(f"Description: {description}")
print(f"Entity Info: {entity_info}")
async def process_table_example(lightrag: LightRAG, llm_model_func):
"""Example of processing a table"""
# Create table processor
table_processor = TableModalProcessor(
lightrag=lightrag, modal_caption_func=llm_model_func
)
# Prepare table content
table_content = {
"table_body": """
| Name | Age | Occupation |
|------|-----|------------|
| John | 25 | Engineer |
| Mary | 30 | Designer |
""",
"table_caption": ["Employee Information Table"],
"table_footnote": ["Data updated as of 2024"],
}
# Process table
description, entity_info = await table_processor.process_multimodal_content(
modal_content=table_content,
content_type="table",
file_path="table_example.md",
entity_name="Employee Table",
)
print("\nTable Processing Results:")
print(f"Description: {description}")
print(f"Entity Info: {entity_info}")
async def process_equation_example(lightrag: LightRAG, llm_model_func):
"""Example of processing a mathematical equation"""
# Create equation processor
equation_processor = EquationModalProcessor(
lightrag=lightrag, modal_caption_func=llm_model_func
)
# Prepare equation content
equation_content = {"text": "E = mc^2", "text_format": "LaTeX"}
# Process equation
description, entity_info = await equation_processor.process_multimodal_content(
modal_content=equation_content,
content_type="equation",
file_path="equation_example.txt",
entity_name="Mass-Energy Equivalence",
)
print("\nEquation Processing Results:")
print(f"Description: {description}")
print(f"Entity Info: {entity_info}")
async def initialize_rag(api_key: str, base_url: str = None):
rag = LightRAG(
working_dir=WORKING_DIR,
embedding_func=lambda texts: openai_embed(
texts,
model="text-embedding-3-large",
api_key=api_key,
base_url=base_url,
),
llm_model_func=lambda prompt,
system_prompt=None,
history_messages=[],
**kwargs: openai_complete_if_cache(
"gpt-4o-mini",
prompt,
system_prompt=system_prompt,
history_messages=history_messages,
api_key=api_key,
base_url=base_url,
**kwargs,
),
)
await rag.initialize_storages()
await initialize_pipeline_status()
return rag
def main():
"""Main function to run the example"""
parser = argparse.ArgumentParser(description="Modal Processors Example")
parser.add_argument("--api-key", required=True, help="OpenAI API key")
parser.add_argument("--base-url", help="Optional base URL for API")
parser.add_argument(
"--working-dir", "-w", default=WORKING_DIR, help="Working directory path"
)
args = parser.parse_args()
# Run examples
asyncio.run(main_async(args.api_key, args.base_url))
async def main_async(api_key: str, base_url: str = None):
# Initialize LightRAG
lightrag = await initialize_rag(api_key, base_url)
# Get model functions
llm_model_func = get_llm_model_func(api_key, base_url)
vision_model_func = get_vision_model_func(api_key, base_url)
# Run examples
await process_image_example(lightrag, vision_model_func)
await process_table_example(lightrag, llm_model_func)
await process_equation_example(lightrag, llm_model_func)
if __name__ == "__main__":
main()

5
raganything/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
from .raganything import RAGAnything as RAGAnything
__version__ = "0.0.1"
__author__ = "Zirui Guo"
__url__ = "https://github.com/HKUDS/RAGAnything"

View File

@@ -0,0 +1,513 @@
# type: ignore
"""
MinerU Document Parser Utility
This module provides functionality for parsing PDF, image and office documents using MinerU library,
and converts the parsing results into markdown and JSON formats
"""
from __future__ import annotations
__all__ = ["MineruParser"]
import os
import json
import argparse
from pathlib import Path
from typing import (
Dict,
List,
Optional,
Union,
Tuple,
Any,
TypeVar,
cast,
TYPE_CHECKING,
ClassVar,
)
# Type stubs for magic_pdf
FileBasedDataWriter = Any
FileBasedDataReader = Any
PymuDocDataset = Any
InferResult = Any
PipeResult = Any
SupportedPdfParseMethod = Any
doc_analyze = Any
read_local_office = Any
read_local_images = Any
if TYPE_CHECKING:
from magic_pdf.data.data_reader_writer import (
FileBasedDataWriter,
FileBasedDataReader,
)
from magic_pdf.data.dataset import PymuDocDataset
from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
from magic_pdf.config.enums import SupportedPdfParseMethod
from magic_pdf.data.read_api import read_local_office, read_local_images
else:
# MinerU imports
from magic_pdf.data.data_reader_writer import (
FileBasedDataWriter,
FileBasedDataReader,
)
from magic_pdf.data.dataset import PymuDocDataset
from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
from magic_pdf.config.enums import SupportedPdfParseMethod
from magic_pdf.data.read_api import read_local_office, read_local_images
T = TypeVar("T")
class MineruParser:
"""
MinerU document parsing utility class
Supports parsing PDF, image and office documents (like Word, PPT, etc.),
converting the content into structured data and generating markdown and JSON output
"""
__slots__: ClassVar[Tuple[str, ...]] = ()
def __init__(self) -> None:
"""Initialize MineruParser"""
pass
@staticmethod
def safe_write(
writer: Any,
content: Union[str, bytes, Dict[str, Any], List[Any]],
filename: str,
) -> None:
"""
Safely write content to a file, ensuring the filename is valid
Args:
writer: The writer object to use
content: The content to write
filename: The filename to write to
"""
# Ensure the filename isn't too long
if len(filename) > 200: # Most filesystems have limits around 255 characters
# Truncate the filename while keeping the extension
base, ext = os.path.splitext(filename)
filename = base[:190] + ext # Leave room for the extension and some margin
# Handle specific content types
if isinstance(content, str):
# Ensure str content is encoded to bytes if required
try:
writer.write(content, filename)
except TypeError:
# If the writer expects bytes, convert string to bytes
writer.write(content.encode("utf-8"), filename)
else:
# For dict/list content, always encode as JSON string first
if isinstance(content, (dict, list)):
try:
writer.write(
json.dumps(content, ensure_ascii=False, indent=4), filename
)
except TypeError:
# If the writer expects bytes, convert JSON string to bytes
writer.write(
json.dumps(content, ensure_ascii=False, indent=4).encode(
"utf-8"
),
filename,
)
else:
# Regular content (assumed to be bytes or compatible)
writer.write(content, filename)
@staticmethod
def parse_pdf(
pdf_path: Union[str, Path],
output_dir: Optional[str] = None,
use_ocr: bool = False,
) -> Tuple[List[Dict[str, Any]], str]:
"""
Parse PDF document
Args:
pdf_path: Path to the PDF file
output_dir: Output directory path
use_ocr: Whether to force OCR parsing
Returns:
Tuple[List[Dict[str, Any]], str]: Tuple containing (content list JSON, Markdown text)
"""
try:
# Convert to Path object for easier handling
pdf_path = Path(pdf_path)
name_without_suff = pdf_path.stem
# Prepare output directories - ensure file name is in path
if output_dir:
base_output_dir = Path(output_dir)
local_md_dir = base_output_dir / name_without_suff
else:
local_md_dir = pdf_path.parent / name_without_suff
local_image_dir = local_md_dir / "images"
image_dir = local_image_dir.name
# Create directories
os.makedirs(local_image_dir, exist_ok=True)
os.makedirs(local_md_dir, exist_ok=True)
# Initialize writers and reader
image_writer = FileBasedDataWriter(str(local_image_dir)) # type: ignore
md_writer = FileBasedDataWriter(str(local_md_dir)) # type: ignore
reader = FileBasedDataReader("") # type: ignore
# Read PDF bytes
pdf_bytes = reader.read(str(pdf_path)) # type: ignore
# Create dataset instance
ds = PymuDocDataset(pdf_bytes) # type: ignore
# Process based on PDF type and user preference
if use_ocr or ds.classify() == SupportedPdfParseMethod.OCR: # type: ignore
infer_result = ds.apply(doc_analyze, ocr=True) # type: ignore
pipe_result = infer_result.pipe_ocr_mode(image_writer) # type: ignore
else:
infer_result = ds.apply(doc_analyze, ocr=False) # type: ignore
pipe_result = infer_result.pipe_txt_mode(image_writer) # type: ignore
# Draw visualizations
try:
infer_result.draw_model(
os.path.join(local_md_dir, f"{name_without_suff}_model.pdf")
) # type: ignore
pipe_result.draw_layout(
os.path.join(local_md_dir, f"{name_without_suff}_layout.pdf")
) # type: ignore
pipe_result.draw_span(
os.path.join(local_md_dir, f"{name_without_suff}_spans.pdf")
) # type: ignore
except Exception as e:
print(f"Warning: Failed to draw visualizations: {str(e)}")
# Get data using API methods
md_content = pipe_result.get_markdown(image_dir) # type: ignore
content_list = pipe_result.get_content_list(image_dir) # type: ignore
# Save files using dump methods (consistent with API)
pipe_result.dump_md(md_writer, f"{name_without_suff}.md", image_dir) # type: ignore
pipe_result.dump_content_list(
md_writer, f"{name_without_suff}_content_list.json", image_dir
) # type: ignore
pipe_result.dump_middle_json(md_writer, f"{name_without_suff}_middle.json") # type: ignore
# Save model result - convert JSON string to bytes before writing
model_inference_result = infer_result.get_infer_res() # type: ignore
json_str = json.dumps(model_inference_result, ensure_ascii=False, indent=4)
try:
# Try to write to a file manually to avoid FileBasedDataWriter issues
model_file_path = os.path.join(
local_md_dir, f"{name_without_suff}_model.json"
)
with open(model_file_path, "w", encoding="utf-8") as f:
f.write(json_str)
except Exception as e:
print(
f"Warning: Failed to save model result using file write: {str(e)}"
)
try:
# If direct file write fails, try using the writer with bytes encoding
md_writer.write(
json_str.encode("utf-8"), f"{name_without_suff}_model.json"
) # type: ignore
except Exception as e2:
print(
f"Warning: Failed to save model result using writer: {str(e2)}"
)
return cast(Tuple[List[Dict[str, Any]], str], (content_list, md_content))
except Exception as e:
print(f"Error in parse_pdf: {str(e)}")
raise
@staticmethod
def parse_office_doc(
doc_path: Union[str, Path], output_dir: Optional[str] = None
) -> Tuple[List[Dict[str, Any]], str]:
"""
Parse office document (Word, PPT, etc.)
Args:
doc_path: Path to the document file
output_dir: Output directory path
Returns:
Tuple[List[Dict[str, Any]], str]: Tuple containing (content list JSON, Markdown text)
"""
try:
# Convert to Path object for easier handling
doc_path = Path(doc_path)
name_without_suff = doc_path.stem
# Prepare output directories - ensure file name is in path
if output_dir:
base_output_dir = Path(output_dir)
local_md_dir = base_output_dir / name_without_suff
else:
local_md_dir = doc_path.parent / name_without_suff
local_image_dir = local_md_dir / "images"
image_dir = local_image_dir.name
# Create directories
os.makedirs(local_image_dir, exist_ok=True)
os.makedirs(local_md_dir, exist_ok=True)
# Initialize writers
image_writer = FileBasedDataWriter(str(local_image_dir)) # type: ignore
md_writer = FileBasedDataWriter(str(local_md_dir)) # type: ignore
# Read office document
ds = read_local_office(str(doc_path))[0] # type: ignore
# Apply chain of operations according to API documentation
# This follows the pattern shown in MS-Office example in the API docs
ds.apply(doc_analyze, ocr=True).pipe_txt_mode(image_writer).dump_md(
md_writer, f"{name_without_suff}.md", image_dir
) # type: ignore
# Re-execute for getting the content data
infer_result = ds.apply(doc_analyze, ocr=True) # type: ignore
pipe_result = infer_result.pipe_txt_mode(image_writer) # type: ignore
# Get data for return values and additional outputs
md_content = pipe_result.get_markdown(image_dir) # type: ignore
content_list = pipe_result.get_content_list(image_dir) # type: ignore
# Save additional output files
pipe_result.dump_content_list(
md_writer, f"{name_without_suff}_content_list.json", image_dir
) # type: ignore
pipe_result.dump_middle_json(md_writer, f"{name_without_suff}_middle.json") # type: ignore
# Save model result - convert JSON string to bytes before writing
model_inference_result = infer_result.get_infer_res() # type: ignore
json_str = json.dumps(model_inference_result, ensure_ascii=False, indent=4)
try:
# Try to write to a file manually to avoid FileBasedDataWriter issues
model_file_path = os.path.join(
local_md_dir, f"{name_without_suff}_model.json"
)
with open(model_file_path, "w", encoding="utf-8") as f:
f.write(json_str)
except Exception as e:
print(
f"Warning: Failed to save model result using file write: {str(e)}"
)
try:
# If direct file write fails, try using the writer with bytes encoding
md_writer.write(
json_str.encode("utf-8"), f"{name_without_suff}_model.json"
) # type: ignore
except Exception as e2:
print(
f"Warning: Failed to save model result using writer: {str(e2)}"
)
return cast(Tuple[List[Dict[str, Any]], str], (content_list, md_content))
except Exception as e:
print(f"Error in parse_office_doc: {str(e)}")
raise
@staticmethod
def parse_image(
image_path: Union[str, Path], output_dir: Optional[str] = None
) -> Tuple[List[Dict[str, Any]], str]:
"""
Parse image document
Args:
image_path: Path to the image file
output_dir: Output directory path
Returns:
Tuple[List[Dict[str, Any]], str]: Tuple containing (content list JSON, Markdown text)
"""
try:
# Convert to Path object for easier handling
image_path = Path(image_path)
name_without_suff = image_path.stem
# Prepare output directories - ensure file name is in path
if output_dir:
base_output_dir = Path(output_dir)
local_md_dir = base_output_dir / name_without_suff
else:
local_md_dir = image_path.parent / name_without_suff
local_image_dir = local_md_dir / "images"
image_dir = local_image_dir.name
# Create directories
os.makedirs(local_image_dir, exist_ok=True)
os.makedirs(local_md_dir, exist_ok=True)
# Initialize writers
image_writer = FileBasedDataWriter(str(local_image_dir)) # type: ignore
md_writer = FileBasedDataWriter(str(local_md_dir)) # type: ignore
# Read image
ds = read_local_images(str(image_path))[0] # type: ignore
# Apply chain of operations according to API documentation
# This follows the pattern shown in Image example in the API docs
ds.apply(doc_analyze, ocr=True).pipe_ocr_mode(image_writer).dump_md(
md_writer, f"{name_without_suff}.md", image_dir
) # type: ignore
# Re-execute for getting the content data
infer_result = ds.apply(doc_analyze, ocr=True) # type: ignore
pipe_result = infer_result.pipe_ocr_mode(image_writer) # type: ignore
# Get data for return values and additional outputs
md_content = pipe_result.get_markdown(image_dir) # type: ignore
content_list = pipe_result.get_content_list(image_dir) # type: ignore
# Save additional output files
pipe_result.dump_content_list(
md_writer, f"{name_without_suff}_content_list.json", image_dir
) # type: ignore
pipe_result.dump_middle_json(md_writer, f"{name_without_suff}_middle.json") # type: ignore
# Save model result - convert JSON string to bytes before writing
model_inference_result = infer_result.get_infer_res() # type: ignore
json_str = json.dumps(model_inference_result, ensure_ascii=False, indent=4)
try:
# Try to write to a file manually to avoid FileBasedDataWriter issues
model_file_path = os.path.join(
local_md_dir, f"{name_without_suff}_model.json"
)
with open(model_file_path, "w", encoding="utf-8") as f:
f.write(json_str)
except Exception as e:
print(
f"Warning: Failed to save model result using file write: {str(e)}"
)
try:
# If direct file write fails, try using the writer with bytes encoding
md_writer.write(
json_str.encode("utf-8"), f"{name_without_suff}_model.json"
) # type: ignore
except Exception as e2:
print(
f"Warning: Failed to save model result using writer: {str(e2)}"
)
return cast(Tuple[List[Dict[str, Any]], str], (content_list, md_content))
except Exception as e:
print(f"Error in parse_image: {str(e)}")
raise
@staticmethod
def parse_document(
file_path: Union[str, Path],
parse_method: str = "auto",
output_dir: Optional[str] = None,
save_results: bool = True,
) -> Tuple[List[Dict[str, Any]], str]:
"""
Parse document using MinerU based on file extension
Args:
file_path: Path to the file to be parsed
parse_method: Parsing method, supports "auto", "ocr", "txt", default is "auto"
output_dir: Output directory path, if None, use the directory of the input file
save_results: Whether to save parsing results to files
Returns:
Tuple[List[Dict[str, Any]], str]: Tuple containing (content list JSON, Markdown text)
"""
# Convert to Path object
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"File does not exist: {file_path}")
# Get file extension
ext = file_path.suffix.lower()
# Choose appropriate parser based on file type
if ext in [".pdf"]:
return MineruParser.parse_pdf(
file_path, output_dir, use_ocr=(parse_method == "ocr")
)
elif ext in [".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif"]:
return MineruParser.parse_image(file_path, output_dir)
elif ext in [".doc", ".docx", ".ppt", ".pptx"]:
return MineruParser.parse_office_doc(file_path, output_dir)
else:
# For unsupported file types, default to PDF parsing
print(
f"Warning: Unsupported file extension '{ext}', trying generic PDF parser"
)
return MineruParser.parse_pdf(
file_path, output_dir, use_ocr=(parse_method == "ocr")
)
def main():
"""
Main function to run the MinerU parser from command line
"""
parser = argparse.ArgumentParser(description="Parse documents using MinerU")
parser.add_argument("file_path", help="Path to the document to parse")
parser.add_argument("--output", "-o", help="Output directory path")
parser.add_argument(
"--method",
"-m",
choices=["auto", "ocr", "txt"],
default="auto",
help="Parsing method (auto, ocr, txt)",
)
parser.add_argument(
"--stats", action="store_true", help="Display content statistics"
)
args = parser.parse_args()
try:
# Parse the document
content_list, md_content = MineruParser.parse_document(
file_path=args.file_path, parse_method=args.method, output_dir=args.output
)
# Display statistics if requested
if args.stats:
print("\nDocument Statistics:")
print(f"Total content blocks: {len(content_list)}")
# Count different types of content
content_types = {}
for item in content_list:
content_type = item.get("type", "unknown")
content_types[content_type] = content_types.get(content_type, 0) + 1
print("\nContent Type Distribution:")
for content_type, count in content_types.items():
print(f"- {content_type}: {count}")
except Exception as e:
print(f"Error: {str(e)}")
return 1
return 0
if __name__ == "__main__":
exit(main())

View File

@@ -0,0 +1,699 @@
"""
Specialized processors for different modalities
Includes:
- ImageModalProcessor: Specialized processor for image content
- TableModalProcessor: Specialized processor for table content
- EquationModalProcessor: Specialized processor for equation content
- GenericModalProcessor: Processor for other modal content
"""
import re
import json
import time
import asyncio
import base64
from typing import Dict, Any, Tuple, cast
from pathlib import Path
from lightrag.base import StorageNameSpace
from lightrag.utils import (
logger,
compute_mdhash_id,
)
from lightrag.lightrag import LightRAG
from dataclasses import asdict
from lightrag.kg.shared_storage import get_namespace_data, get_pipeline_status_lock
class BaseModalProcessor:
"""Base class for modal processors"""
def __init__(self, lightrag: LightRAG, modal_caption_func):
"""Initialize base processor
Args:
lightrag: LightRAG instance
modal_caption_func: Function for generating descriptions
"""
self.lightrag = lightrag
self.modal_caption_func = modal_caption_func
# Use LightRAG's storage instances
self.text_chunks_db = lightrag.text_chunks
self.chunks_vdb = lightrag.chunks_vdb
self.entities_vdb = lightrag.entities_vdb
self.relationships_vdb = lightrag.relationships_vdb
self.knowledge_graph_inst = lightrag.chunk_entity_relation_graph
# Use LightRAG's configuration and functions
self.embedding_func = lightrag.embedding_func
self.llm_model_func = lightrag.llm_model_func
self.global_config = asdict(lightrag)
self.hashing_kv = lightrag.llm_response_cache
self.tokenizer = lightrag.tokenizer
async def process_multimodal_content(
self,
modal_content,
content_type: str,
file_path: str = "manual_creation",
entity_name: str = None,
) -> Tuple[str, Dict[str, Any]]:
"""Process multimodal content"""
# Subclasses need to implement specific processing logic
raise NotImplementedError("Subclasses must implement this method")
async def _create_entity_and_chunk(
self, modal_chunk: str, entity_info: Dict[str, Any], file_path: str
) -> Tuple[str, Dict[str, Any]]:
"""Create entity and text chunk"""
# Create chunk
chunk_id = compute_mdhash_id(str(modal_chunk), prefix="chunk-")
tokens = len(self.tokenizer.encode(modal_chunk))
chunk_data = {
"tokens": tokens,
"content": modal_chunk,
"chunk_order_index": 0,
"full_doc_id": chunk_id,
"file_path": file_path,
}
# Store chunk
await self.text_chunks_db.upsert({chunk_id: chunk_data})
# Create entity node
node_data = {
"entity_id": entity_info["entity_name"],
"entity_type": entity_info["entity_type"],
"description": entity_info["summary"],
"source_id": chunk_id,
"file_path": file_path,
"created_at": int(time.time()),
}
await self.knowledge_graph_inst.upsert_node(
entity_info["entity_name"], node_data
)
# Insert entity into vector database
entity_vdb_data = {
compute_mdhash_id(entity_info["entity_name"], prefix="ent-"): {
"entity_name": entity_info["entity_name"],
"entity_type": entity_info["entity_type"],
"content": f"{entity_info['entity_name']}\n{entity_info['summary']}",
"source_id": chunk_id,
"file_path": file_path,
}
}
await self.entities_vdb.upsert(entity_vdb_data)
# Process entity and relationship extraction
await self._process_chunk_for_extraction(chunk_id, entity_info["entity_name"])
# Ensure all storage updates are complete
await self._insert_done()
return entity_info["summary"], {
"entity_name": entity_info["entity_name"],
"entity_type": entity_info["entity_type"],
"description": entity_info["summary"],
"chunk_id": chunk_id,
}
async def _process_chunk_for_extraction(
self, chunk_id: str, modal_entity_name: str
):
"""Process chunk for entity and relationship extraction"""
chunk_data = await self.text_chunks_db.get_by_id(chunk_id)
if not chunk_data:
logger.error(f"Chunk {chunk_id} not found")
return
# Create text chunk for vector database
chunk_vdb_data = {
chunk_id: {
"content": chunk_data["content"],
"full_doc_id": chunk_id,
"tokens": chunk_data["tokens"],
"chunk_order_index": chunk_data["chunk_order_index"],
"file_path": chunk_data["file_path"],
}
}
await self.chunks_vdb.upsert(chunk_vdb_data)
# Trigger extraction process
from lightrag.operate import extract_entities, merge_nodes_and_edges
pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_pipeline_status_lock()
# Prepare chunk for extraction
chunks = {chunk_id: chunk_data}
# Extract entities and relationships
chunk_results = await extract_entities(
chunks=chunks,
global_config=self.global_config,
pipeline_status=pipeline_status,
pipeline_status_lock=pipeline_status_lock,
llm_response_cache=self.hashing_kv,
)
# Add "belongs_to" relationships for all extracted entities
for maybe_nodes, _ in chunk_results:
for entity_name in maybe_nodes.keys():
if entity_name != modal_entity_name: # Skip self-relationship
# Create belongs_to relationship
relation_data = {
"description": f"Entity {entity_name} belongs to {modal_entity_name}",
"keywords": "belongs_to,part_of,contained_in",
"source_id": chunk_id,
"weight": 10.0,
"file_path": chunk_data.get("file_path", "manual_creation"),
}
await self.knowledge_graph_inst.upsert_edge(
entity_name, modal_entity_name, relation_data
)
relation_id = compute_mdhash_id(
entity_name + modal_entity_name, prefix="rel-"
)
relation_vdb_data = {
relation_id: {
"src_id": entity_name,
"tgt_id": modal_entity_name,
"keywords": relation_data["keywords"],
"content": f"{relation_data['keywords']}\t{entity_name}\n{modal_entity_name}\n{relation_data['description']}",
"source_id": chunk_id,
"file_path": chunk_data.get("file_path", "manual_creation"),
}
}
await self.relationships_vdb.upsert(relation_vdb_data)
await merge_nodes_and_edges(
chunk_results=chunk_results,
knowledge_graph_inst=self.knowledge_graph_inst,
entity_vdb=self.entities_vdb,
relationships_vdb=self.relationships_vdb,
global_config=self.global_config,
pipeline_status=pipeline_status,
pipeline_status_lock=pipeline_status_lock,
llm_response_cache=self.hashing_kv,
)
async def _insert_done(self) -> None:
await asyncio.gather(
*[
cast(StorageNameSpace, storage_inst).index_done_callback()
for storage_inst in [
self.text_chunks_db,
self.chunks_vdb,
self.entities_vdb,
self.relationships_vdb,
self.knowledge_graph_inst,
]
]
)
class ImageModalProcessor(BaseModalProcessor):
"""Processor specialized for image content"""
def __init__(self, lightrag: LightRAG, modal_caption_func):
"""Initialize image processor
Args:
lightrag: LightRAG instance
modal_caption_func: Function for generating descriptions (supporting image understanding)
"""
super().__init__(lightrag, modal_caption_func)
def _encode_image_to_base64(self, image_path: str) -> str:
"""Encode image to base64"""
try:
with open(image_path, "rb") as image_file:
encoded_string = base64.b64encode(image_file.read()).decode("utf-8")
return encoded_string
except Exception as e:
logger.error(f"Failed to encode image {image_path}: {e}")
return ""
async def process_multimodal_content(
self,
modal_content,
content_type: str,
file_path: str = "manual_creation",
entity_name: str = None,
) -> Tuple[str, Dict[str, Any]]:
"""Process image content"""
try:
# Parse image content
if isinstance(modal_content, str):
try:
content_data = json.loads(modal_content)
except json.JSONDecodeError:
content_data = {"description": modal_content}
else:
content_data = modal_content
image_path = content_data.get("img_path")
captions = content_data.get("img_caption", [])
footnotes = content_data.get("img_footnote", [])
# Build detailed visual analysis prompt
vision_prompt = f"""Please analyze this image in detail and provide a JSON response with the following structure:
{{
"detailed_description": "A comprehensive and detailed visual description of the image following these guidelines:
- Describe the overall composition and layout
- Identify all objects, people, text, and visual elements
- Explain relationships between elements
- Note colors, lighting, and visual style
- Describe any actions or activities shown
- Include technical details if relevant (charts, diagrams, etc.)
- Always use specific names instead of pronouns",
"entity_info": {{
"entity_name": "{entity_name if entity_name else 'unique descriptive name for this image'}",
"entity_type": "image",
"summary": "concise summary of the image content and its significance (max 100 words)"
}}
}}
Additional context:
- Image Path: {image_path}
- Captions: {captions if captions else 'None'}
- Footnotes: {footnotes if footnotes else 'None'}
Focus on providing accurate, detailed visual analysis that would be useful for knowledge retrieval."""
# If image path exists, try to encode image
image_base64 = ""
if image_path and Path(image_path).exists():
image_base64 = self._encode_image_to_base64(image_path)
# Call vision model
if image_base64:
# Use real image for analysis
response = await self.modal_caption_func(
vision_prompt,
image_data=image_base64,
system_prompt="You are an expert image analyst. Provide detailed, accurate descriptions.",
)
else:
# Analyze based on existing text information
text_prompt = f"""Based on the following image information, provide analysis:
Image Path: {image_path}
Captions: {captions}
Footnotes: {footnotes}
{vision_prompt}"""
response = await self.modal_caption_func(
text_prompt,
system_prompt="You are an expert image analyst. Provide detailed analysis based on available information.",
)
# Parse response
enhanced_caption, entity_info = self._parse_response(response, entity_name)
# Build complete image content
modal_chunk = f"""
Image Content Analysis:
Image Path: {image_path}
Captions: {', '.join(captions) if captions else 'None'}
Footnotes: {', '.join(footnotes) if footnotes else 'None'}
Visual Analysis: {enhanced_caption}"""
return await self._create_entity_and_chunk(
modal_chunk, entity_info, file_path
)
except Exception as e:
logger.error(f"Error processing image content: {e}")
# Fallback processing
fallback_entity = {
"entity_name": entity_name
if entity_name
else f"image_{compute_mdhash_id(str(modal_content))}",
"entity_type": "image",
"summary": f"Image content: {str(modal_content)[:100]}",
}
return str(modal_content), fallback_entity
def _parse_response(
self, response: str, entity_name: str = None
) -> Tuple[str, Dict[str, Any]]:
"""Parse model response"""
try:
response_data = json.loads(
re.search(r"\{.*\}", response, re.DOTALL).group(0)
)
description = response_data.get("detailed_description", "")
entity_data = response_data.get("entity_info", {})
if not description or not entity_data:
raise ValueError("Missing required fields in response")
if not all(
key in entity_data for key in ["entity_name", "entity_type", "summary"]
):
raise ValueError("Missing required fields in entity_info")
entity_data["entity_name"] = (
entity_data["entity_name"] + f" ({entity_data['entity_type']})"
)
if entity_name:
entity_data["entity_name"] = entity_name
return description, entity_data
except (json.JSONDecodeError, AttributeError, ValueError) as e:
logger.error(f"Error parsing image analysis response: {e}")
fallback_entity = {
"entity_name": entity_name
if entity_name
else f"image_{compute_mdhash_id(response)}",
"entity_type": "image",
"summary": response[:100] + "..." if len(response) > 100 else response,
}
return response, fallback_entity
class TableModalProcessor(BaseModalProcessor):
"""Processor specialized for table content"""
async def process_multimodal_content(
self,
modal_content,
content_type: str,
file_path: str = "manual_creation",
entity_name: str = None,
) -> Tuple[str, Dict[str, Any]]:
"""Process table content"""
# Parse table content
if isinstance(modal_content, str):
try:
content_data = json.loads(modal_content)
except json.JSONDecodeError:
content_data = {"table_body": modal_content}
else:
content_data = modal_content
table_img_path = content_data.get("img_path")
table_caption = content_data.get("table_caption", [])
table_body = content_data.get("table_body", "")
table_footnote = content_data.get("table_footnote", [])
# Build table analysis prompt
table_prompt = f"""Please analyze this table content and provide a JSON response with the following structure:
{{
"detailed_description": "A comprehensive analysis of the table including:
- Table structure and organization
- Column headers and their meanings
- Key data points and patterns
- Statistical insights and trends
- Relationships between data elements
- Significance of the data presented
Always use specific names and values instead of general references.",
"entity_info": {{
"entity_name": "{entity_name if entity_name else 'descriptive name for this table'}",
"entity_type": "table",
"summary": "concise summary of the table's purpose and key findings (max 100 words)"
}}
}}
Table Information:
Image Path: {table_img_path}
Caption: {table_caption if table_caption else 'None'}
Body: {table_body}
Footnotes: {table_footnote if table_footnote else 'None'}
Focus on extracting meaningful insights and relationships from the tabular data."""
response = await self.modal_caption_func(
table_prompt,
system_prompt="You are an expert data analyst. Provide detailed table analysis with specific insights.",
)
# Parse response
enhanced_caption, entity_info = self._parse_table_response(
response, entity_name
)
# TODO: Add Retry Mechanism
# Build complete table content
modal_chunk = f"""Table Analysis:
Image Path: {table_img_path}
Caption: {', '.join(table_caption) if table_caption else 'None'}
Structure: {table_body}
Footnotes: {', '.join(table_footnote) if table_footnote else 'None'}
Analysis: {enhanced_caption}"""
return await self._create_entity_and_chunk(modal_chunk, entity_info, file_path)
def _parse_table_response(
self, response: str, entity_name: str = None
) -> Tuple[str, Dict[str, Any]]:
"""Parse table analysis response"""
try:
response_data = json.loads(
re.search(r"\{.*\}", response, re.DOTALL).group(0)
)
description = response_data.get("detailed_description", "")
entity_data = response_data.get("entity_info", {})
if not description or not entity_data:
raise ValueError("Missing required fields in response")
if not all(
key in entity_data for key in ["entity_name", "entity_type", "summary"]
):
raise ValueError("Missing required fields in entity_info")
entity_data["entity_name"] = (
entity_data["entity_name"] + f" ({entity_data['entity_type']})"
)
if entity_name:
entity_data["entity_name"] = entity_name
return description, entity_data
except (json.JSONDecodeError, AttributeError, ValueError) as e:
logger.error(f"Error parsing table analysis response: {e}")
fallback_entity = {
"entity_name": entity_name
if entity_name
else f"table_{compute_mdhash_id(response)}",
"entity_type": "table",
"summary": response[:100] + "..." if len(response) > 100 else response,
}
return response, fallback_entity
class EquationModalProcessor(BaseModalProcessor):
"""Processor specialized for equation content"""
async def process_multimodal_content(
self,
modal_content,
content_type: str,
file_path: str = "manual_creation",
entity_name: str = None,
) -> Tuple[str, Dict[str, Any]]:
"""Process equation content"""
# Parse equation content
if isinstance(modal_content, str):
try:
content_data = json.loads(modal_content)
except json.JSONDecodeError:
content_data = {"equation": modal_content}
else:
content_data = modal_content
equation_text = content_data.get("text")
equation_format = content_data.get("text_format", "")
# Build equation analysis prompt
equation_prompt = f"""Please analyze this mathematical equation and provide a JSON response with the following structure:
{{
"detailed_description": "A comprehensive analysis of the equation including:
- Mathematical meaning and interpretation
- Variables and their definitions
- Mathematical operations and functions used
- Application domain and context
- Physical or theoretical significance
- Relationship to other mathematical concepts
- Practical applications or use cases
Always use specific mathematical terminology.",
"entity_info": {{
"entity_name": "{entity_name if entity_name else 'descriptive name for this equation'}",
"entity_type": "equation",
"summary": "concise summary of the equation's purpose and significance (max 100 words)"
}}
}}
Equation Information:
Equation: {equation_text}
Format: {equation_format}
Focus on providing mathematical insights and explaining the equation's significance."""
response = await self.modal_caption_func(
equation_prompt,
system_prompt="You are an expert mathematician. Provide detailed mathematical analysis.",
)
# Parse response
enhanced_caption, entity_info = self._parse_equation_response(
response, entity_name
)
# Build complete equation content
modal_chunk = f"""Mathematical Equation Analysis:
Equation: {equation_text}
Format: {equation_format}
Mathematical Analysis: {enhanced_caption}"""
return await self._create_entity_and_chunk(modal_chunk, entity_info, file_path)
def _parse_equation_response(
self, response: str, entity_name: str = None
) -> Tuple[str, Dict[str, Any]]:
"""Parse equation analysis response"""
try:
response_data = json.loads(
re.search(r"\{.*\}", response, re.DOTALL).group(0)
)
description = response_data.get("detailed_description", "")
entity_data = response_data.get("entity_info", {})
if not description or not entity_data:
raise ValueError("Missing required fields in response")
if not all(
key in entity_data for key in ["entity_name", "entity_type", "summary"]
):
raise ValueError("Missing required fields in entity_info")
entity_data["entity_name"] = (
entity_data["entity_name"] + f" ({entity_data['entity_type']})"
)
if entity_name:
entity_data["entity_name"] = entity_name
return description, entity_data
except (json.JSONDecodeError, AttributeError, ValueError) as e:
logger.error(f"Error parsing equation analysis response: {e}")
fallback_entity = {
"entity_name": entity_name
if entity_name
else f"equation_{compute_mdhash_id(response)}",
"entity_type": "equation",
"summary": response[:100] + "..." if len(response) > 100 else response,
}
return response, fallback_entity
class GenericModalProcessor(BaseModalProcessor):
"""Generic processor for other types of modal content"""
async def process_multimodal_content(
self,
modal_content,
content_type: str,
file_path: str = "manual_creation",
entity_name: str = None,
) -> Tuple[str, Dict[str, Any]]:
"""Process generic modal content"""
# Build generic analysis prompt
generic_prompt = f"""Please analyze this {content_type} content and provide a JSON response with the following structure:
{{
"detailed_description": "A comprehensive analysis of the content including:
- Content structure and organization
- Key information and elements
- Relationships between components
- Context and significance
- Relevant details for knowledge retrieval
Always use specific terminology appropriate for {content_type} content.",
"entity_info": {{
"entity_name": "{entity_name if entity_name else f'descriptive name for this {content_type}'}",
"entity_type": "{content_type}",
"summary": "concise summary of the content's purpose and key points (max 100 words)"
}}
}}
Content: {str(modal_content)}
Focus on extracting meaningful information that would be useful for knowledge retrieval."""
response = await self.modal_caption_func(
generic_prompt,
system_prompt=f"You are an expert content analyst specializing in {content_type} content.",
)
# Parse response
enhanced_caption, entity_info = self._parse_generic_response(
response, entity_name, content_type
)
# Build complete content
modal_chunk = f"""{content_type.title()} Content Analysis:
Content: {str(modal_content)}
Analysis: {enhanced_caption}"""
return await self._create_entity_and_chunk(modal_chunk, entity_info, file_path)
def _parse_generic_response(
self, response: str, entity_name: str = None, content_type: str = "content"
) -> Tuple[str, Dict[str, Any]]:
"""Parse generic analysis response"""
try:
response_data = json.loads(
re.search(r"\{.*\}", response, re.DOTALL).group(0)
)
description = response_data.get("detailed_description", "")
entity_data = response_data.get("entity_info", {})
if not description or not entity_data:
raise ValueError("Missing required fields in response")
if not all(
key in entity_data for key in ["entity_name", "entity_type", "summary"]
):
raise ValueError("Missing required fields in entity_info")
entity_data["entity_name"] = (
entity_data["entity_name"] + f" ({entity_data['entity_type']})"
)
if entity_name:
entity_data["entity_name"] = entity_name
return description, entity_data
except (json.JSONDecodeError, AttributeError, ValueError) as e:
logger.error(f"Error parsing generic analysis response: {e}")
fallback_entity = {
"entity_name": entity_name
if entity_name
else f"{content_type}_{compute_mdhash_id(response)}",
"entity_type": content_type,
"summary": response[:100] + "..." if len(response) > 100 else response,
}
return response, fallback_entity

686
raganything/raganything.py Normal file
View File

@@ -0,0 +1,686 @@
"""
Complete MinerU parsing + multimodal content insertion Pipeline
This script integrates:
1. MinerU document parsing
2. Pure text content LightRAG insertion
3. Specialized processing for multimodal content (using different processors)
"""
import os
import asyncio
import logging
from pathlib import Path
from typing import Dict, List, Any, Tuple, Optional, Callable
import sys
# Add project root directory to Python path
sys.path.insert(0, str(Path(__file__).parent.parent))
from lightrag import LightRAG, QueryParam
from lightrag.utils import EmbeddingFunc, setup_logger
# Import parser and multimodal processors
from lightrag.mineru_parser import MineruParser
# Import specialized processors
from lightrag.modalprocessors import (
ImageModalProcessor,
TableModalProcessor,
EquationModalProcessor,
GenericModalProcessor,
)
class RAGAnything:
"""Multimodal Document Processing Pipeline - Complete document parsing and insertion pipeline"""
def __init__(
self,
lightrag: Optional[LightRAG] = None,
llm_model_func: Optional[Callable] = None,
vision_model_func: Optional[Callable] = None,
embedding_func: Optional[Callable] = None,
working_dir: str = "./rag_storage",
embedding_dim: int = 3072,
max_token_size: int = 8192,
):
"""
Initialize Multimodal Document Processing Pipeline
Args:
lightrag: Optional pre-initialized LightRAG instance
llm_model_func: LLM model function for text analysis
vision_model_func: Vision model function for image analysis
embedding_func: Embedding function for text vectorization
working_dir: Working directory for storage (used when creating new RAG)
embedding_dim: Embedding dimension (used when creating new RAG)
max_token_size: Maximum token size for embeddings (used when creating new RAG)
"""
self.working_dir = working_dir
self.llm_model_func = llm_model_func
self.vision_model_func = vision_model_func
self.embedding_func = embedding_func
self.embedding_dim = embedding_dim
self.max_token_size = max_token_size
# Set up logging
setup_logger("RAGAnything")
self.logger = logging.getLogger("RAGAnything")
# Create working directory if needed
if not os.path.exists(working_dir):
os.makedirs(working_dir)
# Use provided LightRAG or mark for later initialization
self.lightrag = lightrag
self.modal_processors = {}
# If LightRAG is provided, initialize processors immediately
if self.lightrag is not None:
self._initialize_processors()
def _initialize_processors(self):
"""Initialize multimodal processors with appropriate model functions"""
if self.lightrag is None:
raise ValueError(
"LightRAG instance must be initialized before creating processors"
)
# Create different multimodal processors
self.modal_processors = {
"image": ImageModalProcessor(
lightrag=self.lightrag,
modal_caption_func=self.vision_model_func or self.llm_model_func,
),
"table": TableModalProcessor(
lightrag=self.lightrag, modal_caption_func=self.llm_model_func
),
"equation": EquationModalProcessor(
lightrag=self.lightrag, modal_caption_func=self.llm_model_func
),
"generic": GenericModalProcessor(
lightrag=self.lightrag, modal_caption_func=self.llm_model_func
),
}
self.logger.info("Multimodal processors initialized")
self.logger.info(f"Available processors: {list(self.modal_processors.keys())}")
async def _ensure_lightrag_initialized(self):
"""Ensure LightRAG instance is initialized, create if necessary"""
if self.lightrag is not None:
return
# Validate required functions
if self.llm_model_func is None:
raise ValueError(
"llm_model_func must be provided when LightRAG is not pre-initialized"
)
if self.embedding_func is None:
raise ValueError(
"embedding_func must be provided when LightRAG is not pre-initialized"
)
from lightrag.kg.shared_storage import initialize_pipeline_status
# Create LightRAG instance with provided functions
self.lightrag = LightRAG(
working_dir=self.working_dir,
llm_model_func=self.llm_model_func,
embedding_func=EmbeddingFunc(
embedding_dim=self.embedding_dim,
max_token_size=self.max_token_size,
func=self.embedding_func,
),
)
await self.lightrag.initialize_storages()
await initialize_pipeline_status()
# Initialize processors after LightRAG is ready
self._initialize_processors()
self.logger.info("LightRAG and multimodal processors initialized")
def parse_document(
self,
file_path: str,
output_dir: str = "./output",
parse_method: str = "auto",
display_stats: bool = True,
) -> Tuple[List[Dict[str, Any]], str]:
"""
Parse document using MinerU
Args:
file_path: Path to the file to parse
output_dir: Output directory
parse_method: Parse method ("auto", "ocr", "txt")
display_stats: Whether to display content statistics
Returns:
(content_list, md_content): Content list and markdown text
"""
self.logger.info(f"Starting document parsing: {file_path}")
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
# Choose appropriate parsing method based on file extension
ext = file_path.suffix.lower()
try:
if ext in [".pdf"]:
self.logger.info(
f"Detected PDF file, using PDF parser (OCR={parse_method == 'ocr'})..."
)
content_list, md_content = MineruParser.parse_pdf(
file_path, output_dir, use_ocr=(parse_method == "ocr")
)
elif ext in [".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif"]:
self.logger.info("Detected image file, using image parser...")
content_list, md_content = MineruParser.parse_image(
file_path, output_dir
)
elif ext in [".doc", ".docx", ".ppt", ".pptx"]:
self.logger.info("Detected Office document, using Office parser...")
content_list, md_content = MineruParser.parse_office_doc(
file_path, output_dir
)
else:
# For other or unknown formats, use generic parser
self.logger.info(
f"Using generic parser for {ext} file (method={parse_method})..."
)
content_list, md_content = MineruParser.parse_document(
file_path, parse_method=parse_method, output_dir=output_dir
)
except Exception as e:
self.logger.error(f"Error during parsing with specific parser: {str(e)}")
self.logger.warning("Falling back to generic parser...")
# If specific parser fails, fall back to generic parser
content_list, md_content = MineruParser.parse_document(
file_path, parse_method=parse_method, output_dir=output_dir
)
self.logger.info(
f"Parsing complete! Extracted {len(content_list)} content blocks"
)
self.logger.info(f"Markdown text length: {len(md_content)} characters")
# Display content statistics if requested
if display_stats:
self.logger.info("\nContent Information:")
self.logger.info(f"* Total blocks in content_list: {len(content_list)}")
self.logger.info(f"* Markdown content length: {len(md_content)} characters")
# Count elements by type
block_types: Dict[str, int] = {}
for block in content_list:
if isinstance(block, dict):
block_type = block.get("type", "unknown")
if isinstance(block_type, str):
block_types[block_type] = block_types.get(block_type, 0) + 1
self.logger.info("* Content block types:")
for block_type, count in block_types.items():
self.logger.info(f" - {block_type}: {count}")
return content_list, md_content
def _separate_content(
self, content_list: List[Dict[str, Any]]
) -> Tuple[str, List[Dict[str, Any]]]:
"""
Separate text content and multimodal content
Args:
content_list: Content list from MinerU parsing
Returns:
(text_content, multimodal_items): Pure text content and multimodal items list
"""
text_parts = []
multimodal_items = []
for item in content_list:
content_type = item.get("type", "text")
if content_type == "text":
# Text content
text = item.get("text", "")
if text.strip():
text_parts.append(text)
else:
# Multimodal content (image, table, equation, etc.)
multimodal_items.append(item)
# Merge all text content
text_content = "\n\n".join(text_parts)
self.logger.info("Content separation complete:")
self.logger.info(f" - Text content length: {len(text_content)} characters")
self.logger.info(f" - Multimodal items count: {len(multimodal_items)}")
# Count multimodal types
modal_types = {}
for item in multimodal_items:
modal_type = item.get("type", "unknown")
modal_types[modal_type] = modal_types.get(modal_type, 0) + 1
if modal_types:
self.logger.info(f" - Multimodal type distribution: {modal_types}")
return text_content, multimodal_items
async def _insert_text_content(
self,
input: str | list[str],
split_by_character: str | None = None,
split_by_character_only: bool = False,
ids: str | list[str] | None = None,
file_paths: str | list[str] | None = None,
):
"""
Insert pure text content into LightRAG
Args:
input: Single document string or list of document strings
split_by_character: if split_by_character is not None, split the string by character, if chunk longer than
chunk_token_size, it will be split again by token size.
split_by_character_only: if split_by_character_only is True, split the string by character only, when
split_by_character is None, this parameter is ignored.
ids: single string of the document ID or list of unique document IDs, if not provided, MD5 hash IDs will be generated
file_paths: single string of the file path or list of file paths, used for citation
"""
self.logger.info("Starting text content insertion into LightRAG...")
# Use LightRAG's insert method with all parameters
await self.lightrag.ainsert(
input=input,
file_paths=file_paths,
split_by_character=split_by_character,
split_by_character_only=split_by_character_only,
ids=ids,
)
self.logger.info("Text content insertion complete")
async def _process_multimodal_content(
self, multimodal_items: List[Dict[str, Any]], file_path: str
):
"""
Process multimodal content (using specialized processors)
Args:
multimodal_items: List of multimodal items
file_path: File path (for reference)
"""
if not multimodal_items:
self.logger.debug("No multimodal content to process")
return
self.logger.info("Starting multimodal content processing...")
file_name = os.path.basename(file_path)
for i, item in enumerate(multimodal_items):
try:
content_type = item.get("type", "unknown")
self.logger.info(
f"Processing item {i+1}/{len(multimodal_items)}: {content_type} content"
)
# Select appropriate processor
processor = self._get_processor_for_type(content_type)
if processor:
(
enhanced_caption,
entity_info,
) = await processor.process_multimodal_content(
modal_content=item,
content_type=content_type,
file_path=file_name,
)
self.logger.info(
f"{content_type} processing complete: {entity_info.get('entity_name', 'Unknown')}"
)
else:
self.logger.warning(
f"No suitable processor found for {content_type} type content"
)
except Exception as e:
self.logger.error(f"Error processing multimodal content: {str(e)}")
self.logger.debug("Exception details:", exc_info=True)
continue
self.logger.info("Multimodal content processing complete")
def _get_processor_for_type(self, content_type: str):
"""
Get appropriate processor based on content type
Args:
content_type: Content type
Returns:
Corresponding processor instance
"""
# Direct mapping to corresponding processor
if content_type == "image":
return self.modal_processors.get("image")
elif content_type == "table":
return self.modal_processors.get("table")
elif content_type == "equation":
return self.modal_processors.get("equation")
else:
# For other types, use generic processor
return self.modal_processors.get("generic")
async def process_document_complete(
self,
file_path: str,
output_dir: str = "./output",
parse_method: str = "auto",
display_stats: bool = True,
split_by_character: str | None = None,
split_by_character_only: bool = False,
doc_id: str | None = None,
):
"""
Complete document processing workflow
Args:
file_path: Path to the file to process
output_dir: MinerU output directory
parse_method: Parse method
display_stats: Whether to display content statistics
split_by_character: Optional character to split the text by
split_by_character_only: If True, split only by the specified character
doc_id: Optional document ID, if not provided MD5 hash will be generated
"""
# Ensure LightRAG is initialized
await self._ensure_lightrag_initialized()
self.logger.info(f"Starting complete document processing: {file_path}")
# Step 1: Parse document using MinerU
content_list, md_content = self.parse_document(
file_path, output_dir, parse_method, display_stats
)
# Step 2: Separate text and multimodal content
text_content, multimodal_items = self._separate_content(content_list)
# Step 3: Insert pure text content with all parameters
if text_content.strip():
file_name = os.path.basename(file_path)
await self._insert_text_content(
text_content,
file_paths=file_name,
split_by_character=split_by_character,
split_by_character_only=split_by_character_only,
ids=doc_id,
)
# Step 4: Process multimodal content (using specialized processors)
if multimodal_items:
await self._process_multimodal_content(multimodal_items, file_path)
self.logger.info(f"Document {file_path} processing complete!")
async def process_folder_complete(
self,
folder_path: str,
output_dir: str = "./output",
parse_method: str = "auto",
display_stats: bool = False,
split_by_character: str | None = None,
split_by_character_only: bool = False,
file_extensions: Optional[List[str]] = None,
recursive: bool = True,
max_workers: int = 1,
):
"""
Process all files in a folder in batch
Args:
folder_path: Path to the folder to process
output_dir: MinerU output directory
parse_method: Parse method
display_stats: Whether to display content statistics for each file (recommended False for batch processing)
split_by_character: Optional character to split text by
split_by_character_only: If True, split only by the specified character
file_extensions: List of file extensions to process, e.g. [".pdf", ".docx"]. If None, process all supported formats
recursive: Whether to recursively process subfolders
max_workers: Maximum number of concurrent workers
"""
# Ensure LightRAG is initialized
await self._ensure_lightrag_initialized()
folder_path = Path(folder_path)
if not folder_path.exists() or not folder_path.is_dir():
raise ValueError(
f"Folder does not exist or is not a valid directory: {folder_path}"
)
# Supported file formats
supported_extensions = {
".pdf",
".jpg",
".jpeg",
".png",
".bmp",
".tiff",
".tif",
".doc",
".docx",
".ppt",
".pptx",
".txt",
".md",
}
# Use specified extensions or all supported formats
if file_extensions:
target_extensions = set(ext.lower() for ext in file_extensions)
# Validate if all are supported formats
unsupported = target_extensions - supported_extensions
if unsupported:
self.logger.warning(
f"The following file formats may not be fully supported: {unsupported}"
)
else:
target_extensions = supported_extensions
# Collect all files to process
files_to_process = []
if recursive:
# Recursively traverse all subfolders
for file_path in folder_path.rglob("*"):
if (
file_path.is_file()
and file_path.suffix.lower() in target_extensions
):
files_to_process.append(file_path)
else:
# Process only current folder
for file_path in folder_path.glob("*"):
if (
file_path.is_file()
and file_path.suffix.lower() in target_extensions
):
files_to_process.append(file_path)
if not files_to_process:
self.logger.info(f"No files to process found in {folder_path}")
return
self.logger.info(f"Found {len(files_to_process)} files to process")
self.logger.info("File type distribution:")
# Count file types
file_type_count = {}
for file_path in files_to_process:
ext = file_path.suffix.lower()
file_type_count[ext] = file_type_count.get(ext, 0) + 1
for ext, count in sorted(file_type_count.items()):
self.logger.info(f" {ext}: {count} files")
# Create progress tracking
processed_count = 0
failed_files = []
# Use semaphore to control concurrency
semaphore = asyncio.Semaphore(max_workers)
async def process_single_file(file_path: Path, index: int) -> None:
"""Process a single file"""
async with semaphore:
nonlocal processed_count
try:
self.logger.info(
f"[{index}/{len(files_to_process)}] Processing: {file_path}"
)
# Create separate output directory for each file
file_output_dir = Path(output_dir) / file_path.stem
file_output_dir.mkdir(parents=True, exist_ok=True)
# Process file
await self.process_document_complete(
file_path=str(file_path),
output_dir=str(file_output_dir),
parse_method=parse_method,
display_stats=display_stats,
split_by_character=split_by_character,
split_by_character_only=split_by_character_only,
)
processed_count += 1
self.logger.info(
f"[{index}/{len(files_to_process)}] Successfully processed: {file_path}"
)
except Exception as e:
self.logger.error(
f"[{index}/{len(files_to_process)}] Failed to process: {file_path}"
)
self.logger.error(f"Error: {str(e)}")
failed_files.append((file_path, str(e)))
# Create all processing tasks
tasks = []
for index, file_path in enumerate(files_to_process, 1):
task = process_single_file(file_path, index)
tasks.append(task)
# Wait for all tasks to complete
await asyncio.gather(*tasks, return_exceptions=True)
# Output processing statistics
self.logger.info("\n===== Batch Processing Complete =====")
self.logger.info(f"Total files: {len(files_to_process)}")
self.logger.info(f"Successfully processed: {processed_count}")
self.logger.info(f"Failed: {len(failed_files)}")
if failed_files:
self.logger.info("\nFailed files:")
for file_path, error in failed_files:
self.logger.info(f" - {file_path}: {error}")
return {
"total": len(files_to_process),
"success": processed_count,
"failed": len(failed_files),
"failed_files": failed_files,
}
async def query_with_multimodal(self, query: str, mode: str = "hybrid") -> str:
"""
Query with multimodal content support
Args:
query: Query content
mode: Query mode
Returns:
Query result
"""
if self.lightrag is None:
raise ValueError(
"No LightRAG instance available. "
"Please either:\n"
"1. Provide a pre-initialized LightRAG instance when creating RAGAnything, or\n"
"2. Process documents first using process_document_complete() or process_folder_complete() "
"to create and populate the LightRAG instance."
)
result = await self.lightrag.aquery(query, param=QueryParam(mode=mode))
return result
def get_processor_info(self) -> Dict[str, Any]:
"""Get processor information"""
if not self.modal_processors:
return {"status": "Not initialized"}
info = {
"status": "Initialized",
"processors": {},
"models": {
"llm_model": "External function"
if self.llm_model_func
else "Not provided",
"vision_model": "External function"
if self.vision_model_func
else "Not provided",
"embedding_model": "External function"
if self.embedding_func
else "Not provided",
},
}
for proc_type, processor in self.modal_processors.items():
info["processors"][proc_type] = {
"class": processor.__class__.__name__,
"supports": self._get_processor_supports(proc_type),
}
return info
def _get_processor_supports(self, proc_type: str) -> List[str]:
"""Get processor supported features"""
supports_map = {
"image": [
"Image content analysis",
"Visual understanding",
"Image description generation",
"Image entity extraction",
],
"table": [
"Table structure analysis",
"Data statistics",
"Trend identification",
"Table entity extraction",
],
"equation": [
"Mathematical formula parsing",
"Variable identification",
"Formula meaning explanation",
"Formula entity extraction",
],
"generic": [
"General content analysis",
"Structured processing",
"Entity extraction",
],
}
return supports_map.get(proc_type, ["Basic processing"])

154
raganything_example.py Normal file
View File

@@ -0,0 +1,154 @@
#!/usr/bin/env python
"""
Example script demonstrating the integration of MinerU parser with RAGAnything
This example shows how to:
1. Process parsed documents with RAGAnything
2. Perform multimodal queries on the processed documents
3. Handle different types of content (text, images, tables)
"""
import os
import argparse
import asyncio
from lightrag.llm.openai import openai_complete_if_cache, openai_embed
from lightrag.raganything import RAGAnything
async def process_with_rag(
file_path: str,
output_dir: str,
api_key: str,
base_url: str = None,
working_dir: str = None,
):
"""
Process document with RAGAnything
Args:
file_path: Path to the document
output_dir: Output directory for RAG results
api_key: OpenAI API key
base_url: Optional base URL for API
"""
try:
# Initialize RAGAnything
rag = RAGAnything(
working_dir=working_dir,
llm_model_func=lambda prompt,
system_prompt=None,
history_messages=[],
**kwargs: openai_complete_if_cache(
"gpt-4o-mini",
prompt,
system_prompt=system_prompt,
history_messages=history_messages,
api_key=api_key,
base_url=base_url,
**kwargs,
),
vision_model_func=lambda prompt,
system_prompt=None,
history_messages=[],
image_data=None,
**kwargs: openai_complete_if_cache(
"gpt-4o",
"",
system_prompt=None,
history_messages=[],
messages=[
{"role": "system", "content": system_prompt}
if system_prompt
else None,
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_data}"
},
},
],
}
if image_data
else {"role": "user", "content": prompt},
],
api_key=api_key,
base_url=base_url,
**kwargs,
)
if image_data
else openai_complete_if_cache(
"gpt-4o-mini",
prompt,
system_prompt=system_prompt,
history_messages=history_messages,
api_key=api_key,
base_url=base_url,
**kwargs,
),
embedding_func=lambda texts: openai_embed(
texts,
model="text-embedding-3-large",
api_key=api_key,
base_url=base_url,
),
embedding_dim=3072,
max_token_size=8192,
)
# Process document
await rag.process_document_complete(
file_path=file_path, output_dir=output_dir, parse_method="auto"
)
# Example queries
queries = [
"What is the main content of the document?",
"Describe the images and figures in the document",
"Tell me about the experimental results and data tables",
]
print("\nQuerying processed document:")
for query in queries:
print(f"\nQuery: {query}")
result = await rag.query_with_multimodal(query, mode="hybrid")
print(f"Answer: {result}")
except Exception as e:
print(f"Error processing with RAG: {str(e)}")
def main():
"""Main function to run the example"""
parser = argparse.ArgumentParser(description="MinerU RAG Example")
parser.add_argument("file_path", help="Path to the document to process")
parser.add_argument(
"--working_dir", "-w", default="./rag_storage", help="Working directory path"
)
parser.add_argument(
"--output", "-o", default="./output", help="Output directory path"
)
parser.add_argument(
"--api-key", required=True, help="OpenAI API key for RAG processing"
)
parser.add_argument("--base-url", help="Optional base URL for API")
args = parser.parse_args()
# Create output directory if specified
if args.output:
os.makedirs(args.output, exist_ok=True)
# Process with RAG
asyncio.run(
process_with_rag(
args.file_path, args.output, args.api_key, args.base_url, args.working_dir
)
)
if __name__ == "__main__":
main()

6
requirements.txt Normal file
View File

@@ -0,0 +1,6 @@
# LightRAG packages
lightrag-hku
# MinerU packages
magic-pdf[full]>=1.2.2
huggingface_hub