From 1dd06065eafe93973b2adae3d7d3e1a8410963b8 Mon Sep 17 00:00:00 2001 From: Dennis Snell Date: Sun, 18 Feb 2018 22:53:27 -0500 Subject: [PATCH] Add comments and refactor style While working on tracking down a nasty synchronization bug I could keep all the data flows and puzzle pieces together in my mind, so I wrote some of them down as best as I was understanding them. Further I refactored some code because I find the terser styles afforded by ES2015+ syntax to confuse me less, especially when following around closures with `bind` and `call` and `apply` --- src/simperium/channel.js | 343 +++++++++++++++++++++++------------ src/simperium/util/change.js | 63 +++++-- 2 files changed, 281 insertions(+), 125 deletions(-) diff --git a/src/simperium/channel.js b/src/simperium/channel.js index d2bb379..7175b27 100644 --- a/src/simperium/channel.js +++ b/src/simperium/channel.js @@ -12,26 +12,33 @@ const CODE_INVALID_VERSION = 405; const CODE_EMPTY_RESPONSE = 412; const CODE_INVALID_DIFF = 440; -var operation = { +const operation = { MODIFY: 'M', REMOVE: '-' }; -var internal = {}; +const internal = {}; internal.updateChangeVersion = function( cv ) { return this.store.setChangeVersion( cv ); }; -// Called when receive a change from the network. Attempt to apply the change -// to the ghost object and notify. -internal.changeObject = function( id, change ) { +internal.changeObject = +/** + * Called when receive a change from the network. + * Attempt to apply the change + * to the ghost object and notify. + */ +function changeObject( id, change ) { // pull out the object from the store and apply the change delta - var applyChange = internal.performChange.bind( this, change ); - - this.networkQueue.queueFor( id ).add( function( done ) { - return applyChange().then( done, done ); - } ); + this + .networkQueue + .queueFor( id ) + .add( + keepRunningQueue => internal + .performChange.call( this, change ) + .then( keepRunningQueue, keepRunningQueue ) + ); }; internal.buildModifyChange = function( id, object, ghost ) { @@ -47,6 +54,7 @@ internal.buildModifyChange = function( id, object, ghost ) { } if ( empty ) return this.emit( 'unmodified', id, object, ghost ); + payload.v && payload.v.content && console.log(payload.v.content) // if the change v is an empty object, do not send, notify? this.localQueue.queue( payload ); @@ -71,57 +79,96 @@ internal.removeAndSend = function( id, object ) { return this.store.get( id ).then( remove ); }; -// We've receive a full object from the network. Update the local instance and -// notify of the new object version -internal.updateObjectVersion = function( id, version, data, original, patch, acknowledged ) { - var notify, - changes, - change, - patch, - localModifications, - remoteModifications, - transformed, - update; +internal.updateObjectVersion = +/** + * Updates the ghost store with a received Revision of + * an Entity including its new version and its changes + * + * After updating the ghost store we will also emit + * and announce that the update occurred. + * + * @param {string} id Entity id + * @param {number} version new version after change + * @param {object} data updated data after change + * @param {object} original data from start version before change + * @param {object} patch diff-match-patch diff of received change + * @param {boolean} isAcknowledged + * @returns {Promise<*>} + */ +function updateObjectVersion( id, version, data, original, patch, isAcknowledged ) { + // no matter what happens, we need to store the new + // revision of the Entity in the ghost store + const updateGhost = this.store.put( id, version, data ); + + // If it's already been acknowledged then we can simply + // update the ghost store and update the acknowledgement + if ( isAcknowledged ) { + return updateGhost.then( + internal.updateAcknowledged.bind( this, isAcknowledged ) + ); + } + // If it's not an ack, it's a change initiated on a different client // we need to provide a way for the current client to respond to // a potential conflict if it has modifications that have not been synced - if ( !acknowledged ) { - changes = this.localQueue.dequeueChangesFor( id ); - localModifications = change_util.compressChanges( changes, original ); - remoteModifications = patch; - transformed = change_util.transform( localModifications, remoteModifications, original ); - update = data; - - // apply the transformed patch and emit the update - if ( transformed ) { - patch = transformed; - update = jsondiff.apply_object_diff( data, transformed ); - // queue up the new change - change = change_util.modify( id, version, patch ); - this.localQueue.queue( change ); - } - - notify = this.emit.bind( this, 'update', id, update, original, patch, this.bucket.isIndexing ); - } else { - notify = internal.updateAcknowledged.bind( this, acknowledged ); + /** @type Array all sent and queued local changes */ + const localChanges = this.localQueue.dequeueChangesFor( id ); + const rebasedPatch = change_util.transform( + // combine all local changes into a single patch + change_util.compressChanges( localChanges, original ), + patch, + original + ); + + // choose the final form of the updated Entity + const nextData = rebasedPatch + ? jsondiff.apply_object_diff( data, rebasedPatch ) + : data; + + // of course, if we did have to rebase our sent and queued + // changes then we assume they will fail when they arrive at + // the server because a new base version has appeared on that + // end. we'll resubmit a new patch which incorporates those + // changes which we don't know yet if they were successful + if ( rebasedPatch ) { + this.localQueue.queue( + change_util.modify( id, version, rebasedPatch ) + ); } - return this.store.put( id, version, data ).then( notify ); + return updateGhost.then( this.emit.bind( + this, + 'update', + id, nextData, original, patch, this.bucket.isIndexing + ) ); }; -internal.removeObject = function( id, acknowledged ) { - var notify; - if ( !acknowledged ) { - notify = this.emit.bind( this, 'remove', id ); - } else { - notify = internal.updateAcknowledged.bind( this, acknowledged ); - } +internal.removeObject = +/** + * Removes an Entity from the Ghost store + * + * @param {string} id Entity id + * @param {boolean} isAcknowledged + * @returns {Promise<*>} + */ +function removeObject( id, isAcknowledged ) { + // no matter what happens, we have to remove + // the Entity from our ghost store + const removeEntity = this.store.remove( id ); - return this.store.remove( id ).then( notify ); + return isAcknowledged + ? removeEntity.then( internal.updateAcknowledged.bind( this, isAcknowledged ) ) + : removeEntity.then( this.emit.bind( this, 'remove', id ) ); }; -internal.updateAcknowledged = function( change ) { - var id = change.id; +internal.updateAcknowledged = +/** + * Tells the local queue of changes to acknowledge + * that we have confirmed receipt of our change. + */ +function updateAcknowledge( change ) { + const { id } = change; + if ( this.localQueue.sent[id] === change ) { this.localQueue.acknowledge( change ); this.emit( 'acknowledge', id, change ); @@ -129,68 +176,122 @@ internal.updateAcknowledged = function( change ) { }; internal.performChange = function( change ) { - var success = internal.applyChange.bind( this, change ); - return this.store.get( change.id ).then( success ); + return this + .store + .get( change.id ) + .then( + object => internal.applyChange.call(this, change, object) + ); }; -internal.findAcknowledgedChange = function( change ) { - var possibleChange = this.localQueue.sent[change.id]; - if ( possibleChange ) { - if ( ( change.ccids || [] ).indexOf( possibleChange.ccid ) > -1 ) { - return possibleChange; - } +internal.findAcknowledgedChange = +/** + * Takes a changeset and returns indicates whether + * we have + * + * @param {Changeset} changeset possibly containing an acknowledged request + * @returns {boolean} whether or not we have seen and acknowledged this request + */ +function findAcknowledgedChange( changeset ) { + const sentChange = this.localQueue.sent[changeset.id]; + + if ( ! sentChange || ! changeset.hasOwnProperty( 'ccids' ) ) { + return null; } + + return changeset.ccids.indexOf( sentChange.ccid ) > -1 + ? sentChange + : null; }; -internal.requestObjectVersion = function( id, version ) { +internal.requestObjectVersion = +/** + * Requests a copy of a specific entity + * revision from the Simperium server + * + * @param {string} id Entity id + * @param {number} version revision version number + * @returns {Promise} + */ +function requestObjectVersion( id, version ) { return new Promise( resolve => { - this.once( `version.${ id }.${ version }`, data => { - resolve( data ); - } ); + this.once( `version.${ id }.${ version }`, resolve ); this.send( `e:${ id }.${ version }` ); } ); }; -internal.applyChange = function( change, ghost ) { - var acknowledged = internal.findAcknowledgedChange.bind( this )( change ), - error, - emit, - original, - patch, - modified; +internal.applyChange = +/** + * Applies an incoming changeset + * and updates the ghost store + * + * @param {Changeset} changeset + * @param {object} ghost most-recent Revision of Entity in ghost store + * @returns {Promise<*>} + */ +function applyChange( changeset, ghost ) { + const acknowledged = internal + .findAcknowledgedChange + .call( this, changeset ); + // attempt to apply the change // TODO: Handle errors as specified in - // 0:c:[{"ccids": ["0435edf4-3f07-4cc6-bf86-f68e6db8779c"], "id": "9e9a9616-8174-42 - // { ccids: [ '0435edf4-3f07-4cc6-bf86-f68e6db8779c' ], - // id: '9e9a9616-8174-425a-a1b0-9ed5410f1edc', - // clientid: 'node-b9776e96-c068-42ae-893a-03f50833bddb', - // error: 400 } - if ( change.error ) { - error = new Error( `${change.error} - Could not apply change to: ${ghost.key}` ); - error.code = change.error; - error.change = change; + // 0:c:[{ + // "ccids": ["0435edf4-3f07-4cc6-bf86-f68e6db8779c"], + // "id": "9e9a9616-8174-42 + // { ccids: [ '0435edf4-3f07-4cc6-bf86-f68e6db8779c' ], + // id: '9e9a9616-8174-425a-a1b0-9ed5410f1edc', + // clientid: 'node-b9776e96-c068-42ae-893a-03f50833bddb', + // error: 400 + // } + if ( changeset.error ) { + const error = new Error( `${changeset.error} - Could not apply change to: ${ghost.key}` ); + error.code = changeset.error; + error.changeset = changeset; error.ghost = ghost; - internal.handleChangeError.call( this, error, change, acknowledged ); + internal.handleChangeError.call( this, error, changeset, acknowledged ); return; } - emit = this.emit.bind( this, 'change-version', change.cv, change ); + const emit = this.emit.bind( this, 'change-version', changeset.cv, changeset ); - if ( change.o === operation.MODIFY ) { - if ( ghost && ( ghost.version !== change.sv ) ) { - internal.requestObjectVersion.call( this, change.id, change.sv ).then( data => { - internal.applyChange.call( this, change, { version: change.sv, data } ) - } ); - return; + // easy peasy if all we're doing is removing the Entity + if ( changeset.o === operation.REMOVE ) { + return internal + .removeObject + .call( this, changeset.id, acknowledged ) + .then( emit ); + } + + if ( changeset.o === operation.MODIFY ) { + /* + * If we don't have a local copy of the start version + * upon which this change was made then we have to + * request a copy of it from the network and try again + */ + if ( ghost && ( ghost.version !== changeset.sv ) ) { + return internal + .requestObjectVersion + .call( this, changeset.id, changeset.sv ) + .then( data => internal + /* + * When the updated version of the entity returns + * it does _not_ come associated with a version + * number so we have to append it here in order to + * match what we are expecting from the ghost entity + */ + .applyChange + .call( this, changeset, { version: changeset.sv, data } ) + ); } - original = ghost.data; - patch = change.v; - modified = jsondiff.apply_object_diff( original, patch ); - return internal.updateObjectVersion.bind( this )( change.id, change.ev, modified, original, patch, acknowledged ) + const original = ghost.data; + const patch = changeset.v; + const modified = jsondiff.apply_object_diff( original, patch ); + return internal + .updateObjectVersion + .call( this, changeset.id, changeset.ev, modified, original, patch, acknowledged ) .then( emit ); - } else if ( change.o === operation.REMOVE ) { - return internal.removeObject.bind( this )( change.id, acknowledged ).then( emit ); } } @@ -275,7 +376,9 @@ export default function Channel( appid, access_token, bucket, store ) { options = { sync: true }; } + console.time('update') return update.call( bucket, id, object, options, function( err, object ) { + console.timeEnd('update') if ( !err ) bucket.emit( 'update', id, object.data ); if ( !err && options.sync !== false ) bucketEvents.emit( 'update', id, object.data ); if ( callback ) callback.apply( this, arguments ); @@ -490,9 +593,7 @@ NetworkQueue.prototype.queueFor = function( id ) { if ( !queue ) { queue = new Queue(); - queue.on( 'finish', function() { - delete queues[id]; - } ); + queue.on( 'finish',() =>{ delete queues[id]; } ); queues[id] = queue; } @@ -622,46 +723,64 @@ LocalQueue.prototype.processQueue = function( id ) { this.store.get( id ).then( compressAndSend ); } -LocalQueue.prototype.compressAndSend = function( id, ghost ) { - var changes = this.queues[id]; - var change; - var target = ghost.data; - var c; - var type; - - // a change was sent before we could compress and send +LocalQueue.prototype.compressAndSend = +/** + * Compress a sequences of changes into a single patch + * and send combinde change to server for synchronizing + * + * @param {string} id Change id + * @param {object} ghost most-recent copy of Entity in ghost store + */ +function compressAndSend( id, ghost ) { + // if we've already sent this change + // then don't try again yet if ( this.sent[id] ) { this.emit( 'wait', id ); return; } + const changes = this.queues[id]; + + // if we have sent the first change from + // the sequence out already then + // don't try again yet if ( changes.length === 1 ) { - change = changes.shift(); + const change = changes.shift(); this.sent[id] = change; this.emit( 'send', change ); return; } + // if the first change tells us to remove the Entity + // then drop all further changes if ( changes.length > 1 && changes[0].type === change_util.type.REMOVE ) { - change = changes.shift(); + const change = changes.shift(); changes.splice( 0, changes.length - 1 ); this.sent[id] = change; this.emit( 'send', change ); } + // for the rest of th queued changes + // iteratively apply them to the base + // Entity from the ghost store + let target = ghost.data; while ( changes.length > 0 ) { - c = changes.shift(); + const change = changes.shift(); - if ( c.o === change_util.type.REMOVE ) { - changes.unshift( c ); + // leave the removal change in the queue? + if ( change.o === change_util.type.REMOVE ) { + changes.unshift( change ); break; } - target = jsondiff.apply_object_diff( target, c.v ); + target = jsondiff.apply_object_diff( target, change.v ); } - type = target === null ? change_util.type.REMOVE : change_util.type.MODIFY; - change = change_util.buildChange( type, id, target, ghost ); + const type = target === null + ? change_util.type.REMOVE + : change_util.type.MODIFY; + + const change = change_util.buildChange( type, id, target, ghost ); this.sent[id] = change; this.emit( 'send', change ); diff --git a/src/simperium/util/change.js b/src/simperium/util/change.js index d5105cc..0881ede 100644 --- a/src/simperium/util/change.js +++ b/src/simperium/util/change.js @@ -1,6 +1,20 @@ import uuid from 'node-uuid' import jsondiff from '../jsondiff' +/** + * @typedef {object} Change + * @property {string} id id of changed Entity + * @property {string} o type of Entity change - see below operation types + * @property {object} v jsondiff object for each property + * @property {string} ccid client-generated unique id for changeset + */ + +/** + * @typedef {object} Changeset + * @property {string} id id of Entity being changed + * @property {Array} ccids sequence of changes contained in changeset + */ + const changeTypes = { MODIFY: 'M', REMOVE: '-', @@ -42,27 +56,50 @@ function buildChangeFromOrigin( type, id, version, target, origin ) { return changeData; } -function compressChanges( changes, origin ) { - var modified; - +/** + * Combine a sequence of changes into a single change object + * + * We will take a simple approach by starting with the base Entity + * and apply each transformation to it in succession. + * At the end of the iteration we'll take a final diff against + * how that Entity started and that will equal the combined change. + * + * @TODO: Could we more easily rebase the changes without the base Entity? + * + * @param {Array} changes to apply in order + * @param {object} baseEntity start Entity which changes modify + * @returns {?Change} change representing all of the combined input changes + */ +function compressChanges( changes, baseEntity ) { + // no changes is an empty changeset - don't do anything if ( changes.length === 0 ) { return {}; } + // if we only have a single change, that's it! if ( changes.length === 1 ) { return changes[0].v; } - modified = changes.reduce( function( from, change ) { - // deletes when, any changes after a delete are ignored - if ( from === null ) return null; - if ( from.o === changeTypes.REMOVE ) return null; - return apply_object_diff( from, change.v ); - }, origin ); - - if ( modified === null ) return null; - - return object_diff( origin, modified ); + /** + * Otherwise we need to iterate + * + * At the first point in time where we remove the Entity + * we will want to short-circuit the rest of the changes. + * + * @type {?Object} the Entity as it's transformed + */ + const finalEntity = changes.reduce( ( combined, change ) => + ( null !== combined && changeTypes.REMOVE !== combined.o ) + ? apply_object_diff( combined, change.v ) + : null, + baseEntity + ); + + // null indicates that the Entity should be removed + return finalEntity !== null + ? object_diff( baseEntity, finalEntity ) + : null; } function rebase( local_diff, remote_diff, origin ) {