From 8c55a66320c1fcd92e806dcb0dce5519a82979d1 Mon Sep 17 00:00:00 2001 From: Victor Woeltjen Date: Mon, 29 Dec 2014 13:36:53 -0800 Subject: [PATCH] [Telemetry] Add telemetry subscriptions Add ability to subscribe/unsubscribe to streaming telemetry updates. This may be more performant than polling in some circumstances, and appears necessary for good performance of autoflow tabular views, WTD-614. --- .../src/SinewaveTelemetryProvider.js | 43 +++++++++++++++- platform/telemetry/src/TelemetryAggregator.js | 17 +++++++ platform/telemetry/src/TelemetryCapability.js | 49 +++++++++++++++---- 3 files changed, 99 insertions(+), 10 deletions(-) diff --git a/example/generator/src/SinewaveTelemetryProvider.js b/example/generator/src/SinewaveTelemetryProvider.js index d111bafd3d..4bb20a7354 100644 --- a/example/generator/src/SinewaveTelemetryProvider.js +++ b/example/generator/src/SinewaveTelemetryProvider.js @@ -13,6 +13,7 @@ define( * @constructor */ function SinewaveTelemetryProvider($q, $timeout) { + var subscriptions = []; // function matchesSource(request) { @@ -43,8 +44,48 @@ define( }, 0); } + function handleSubscriptions() { + subscriptions.forEach(function (subscription) { + var requests = subscription.requests; + subscription.callback(doPackage( + requests.filter(matchesSource).map(generateData) + )); + }); + } + + function startGenerating() { + $timeout(function () { + handleSubscriptions(); + if (subscriptions.length > 0) { + startGenerating(); + } + }, 1000); + } + + function subscribe(callback, requests) { + var subscription = { + callback: callback, + requests: requests + }; + + function unsubscribe() { + subscriptions = subscriptions.filter(function (s) { + return s !== subscription; + }); + } + + subscriptions.push(subscription); + + if (subscriptions.length === 1) { + startGenerating(); + } + + return unsubscribe; + } + return { - requestTelemetry: requestTelemetry + requestTelemetry: requestTelemetry, + subscribe: subscribe }; } diff --git a/platform/telemetry/src/TelemetryAggregator.js b/platform/telemetry/src/TelemetryAggregator.js index 797fc55c72..45c3a5e036 100644 --- a/platform/telemetry/src/TelemetryAggregator.js +++ b/platform/telemetry/src/TelemetryAggregator.js @@ -38,6 +38,23 @@ define( })).then(mergeResults); } + // Subscribe to updates from all providers + function subscribe(callback, requests) { + var unsubscribes = telemetryProviders.map(function (provider) { + return provider.subscribe(callback, requests); + }); + + // Return an unsubscribe function that invokes unsubscribe + // for all providers. + return function () { + unsubscribes.forEach(function (unsubscribe) { + if (unsubscribe) { + unsubscribe(); + } + }); + }; + } + return { /** * Request telemetry data. diff --git a/platform/telemetry/src/TelemetryCapability.js b/platform/telemetry/src/TelemetryCapability.js index 2cdcd23f02..86f158d0a7 100644 --- a/platform/telemetry/src/TelemetryCapability.js +++ b/platform/telemetry/src/TelemetryCapability.js @@ -16,20 +16,22 @@ define( * @constructor */ function TelemetryCapability($injector, $q, $log, domainObject) { - var telemetryService; + var telemetryService, + subscriptions = [], + unsubscribeFunction; // We could depend on telemetryService directly, but // there isn't a platform implementation of this; function getTelemetryService() { - if (!telemetryService) { + if (telemetryService === undefined) { try { telemetryService = - $q.when($injector.get("telemetryService")); + $injector.get("telemetryService"); } catch (e) { // $injector should throw is telemetryService // is unavailable or unsatisfiable. $log.warn("Telemetry service unavailable"); - telemetryService = $q.reject(e); + telemetryService = null; } } return telemetryService; @@ -83,16 +85,34 @@ define( } // Issue a request to the service - function requestTelemetryFromService(telemetryService) { + function requestTelemetryFromService() { return telemetryService.requestTelemetry([fullRequest]); } // If a telemetryService is not available, // getTelemetryService() should reject, and this should // bubble through subsequent then calls. - return getTelemetryService() - .then(requestTelemetryFromService) - .then(getRelevantResponse); + return getTelemetryService() && + requestTelemetryFromService() + .then(getRelevantResponse); + } + + // Listen for real-time and/or streaming updates + function subscribe(callback, request) { + var fullRequest = buildRequest(request || {}); + + // Unpack the relevant telemetry series + function update(telemetries) { + var source = fullRequest.source, + key = fullRequest.key, + result = ((telemetries || {})[source] || {})[key]; + if (result) { + callback(result); + } + } + + return getTelemetryService() && + telemetryService.subscribe(update, [fullRequest]); } return { @@ -115,7 +135,18 @@ define( // type-level and object-level telemetry // properties return buildRequest({}); - } + }, + + /** + * Subscribe to updates to telemetry data for this domain + * object. + * @param {Function} callback a function to call when new + * data becomes available; the telemetry series + * containing the data will be given as an argument. + * @param {TelemetryRequest} [request] parameters for the + * subscription request + */ + subscribe: subscribe }; }