WIP
This commit is contained in:
@@ -22,13 +22,16 @@
|
||||
|
||||
import _ from 'lodash';
|
||||
|
||||
/**
|
||||
* Binds class methods
|
||||
*/
|
||||
function methods() {
|
||||
return [
|
||||
'load',
|
||||
'requestHistoricalTelemetry',
|
||||
'initiateSubscriptionTelemetry',
|
||||
'addPage',
|
||||
'processNewTelemetry',
|
||||
'_addPage',
|
||||
'_processNewTelemetry',
|
||||
'hasMorePages',
|
||||
'nextPage',
|
||||
'bounds',
|
||||
@@ -45,8 +48,16 @@ function methods() {
|
||||
];
|
||||
}
|
||||
|
||||
export class TelemetryCollection {
|
||||
/** Class representing a Telemetry Collection. */
|
||||
|
||||
export class TelemetryCollection {
|
||||
/**
|
||||
* Creates a Telemetry Collection
|
||||
*
|
||||
* @param {object} openmct - Openm MCT
|
||||
* @param {object} domainObject - Domain Object to user for telemetry collection
|
||||
* @param {object} options - Any options or args passed in from request/subscribe
|
||||
*/
|
||||
constructor(openmct, domainObject, options) {
|
||||
methods().forEach(method => this[method] = this[method].bind(this));
|
||||
|
||||
@@ -72,7 +83,10 @@ export class TelemetryCollection {
|
||||
remove: []
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* This will load start the requests for historical and realtime data,
|
||||
* as well as setting up initial values and watchers
|
||||
*/
|
||||
load() {
|
||||
if (this.loaded) {
|
||||
throw new Error('Telemetry Collection has already been loaded.');
|
||||
@@ -89,7 +103,10 @@ export class TelemetryCollection {
|
||||
|
||||
this.loaded = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets up the telemetry collection for historical requests,
|
||||
* this uses the "standardizeRequestOptions" from Telemetry API
|
||||
*/
|
||||
initiateHistoricalRequests() {
|
||||
if (this.arguments.length === 1) {
|
||||
this.arguments.length = 2;
|
||||
@@ -101,8 +118,9 @@ export class TelemetryCollection {
|
||||
|
||||
this.requestHistoricalTelemetry();
|
||||
}
|
||||
// need to work out how requestHistorical and nextPage work
|
||||
// together when provider supports paging
|
||||
/**
|
||||
* If a historical provider exists, then historical requests will be made
|
||||
*/
|
||||
async requestHistoricalTelemetry() {
|
||||
if (!this.historicalProvider) {
|
||||
return;
|
||||
@@ -116,26 +134,42 @@ export class TelemetryCollection {
|
||||
});
|
||||
|
||||
if (Array.isArray(historicalData)) {
|
||||
this.processNewTelemetry(historicalData);
|
||||
this._processNewTelemetry(historicalData);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This uses the built in subscription function from Telemetry API
|
||||
*/
|
||||
initiateSubscriptionTelemetry() {
|
||||
this.unsubscribe = this.openmct.telemetry
|
||||
.subscribe(this.domainObject, (datum) => {
|
||||
this.processNewTelemetry(datum);
|
||||
this._processNewTelemetry(datum);
|
||||
}, this.arguments);
|
||||
}
|
||||
|
||||
// utilized by telemetry provider to add more data as well as
|
||||
// pass in the current state of the telemetry collection (which the telemetry collection will hold)
|
||||
addPage(telemetryData, collectionState) {
|
||||
/**
|
||||
* Utilized by telemetry provider to add more data as well as
|
||||
* pass in the current state of the telemetry collection
|
||||
* (which the telemetry collection will hold)
|
||||
*
|
||||
*
|
||||
* @param {Object[]} telemetryData - array of telemetry data objects
|
||||
* @param {*} [collectionState] - providers can pass a collectionState that
|
||||
* will be used for tracking between collection and provider
|
||||
*/
|
||||
_addPage(telemetryData, collectionState) {
|
||||
this.collectionState = collectionState;
|
||||
this.processNewTelemetry(telemetryData);
|
||||
this._processNewTelemetry(telemetryData);
|
||||
}
|
||||
|
||||
// used to sort any new telemetry (add/page, historical, subscription)
|
||||
processNewTelemetry(telemetryData) {
|
||||
/**
|
||||
* Filter any new telemetry (add/page, historical, subscription) based on
|
||||
* time bounds
|
||||
*
|
||||
* @param {(Object|Object[])} telemetryData - telemetry data object or
|
||||
* array of telemetry data objects
|
||||
*/
|
||||
_processNewTelemetry(telemetryData) {
|
||||
let data = Array.isArray(telemetryData) ? telemetryData : [telemetryData];
|
||||
let parsedValue;
|
||||
let beforeStartOfBounds;
|
||||
@@ -162,8 +196,12 @@ export class TelemetryCollection {
|
||||
}
|
||||
}
|
||||
|
||||
// returns a boolean if there is more telemetry within the time bounds
|
||||
// if the provider supports it
|
||||
/**
|
||||
* returns if there is more telemetry within the time bounds
|
||||
* if the provider supports it
|
||||
*
|
||||
* @returns {boolean}
|
||||
*/
|
||||
hasMorePages() {
|
||||
return this.historicalProvider
|
||||
&& this.historicalProvider.supportsPaging
|
||||
@@ -172,8 +210,10 @@ export class TelemetryCollection {
|
||||
&& this.historicalProvider.hasMorePages(this);
|
||||
}
|
||||
|
||||
// will trigger the next page for the provider if it supports it,
|
||||
// addPage will be passed in as a callback to receive the telemetry and updated state
|
||||
/**
|
||||
* will trigger the next page for the provider if it supports it,
|
||||
* _addPage will be passed in as a callback to receive the telemetry and updated state
|
||||
*/
|
||||
nextPage() {
|
||||
if (
|
||||
!this.historicalProvider
|
||||
@@ -183,12 +223,18 @@ export class TelemetryCollection {
|
||||
throw new Error('Provider does not support paging');
|
||||
}
|
||||
|
||||
this.historicalProvider.nextPage(this.addPage, this.collectionState);
|
||||
this.historicalProvider.nextPage(this._addPage, this.collectionState);
|
||||
}
|
||||
|
||||
// either user changes bounds or incremental tick
|
||||
// when bounds change, data could be added OR removed
|
||||
// here we update the current bounded telemetry and emit the results
|
||||
/**
|
||||
* when the start time, end time, or both have been updated.
|
||||
* data could be added OR removed here we update the current
|
||||
* bounded telemetry and emit the results
|
||||
*
|
||||
* @param {TimeConductorBounds} bounds The newly updated bounds
|
||||
* @param {boolean} [tick] `true` if the bounds update was due to
|
||||
* a "tick" event (ie. was an automatic update), false otherwise.
|
||||
*/
|
||||
bounds(bounds, isTick) {
|
||||
let startChanged = this.lastBounds.start !== bounds.start;
|
||||
let endChanged = this.lastBounds.end !== bounds.end;
|
||||
@@ -236,6 +282,12 @@ export class TelemetryCollection {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the telemetry data of the collection, and re-request
|
||||
* historical telemetry
|
||||
*
|
||||
* @todo handle subscriptions more granually
|
||||
*/
|
||||
reset() {
|
||||
this.boundedTelemetry = [];
|
||||
this.futureBuffer = [];
|
||||
@@ -244,6 +296,13 @@ export class TelemetryCollection {
|
||||
// possible unsubscribe/resubscribe...
|
||||
}
|
||||
|
||||
/**
|
||||
* whenever the time system is updated need to update related values in
|
||||
* the Telemetry Collection and reset the telemetry collection
|
||||
*
|
||||
* @param {TimeSystem} timeSystem - the value of the currently applied
|
||||
* Time System
|
||||
*/
|
||||
timeSystem(timeSystem) {
|
||||
this.timeKey = timeSystem.key;
|
||||
let metadataValue = this.metadata.value(this.timeKey) || { format: this.timeKey };
|
||||
@@ -256,6 +315,12 @@ export class TelemetryCollection {
|
||||
this.reset();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} event - add, remove
|
||||
* @param {requestCallback} callback - callback to be executed when event happens,
|
||||
* should accept an array of added telemetry data
|
||||
* @param {object} [context] - optional context to use
|
||||
*/
|
||||
on(event, callback, context) {
|
||||
if (!this.listeners[event]) {
|
||||
throw new Error('Event not supported by Telemetry Collections: ' + event);
|
||||
@@ -271,7 +336,12 @@ export class TelemetryCollection {
|
||||
});
|
||||
}
|
||||
|
||||
// Unregister TelemetryCollection events.
|
||||
/**
|
||||
* @param {string} event - add, remove
|
||||
* @param {requestCallback} callback - callback to be executed when event happens,
|
||||
* should accept an array of removed
|
||||
* telemetry data
|
||||
*/
|
||||
off(event, callback) {
|
||||
if (!this.listeners[event]) {
|
||||
throw new Error('Event not supported by Telemetry Collections: ' + event);
|
||||
|
||||
Reference in New Issue
Block a user