[Tables] Refactoring for consolidation of historical and real-time tables

Added batch processing of large historical queries. #1077
This commit is contained in:
Henry
2016-12-16 16:34:41 -08:00
parent 3544caf4be
commit 2a4944d6ee
9 changed files with 365 additions and 177 deletions

View File

@@ -42,43 +42,47 @@ define(
*/
function TelemetryTableController(
$scope,
$timeout,
openmct
) {
var self = this;
this.$scope = $scope;
this.$timeout = $timeout;
this.openmct = openmct;
this.batchSize = 1000;
/*
* Initialization block
*/
this.columns = {}; //Range and Domain columns
this.handle = undefined;
this.deregisterListeners = [];
this.subscriptions = [];
this.timeColumns = [];
$scope.rows = [];
this.table = new TableConfiguration($scope.domainObject,
openmct);
this.changeListeners = [];
this.conductor = openmct.conductor;
this.openmct = openmct;
this.newObject = objectUtils.toNewFormat($scope.domainObject.getModel(), $scope.domainObject.getId());
this.lastBounds = this.openmct.conductor.bounds();
this.requestTime = 0;
$scope.rows = [];
/*
* Create a new format object from legacy object, and replace it
* when it changes
*/
this.newObject = objectUtils.toNewFormat($scope.domainObject.getModel(),
$scope.domainObject.getId());
// Subscribe to telemetry when a domain object becomes available
this.$scope.$watch('domainObject', function () {
self.subscribe();
self.registerChangeListeners();
});
this.mutationListener = openmct.objects.observe(this.newObject, "*", function (domainObject){
self.newObject = domainObject;
});
_.bindAll(this, [
'destroy',
'sortByTimeSystem',
'loadColumns',
'getHistoricalData',
'subscribeToNewData',
'changeBounds'
]);
this.destroy = this.destroy.bind(this);
this.getData();
this.registerChangeListeners();
// Unsubscribe when the plot is destroyed
this.$scope.$on("$destroy", this.destroy);
this.timeColumns = [];
this.sortByTimeSystem = this.sortByTimeSystem.bind(this);
this.conductor.on('timeSystem', this.sortByTimeSystem);
this.conductor.off('timeSystem', this.sortByTimeSystem);
this.subscriptions = [];
}
/**
@@ -91,133 +95,254 @@ define(
scope.defaultSort = undefined;
if (timeSystem) {
this.table.columns.forEach(function (column) {
if (column.domainMetadata && column.domainMetadata.key === timeSystem.metadata.key) {
if (column.metadata.key === timeSystem.metadata.key) {
scope.defaultSort = column.getTitle();
}
});
this.$scope.rows = _.sortBy(this.$scope.rows, function (row) {
return row[this.$scope.defaultSort];
});
}
};
TelemetryTableController.prototype.unregisterChangeListeners = function () {
this.changeListeners.forEach(function (listener) {
return listener && listener();
});
this.changeListeners = [];
};
/**
* Defer registration of change listeners until domain object is
* available in order to avoid race conditions
* Attach listeners to domain object to respond to changes due to
* composition, etc.
* @private
*/
TelemetryTableController.prototype.registerChangeListeners = function () {
var self = this;
this.unregisterChangeListeners();
this.deregisterListeners.forEach(function (deregister){
deregister();
});
this.deregisterListeners = [];
// When composition changes, re-subscribe to the various
// telemetry subscriptions
this.changeListeners.push(this.$scope.$watchCollection(
'domainObject.getModel().composition',
function (newVal, oldVal) {
if (newVal !== oldVal) {
self.subscribe();
}
})
this.deregisterListeners.push(
this.openmct.objects.observe(this.newObject, "*",
function (domainObject){
this.newObject = domainObject;
this.getData();
}.bind(this)
)
);
this.openmct.conductor.on('timeSystem', this.sortByTimeSystem);
this.openmct.conductor.on('bounds', this.changeBounds);
};
TelemetryTableController.prototype.tick = function (bounds) {
// Can't do ticking until we change how data is handled
// Pass raw values to table, with format function
/*if (this.$scope.defaultSort) {
this.$scope.rows.filter(function (row){
return row[]
})
}*/
};
TelemetryTableController.prototype.changeBounds = function (bounds) {
var follow = this.openmct.conductor.follow();
var isTick = follow &&
bounds.start !== this.lastBounds.start &&
bounds.end !== this.lastBounds.end;
var isDeltaChange = follow &&
!isTick &&
(bounds.start !== this.lastBounds.start ||
bounds.end !== this.lastBounds.end);
if (isTick){
// Treat it as a realtime tick
// Drop old data that falls outside of bounds
this.tick(bounds);
} else if (isDeltaChange){
// No idea...
// Historical query for bounds, then tick on
this.getData();
} else {
// Is fixed bounds change
this.getData();
}
this.lastBounds = bounds;
};
/**
* Release the current subscription (called when scope is destroyed)
*/
TelemetryTableController.prototype.destroy = function () {
this.openmct.conductor.off('timeSystem', this.sortByTimeSystem);
this.openmct.conductor.off('bounds', this.changeBounds);
this.subscriptions.forEach(function (subscription) {
subscription()
subscription();
});
this.mutationListener();
this.deregisterListeners.forEach(function (deregister){
deregister();
});
this.subscriptions = [];
this.deregisterListeners = [];
if (this.timeoutHandle) {
this.$timeout.cancel(this.timeoutHandle);
}
// In case controller instance lingers around (currently there is a
// temporary memory leak with PlotController), clean up scope as it
// can be extremely large.
this.$scope = null;
this.table = null;
};
/**
* @private
* @param objects
* @returns {*}
*/
TelemetryTableController.prototype.loadColumns = function (objects) {
var telemetryApi = this.openmct.telemetry;
if (objects.length > 0) {
var metadatas = objects.map(telemetryApi.getMetadata.bind(telemetryApi));
var allColumns = telemetryApi.commonValuesForHints(metadatas, []);
this.table.populateColumns(allColumns);
this.timeColumns = telemetryApi.commonValuesForHints(metadatas, ['x']).map(function (metadatum) {
return metadatum.name;
});
this.filterColumns();
var timeSystem = this.openmct.conductor.timeSystem();
if (timeSystem) {
this.sortByTimeSystem(timeSystem);
}
}
return objects;
};
/**
Create a new subscription. This can be overridden by children to
change default behaviour (which is to retrieve historical telemetry
only).
* @private
* @param objects The domain objects to request telemetry for
* @returns {*|{configFile}|app|boolean|Route|Object}
*/
TelemetryTableController.prototype.subscribe = function () {
var self = this;
TelemetryTableController.prototype.getHistoricalData = function (objects) {
var openmct = this.openmct;
var bounds = openmct.conductor.bounds();
var scope = this.$scope;
var processedObjects = 0;
var requestTime = this.lastRequestTime = Date.now();
return new Promise(function (resolve, reject){
console.log('Created promise');
function finishProcessing(tableRows){
scope.rows = tableRows;
scope.loading = false;
console.log('Resolved promise');
resolve(tableRows);
}
function processData(historicalData, index, rowData, limitEvaluator){
console.log("Processing batch");
if (index >= historicalData.length) {
processedObjects++;
if (processedObjects === objects.length) {
finishProcessing(rowData);
}
} else {
rowData = rowData.concat(historicalData.slice(index, index + this.batchSize)
.map(this.table.getRowValues.bind(this.table, limitEvaluator)));
this.timeoutHandle = this.$timeout(processData.bind(
this,
historicalData,
index + this.batchSize,
rowData,
limitEvaluator
));
}
}
function makeTableRows(object, historicalData) {
// Only process one request at a time
if (requestTime === this.lastRequestTime) {
console.log('Processing request');
var limitEvaluator = openmct.telemetry.limitEvaluator(object);
processData.call(this, historicalData, 0, [], limitEvaluator);
} else {
console.log('Ignoring returned data because of staleness');
resolve([]);
}
}
function requestData (object) {
return openmct.telemetry.request(object, {
start: bounds.start,
end: bounds.end
}).then(makeTableRows.bind(this, object))
.catch(reject);
}
this.$timeout.cancel(this.timeoutHandle);
if (objects.length > 0){
objects.forEach(requestData.bind(this));
} else {
scope.loading = false;
console.log('Resolved promise');
resolve([]);
}
}.bind(this));
};
/**
* @private
* @param objects
* @returns {*}
*/
TelemetryTableController.prototype.subscribeToNewData = function (objects) {
var telemetryApi = this.openmct.telemetry;
//Set table max length to avoid unbounded growth.
var maxRows = 100000;
this.subscriptions.forEach(function (subscription) {
subscription();
});
this.subscriptions = [];
function newData(domainObject, datum) {
this.$scope.rows.push(this.table.getRowValues(
telemetryApi.limitEvaluator(domainObject), datum));
//Inform table that a new row has been added
if (this.$scope.rows.length > maxRows) {
this.$scope.$broadcast('remove:row', 0);
this.$scope.rows.shift();
}
this.$scope.$broadcast('add:row',
this.$scope.rows.length - 1);
}
objects.forEach(function (object){
this.subscriptions.push(
telemetryApi.subscribe(object, newData.bind(this, object), {}));
console.log('subscribed');
}.bind(this));
return objects;
};
TelemetryTableController.prototype.getData = function () {
var telemetryApi = this.openmct.telemetry;
var compositionApi = this.openmct.composition;
var subscriptions = this.subscriptions;
var tableConfiguration = this.table;
var scope = this.$scope;
var maxRows = 100000;
var conductor = this.conductor;
var newObject = this.newObject;
this.$scope.loading = true;
function makeTableRows(object, historicalData){
var limitEvaluator = telemetryApi.limitEvaluator(object);
return historicalData.map(tableConfiguration.getRowValues.bind(tableConfiguration, limitEvaluator));
}
function requestData(objects) {
var bounds = conductor.bounds();
return Promise.all(
objects.map(function (object) {
return telemetryApi.request(object, {
start: bounds.start,
end: bounds.end
}).then(
makeTableRows.bind(this, object)
);
})
);
}
function addHistoricalData(historicalData){
scope.rows = Array.prototype.concat.apply([], historicalData);
scope.loading = false;
}
function newData(domainObject, datum) {
scope.rows.push(tableConfiguration.getRowValues(datum, telemetryApi.limitEvaluator(domainObject)));
//Inform table that a new row has been added
if (scope.rows.length > maxRows) {
scope.$broadcast('remove:row', 0);
scope.rows.shift();
}
scope.$broadcast('add:row',
scope.rows.length - 1);
}
function subscribe(objects) {
objects.forEach(function (object){
subscriptions.push(telemetryApi.subscribe(object, newData.bind(this, object), {}));
});
return objects;
}
function error(e) {
throw e;
}
function loadColumns(objects) {
var metadatas = objects.map(telemetryApi.getMetadata.bind(telemetryApi));
var allColumns = telemetryApi.commonValuesForHints(metadatas, []);
tableConfiguration.populateColumns(allColumns);
this.timeColumns = telemetryApi.commonValuesForHints(metadatas, ['x']).map(function (metadatum){
return metadatum.name;
});
self.filterColumns();
return Promise.resolve(objects);
scope.loading = false;
console.error(e);
}
function filterForTelemetry(objects){
@@ -248,18 +373,10 @@ define(
getDomainObjects()
.then(filterForTelemetry)
.then(this.loadColumns)
//.then(this.subscribeToNewData)
.then(this.getHistoricalData)
.catch(error)
.then(function (objects){
if (objects.length > 0){
return loadColumns(objects)
.then(subscribe)
.then(requestData)
.then(addHistoricalData)
.catch(error);
} else {
scope.loading = false;
}
})
};
/**