Merge remote-tracking branch 'origin/open751' into open-master
This commit is contained in:
71
platform/telemetry/src/TelemetryQueue.js
Normal file
71
platform/telemetry/src/TelemetryQueue.js
Normal file
@@ -0,0 +1,71 @@
|
||||
/*global define*/
|
||||
|
||||
define(
|
||||
[],
|
||||
function () {
|
||||
"use strict";
|
||||
|
||||
/**
|
||||
* Supports TelemetrySubscription. Provides a simple data structure
|
||||
* (with a pool-like interface) that aggregates key-value pairs into
|
||||
* a queued series of large objects, ensuring that no value is
|
||||
* overwritten (but consolidated non-overlapping keys into single
|
||||
* objects.)
|
||||
* @constructor
|
||||
*/
|
||||
function TelemetryQueue() {
|
||||
var queue = [];
|
||||
|
||||
// Look up an object in the queue that does not have a value
|
||||
// assigned to this key (or, add a new one)
|
||||
function getFreeObject(key) {
|
||||
var index = 0, object;
|
||||
|
||||
// Look for an existing queue position where we can store
|
||||
// a value to this key without overwriting an existing value.
|
||||
for (index = 0; index < queue.length; index += 1) {
|
||||
if (queue[index][key] === undefined) {
|
||||
return queue[index];
|
||||
}
|
||||
}
|
||||
|
||||
// If we made it through the loop, values have been assigned
|
||||
// to that key in all queued containers, so we need to queue
|
||||
// up a new container for key-value pairs.
|
||||
object = {};
|
||||
queue.push(object);
|
||||
return object;
|
||||
}
|
||||
|
||||
return {
|
||||
/**
|
||||
* Check if any value groups remain in this pool.
|
||||
* @return {boolean} true if value groups remain
|
||||
*/
|
||||
isEmpty: function () {
|
||||
return queue.length < 1;
|
||||
},
|
||||
/**
|
||||
* Retrieve the next value group from this pool.
|
||||
* This gives an object containing key-value pairs,
|
||||
* where keys and values correspond to the arguments
|
||||
* given to previous put functions.
|
||||
* @return {object} key-value pairs
|
||||
*/
|
||||
poll: function () {
|
||||
return queue.shift();
|
||||
},
|
||||
/**
|
||||
* Put a key-value pair into the pool.
|
||||
* @param {string} key the key to store the value under
|
||||
* @param {*} value the value to store
|
||||
*/
|
||||
put: function (key, value) {
|
||||
getFreeObject(key)[key] = value;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
return TelemetryQueue;
|
||||
}
|
||||
);
|
||||
@@ -32,18 +32,23 @@ define(
|
||||
* associated telemetry data is of interest
|
||||
* @param {Function} callback a function to invoke
|
||||
* when new data has become available.
|
||||
* @param {boolean} lossless flag to indicate whether the
|
||||
* callback should be notified for all values
|
||||
* (otherwise, multiple values in quick succession
|
||||
* will call back with only the latest value.)
|
||||
* @returns {TelemetrySubscription} the subscription,
|
||||
* which will provide access to latest values.
|
||||
*
|
||||
* @method
|
||||
* @memberof TelemetrySubscriber
|
||||
*/
|
||||
subscribe: function (domainObject, callback) {
|
||||
subscribe: function (domainObject, callback, lossless) {
|
||||
return new TelemetrySubscription(
|
||||
$q,
|
||||
$timeout,
|
||||
domainObject,
|
||||
callback
|
||||
callback,
|
||||
lossless
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
/*global define*/
|
||||
|
||||
define(
|
||||
[],
|
||||
function () {
|
||||
['./TelemetryQueue', './TelemetryTable'],
|
||||
function (TelemetryQueue, TelemetryTable) {
|
||||
"use strict";
|
||||
|
||||
|
||||
@@ -25,11 +25,17 @@ define(
|
||||
* associated telemetry data is of interest
|
||||
* @param {Function} callback a function to invoke
|
||||
* when new data has become available.
|
||||
* @param {boolean} lossless true if callback should be invoked
|
||||
* once with every data point available; otherwise, multiple
|
||||
* data events in a short period of time will only invoke
|
||||
* the callback once, with access to the latest data
|
||||
*/
|
||||
function TelemetrySubscription($q, $timeout, domainObject, callback) {
|
||||
function TelemetrySubscription($q, $timeout, domainObject, callback, lossless) {
|
||||
var unsubscribePromise,
|
||||
latestValues = {},
|
||||
telemetryObjects = [],
|
||||
pool = lossless ? new TelemetryQueue() : new TelemetryTable(),
|
||||
metadatas,
|
||||
updatePending;
|
||||
|
||||
// Look up domain objects which have telemetry capabilities.
|
||||
@@ -55,10 +61,22 @@ define(
|
||||
});
|
||||
}
|
||||
|
||||
function updateValuesFromPool() {
|
||||
var values = pool.poll();
|
||||
Object.keys(values).forEach(function (k) {
|
||||
latestValues[k] = values[k];
|
||||
});
|
||||
}
|
||||
|
||||
// Invoke the observer callback to notify that new streaming
|
||||
// data has become available.
|
||||
function fireCallback() {
|
||||
callback();
|
||||
// Play back from queue if we are lossless
|
||||
while (!pool.isEmpty()) {
|
||||
updateValuesFromPool();
|
||||
callback();
|
||||
}
|
||||
|
||||
// Clear the pending flag so that future updates will
|
||||
// schedule this callback.
|
||||
updatePending = false;
|
||||
@@ -79,10 +97,10 @@ define(
|
||||
|
||||
// Update the latest-value table
|
||||
if (count > 0) {
|
||||
latestValues[domainObject.getId()] = {
|
||||
pool.put(domainObject.getId(), {
|
||||
domain: telemetry.getDomainValue(count - 1),
|
||||
range: telemetry.getRangeValue(count - 1)
|
||||
};
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -96,6 +114,14 @@ define(
|
||||
});
|
||||
}
|
||||
|
||||
// Look up metadata associated with an object's telemetry
|
||||
function lookupMetadata(domainObject) {
|
||||
var telemetryCapability =
|
||||
domainObject.getCapability("telemetry");
|
||||
return telemetryCapability &&
|
||||
telemetryCapability.getMetadata();
|
||||
}
|
||||
|
||||
// Prepare subscriptions to all relevant telemetry-providing
|
||||
// domain objects.
|
||||
function subscribeAll(domainObjects) {
|
||||
@@ -108,6 +134,7 @@ define(
|
||||
// to return a non-Promise to simplify usage elsewhere.
|
||||
function cacheObjectReferences(objects) {
|
||||
telemetryObjects = objects;
|
||||
metadatas = objects.map(lookupMetadata);
|
||||
return objects;
|
||||
}
|
||||
|
||||
@@ -189,6 +216,21 @@ define(
|
||||
*/
|
||||
getTelemetryObjects: function () {
|
||||
return telemetryObjects;
|
||||
},
|
||||
/**
|
||||
* Get all telemetry metadata associated with
|
||||
* telemetry-providing domain objects managed by
|
||||
* this controller.
|
||||
*
|
||||
* This will ordered in the
|
||||
* same manner as `getTelemetryObjects()` or
|
||||
* `getResponse()`; that is, the metadata at a
|
||||
* given index will correspond to the telemetry-providing
|
||||
* domain object at the same index.
|
||||
* @returns {Array} an array of metadata objects
|
||||
*/
|
||||
getMetadata: function () {
|
||||
return metadatas;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
53
platform/telemetry/src/TelemetryTable.js
Normal file
53
platform/telemetry/src/TelemetryTable.js
Normal file
@@ -0,0 +1,53 @@
|
||||
/*global define*/
|
||||
|
||||
define(
|
||||
[],
|
||||
function () {
|
||||
"use strict";
|
||||
|
||||
/**
|
||||
* Supports TelemetrySubscription. Provides a simple data structure
|
||||
* (with a pool-like interface) that aggregates key-value pairs into
|
||||
* one large object, overwriting new values as necessary. Stands
|
||||
* in contrast to the TelemetryQueue, which will avoid overwriting
|
||||
* values.
|
||||
* @constructor
|
||||
*/
|
||||
function TelemetryTable() {
|
||||
var table;
|
||||
|
||||
return {
|
||||
/**
|
||||
* Check if any value groups remain in this pool.
|
||||
* @return {boolean} true if value groups remain
|
||||
*/
|
||||
isEmpty: function () {
|
||||
return !table;
|
||||
},
|
||||
/**
|
||||
* Retrieve the next value group from this pool.
|
||||
* This gives an object containing key-value pairs,
|
||||
* where keys and values correspond to the arguments
|
||||
* given to previous put functions.
|
||||
* @return {object} key-value pairs
|
||||
*/
|
||||
poll: function () {
|
||||
var t = table;
|
||||
table = undefined;
|
||||
return t;
|
||||
},
|
||||
/**
|
||||
* Put a key-value pair into the pool.
|
||||
* @param {string} key the key to store the value under
|
||||
* @param {*} value the value to store
|
||||
*/
|
||||
put: function (key, value) {
|
||||
table = table || {};
|
||||
table[key] = value;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
return TelemetryTable;
|
||||
}
|
||||
);
|
||||
55
platform/telemetry/test/TelemetryQueueSpec.js
Normal file
55
platform/telemetry/test/TelemetryQueueSpec.js
Normal file
@@ -0,0 +1,55 @@
|
||||
/*global define,Promise,describe,it,expect,beforeEach,waitsFor,jasmine*/
|
||||
|
||||
define(
|
||||
["../src/TelemetryQueue"],
|
||||
function (TelemetryQueue) {
|
||||
"use strict";
|
||||
|
||||
describe("The telemetry queue", function () {
|
||||
var queue;
|
||||
|
||||
beforeEach(function () {
|
||||
// put, isEmpty, dequeue
|
||||
queue = new TelemetryQueue();
|
||||
});
|
||||
|
||||
it("stores elements by key", function () {
|
||||
queue.put("a", { someKey: "some value" });
|
||||
expect(queue.poll())
|
||||
.toEqual({ a: { someKey: "some value" }});
|
||||
});
|
||||
|
||||
it("merges non-overlapping keys", function () {
|
||||
queue.put("a", { someKey: "some value" });
|
||||
queue.put("b", 42);
|
||||
expect(queue.poll())
|
||||
.toEqual({ a: { someKey: "some value" }, b: 42 });
|
||||
});
|
||||
|
||||
it("adds new objects for repeated keys", function () {
|
||||
queue.put("a", { someKey: "some value" });
|
||||
queue.put("a", { someKey: "some other value" });
|
||||
queue.put("b", 42);
|
||||
expect(queue.poll())
|
||||
.toEqual({ a: { someKey: "some value" }, b: 42 });
|
||||
expect(queue.poll())
|
||||
.toEqual({ a: { someKey: "some other value" } });
|
||||
});
|
||||
|
||||
it("reports emptiness", function () {
|
||||
expect(queue.isEmpty()).toBeTruthy();
|
||||
queue.put("a", { someKey: "some value" });
|
||||
queue.put("a", { someKey: "some other value" });
|
||||
queue.put("b", 42);
|
||||
expect(queue.isEmpty()).toBeFalsy();
|
||||
queue.poll();
|
||||
expect(queue.isEmpty()).toBeFalsy();
|
||||
queue.poll();
|
||||
expect(queue.isEmpty()).toBeTruthy();
|
||||
});
|
||||
|
||||
|
||||
});
|
||||
|
||||
}
|
||||
);
|
||||
@@ -13,6 +13,7 @@ define(
|
||||
mockTelemetry,
|
||||
mockUnsubscribe,
|
||||
mockSeries,
|
||||
testMetadata,
|
||||
subscription;
|
||||
|
||||
function mockPromise(value) {
|
||||
@@ -24,6 +25,8 @@ define(
|
||||
}
|
||||
|
||||
beforeEach(function () {
|
||||
testMetadata = { someKey: "some value" };
|
||||
|
||||
mockQ = jasmine.createSpyObj("$q", ["when", "all"]);
|
||||
mockTimeout = jasmine.createSpy("$timeout");
|
||||
mockDomainObject = jasmine.createSpyObj(
|
||||
@@ -33,7 +36,7 @@ define(
|
||||
mockCallback = jasmine.createSpy("callback");
|
||||
mockTelemetry = jasmine.createSpyObj(
|
||||
"telemetry",
|
||||
["subscribe"]
|
||||
["subscribe", "getMetadata"]
|
||||
);
|
||||
mockUnsubscribe = jasmine.createSpy("unsubscribe");
|
||||
mockSeries = jasmine.createSpyObj(
|
||||
@@ -48,6 +51,7 @@ define(
|
||||
mockDomainObject.getId.andReturn('test-id');
|
||||
|
||||
mockTelemetry.subscribe.andReturn(mockUnsubscribe);
|
||||
mockTelemetry.getMetadata.andReturn(testMetadata);
|
||||
|
||||
mockSeries.getPointCount.andReturn(42);
|
||||
mockSeries.getDomainValue.andReturn(123456);
|
||||
@@ -120,6 +124,52 @@ define(
|
||||
// Should have no objects
|
||||
expect(subscription.getTelemetryObjects()).toEqual([]);
|
||||
});
|
||||
|
||||
// This test case corresponds to plot usage of
|
||||
// telemetrySubscription, where failure to callback
|
||||
// once-per-update results in loss of data, WTD-784
|
||||
it("fires one event per update if requested", function () {
|
||||
var i, domains = [], ranges = [], lastCall;
|
||||
|
||||
// Clear out the subscription from beforeEach
|
||||
subscription.unsubscribe();
|
||||
// Create a subscription which does not drop events
|
||||
subscription = new TelemetrySubscription(
|
||||
mockQ,
|
||||
mockTimeout,
|
||||
mockDomainObject,
|
||||
mockCallback,
|
||||
true // Don't drop updates!
|
||||
);
|
||||
|
||||
// Snapshot getDomainValue, getRangeValue at time of callback
|
||||
mockCallback.andCallFake(function () {
|
||||
domains.push(subscription.getDomainValue(mockDomainObject));
|
||||
ranges.push(subscription.getRangeValue(mockDomainObject));
|
||||
});
|
||||
|
||||
// Send 100 updates
|
||||
for (i = 0; i < 100; i += 1) {
|
||||
// Return different values to verify later
|
||||
mockSeries.getDomainValue.andReturn(i);
|
||||
mockSeries.getRangeValue.andReturn(i * 2);
|
||||
mockTelemetry.subscribe.mostRecentCall.args[0](mockSeries);
|
||||
}
|
||||
|
||||
// Fire all timeouts that get scheduled
|
||||
while (mockTimeout.mostRecentCall !== lastCall) {
|
||||
lastCall = mockTimeout.mostRecentCall;
|
||||
lastCall.args[0]();
|
||||
}
|
||||
|
||||
// Should have only triggered the
|
||||
expect(mockCallback.calls.length).toEqual(100);
|
||||
});
|
||||
|
||||
it("provides domain object metadata", function () {
|
||||
expect(subscription.getMetadata()[0])
|
||||
.toEqual(testMetadata);
|
||||
});
|
||||
});
|
||||
}
|
||||
);
|
||||
53
platform/telemetry/test/TelemetryTableSpec.js
Normal file
53
platform/telemetry/test/TelemetryTableSpec.js
Normal file
@@ -0,0 +1,53 @@
|
||||
/*global define,Promise,describe,it,expect,beforeEach,waitsFor,jasmine*/
|
||||
|
||||
define(
|
||||
["../src/TelemetryTable"],
|
||||
function (TelemetryTable) {
|
||||
"use strict";
|
||||
|
||||
describe("The telemetry table", function () {
|
||||
var queue;
|
||||
|
||||
beforeEach(function () {
|
||||
// put, isEmpty, dequeue
|
||||
queue = new TelemetryTable();
|
||||
});
|
||||
|
||||
it("stores elements by key", function () {
|
||||
queue.put("a", { someKey: "some value" });
|
||||
expect(queue.poll())
|
||||
.toEqual({ a: { someKey: "some value" }});
|
||||
});
|
||||
|
||||
it("merges non-overlapping keys", function () {
|
||||
queue.put("a", { someKey: "some value" });
|
||||
queue.put("b", 42);
|
||||
expect(queue.poll())
|
||||
.toEqual({ a: { someKey: "some value" }, b: 42 });
|
||||
});
|
||||
|
||||
it("overwrites repeated keys", function () {
|
||||
queue.put("a", { someKey: "some value" });
|
||||
queue.put("a", { someKey: "some other value" });
|
||||
queue.put("b", 42);
|
||||
expect(queue.poll())
|
||||
.toEqual({ a: { someKey: "some other value" }, b: 42 });
|
||||
expect(queue.poll())
|
||||
.toBeUndefined();
|
||||
});
|
||||
|
||||
it("reports emptiness", function () {
|
||||
expect(queue.isEmpty()).toBeTruthy();
|
||||
queue.put("a", { someKey: "some value" });
|
||||
queue.put("a", { someKey: "some other value" });
|
||||
queue.put("b", 42);
|
||||
expect(queue.isEmpty()).toBeFalsy();
|
||||
queue.poll();
|
||||
expect(queue.isEmpty()).toBeTruthy();
|
||||
});
|
||||
|
||||
|
||||
});
|
||||
|
||||
}
|
||||
);
|
||||
@@ -3,6 +3,8 @@
|
||||
"TelemetryCapability",
|
||||
"TelemetryController",
|
||||
"TelemetryFormatter",
|
||||
"TelemetryQueue",
|
||||
"TelemetrySubscriber",
|
||||
"TelemetrySubscription"
|
||||
"TelemetrySubscription",
|
||||
"TelemetryTable"
|
||||
]
|
||||
Reference in New Issue
Block a user