From 78f3f8367e14a38c97ffee98489b5d84114ce347 Mon Sep 17 00:00:00 2001 From: Victor Woeltjen Date: Tue, 29 Sep 2015 18:35:55 -0700 Subject: [PATCH] [Search] Vary batch size When indexing for generic search, issue new batches of requests as individual requests finish (instead of waiting for whole batches to finish) varying size to keep the number of outstanding requests below some maximum. nasa/openmctweb#141 --- .../src/services/GenericSearchProvider.js | 49 ++++++++++--------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/platform/search/src/services/GenericSearchProvider.js b/platform/search/src/services/GenericSearchProvider.js index 3eb31f255a..a8387b143f 100644 --- a/platform/search/src/services/GenericSearchProvider.js +++ b/platform/search/src/services/GenericSearchProvider.js @@ -31,8 +31,8 @@ define( var DEFAULT_MAX_RESULTS = 100, DEFAULT_TIMEOUT = 1000, - FLUSH_SIZE = 24, - FLUSH_INTERVAL = 25, + MAX_CONCURRENT_REQUESTS = 100, + FLUSH_INTERVAL = 180, stopTime; /** @@ -53,10 +53,11 @@ define( function GenericSearchProvider($q, $log, throttle, objectService, workerService, topic, ROOTS) { var indexed = {}, pendingQueries = {}, - toIndex = {}, + toRequest = {}, worker = workerService.run('genericSearchWorker'), mutationTopic = topic("mutation"), indexingStarted = Date.now(), + pendingRequests = 0, scheduleFlush; this.worker = worker; @@ -69,7 +70,7 @@ define( ids.forEach(function (id) { if (!indexed[id]) { indexed[id] = true; - toIndex[id] = true; + toRequest[id] = true; } }); scheduleFlush(); @@ -125,32 +126,36 @@ define( } } - scheduleFlush = throttle(function flush() { - var ids = Object.keys(toIndex).slice(0, FLUSH_SIZE); - - // Don't need to look these up next time - ids.forEach(function (id) { - delete toIndex[id]; + function requestAndIndex(id) { + delete toRequest[id]; + pendingRequests += 1; + objectService.getObjects([id]).then(function (objects) { + if (objects[id]) { + indexItem(objects[id]); + } + }, function () { + $log.warn("Failed to index domain object " + id); + }).then(function () { + pendingRequests -= 1; + scheduleFlush(); }); + } - if (ids.length < 1) { + scheduleFlush = throttle(function flush() { + var batchSize = + Math.max(MAX_CONCURRENT_REQUESTS - pendingRequests, 0); + + if (Object.keys(toRequest).length + pendingRequests < 1) { $log.info([ 'GenericSearch finished indexing after ', ((Date.now() - indexingStarted) / 1000).toFixed(2), ' seconds.' ].join('')); - return; + } else { + Object.keys(toRequest) + .slice(0, batchSize) + .forEach(requestAndIndex); } - - objectService.getObjects(ids).then(function (objects) { - ids.map(function (id) { - return objects[id]; - }).filter(function (object) { - return object; - }).forEach(indexItem); - - scheduleFlush(); - }); }, FLUSH_INTERVAL); worker.onmessage = handleResponse;