[Tables] Support for subscriptions from new Telemetry API

Historical and real-time data flowing

Added formatting, and limits. Support telemetry objects themselves and not just composition of telemetry objects

Apply default time range if none supplied (15 minutes)
This commit is contained in:
Henry
2016-12-06 18:04:47 -08:00
parent 6d5530ba9c
commit 976333d7f7
12 changed files with 159 additions and 592 deletions

View File

@@ -38,14 +38,10 @@ define(
* configuration, and telemetry subscriptions.
* @memberof platform/features/table
* @param $scope
* @param telemetryHandler
* @param telemetryFormatter
* @constructor
*/
function TelemetryTableController(
$scope,
telemetryHandler,
telemetryFormatter,
openmct
) {
var self = this;
@@ -53,9 +49,8 @@ define(
this.$scope = $scope;
this.columns = {}; //Range and Domain columns
this.handle = undefined;
this.telemetryHandler = telemetryHandler;
this.table = new TableConfiguration($scope.domainObject,
telemetryFormatter);
openmct);
this.changeListeners = [];
this.conductor = openmct.conductor;
this.openmct = openmct;
@@ -68,6 +63,9 @@ define(
self.subscribe();
self.registerChangeListeners();
});
this.mutationListener = openmct.objects.observe(this.newObject, "*", function (domainObject){
self.newObject = domainObject;
});
this.destroy = this.destroy.bind(this);
@@ -79,6 +77,8 @@ define(
this.sortByTimeSystem = this.sortByTimeSystem.bind(this);
this.conductor.on('timeSystem', this.sortByTimeSystem);
this.conductor.off('timeSystem', this.sortByTimeSystem);
this.subscriptions = [];
}
/**
@@ -130,29 +130,12 @@ define(
* Release the current subscription (called when scope is destroyed)
*/
TelemetryTableController.prototype.destroy = function () {
if (this.handle) {
this.handle.unsubscribe();
this.handle = undefined;
}
this.subscriptions.forEach(function (subscription) {
subscription()
});
this.mutationListener();
};
/**
* Function for handling realtime data when it is available. This
* will be called by the telemetry framework when new data is
* available.
*
* Method should be overridden by specializing class.
*/
TelemetryTableController.prototype.addRealtimeData = function () {
};
/**
* Function for handling historical data. Will be called by
* telemetry framework when requested historical data is available.
* Should be overridden by specializing class.
*/
TelemetryTableController.prototype.addHistoricalData = function () {
};
/**
Create a new subscription. This can be overridden by children to
@@ -160,94 +143,123 @@ define(
only).
*/
TelemetryTableController.prototype.subscribe = function () {
var self = this;
var telemetryApi = this.openmct.telemetry;
var compositionApi = this.openmct.composition;
var subscriptions = this.subscriptions;
var tableConfiguration = this.table;
var scope = this.$scope;
var maxRows = 100000;
var conductor = this.conductor;
var newObject = this.newObject;
if (this.handle) {
this.handle.unsubscribe();
}
this.$scope.loading = true;
function map(func){
return function (objects) {
return Promise.all(objects.map(func));
function makeTableRows(object, historicalData){
var limitEvaluator = telemetryApi.limitEvaluator(object);
return historicalData.map(tableConfiguration.getRowValues.bind(tableConfiguration, limitEvaluator));
}
function requestData(objects) {
var bounds = conductor.bounds();
return Promise.all(
objects.map(function (object) {
return telemetryApi.request(object, {
start: bounds.start,
end: bounds.end
}).then(
makeTableRows.bind(this, object)
);
})
);
}
function addHistoricalData(historicalData){
scope.rows = Array.prototype.concat.apply([], historicalData);
scope.loading = false;
}
function newData(domainObject, datum) {
scope.rows.push(tableConfiguration.getRowValues(datum, telemetryApi.limitEvaluator(domainObject)));
//Inform table that a new row has been added
if (scope.rows.length > maxRows) {
scope.$broadcast('remove:row', 0);
scope.rows.shift();
}
scope.$broadcast('add:row',
scope.rows.length - 1);
}
function add(object){
return function (objects) {
objects.unshift(object);
return objects;
}
function subscribe(objects) {
objects.forEach(function (object){
subscriptions.push(telemetryApi.subscribe(object, newData.bind(this, object), {}));
});
return objects;
}
function subscribeTo(object) {
return telemetryApi.request(object, {});
function error(e) {
throw e;
}
function error() {
console.log("Unable to subscribe");
function loadColumns(objects) {
var metadatas = objects.map(telemetryApi.getMetadata.bind(telemetryApi));
var allColumns = telemetryApi.commonValuesForHints(metadatas, []);
tableConfiguration.populateColumns(allColumns);
this.timeColumns = telemetryApi.commonValuesForHints(metadatas, ['x']).map(function (metadatum){
return metadatum.name;
});
self.filterColumns();
return Promise.resolve(objects);
}
this.openmct.composition.get(this.newObject)
.load()
.then(add(this.newObject))
.then(map(subscribeTo))
.then(function (telemetry) {
console.log(telemetry.length);
}).catch(error);
this.handle = this.$scope.domainObject && this.telemetryHandler.handle(
this.$scope.domainObject,
this.addRealtimeData.bind(this),
true // Lossless
);
this.handle.request({}).then(this.addHistoricalData.bind(this));
this.setup();
};
TelemetryTableController.prototype.populateColumns = function (telemetryMetadata) {
this.table.populateColumns(telemetryMetadata);
//Identify time columns
telemetryMetadata.forEach(function (metadatum) {
//Push domains first
(metadatum.domains || []).forEach(function (domainMetadata) {
this.timeColumns.push(domainMetadata.name);
}.bind(this));
}.bind(this));
var timeSystem = this.conductor.timeSystem();
if (timeSystem) {
this.sortByTimeSystem(timeSystem);
function filterForTelemetry(objects){
return objects.filter(telemetryApi.canProvideTelemetry.bind(telemetryApi));
}
};
/**
* Setup table columns based on domain object metadata
*/
TelemetryTableController.prototype.setup = function () {
var handle = this.handle,
self = this;
function getDomainObjects() {
return new Promise(function (resolve, reject){
var objects = [newObject];
var composition = compositionApi.get(newObject);
if (handle) {
this.timeColumns = [];
handle.promiseTelemetryObjects().then(function () {
self.$scope.headers = [];
self.$scope.rows = [];
self.populateColumns(handle.getMetadata());
self.filterColumns();
// When table column configuration changes, (due to being
// selected or deselected), filter columns appropriately.
self.changeListeners.push(self.$scope.$watchCollection(
'domainObject.getModel().configuration.table.columns',
self.filterColumns.bind(self)
));
if (composition) {
composition
.load()
.then(function (children) {
return objects.concat(children);
})
.then(resolve)
.catch(reject);
} else {
return resolve(objects);
}
});
}
scope.headers = [];
scope.rows = [];
getDomainObjects()
.then(filterForTelemetry)
.catch(error)
.then(function (objects){
if (objects.length > 0){
return loadColumns(objects)
.then(subscribe)
.then(requestData)
.then(addHistoricalData)
.catch(error);
} else {
scope.loading = false;
}
})
};
/**