Use Server-Sent Events in the Couch DB adapter (#4427)
* Use SSE instead of chunked HTTP response. Co-authored-by: Joshi <simplyrender@gmail.com>
This commit is contained in:
@@ -38,6 +38,8 @@ class CouchObjectProvider {
|
||||
this.objectQueue = {};
|
||||
this.observers = {};
|
||||
this.batchIds = [];
|
||||
this.onEventMessage = this.onEventMessage.bind(this);
|
||||
this.onEventError = this.onEventError.bind(this);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -50,7 +52,7 @@ class CouchObjectProvider {
|
||||
// eslint-disable-next-line no-undef
|
||||
const sharedWorkerURL = `${this.openmct.getAssetPath()}${__OPENMCT_ROOT_RELATIVE__}couchDBChangesFeed.js`;
|
||||
|
||||
sharedWorker = new SharedWorker(sharedWorkerURL);
|
||||
sharedWorker = new SharedWorker(sharedWorkerURL, 'CouchDB SSE Shared Worker');
|
||||
sharedWorker.port.onmessage = provider.onSharedWorkerMessage.bind(this);
|
||||
sharedWorker.port.onmessageerror = provider.onSharedWorkerMessageError.bind(this);
|
||||
sharedWorker.port.start();
|
||||
@@ -70,6 +72,13 @@ class CouchObjectProvider {
|
||||
console.log('Error', event);
|
||||
}
|
||||
|
||||
isSynchronizedObject(object) {
|
||||
return (object && object.type
|
||||
&& this.openmct.objects.SYNCHRONIZED_OBJECT_TYPES
|
||||
&& this.openmct.objects.SYNCHRONIZED_OBJECT_TYPES.includes(object.type));
|
||||
|
||||
}
|
||||
|
||||
onSharedWorkerMessage(event) {
|
||||
if (event.data.type === 'connection') {
|
||||
this.changesFeedSharedWorkerConnectionId = event.data.connectionId;
|
||||
@@ -86,7 +95,9 @@ class CouchObjectProvider {
|
||||
if (observersForObject) {
|
||||
observersForObject.forEach(async (observer) => {
|
||||
const updatedObject = await this.get(objectChanges.identifier);
|
||||
observer(updatedObject);
|
||||
if (this.isSynchronizedObject(updatedObject)) {
|
||||
observer(updatedObject);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -390,38 +401,16 @@ class CouchObjectProvider {
|
||||
* @private
|
||||
*/
|
||||
observeObjectChanges() {
|
||||
|
||||
let filter = {selector: {}};
|
||||
|
||||
if (this.openmct.objects.SYNCHRONIZED_OBJECT_TYPES.length > 1) {
|
||||
filter.selector.$or = this.openmct.objects.SYNCHRONIZED_OBJECT_TYPES
|
||||
.map(type => {
|
||||
return {
|
||||
'model': {
|
||||
type
|
||||
}
|
||||
};
|
||||
});
|
||||
} else {
|
||||
filter.selector.model = {
|
||||
type: this.openmct.objects.SYNCHRONIZED_OBJECT_TYPES[0]
|
||||
};
|
||||
}
|
||||
|
||||
// feed=continuous maintains an indefinitely open connection with a keep-alive of HEARTBEAT milliseconds until this client closes the connection
|
||||
// style=main_only returns only the current winning revision of the document
|
||||
let url = `${this.url}/_changes?feed=continuous&style=main_only&heartbeat=${HEARTBEAT}`;
|
||||
|
||||
let body = {};
|
||||
if (filter) {
|
||||
url = `${url}&filter=_selector`;
|
||||
body = JSON.stringify(filter);
|
||||
}
|
||||
const sseChangesPath = `${this.url}/_changes`;
|
||||
const sseURL = new URL(sseChangesPath);
|
||||
sseURL.searchParams.append('feed', 'eventsource');
|
||||
sseURL.searchParams.append('style', 'main_only');
|
||||
sseURL.searchParams.append('heartbeat', HEARTBEAT);
|
||||
|
||||
if (typeof SharedWorker === 'undefined') {
|
||||
this.fetchChanges(url, body);
|
||||
this.fetchChanges(sseURL.toString());
|
||||
} else {
|
||||
this.initiateSharedWorkerFetchChanges(url, body);
|
||||
this.initiateSharedWorkerFetchChanges(sseURL.toString());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -429,7 +418,7 @@ class CouchObjectProvider {
|
||||
/**
|
||||
* @private
|
||||
*/
|
||||
initiateSharedWorkerFetchChanges(url, body) {
|
||||
initiateSharedWorkerFetchChanges(url) {
|
||||
if (!this.changesFeedSharedWorker) {
|
||||
this.changesFeedSharedWorker = this.startSharedWorker();
|
||||
|
||||
@@ -443,17 +432,40 @@ class CouchObjectProvider {
|
||||
|
||||
this.changesFeedSharedWorker.port.postMessage({
|
||||
request: 'changes',
|
||||
body,
|
||||
url
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fetchChanges(url, body) {
|
||||
const controller = new AbortController();
|
||||
const signal = controller.signal;
|
||||
onEventError(error) {
|
||||
console.error('Error on feed', error);
|
||||
if (Object.keys(this.observers).length > 0) {
|
||||
this.observeObjectChanges();
|
||||
}
|
||||
}
|
||||
|
||||
let error = false;
|
||||
onEventMessage(event) {
|
||||
const object = JSON.parse(event.data);
|
||||
object.identifier = {
|
||||
namespace: this.namespace,
|
||||
key: object.id
|
||||
};
|
||||
let keyString = this.openmct.objects.makeKeyString(object.identifier);
|
||||
let observersForObject = this.observers[keyString];
|
||||
|
||||
if (observersForObject) {
|
||||
observersForObject.forEach(async (observer) => {
|
||||
const updatedObject = await this.get(object.identifier);
|
||||
if (this.isSynchronizedObject(updatedObject)) {
|
||||
observer(updatedObject);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fetchChanges(url) {
|
||||
const controller = new AbortController();
|
||||
let couchEventSource;
|
||||
|
||||
if (this.isObservingObjectChanges()) {
|
||||
this.stopObservingObjectChanges();
|
||||
@@ -461,66 +473,18 @@ class CouchObjectProvider {
|
||||
|
||||
this.stopObservingObjectChanges = () => {
|
||||
controller.abort();
|
||||
couchEventSource.removeEventListener('message', this.onEventMessage);
|
||||
delete this.stopObservingObjectChanges;
|
||||
};
|
||||
|
||||
const response = await fetch(url, {
|
||||
method: 'POST',
|
||||
signal,
|
||||
headers: {
|
||||
"Content-Type": 'application/json'
|
||||
},
|
||||
body
|
||||
});
|
||||
console.debug('⇿ Opening CouchDB change feed connection ⇿');
|
||||
|
||||
let reader;
|
||||
couchEventSource = new EventSource(url);
|
||||
couchEventSource.onerror = this.onEventError;
|
||||
|
||||
if (response.body === undefined) {
|
||||
error = true;
|
||||
} else {
|
||||
reader = response.body.getReader();
|
||||
}
|
||||
|
||||
while (!error) {
|
||||
const {done, value} = await reader.read();
|
||||
//done is true when we lose connection with the provider
|
||||
if (done) {
|
||||
error = true;
|
||||
}
|
||||
|
||||
if (value) {
|
||||
let chunk = new Uint8Array(value.length);
|
||||
chunk.set(value, 0);
|
||||
const decodedChunk = new TextDecoder("utf-8").decode(chunk).split('\n');
|
||||
if (decodedChunk.length && decodedChunk[decodedChunk.length - 1] === '') {
|
||||
decodedChunk.forEach((doc, index) => {
|
||||
try {
|
||||
const object = JSON.parse(doc);
|
||||
object.identifier = {
|
||||
namespace: this.namespace,
|
||||
key: object.id
|
||||
};
|
||||
let keyString = this.openmct.objects.makeKeyString(object.identifier);
|
||||
let observersForObject = this.observers[keyString];
|
||||
|
||||
if (observersForObject) {
|
||||
observersForObject.forEach(async (observer) => {
|
||||
const updatedObject = await this.get(object.identifier);
|
||||
observer(updatedObject);
|
||||
});
|
||||
}
|
||||
} catch (e) {
|
||||
//do nothing;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if (error && Object.keys(this.observers).length > 0) {
|
||||
this.observeObjectChanges();
|
||||
}
|
||||
// start listening for events
|
||||
couchEventSource.addEventListener('message', this.onEventMessage);
|
||||
console.debug('⇿ Opened connection ⇿');
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user