fix: add waitForIndexReady and loadCollectionWithRetry for milvus

This commit is contained in:
KartDriver
2025-08-12 12:44:57 -05:00
committed by Cheney Zhang
parent 9cbe7870f5
commit 84253a369c
2 changed files with 195 additions and 27 deletions

View File

@@ -115,6 +115,18 @@ export class OpenAIEmbedding extends Embedding {
}
getDimension(): number {
// For custom models, we need to detect the dimension first
const model = this.config.model || 'text-embedding-3-small';
const knownModels = OpenAIEmbedding.getSupportedModels();
// If it's a known model, return its known dimension
if (knownModels[model]) {
return knownModels[model].dimension;
}
// For custom models, return the current dimension
// Note: This may be incorrect until detectDimension() is called
console.warn(`[OpenAIEmbedding] getDimension() called for custom model '${model}' - returning ${this.dimension}. Call detectDimension() first for accurate dimension.`);
return this.dimension;
}

View File

@@ -1,4 +1,4 @@
import { MilvusClient, DataType, MetricType, FunctionType, LoadState } from '@zilliz/milvus2-sdk-node';
import { MilvusClient, DataType, MetricType, FunctionType, LoadState, IndexState } from '@zilliz/milvus2-sdk-node';
import {
VectorDocument,
SearchOptions,
@@ -103,15 +103,19 @@ export class MilvusVectorDatabase implements VectorDatabase {
* Ensure collection is loaded before search/query operations
*/
protected async ensureLoaded(collectionName: string): Promise<void> {
if (!this.client) {
throw new Error('MilvusClient is not initialized. Call ensureInitialized() first.');
}
try {
// Check if collection is loaded
const result = await this.client!.getLoadState({
const result = await this.client.getLoadState({
collection_name: collectionName
});
if (result.state !== LoadState.LoadStateLoaded) {
console.log(`🔄 Loading collection '${collectionName}' to memory...`);
await this.client!.loadCollection({
await this.client.loadCollection({
collection_name: collectionName,
});
}
@@ -121,6 +125,106 @@ export class MilvusVectorDatabase implements VectorDatabase {
}
}
/**
* Wait for an index to be ready before proceeding
* Polls index status with exponential backoff up to 30 seconds
*/
protected async waitForIndexReady(collectionName: string, fieldName: string): Promise<void> {
if (!this.client) {
throw new Error('MilvusClient is not initialized. Call ensureInitialized() first.');
}
const maxWaitTime = 60000; // 60 seconds
const initialInterval = 500; // 500ms
const maxInterval = 5000; // 5 seconds
const backoffMultiplier = 1.5;
let interval = initialInterval;
const startTime = Date.now();
console.log(`⏳ Waiting for index on field '${fieldName}' in collection '${collectionName}' to be ready...`);
while (Date.now() - startTime < maxWaitTime) {
try {
const indexStateResult = await this.client.getIndexState({
collection_name: collectionName,
field_name: fieldName
});
// Debug logging to understand the state value
console.log(`📊 Index state for '${fieldName}': raw=${indexStateResult.state}, type=${typeof indexStateResult.state}, IndexState.Finished=${IndexState.Finished}`);
console.log(`📊 Full response:`, JSON.stringify(indexStateResult));
// Check both numeric and potential string values
// Cast to any to bypass TypeScript checks while debugging
const stateValue = indexStateResult.state as any;
if (stateValue === IndexState.Finished ||
stateValue === 3 ||
stateValue === 'Finished') {
console.log(`✅ Index on field '${fieldName}' is ready!`);
return;
}
if (indexStateResult.state === IndexState.Failed) {
throw new Error(`Index creation failed for field '${fieldName}' in collection '${collectionName}'`);
}
// Wait with exponential backoff
await new Promise(resolve => setTimeout(resolve, interval));
interval = Math.min(interval * backoffMultiplier, maxInterval);
} catch (error) {
console.error(`❌ Error checking index state for field '${fieldName}':`, error);
throw error;
}
}
throw new Error(`Timeout waiting for index on field '${fieldName}' in collection '${collectionName}' to be ready after ${maxWaitTime}ms`);
}
/**
* Load collection with retry logic and exponential backoff
* Retries up to 5 times with exponential backoff
*/
protected async loadCollectionWithRetry(collectionName: string): Promise<void> {
if (!this.client) {
throw new Error('MilvusClient is not initialized. Call ensureInitialized() first.');
}
const maxRetries = 5;
const initialInterval = 1000; // 1 second
const backoffMultiplier = 2;
let attempt = 1;
let interval = initialInterval;
while (attempt <= maxRetries) {
try {
console.log(`🔄 Loading collection '${collectionName}' to memory (attempt ${attempt}/${maxRetries})...`);
await this.client.loadCollection({
collection_name: collectionName,
});
console.log(`✅ Collection '${collectionName}' loaded successfully!`);
return;
} catch (error) {
console.error(`❌ Failed to load collection '${collectionName}' on attempt ${attempt}:`, error);
if (attempt === maxRetries) {
throw new Error(`Failed to load collection '${collectionName}' after ${maxRetries} attempts: ${error}`);
}
// Wait with exponential backoff before retry
console.log(`⏳ Retrying collection load in ${interval}ms...`);
await new Promise(resolve => setTimeout(resolve, interval));
interval *= backoffMultiplier;
attempt++;
}
}
}
async createCollection(collectionName: string, dimension: number, description?: string): Promise<void> {
await this.ensureInitialized();
@@ -182,7 +286,11 @@ export class MilvusVectorDatabase implements VectorDatabase {
fields: schema,
};
await createCollectionWithLimitCheck(this.client!, createCollectionParams);
if (!this.client) {
throw new Error('MilvusClient is not initialized. Call ensureInitialized() first.');
}
await createCollectionWithLimitCheck(this.client, createCollectionParams);
// Create index
const indexParams = {
@@ -192,15 +300,17 @@ export class MilvusVectorDatabase implements VectorDatabase {
metric_type: MetricType.COSINE,
};
await this.client!.createIndex(indexParams);
console.log(`🔧 Creating index for field 'vector' in collection '${collectionName}'...`);
await this.client.createIndex(indexParams);
// Load collection to memory
await this.client!.loadCollection({
collection_name: collectionName,
});
// Wait for index to be ready before loading collection
await this.waitForIndexReady(collectionName, 'vector');
// Load collection to memory with retry logic
await this.loadCollectionWithRetry(collectionName);
// Verify collection is created correctly
await this.client!.describeCollection({
await this.client.describeCollection({
collection_name: collectionName,
});
}
@@ -208,7 +318,11 @@ export class MilvusVectorDatabase implements VectorDatabase {
async dropCollection(collectionName: string): Promise<void> {
await this.ensureInitialized();
await this.client!.dropCollection({
if (!this.client) {
throw new Error('MilvusClient is not initialized after ensureInitialized().');
}
await this.client.dropCollection({
collection_name: collectionName,
});
}
@@ -216,7 +330,11 @@ export class MilvusVectorDatabase implements VectorDatabase {
async hasCollection(collectionName: string): Promise<boolean> {
await this.ensureInitialized();
const result = await this.client!.hasCollection({
if (!this.client) {
throw new Error('MilvusClient is not initialized after ensureInitialized().');
}
const result = await this.client.hasCollection({
collection_name: collectionName,
});
@@ -226,7 +344,11 @@ export class MilvusVectorDatabase implements VectorDatabase {
async listCollections(): Promise<string[]> {
await this.ensureInitialized();
const result = await this.client!.showCollections();
if (!this.client) {
throw new Error('MilvusClient is not initialized after ensureInitialized().');
}
const result = await this.client.showCollections();
// Handle the response format - cast to any to avoid type errors
const collections = (result as any).collection_names || (result as any).collections || [];
return Array.isArray(collections) ? collections : [];
@@ -236,6 +358,10 @@ export class MilvusVectorDatabase implements VectorDatabase {
await this.ensureInitialized();
await this.ensureLoaded(collectionName);
if (!this.client) {
throw new Error('MilvusClient is not initialized after ensureInitialized().');
}
console.log('Inserting documents into collection:', collectionName);
const data = documents.map(doc => ({
id: doc.id,
@@ -248,7 +374,7 @@ export class MilvusVectorDatabase implements VectorDatabase {
metadata: JSON.stringify(doc.metadata),
}));
await this.client!.insert({
await this.client.insert({
collection_name: collectionName,
data: data,
});
@@ -258,6 +384,10 @@ export class MilvusVectorDatabase implements VectorDatabase {
await this.ensureInitialized();
await this.ensureLoaded(collectionName);
if (!this.client) {
throw new Error('MilvusClient is not initialized after ensureInitialized().');
}
const searchParams: any = {
collection_name: collectionName,
data: [queryVector],
@@ -270,7 +400,7 @@ export class MilvusVectorDatabase implements VectorDatabase {
searchParams.expr = options.filterExpr;
}
const searchResult = await this.client!.search(searchParams);
const searchResult = await this.client.search(searchParams);
if (!searchResult.results || searchResult.results.length === 0) {
return [];
@@ -295,7 +425,11 @@ export class MilvusVectorDatabase implements VectorDatabase {
await this.ensureInitialized();
await this.ensureLoaded(collectionName);
await this.client!.delete({
if (!this.client) {
throw new Error('MilvusClient is not initialized after ensureInitialized().');
}
await this.client.delete({
collection_name: collectionName,
filter: `id in [${ids.map(id => `"${id}"`).join(', ')}]`,
});
@@ -305,6 +439,10 @@ export class MilvusVectorDatabase implements VectorDatabase {
await this.ensureInitialized();
await this.ensureLoaded(collectionName);
if (!this.client) {
throw new Error('MilvusClient is not initialized after ensureInitialized().');
}
try {
const queryParams: any = {
collection_name: collectionName,
@@ -320,7 +458,7 @@ export class MilvusVectorDatabase implements VectorDatabase {
queryParams.limit = 16384; // Default limit for empty filters
}
const result = await this.client!.query(queryParams);
const result = await this.client.query(queryParams);
if (result.status.error_code !== 'Success') {
throw new Error(`Failed to query Milvus: ${result.status.reason}`);
@@ -414,7 +552,11 @@ export class MilvusVectorDatabase implements VectorDatabase {
functions: functions,
};
await createCollectionWithLimitCheck(this.client!, createCollectionParams);
if (!this.client) {
throw new Error('MilvusClient is not initialized. Call ensureInitialized() first.');
}
await createCollectionWithLimitCheck(this.client, createCollectionParams);
// Create indexes for both vector fields
// Index for dense vector
@@ -424,7 +566,11 @@ export class MilvusVectorDatabase implements VectorDatabase {
index_type: 'AUTOINDEX',
metric_type: MetricType.COSINE,
};
await this.client!.createIndex(denseIndexParams);
console.log(`🔧 Creating dense vector index for field 'vector' in collection '${collectionName}'...`);
await this.client.createIndex(denseIndexParams);
// Wait for dense vector index to be ready
await this.waitForIndexReady(collectionName, 'vector');
// Index for sparse vector
const sparseIndexParams = {
@@ -433,15 +579,17 @@ export class MilvusVectorDatabase implements VectorDatabase {
index_type: 'SPARSE_INVERTED_INDEX',
metric_type: MetricType.BM25,
};
await this.client!.createIndex(sparseIndexParams);
console.log(`🔧 Creating sparse vector index for field 'sparse_vector' in collection '${collectionName}'...`);
await this.client.createIndex(sparseIndexParams);
// Load collection to memory
await this.client!.loadCollection({
collection_name: collectionName,
});
// Wait for sparse vector index to be ready
await this.waitForIndexReady(collectionName, 'sparse_vector');
// Load collection to memory with retry logic
await this.loadCollectionWithRetry(collectionName);
// Verify collection is created correctly
await this.client!.describeCollection({
await this.client.describeCollection({
collection_name: collectionName,
});
}
@@ -450,6 +598,10 @@ export class MilvusVectorDatabase implements VectorDatabase {
await this.ensureInitialized();
await this.ensureLoaded(collectionName);
if (!this.client) {
throw new Error('MilvusClient is not initialized after ensureInitialized().');
}
const data = documents.map(doc => ({
id: doc.id,
content: doc.content,
@@ -461,7 +613,7 @@ export class MilvusVectorDatabase implements VectorDatabase {
metadata: JSON.stringify(doc.metadata),
}));
await this.client!.insert({
await this.client.insert({
collection_name: collectionName,
data: data,
});
@@ -471,6 +623,10 @@ export class MilvusVectorDatabase implements VectorDatabase {
await this.ensureInitialized();
await this.ensureLoaded(collectionName);
if (!this.client) {
throw new Error('MilvusClient is not initialized after ensureInitialized().');
}
try {
// Generate OpenAI embedding for the first search request (dense)
console.log(`🔍 Preparing hybrid search for collection: ${collectionName}`);
@@ -534,7 +690,7 @@ export class MilvusVectorDatabase implements VectorDatabase {
expr: searchParams.expr
}, null, 2));
const searchResult = await this.client!.search(searchParams);
const searchResult = await this.client.search(searchParams);
console.log(`🔍 Search executed, processing results...`);