Incremental flow rework (#1696)

* Rework update output structure

* Semver

* Fix unit test

* Update frequency in incremental

---------

Co-authored-by: Alonso Guevara <alonsog@microsoft.com>
This commit is contained in:
Nathan Evans
2025-02-13 16:22:32 -08:00
committed by GitHub
parent 5ef2399a6f
commit 35b639399b
13 changed files with 153 additions and 137 deletions

View File

@@ -0,0 +1,4 @@
{
"type": "minor",
"description": "Rework the update output storage structure."
}

View File

@@ -26,6 +26,7 @@ log = logging.getLogger(__name__)
async def build_index(
config: GraphRagConfig,
method: IndexingMethod = IndexingMethod.Standard,
is_update_run: bool = False,
memory_profile: bool = False,
callbacks: list[WorkflowCallbacks] | None = None,
progress_logger: ProgressLogger | None = None,
@@ -50,8 +51,6 @@ async def build_index(
list[PipelineRunResult]
The list of pipeline run results
"""
is_update_run = bool(config.update_index_output)
pipeline_cache = (
NoopPipelineCache() if config.cache.type == CacheType.none is None else None
)

View File

@@ -78,11 +78,13 @@ def index_cli(
if output_dir:
cli_overrides["output.base_dir"] = str(output_dir)
cli_overrides["reporting.base_dir"] = str(output_dir)
cli_overrides["update_index_output.base_dir"] = str(output_dir)
config = load_config(root_dir, config_filepath, cli_overrides)
_run_index(
config=config,
method=method,
is_update_run=False,
verbose=verbose,
memprofile=memprofile,
cache=cache,
@@ -108,21 +110,14 @@ def update_cli(
if output_dir:
cli_overrides["output.base_dir"] = str(output_dir)
cli_overrides["reporting.base_dir"] = str(output_dir)
cli_overrides["update_index_output.base_dir"] = str(output_dir)
config = load_config(root_dir, config_filepath, cli_overrides)
# Check if update output exist, if not configure it with default values
if not config.update_index_output:
from graphrag.config.defaults import OUTPUT_TYPE, UPDATE_OUTPUT_BASE_DIR
from graphrag.config.models.output_config import OutputConfig
config.update_index_output = OutputConfig(
type=OUTPUT_TYPE,
base_dir=UPDATE_OUTPUT_BASE_DIR,
)
_run_index(
config=config,
method=method,
is_update_run=True,
verbose=verbose,
memprofile=memprofile,
cache=cache,
@@ -135,6 +130,7 @@ def update_cli(
def _run_index(
config,
method,
is_update_run,
verbose,
memprofile,
cache,
@@ -176,6 +172,7 @@ def _run_index(
api.build_index(
config=config,
method=method,
is_update_run=is_update_run,
memory_profile=memprofile,
progress_logger=progress_logger,
)

View File

@@ -128,12 +128,11 @@ REPORTING_BASE_DIR = "logs"
SNAPSHOTS_GRAPHML = False
SNAPSHOTS_EMBEDDINGS = False
OUTPUT_BASE_DIR = "output"
OUTPUT_DEFAULT_ID = "default_output"
OUTPUT_TYPE = OutputType.file
UPDATE_OUTPUT_BASE_DIR = "update_output"
SUMMARIZE_DESCRIPTIONS_MAX_LENGTH = 500
SUMMARIZE_MODEL_ID = DEFAULT_CHAT_MODEL_ID
UMAP_ENABLED = False
UPDATE_OUTPUT_BASE_DIR = "update_output"
# Graph Pruning
PRUNE_MIN_NODE_FREQ = 2

View File

@@ -91,12 +91,6 @@ output:
type: {defs.OUTPUT_TYPE.value} # [file, blob, cosmosdb]
base_dir: "{defs.OUTPUT_BASE_DIR}"
## only turn this on if running `graphrag index` with custom settings
## we normally use `graphrag update` with the defaults
update_index_output:
# type: {defs.OUTPUT_TYPE.value} # [file, blob, cosmosdb]
# base_dir: "{defs.UPDATE_OUTPUT_BASE_DIR}"
### Workflow settings ###
extract_graph:

View File

@@ -134,20 +134,20 @@ class GraphRagConfig(BaseModel):
(Path(self.root_dir) / output.base_dir).resolve()
)
update_index_output: OutputConfig | None = Field(
update_index_output: OutputConfig = Field(
description="The output configuration for the updated index.",
default=None,
default=OutputConfig(
type=defs.OUTPUT_TYPE,
base_dir=defs.UPDATE_OUTPUT_BASE_DIR,
),
)
"""The output configuration for the updated index."""
def _validate_update_index_output_base_dir(self) -> None:
"""Validate the update index output base directory."""
if (
self.update_index_output
and self.update_index_output.type == defs.OutputType.file
):
if self.update_index_output.type == defs.OutputType.file:
if self.update_index_output.base_dir.strip() == "":
msg = "Update index output base directory is required for file output. Please rerun `graphrag init` and set the update index output configuration."
msg = "update_index_output base directory is required for file output. Please rerun `graphrag init` and set the update_index_output configuration."
raise ValueError(msg)
self.update_index_output.base_dir = str(
(Path(self.root_dir) / self.update_index_output.base_dir).resolve()

View File

@@ -5,6 +5,7 @@
import json
import logging
import re
import time
import traceback
from collections.abc import AsyncIterable
@@ -31,7 +32,7 @@ from graphrag.logger.null_progress import NullProgressLogger
from graphrag.logger.progress import Progress
from graphrag.storage.factory import StorageFactory
from graphrag.storage.pipeline_storage import PipelineStorage
from graphrag.utils.storage import write_table_to_storage
from graphrag.utils.storage import load_table_from_storage, write_table_to_storage
log = logging.getLogger(__name__)
@@ -66,45 +67,49 @@ async def run_pipeline(
if is_update_run:
progress_logger.info("Running incremental indexing.")
update_storage_config = config.update_index_output.model_dump() # type: ignore
update_index_storage = StorageFactory().create_storage(
storage_type=update_storage_config["type"], # type: ignore
kwargs=update_storage_config,
)
delta_dataset = await get_delta_docs(dataset, storage)
# Fail on empty delta dataset
# warn on empty delta dataset
if delta_dataset.new_inputs.empty:
error_msg = "Incremental Indexing Error: No new documents to process."
raise ValueError(error_msg)
warning_msg = "Incremental indexing found no new documents, exiting."
progress_logger.warning(warning_msg)
else:
update_storage_config = config.update_index_output.model_dump() # type: ignore
update_storage = StorageFactory().create_storage(
storage_type=update_storage_config["type"], # type: ignore
kwargs=update_storage_config,
)
# we use this to store the new subset index, and will merge its content with the previous index
timestamped_storage = update_storage.child(time.strftime("%Y%m%d-%H%M%S"))
delta_storage = timestamped_storage.child("delta")
# copy the previous output to a backup folder, so we can replace it with the update
# we'll read from this later when we merge the old and new indexes
previous_storage = timestamped_storage.child("previous")
await _copy_previous_output(storage, previous_storage)
delta_storage = update_index_storage.child("delta")
# Run the pipeline on the new documents
async for table in _run_pipeline(
pipeline=pipeline,
config=config,
dataset=delta_dataset.new_inputs,
cache=cache,
storage=delta_storage,
callbacks=callback_chain,
logger=progress_logger,
):
yield table
# Run the pipeline on the new documents
tables_dict = {}
async for table in _run_pipeline(
pipeline=pipeline,
config=config,
dataset=delta_dataset.new_inputs,
cache=cache,
storage=delta_storage,
callbacks=callback_chain,
logger=progress_logger,
):
tables_dict[table.workflow] = table.result
progress_logger.success("Finished running workflows on new documents.")
progress_logger.success("Finished running workflows on new documents.")
await update_dataframe_outputs(
dataframe_dict=tables_dict,
storage=storage,
update_storage=update_index_storage,
config=config,
cache=cache,
callbacks=NoopWorkflowCallbacks(),
progress_logger=progress_logger,
)
await update_dataframe_outputs(
previous_storage=previous_storage,
delta_storage=delta_storage,
output_storage=storage,
config=config,
cache=cache,
callbacks=NoopWorkflowCallbacks(),
progress_logger=progress_logger,
)
else:
progress_logger.info("Running standard indexing.")
@@ -172,3 +177,13 @@ async def _dump_stats(stats: PipelineRunStats, storage: PipelineStorage) -> None
await storage.set(
"stats.json", json.dumps(asdict(stats), indent=4, ensure_ascii=False)
)
async def _copy_previous_output(
storage: PipelineStorage,
copy_storage: PipelineStorage,
):
for file in storage.find(re.compile(r"\.parquet$")):
base_name = file[0].replace(".parquet", "")
table = await load_table_from_storage(base_name, storage)
await write_table_to_storage(table, base_name, copy_storage)

View File

@@ -65,10 +65,16 @@ def _group_and_resolve_entities(
"description": lambda x: list(x.astype(str)), # Ensure str
# Concatenate nd.array into a single list
"text_unit_ids": lambda x: list(itertools.chain(*x.tolist())),
"degree": "first", # todo: we could probably re-compute this with the entire new graph
"x": "first",
"y": "first",
})
.reset_index()
)
# recompute frequency to include new text units
aggregated["frequency"] = aggregated["text_unit_ids"].apply(len)
# Force the result into a DataFrame
resolved: pd.DataFrame = pd.DataFrame(aggregated)
@@ -82,6 +88,10 @@ def _group_and_resolve_entities(
"type",
"description",
"text_unit_ids",
"frequency",
"degree",
"x",
"y",
],
]

View File

@@ -80,9 +80,9 @@ async def get_delta_docs(
async def update_dataframe_outputs(
dataframe_dict: dict[str, pd.DataFrame],
storage: PipelineStorage,
update_storage: PipelineStorage,
previous_storage: PipelineStorage,
delta_storage: PipelineStorage,
output_storage: PipelineStorage,
config: GraphRagConfig,
cache: PipelineCache,
callbacks: WorkflowCallbacks,
@@ -92,52 +92,53 @@ async def update_dataframe_outputs(
Parameters
----------
dataframe_dict : dict[str, pd.DataFrame]
The dictionary of dataframes.
storage : PipelineStorage
The storage used to store the dataframes.
previous_storage : PipelineStorage
The storage used to store the dataframes in the original run.
delta_storage : PipelineStorage
The storage used to store the subset of new dataframes in the update run.
output_storage : PipelineStorage
The storage used to store the updated dataframes (the final incremental output).
"""
progress_logger.info("Updating Documents")
final_documents_df = await _concat_dataframes(
"documents", dataframe_dict, storage, update_storage
"documents", previous_storage, delta_storage, output_storage
)
# Update entities and merge them
progress_logger.info("Updating Entities")
merged_entities_df, entity_id_mapping = await _update_entities(
dataframe_dict, storage, update_storage, config, cache, callbacks
previous_storage, delta_storage, output_storage, config, cache, callbacks
)
# Update relationships with the entities id mapping
progress_logger.info("Updating Relationships")
merged_relationships_df = await _update_relationships(
dataframe_dict, storage, update_storage
previous_storage, delta_storage, output_storage
)
# Update and merge final text units
progress_logger.info("Updating Text Units")
merged_text_units = await _update_text_units(
dataframe_dict, storage, update_storage, entity_id_mapping
previous_storage, delta_storage, output_storage, entity_id_mapping
)
# Merge final covariates
if (
await storage_has_table("covariates", storage)
and "covariates" in dataframe_dict
):
if await storage_has_table(
"covariates", previous_storage
) and await storage_has_table("covariates", delta_storage):
progress_logger.info("Updating Covariates")
await _update_covariates(dataframe_dict, storage, update_storage)
await _update_covariates(previous_storage, delta_storage, output_storage)
# Merge final communities
progress_logger.info("Updating Communities")
community_id_mapping = await _update_communities(
dataframe_dict, storage, update_storage
previous_storage, delta_storage, output_storage
)
# Merge community reports
progress_logger.info("Updating Community Reports")
merged_community_reports = await _update_community_reports(
dataframe_dict, storage, update_storage, community_id_mapping
previous_storage, delta_storage, output_storage, community_id_mapping
)
# Generate text embeddings
@@ -152,7 +153,7 @@ async def update_dataframe_outputs(
final_community_reports=merged_community_reports,
callbacks=callbacks,
cache=cache,
storage=update_storage,
storage=output_storage,
text_embed_config=text_embed,
embedded_fields=embedded_fields,
snapshot_embeddings_enabled=config.snapshots.embeddings,
@@ -160,84 +161,85 @@ async def update_dataframe_outputs(
async def _update_community_reports(
dataframe_dict, storage, update_storage, community_id_mapping
previous_storage, delta_storage, output_storage, community_id_mapping
):
"""Update the community reports output."""
old_community_reports = await load_table_from_storage("community_reports", storage)
delta_community_reports = dataframe_dict["community_reports"]
old_community_reports = await load_table_from_storage(
"community_reports", previous_storage
)
delta_community_reports = await load_table_from_storage(
"community_reports", delta_storage
)
merged_community_reports = _update_and_merge_community_reports(
old_community_reports, delta_community_reports, community_id_mapping
)
await write_table_to_storage(
merged_community_reports, "community_reports", update_storage
merged_community_reports, "community_reports", output_storage
)
return merged_community_reports
async def _update_communities(dataframe_dict, storage, update_storage):
async def _update_communities(previous_storage, delta_storage, output_storage):
"""Update the communities output."""
old_communities = await load_table_from_storage("communities", storage)
delta_communities = dataframe_dict["communities"]
old_communities = await load_table_from_storage("communities", previous_storage)
delta_communities = await load_table_from_storage("communities", delta_storage)
merged_communities, community_id_mapping = _update_and_merge_communities(
old_communities, delta_communities
)
await write_table_to_storage(merged_communities, "communities", update_storage)
await write_table_to_storage(merged_communities, "communities", output_storage)
return community_id_mapping
async def _update_covariates(dataframe_dict, storage, update_storage):
async def _update_covariates(previous_storage, delta_storage, output_storage):
"""Update the covariates output."""
old_covariates = await load_table_from_storage("covariates", storage)
delta_covariates = dataframe_dict["covariates"]
old_covariates = await load_table_from_storage("covariates", previous_storage)
delta_covariates = await load_table_from_storage("covariates", delta_storage)
merged_covariates = _merge_covariates(old_covariates, delta_covariates)
await write_table_to_storage(merged_covariates, "covariates", update_storage)
await write_table_to_storage(merged_covariates, "covariates", output_storage)
async def _update_text_units(
dataframe_dict, storage, update_storage, entity_id_mapping
previous_storage, delta_storage, output_storage, entity_id_mapping
):
"""Update the text units output."""
old_text_units = await load_table_from_storage("text_units", storage)
delta_text_units = dataframe_dict["text_units"]
old_text_units = await load_table_from_storage("text_units", previous_storage)
delta_text_units = await load_table_from_storage("text_units", delta_storage)
merged_text_units = _update_and_merge_text_units(
old_text_units, delta_text_units, entity_id_mapping
)
await write_table_to_storage(merged_text_units, "text_units", update_storage)
await write_table_to_storage(merged_text_units, "text_units", output_storage)
return merged_text_units
async def _update_relationships(dataframe_dict, storage, update_storage):
async def _update_relationships(previous_storage, delta_storage, output_storage):
"""Update the relationships output."""
old_relationships = await load_table_from_storage("relationships", storage)
delta_relationships = dataframe_dict["relationships"]
old_relationships = await load_table_from_storage("relationships", previous_storage)
delta_relationships = await load_table_from_storage("relationships", delta_storage)
merged_relationships_df = _update_and_merge_relationships(
old_relationships,
delta_relationships,
)
await write_table_to_storage(
merged_relationships_df, "relationships", update_storage
merged_relationships_df, "relationships", output_storage
)
return merged_relationships_df
async def _update_entities(
dataframe_dict, storage, update_storage, config, cache, callbacks
previous_storage, delta_storage, output_storage, config, cache, callbacks
):
"""Update Final Entities output."""
old_entities = await load_table_from_storage("entities", storage)
delta_entities = dataframe_dict["entities"]
old_entities = await load_table_from_storage("entities", previous_storage)
delta_entities = await load_table_from_storage("entities", delta_storage)
merged_entities_df, entity_id_mapping = _group_and_resolve_entities(
old_entities, delta_entities
@@ -252,30 +254,22 @@ async def _update_entities(
)
# Save the updated entities back to storage
await write_table_to_storage(merged_entities_df, "entities", update_storage)
await write_table_to_storage(merged_entities_df, "entities", output_storage)
return merged_entities_df, entity_id_mapping
async def _concat_dataframes(name, dataframe_dict, storage, update_storage):
"""Concatenate dataframes.
Parameters
----------
name : str
The name of the dataframe to concatenate.
dataframe_dict : dict[str, pd.DataFrame]
The dictionary of dataframes from a pipeline run.
storage : PipelineStorage
The storage used to store the dataframes.
"""
old_df = await load_table_from_storage(name, storage)
delta_df = dataframe_dict[name]
async def _concat_dataframes(name, previous_storage, delta_storage, output_storage):
"""Concatenate dataframes."""
old_df = await load_table_from_storage(name, previous_storage)
delta_df = await load_table_from_storage(name, delta_storage)
# Merge the final documents
final_df = pd.concat([old_df, delta_df], copy=False)
initial_id = old_df["human_readable_id"].max() + 1
delta_df["human_readable_id"] = np.arange(initial_id, initial_id + len(delta_df))
final_df = pd.concat([old_df, delta_df], ignore_index=True, copy=False)
await write_table_to_storage(final_df, name, update_storage)
await write_table_to_storage(final_df, name, output_storage)
return final_df
@@ -329,13 +323,6 @@ def _merge_covariates(
The merged covariates.
"""
# Get the max human readable id from the old covariates and update the delta covariates
old_covariates["human_readable_id"] = old_covariates["human_readable_id"].astype(
int
)
delta_covariates["human_readable_id"] = delta_covariates[
"human_readable_id"
].astype(int)
initial_id = old_covariates["human_readable_id"].max() + 1
delta_covariates["human_readable_id"] = np.arange(
initial_id, initial_id + len(delta_covariates)

View File

@@ -133,7 +133,9 @@ class BlobPipelineStorage(PipelineStorage):
if file_filter is None:
return True
return all(re.match(value, item[key]) for key, value in file_filter.items())
return all(
re.search(value, item[key]) for key, value in file_filter.items()
)
try:
container_client = self._blob_service_client.get_container_client(
@@ -145,7 +147,7 @@ class BlobPipelineStorage(PipelineStorage):
num_total = len(list(all_blobs))
num_filtered = 0
for blob in all_blobs:
match = file_pattern.match(blob.name)
match = file_pattern.search(blob.name)
if match and blob.name.startswith(base_dir):
group = match.groupdict()
if item_filter(group):

View File

@@ -146,7 +146,8 @@ class CosmosDBPipelineStorage(PipelineStorage):
if file_filter is None:
return True
return all(
re.match(value, item.get(key, "")) for key, value in file_filter.items()
re.search(value, item.get(key, ""))
for key, value in file_filter.items()
)
try:
@@ -171,7 +172,7 @@ class CosmosDBPipelineStorage(PipelineStorage):
return
num_filtered = 0
for item in items:
match = file_pattern.match(item["id"])
match = file_pattern.search(item["id"])
if match:
group = match.groupdict()
if item_filter(group):

View File

@@ -51,7 +51,9 @@ class FilePipelineStorage(PipelineStorage):
def item_filter(item: dict[str, Any]) -> bool:
if file_filter is None:
return True
return all(re.match(value, item[key]) for key, value in file_filter.items())
return all(
re.search(value, item[key]) for key, value in file_filter.items()
)
search_path = Path(self._root_dir) / (base_dir or "")
log.info("search %s for files matching %s", search_path, file_pattern.pattern)
@@ -60,7 +62,7 @@ class FilePipelineStorage(PipelineStorage):
num_total = len(all_files)
num_filtered = 0
for file in all_files:
match = file_pattern.match(f"{file}")
match = file_pattern.search(f"{file}")
if match:
group = match.groupdict()
if item_filter(group):

View File

@@ -75,7 +75,13 @@ DEFAULT_GRAPHRAG_CONFIG_SETTINGS = {
"container_name": None,
"storage_account_blob_url": None,
},
"update_index_output": None,
"update_index_output": {
"type": defs.OUTPUT_TYPE,
"base_dir": defs.UPDATE_OUTPUT_BASE_DIR,
"connection_string": None,
"container_name": None,
"storage_account_blob_url": None,
},
"cache": {
"type": defs.CACHE_TYPE,
"base_dir": defs.CACHE_BASE_DIR,