From 84253a369c9a0f0a0678b3823fafef032c9e8d74 Mon Sep 17 00:00:00 2001 From: KartDriver Date: Tue, 12 Aug 2025 12:44:57 -0500 Subject: [PATCH] fix: add waitForIndexReady and loadCollectionWithRetry for milvus --- .../core/src/embedding/openai-embedding.ts | 12 + packages/core/src/vectordb/milvus-vectordb.ts | 210 +++++++++++++++--- 2 files changed, 195 insertions(+), 27 deletions(-) diff --git a/packages/core/src/embedding/openai-embedding.ts b/packages/core/src/embedding/openai-embedding.ts index 6defeaf..b0a8663 100644 --- a/packages/core/src/embedding/openai-embedding.ts +++ b/packages/core/src/embedding/openai-embedding.ts @@ -115,6 +115,18 @@ export class OpenAIEmbedding extends Embedding { } getDimension(): number { + // For custom models, we need to detect the dimension first + const model = this.config.model || 'text-embedding-3-small'; + const knownModels = OpenAIEmbedding.getSupportedModels(); + + // If it's a known model, return its known dimension + if (knownModels[model]) { + return knownModels[model].dimension; + } + + // For custom models, return the current dimension + // Note: This may be incorrect until detectDimension() is called + console.warn(`[OpenAIEmbedding] getDimension() called for custom model '${model}' - returning ${this.dimension}. Call detectDimension() first for accurate dimension.`); return this.dimension; } diff --git a/packages/core/src/vectordb/milvus-vectordb.ts b/packages/core/src/vectordb/milvus-vectordb.ts index 6a69d5b..d489b21 100644 --- a/packages/core/src/vectordb/milvus-vectordb.ts +++ b/packages/core/src/vectordb/milvus-vectordb.ts @@ -1,4 +1,4 @@ -import { MilvusClient, DataType, MetricType, FunctionType, LoadState } from '@zilliz/milvus2-sdk-node'; +import { MilvusClient, DataType, MetricType, FunctionType, LoadState, IndexState } from '@zilliz/milvus2-sdk-node'; import { VectorDocument, SearchOptions, @@ -103,15 +103,19 @@ export class MilvusVectorDatabase implements VectorDatabase { * Ensure collection is loaded before search/query operations */ protected async ensureLoaded(collectionName: string): Promise { + if (!this.client) { + throw new Error('MilvusClient is not initialized. Call ensureInitialized() first.'); + } + try { // Check if collection is loaded - const result = await this.client!.getLoadState({ + const result = await this.client.getLoadState({ collection_name: collectionName }); if (result.state !== LoadState.LoadStateLoaded) { console.log(`🔄 Loading collection '${collectionName}' to memory...`); - await this.client!.loadCollection({ + await this.client.loadCollection({ collection_name: collectionName, }); } @@ -121,6 +125,106 @@ export class MilvusVectorDatabase implements VectorDatabase { } } + /** + * Wait for an index to be ready before proceeding + * Polls index status with exponential backoff up to 30 seconds + */ + protected async waitForIndexReady(collectionName: string, fieldName: string): Promise { + if (!this.client) { + throw new Error('MilvusClient is not initialized. Call ensureInitialized() first.'); + } + + const maxWaitTime = 60000; // 60 seconds + const initialInterval = 500; // 500ms + const maxInterval = 5000; // 5 seconds + const backoffMultiplier = 1.5; + + let interval = initialInterval; + const startTime = Date.now(); + + console.log(`⏳ Waiting for index on field '${fieldName}' in collection '${collectionName}' to be ready...`); + + while (Date.now() - startTime < maxWaitTime) { + try { + const indexStateResult = await this.client.getIndexState({ + collection_name: collectionName, + field_name: fieldName + }); + + // Debug logging to understand the state value + console.log(`📊 Index state for '${fieldName}': raw=${indexStateResult.state}, type=${typeof indexStateResult.state}, IndexState.Finished=${IndexState.Finished}`); + console.log(`📊 Full response:`, JSON.stringify(indexStateResult)); + + // Check both numeric and potential string values + // Cast to any to bypass TypeScript checks while debugging + const stateValue = indexStateResult.state as any; + if (stateValue === IndexState.Finished || + stateValue === 3 || + stateValue === 'Finished') { + console.log(`✅ Index on field '${fieldName}' is ready!`); + return; + } + + if (indexStateResult.state === IndexState.Failed) { + throw new Error(`Index creation failed for field '${fieldName}' in collection '${collectionName}'`); + } + + // Wait with exponential backoff + await new Promise(resolve => setTimeout(resolve, interval)); + interval = Math.min(interval * backoffMultiplier, maxInterval); + + } catch (error) { + console.error(`❌ Error checking index state for field '${fieldName}':`, error); + throw error; + } + } + + throw new Error(`Timeout waiting for index on field '${fieldName}' in collection '${collectionName}' to be ready after ${maxWaitTime}ms`); + } + + /** + * Load collection with retry logic and exponential backoff + * Retries up to 5 times with exponential backoff + */ + protected async loadCollectionWithRetry(collectionName: string): Promise { + if (!this.client) { + throw new Error('MilvusClient is not initialized. Call ensureInitialized() first.'); + } + + const maxRetries = 5; + const initialInterval = 1000; // 1 second + const backoffMultiplier = 2; + + let attempt = 1; + let interval = initialInterval; + + while (attempt <= maxRetries) { + try { + console.log(`🔄 Loading collection '${collectionName}' to memory (attempt ${attempt}/${maxRetries})...`); + + await this.client.loadCollection({ + collection_name: collectionName, + }); + + console.log(`✅ Collection '${collectionName}' loaded successfully!`); + return; + + } catch (error) { + console.error(`❌ Failed to load collection '${collectionName}' on attempt ${attempt}:`, error); + + if (attempt === maxRetries) { + throw new Error(`Failed to load collection '${collectionName}' after ${maxRetries} attempts: ${error}`); + } + + // Wait with exponential backoff before retry + console.log(`⏳ Retrying collection load in ${interval}ms...`); + await new Promise(resolve => setTimeout(resolve, interval)); + interval *= backoffMultiplier; + attempt++; + } + } + } + async createCollection(collectionName: string, dimension: number, description?: string): Promise { await this.ensureInitialized(); @@ -182,7 +286,11 @@ export class MilvusVectorDatabase implements VectorDatabase { fields: schema, }; - await createCollectionWithLimitCheck(this.client!, createCollectionParams); + if (!this.client) { + throw new Error('MilvusClient is not initialized. Call ensureInitialized() first.'); + } + + await createCollectionWithLimitCheck(this.client, createCollectionParams); // Create index const indexParams = { @@ -192,15 +300,17 @@ export class MilvusVectorDatabase implements VectorDatabase { metric_type: MetricType.COSINE, }; - await this.client!.createIndex(indexParams); + console.log(`🔧 Creating index for field 'vector' in collection '${collectionName}'...`); + await this.client.createIndex(indexParams); - // Load collection to memory - await this.client!.loadCollection({ - collection_name: collectionName, - }); + // Wait for index to be ready before loading collection + await this.waitForIndexReady(collectionName, 'vector'); + + // Load collection to memory with retry logic + await this.loadCollectionWithRetry(collectionName); // Verify collection is created correctly - await this.client!.describeCollection({ + await this.client.describeCollection({ collection_name: collectionName, }); } @@ -208,7 +318,11 @@ export class MilvusVectorDatabase implements VectorDatabase { async dropCollection(collectionName: string): Promise { await this.ensureInitialized(); - await this.client!.dropCollection({ + if (!this.client) { + throw new Error('MilvusClient is not initialized after ensureInitialized().'); + } + + await this.client.dropCollection({ collection_name: collectionName, }); } @@ -216,7 +330,11 @@ export class MilvusVectorDatabase implements VectorDatabase { async hasCollection(collectionName: string): Promise { await this.ensureInitialized(); - const result = await this.client!.hasCollection({ + if (!this.client) { + throw new Error('MilvusClient is not initialized after ensureInitialized().'); + } + + const result = await this.client.hasCollection({ collection_name: collectionName, }); @@ -226,7 +344,11 @@ export class MilvusVectorDatabase implements VectorDatabase { async listCollections(): Promise { await this.ensureInitialized(); - const result = await this.client!.showCollections(); + if (!this.client) { + throw new Error('MilvusClient is not initialized after ensureInitialized().'); + } + + const result = await this.client.showCollections(); // Handle the response format - cast to any to avoid type errors const collections = (result as any).collection_names || (result as any).collections || []; return Array.isArray(collections) ? collections : []; @@ -236,6 +358,10 @@ export class MilvusVectorDatabase implements VectorDatabase { await this.ensureInitialized(); await this.ensureLoaded(collectionName); + if (!this.client) { + throw new Error('MilvusClient is not initialized after ensureInitialized().'); + } + console.log('Inserting documents into collection:', collectionName); const data = documents.map(doc => ({ id: doc.id, @@ -248,7 +374,7 @@ export class MilvusVectorDatabase implements VectorDatabase { metadata: JSON.stringify(doc.metadata), })); - await this.client!.insert({ + await this.client.insert({ collection_name: collectionName, data: data, }); @@ -258,6 +384,10 @@ export class MilvusVectorDatabase implements VectorDatabase { await this.ensureInitialized(); await this.ensureLoaded(collectionName); + if (!this.client) { + throw new Error('MilvusClient is not initialized after ensureInitialized().'); + } + const searchParams: any = { collection_name: collectionName, data: [queryVector], @@ -270,7 +400,7 @@ export class MilvusVectorDatabase implements VectorDatabase { searchParams.expr = options.filterExpr; } - const searchResult = await this.client!.search(searchParams); + const searchResult = await this.client.search(searchParams); if (!searchResult.results || searchResult.results.length === 0) { return []; @@ -295,7 +425,11 @@ export class MilvusVectorDatabase implements VectorDatabase { await this.ensureInitialized(); await this.ensureLoaded(collectionName); - await this.client!.delete({ + if (!this.client) { + throw new Error('MilvusClient is not initialized after ensureInitialized().'); + } + + await this.client.delete({ collection_name: collectionName, filter: `id in [${ids.map(id => `"${id}"`).join(', ')}]`, }); @@ -305,6 +439,10 @@ export class MilvusVectorDatabase implements VectorDatabase { await this.ensureInitialized(); await this.ensureLoaded(collectionName); + if (!this.client) { + throw new Error('MilvusClient is not initialized after ensureInitialized().'); + } + try { const queryParams: any = { collection_name: collectionName, @@ -320,7 +458,7 @@ export class MilvusVectorDatabase implements VectorDatabase { queryParams.limit = 16384; // Default limit for empty filters } - const result = await this.client!.query(queryParams); + const result = await this.client.query(queryParams); if (result.status.error_code !== 'Success') { throw new Error(`Failed to query Milvus: ${result.status.reason}`); @@ -414,7 +552,11 @@ export class MilvusVectorDatabase implements VectorDatabase { functions: functions, }; - await createCollectionWithLimitCheck(this.client!, createCollectionParams); + if (!this.client) { + throw new Error('MilvusClient is not initialized. Call ensureInitialized() first.'); + } + + await createCollectionWithLimitCheck(this.client, createCollectionParams); // Create indexes for both vector fields // Index for dense vector @@ -424,7 +566,11 @@ export class MilvusVectorDatabase implements VectorDatabase { index_type: 'AUTOINDEX', metric_type: MetricType.COSINE, }; - await this.client!.createIndex(denseIndexParams); + console.log(`🔧 Creating dense vector index for field 'vector' in collection '${collectionName}'...`); + await this.client.createIndex(denseIndexParams); + + // Wait for dense vector index to be ready + await this.waitForIndexReady(collectionName, 'vector'); // Index for sparse vector const sparseIndexParams = { @@ -433,15 +579,17 @@ export class MilvusVectorDatabase implements VectorDatabase { index_type: 'SPARSE_INVERTED_INDEX', metric_type: MetricType.BM25, }; - await this.client!.createIndex(sparseIndexParams); + console.log(`🔧 Creating sparse vector index for field 'sparse_vector' in collection '${collectionName}'...`); + await this.client.createIndex(sparseIndexParams); - // Load collection to memory - await this.client!.loadCollection({ - collection_name: collectionName, - }); + // Wait for sparse vector index to be ready + await this.waitForIndexReady(collectionName, 'sparse_vector'); + + // Load collection to memory with retry logic + await this.loadCollectionWithRetry(collectionName); // Verify collection is created correctly - await this.client!.describeCollection({ + await this.client.describeCollection({ collection_name: collectionName, }); } @@ -450,6 +598,10 @@ export class MilvusVectorDatabase implements VectorDatabase { await this.ensureInitialized(); await this.ensureLoaded(collectionName); + if (!this.client) { + throw new Error('MilvusClient is not initialized after ensureInitialized().'); + } + const data = documents.map(doc => ({ id: doc.id, content: doc.content, @@ -461,7 +613,7 @@ export class MilvusVectorDatabase implements VectorDatabase { metadata: JSON.stringify(doc.metadata), })); - await this.client!.insert({ + await this.client.insert({ collection_name: collectionName, data: data, }); @@ -471,6 +623,10 @@ export class MilvusVectorDatabase implements VectorDatabase { await this.ensureInitialized(); await this.ensureLoaded(collectionName); + if (!this.client) { + throw new Error('MilvusClient is not initialized after ensureInitialized().'); + } + try { // Generate OpenAI embedding for the first search request (dense) console.log(`🔍 Preparing hybrid search for collection: ${collectionName}`); @@ -534,7 +690,7 @@ export class MilvusVectorDatabase implements VectorDatabase { expr: searchParams.expr }, null, 2)); - const searchResult = await this.client!.search(searchParams); + const searchResult = await this.client.search(searchParams); console.log(`🔍 Search executed, processing results...`);