adding DocStatus class in base.py
This commit is contained in:
@@ -1,8 +1,8 @@
|
||||
from dataclasses import dataclass, field
|
||||
from typing import TypedDict, Union, Literal, Generic, TypeVar
|
||||
from enum import Enum
|
||||
from typing import Any, TypedDict, Optional, Union, Literal, Generic, TypeVar
|
||||
import os
|
||||
import numpy as np
|
||||
|
||||
from .utils import EmbeddingFunc
|
||||
|
||||
TextChunkSchema = TypedDict(
|
||||
@@ -138,3 +138,52 @@ class BaseGraphStorage(StorageNameSpace):
|
||||
|
||||
async def embed_nodes(self, algorithm: str) -> tuple[np.ndarray, list[str]]:
|
||||
raise NotImplementedError("Node embedding is not used in minirag.")
|
||||
|
||||
|
||||
class DocStatus(str, Enum):
|
||||
"""Document processing status enum"""
|
||||
|
||||
PENDING = "pending"
|
||||
PROCESSING = "processing"
|
||||
PROCESSED = "processed"
|
||||
FAILED = "failed"
|
||||
|
||||
|
||||
@dataclass
|
||||
class DocProcessingStatus:
|
||||
"""Document processing status data structure"""
|
||||
|
||||
content: str
|
||||
"""Original content of the document"""
|
||||
content_summary: str
|
||||
"""First 100 chars of document content, used for preview"""
|
||||
content_length: int
|
||||
"""Total length of document"""
|
||||
status: DocStatus
|
||||
"""Current processing status"""
|
||||
created_at: str
|
||||
"""ISO format timestamp when document was created"""
|
||||
updated_at: str
|
||||
"""ISO format timestamp when document was last updated"""
|
||||
chunks_count: Optional[int] = None
|
||||
"""Number of chunks after splitting, used for processing"""
|
||||
error: Optional[str] = None
|
||||
"""Error message if failed"""
|
||||
metadata: dict[str, Any] = field(default_factory=dict)
|
||||
"""Additional metadata"""
|
||||
|
||||
|
||||
class DocStatusStorage(BaseKVStorage):
|
||||
"""Base class for document status storage"""
|
||||
|
||||
async def get_status_counts(self) -> dict[str, int]:
|
||||
"""Get counts of documents in each status"""
|
||||
raise NotImplementedError
|
||||
|
||||
async def get_failed_docs(self) -> dict[str, DocProcessingStatus]:
|
||||
"""Get all failed documents"""
|
||||
raise NotImplementedError
|
||||
|
||||
async def get_pending_docs(self) -> dict[str, DocProcessingStatus]:
|
||||
"""Get all pending documents"""
|
||||
raise NotImplementedError
|
||||
|
||||
Reference in New Issue
Block a user