diff --git a/example/generator/src/generatorWorker.js b/example/generator/src/generatorWorker.js index 091297e185..bbf1851346 100644 --- a/example/generator/src/generatorWorker.js +++ b/example/generator/src/generatorWorker.js @@ -52,6 +52,7 @@ function onSubscribe(message) { var data = message.data; + // Keep var start = Date.now(); var step = 1000 / data.dataRateInHz; var nextStep = start - (start % step) + step; diff --git a/platform/features/table/src/TelemetryCollection.js b/platform/features/table/src/TelemetryCollection.js index d5816b37b6..185c1549af 100644 --- a/platform/features/table/src/TelemetryCollection.js +++ b/platform/features/table/src/TelemetryCollection.js @@ -23,24 +23,23 @@ define( [ 'lodash', - 'eventEmitter' + 'EventEmitter' ], - function (_, eventEmitter) { + function (_, EventEmitter) { function TelemetryCollection() { - eventEmitter.call(this, arguments); + EventEmitter.call(this, arguments); this.telemetry = []; - this.forwardBuffer = []; + this.highBuffer = []; this.sortField = undefined; this.lastBounds = {}; - this.lastStartIndex = 0; - this.lastEndIndex = 0; _.bindAll(this,[ + 'addOne', 'iteratee' ]); } - TelemetryCollection.prototype = Object.create(eventEmitter.prototype); + TelemetryCollection.prototype = Object.create(EventEmitter.prototype); TelemetryCollection.prototype.iteratee = function (element) { return _.get(element, this.sortField); @@ -53,7 +52,7 @@ define( * collection is cleared and repopulated. * @param bounds */ - TelemetryCollection.prototype.tick = function (bounds) { + TelemetryCollection.prototype.bounds = function (bounds) { var startChanged = this.lastBounds.start !== bounds.start; var endChanged = this.lastBounds.end !== bounds.end; var startIndex = 0; @@ -65,71 +64,106 @@ define( var testValue = _.set({}, this.sortField, bounds.start); // Calculate the new index of the first element within the bounds startIndex = _.sortedIndex(this.telemetry, testValue, this.sortField); - discarded = this.telemetry.slice(this.lastStartIndex, startIndex + 1); + discarded = this.telemetry.splice(0, startIndex); } if (endChanged) { var testValue = _.set({}, this.sortField, bounds.end); // Calculate the new index of the last element in bounds - endIndex = _.sortedLastIndex(this.telemetry, testValue, this.sortField); - added = this.telemetry.slice(this.lastEndIndex, endIndex + 1); + endIndex = _.sortedLastIndex(this.highBuffer, testValue, this.sortField); + added = this.highBuffer.splice(0, endIndex); + this.telemetry = this.telemetry.concat(added); } - if (discarded.length > 0){ + if (discarded && discarded.length > 0){ this.emit('discarded', discarded); } - if (added.length > 0){ + if (added && added.length > 0) { this.emit('added', added); } this.lastBounds = bounds; }; - /*collection.on('added'); - collection.on('discarded');*/ - TelemetryCollection.prototype.inBounds = function (element) { var noBoundsDefined = !this.lastBounds || (!this.lastBounds.start && !this.lastBounds.end); - var withinBounds = _.get(element, this.sortField) >= this.lastBounds.start && + var withinBounds = + _.get(element, this.sortField) >= this.lastBounds.start && _.get(element, this.sortField) <= this.lastBounds.end; return noBoundsDefined || withinBounds; }; - //Todo: addAll for initial historical data - TelemetryCollection.prototype.add = function (element) { - // Going to check for duplicates. Bound the search problem to - // elements around the given time. Use sortedIndex because it - // employs a binary search which is O(log n). Can use binary search - // based on time stamp because the array is guaranteed ordered due - // to sorted insertion. - + /** + * @private + * @param element + */ + TelemetryCollection.prototype.addOne = function (element) { var isDuplicate = false; - var startIx = _.sortedIndex(this.telemetry, element, this.sortField); + var boundsDefined = this.lastBounds && (this.lastBounds.start && this.lastBounds.end); + var array; - if (startIx !== this.telemetry.length) { - var endIx = _.sortedLastIndex(this.telemetry, element, this.sortField); + // Insert into either in-bounds array, or the out of bounds high buffer. + // Data in the high buffer will be re-evaluated for possible insertion on next tick - // Create an array of potential dupes, based on having the - // same time stamp - var potentialDupes = this.telemetry.slice(startIx, endIx + 1); - // Search potential dupes for exact dupe - isDuplicate = _.findIndex(potentialDupes, _.isEqual.bind(undefined, element)) > -1; + if (boundsDefined) { + var boundsHigh = _.get(element, this.sortField) > this.lastBounds.end; + var boundsLow = _.get(element, this.sortField) < this.lastBounds.start; + + if (!boundsHigh && !boundsLow) { + array = this.telemetry; + } else if (boundsHigh) { + array = this.highBuffer; + } + } else { + array = this.highBuffer; } - if (!isDuplicate) { - this.telemetry.splice(startIx, 0, element); + // If out of bounds low, disregard data + if (!boundsLow) { + // Going to check for duplicates. Bound the search problem to + // elements around the given time. Use sortedIndex because it + // employs a binary search which is O(log n). Can use binary search + // based on time stamp because the array is guaranteed ordered due + // to sorted insertion. - if (this.inBounds(element)) { - // If new element is within bounds, then the index within the - // master of the last element in bounds has just increased by one. - this.lastEndIndex++; - //If the new element is within bounds, add it immediately - this.emit('added', [element]); + var startIx = _.sortedIndex(array, element, this.sortField); + + if (startIx !== array.length) { + var endIx = _.sortedLastIndex(array, element, this.sortField); + + // Create an array of potential dupes, based on having the + // same time stamp + var potentialDupes = array.slice(startIx, endIx + 1); + // Search potential dupes for exact dupe + isDuplicate = _.findIndex(potentialDupes, _.isEqual.bind(undefined, element)) > -1; } + + if (!isDuplicate) { + array.splice(startIx, 0, element); + + //Return true if it was added and in bounds + return array === this.telemetry; + } + } + return false; + }; + + TelemetryCollection.prototype.addAll = function (elements) { + var added = elements.filter(this.addOne); + this.emit('added', added); + }; + + //Todo: addAll for initial historical data + TelemetryCollection.prototype.add = function (element) { + if (this.addOne(element)){ + this.emit('added', [element]); + return true; + } else { + return false; } }; TelemetryCollection.prototype.clear = function () { - this.telemetry = undefined; + this.telemetry = []; }; TelemetryCollection.prototype.sort = function (sortField){ diff --git a/platform/features/table/src/controllers/MCTTableController.js b/platform/features/table/src/controllers/MCTTableController.js index 7ae11eae0c..af942f8b55 100644 --- a/platform/features/table/src/controllers/MCTTableController.js +++ b/platform/features/table/src/controllers/MCTTableController.js @@ -31,32 +31,32 @@ define( //Bind all class functions to 'this' _.bindAll(this, [ - 'destroyConductorListeners', - 'changeTimeSystem', - 'scrollToBottom', - 'addRow', - 'removeRows', - 'onScroll', - 'firstVisible', - 'lastVisible', - 'setVisibleRows', - 'setHeaders', - 'setElementSizes', + 'addRows', 'binarySearch', - 'insertSorted', - 'sortComparator', - 'sortRows', 'buildLargestRow', - 'resize', - 'filterAndSort', - 'setRows', - 'filterRows', - 'scrollToRow', - 'setTimeOfInterestRow', - 'changeTimeOfInterest', 'changeBounds', + 'changeTimeOfInterest', + 'changeTimeSystem', + 'destroyConductorListeners', + 'digest', + 'filterAndSort', + 'filterRows', + 'firstVisible', + 'insertSorted', + 'lastVisible', 'onRowClick', - 'digest' + 'onScroll', + 'removeRows', + 'resize', + 'scrollToBottom', + 'scrollToRow', + 'setElementSizes', + 'setHeaders', + 'setRows', + 'setTimeOfInterestRow', + 'setVisibleRows', + 'sortComparator', + 'sortRows' ]); this.scrollable.on('scroll', this.onScroll); @@ -125,7 +125,7 @@ define( /* * Listen for rows added individually (eg. for real-time tables) */ - $scope.$on('add:row', this.addRow); + $scope.$on('add:rows', this.addRows); $scope.$on('remove:rows', this.removeRows); /** @@ -199,16 +199,13 @@ define( * `add:row` broadcast event. * @private */ - MCTTableController.prototype.addRow = function (event, rowIndex) { - var row = this.$scope.rows[rowIndex]; - + MCTTableController.prototype.addRows = function (event, rows) { //Does the row pass the current filter? - if (this.filterRows([row]).length === 1) { - //Insert the row into the correct position in the array - this.insertSorted(this.$scope.displayRows, row); + if (this.filterRows(rows).length > 0) { + rows.forEach(this.insertSorted.bind(this, this.$scope.displayRows)); //Resize the columns , then update the rows visible in the table - this.resize([this.$scope.sizingRow, row]) + this.resize([this.$scope.sizingRow].concat(rows)) .then(this.setVisibleRows) .then(function () { if (this.$scope.autoScroll) { @@ -220,7 +217,6 @@ define( if (toi !== -1) { this.setTimeOfInterestRow(toi); } - } }; diff --git a/platform/features/table/src/controllers/TelemetryTableController.js b/platform/features/table/src/controllers/TelemetryTableController.js index bd98ad6976..d4414b1004 100644 --- a/platform/features/table/src/controllers/TelemetryTableController.js +++ b/platform/features/table/src/controllers/TelemetryTableController.js @@ -79,7 +79,9 @@ define( 'getHistoricalData', 'subscribeToNewData', 'changeBounds', - 'setScroll' + 'setScroll', + 'addRowsToTable', + 'removeRowsFromTable', ]); this.getData(); @@ -88,6 +90,9 @@ define( this.openmct.conductor.on('follow', this.setScroll); this.setScroll(this.openmct.conductor.follow()); + this.telemetry.on('added', this.addRowsToTable); + this.telemetry.on('discarded', this.removeRowsFromTable); + this.$scope.$on("$destroy", this.destroy); } @@ -102,16 +107,19 @@ define( */ TelemetryTableController.prototype.sortByTimeSystem = function (timeSystem) { var scope = this.$scope; + var sortColumn = undefined; scope.defaultSort = undefined; + if (timeSystem) { this.table.columns.forEach(function (column) { if (column.metadata.key === timeSystem.metadata.key) { - scope.defaultSort = column.getTitle(); + sortColumn = column; } }); - this.$scope.rows = _.sortBy(this.$scope.rows, function (row) { - return row[this.$scope.defaultSort]; - }); + if (sortColumn) { + scope.defaultSort = sortColumn.getTitle(); + this.telemetry.sort(sortColumn.getTitle() + '.value'); + } } }; @@ -138,31 +146,23 @@ define( this.openmct.conductor.on('bounds', this.changeBounds); }; + TelemetryTableController.prototype.addRowsToTable = function (rows) { + this.$scope.$broadcast('add:rows', rows); + }; + + TelemetryTableController.prototype.removeRowsFromTable = function (rows) { + this.$scope.$broadcast('remove:rows', rows); + }; + TelemetryTableController.prototype.changeBounds = function (bounds) { //console.log('bounds.end: ' + bounds.end); var follow = this.openmct.conductor.follow(); var isTick = follow && bounds.start !== this.lastBounds.start && bounds.end !== this.lastBounds.end; - var isDeltaChange = follow && - !isTick && - (bounds.start !== this.lastBounds.start || - bounds.end !== this.lastBounds.end); - - var discarded = this.telemetry.bounds(bounds); - - if (discarded.length > 0){ - this.$scope.$broadcast('remove:rows', discarded); - } if (isTick){ - // Treat it as a realtime tick - // Drop old data that falls outside of bounds - //this.tick(bounds); - } else if (isDeltaChange){ - // No idea... - // Historical query for bounds, then tick on - this.getData(); + this.telemetry.bounds(bounds); } else { // Is fixed bounds change this.getData(); @@ -212,19 +212,23 @@ define( var allColumns = telemetryApi.commonValuesForHints(metadatas, []); this.table.populateColumns(allColumns); - this.timeColumns = telemetryApi.commonValuesForHints(metadatas, ['x']).map(function (metadatum) { + + var domainColumns = telemetryApi.commonValuesForHints(metadatas, ['x']); + this.timeColumns = domainColumns.map(function (metadatum) { return metadatum.name; }); - // For now, use first time field for time conductor - this.telemetry.sort(this.timeColumns[0] + '.value'); - this.filterColumns(); var timeSystem = this.openmct.conductor.timeSystem(); if (timeSystem) { this.sortByTimeSystem(timeSystem); } + if (!this.telemetry.sortColumn && domainColumns.length > 0) { + this.telemetry.sort(domainColumns[0].name + '.value'); + } + + } return objects; }; @@ -245,6 +249,7 @@ define( return new Promise(function (resolve, reject){ function finishProcessing(){ + telemetryCollection.addAll(rowData); scope.rows = telemetryCollection.telemetry; scope.loading = false; resolve(scope.rows); @@ -258,11 +263,9 @@ define( finishProcessing(); } } else { - historicalData.slice(index, index + this.batchSize) - .forEach(function (datum) { - telemetryCollection.add(this.table.getRowValues( - limitEvaluator, datum)); - }.bind(this)); + rowData = rowData.concat(historicalData.slice(index, index + this.batchSize) + .map(this.table.getRowValues.bind(this.table, limitEvaluator))); + this.timeoutHandle = this.$timeout(processData.bind( this, historicalData, @@ -300,6 +303,7 @@ define( }.bind(this)); }; + /** * @private * @param objects @@ -348,6 +352,9 @@ define( var scope = this.$scope; var newObject = this.newObject; + this.telemetry.clear(); + this.telemetry.bounds(this.openmct.conductor.bounds()); + this.$scope.loading = true; function error(e) { @@ -384,7 +391,7 @@ define( getDomainObjects() .then(filterForTelemetry) .then(this.loadColumns) - .then(this.subscribeToNewData) + //.then(this.subscribeToNewData) .then(this.getHistoricalData) .catch(error) }; diff --git a/platform/features/table/test/controllers/MCTTableControllerSpec.js b/platform/features/table/test/controllers/MCTTableControllerSpec.js index 61e4a2eece..970ca72047 100644 --- a/platform/features/table/test/controllers/MCTTableControllerSpec.js +++ b/platform/features/table/test/controllers/MCTTableControllerSpec.js @@ -465,20 +465,20 @@ define( mockScope.displayRows = controller.sortRows(testRows.slice(0)); mockScope.rows.push(row4); - controller.addRow(undefined, mockScope.rows.length - 1); + controller.addRows(undefined, mockScope.rows.length - 1); expect(mockScope.displayRows[0].col2.text).toEqual('xyz'); mockScope.rows.push(row5); - controller.addRow(undefined, mockScope.rows.length - 1); + controller.addRows(undefined, mockScope.rows.length - 1); expect(mockScope.displayRows[4].col2.text).toEqual('aaa'); mockScope.rows.push(row6); - controller.addRow(undefined, mockScope.rows.length - 1); + controller.addRows(undefined, mockScope.rows.length - 1); expect(mockScope.displayRows[2].col2.text).toEqual('ggg'); //Add a duplicate row mockScope.rows.push(row6); - controller.addRow(undefined, mockScope.rows.length - 1); + controller.addRows(undefined, mockScope.rows.length - 1); expect(mockScope.displayRows[2].col2.text).toEqual('ggg'); expect(mockScope.displayRows[3].col2.text).toEqual('ggg'); }); @@ -494,12 +494,12 @@ define( mockScope.displayRows = controller.filterRows(testRows); mockScope.rows.push(row5); - controller.addRow(undefined, mockScope.rows.length - 1); + controller.addRows(undefined, mockScope.rows.length - 1); expect(mockScope.displayRows.length).toBe(2); expect(mockScope.displayRows[1].col2.text).toEqual('aaa'); mockScope.rows.push(row6); - controller.addRow(undefined, mockScope.rows.length - 1); + controller.addRows(undefined, mockScope.rows.length - 1); expect(mockScope.displayRows.length).toBe(2); //Row was not added because does not match filter }); @@ -513,11 +513,11 @@ define( mockScope.displayRows = testRows.slice(0); mockScope.rows.push(row5); - controller.addRow(undefined, mockScope.rows.length - 1); + controller.addRows(undefined, mockScope.rows.length - 1); expect(mockScope.displayRows[3].col2.text).toEqual('aaa'); mockScope.rows.push(row6); - controller.addRow(undefined, mockScope.rows.length - 1); + controller.addRows(undefined, mockScope.rows.length - 1); expect(mockScope.displayRows[4].col2.text).toEqual('ggg'); }); @@ -536,7 +536,7 @@ define( mockScope.displayRows = testRows.slice(0); mockScope.rows.push(row7); - controller.addRow(undefined, mockScope.rows.length - 1); + controller.addRows(undefined, mockScope.rows.length - 1); expect(controller.$scope.sizingRow.col2).toEqual({text: 'some longer string'}); });