diff --git a/lib/LRUCacheProxy.js b/lib/LRUCacheProxy.js index 26680ce..0c87f3f 100644 --- a/lib/LRUCacheProxy.js +++ b/lib/LRUCacheProxy.js @@ -1,4 +1,4 @@ -var LRUCache = require('lru-cache'); +var LRUCache = require('./lru-cache-extensions'); var cluster = require('cluster'); var uuid = require('node-uuid'); var pending = {}; @@ -24,12 +24,20 @@ function send(namespace, cmd) { var LRUCacheProxy = function LRUCacheProxy(options) { if (!(this instanceof LRUCacheProxy)) { if(cluster.isMaster) { + var lru = new LRUCache(options); - var lruGet = LRUCache.prototype.get; - lru.get = function(key, callback) { - callback(lruGet.apply(lru, arguments)); - }; + var methodCtor = function( baseMethod ) { + return function( ) { + var cb; + if ( typeof ( cb = arguments[ arguments.length - 1 ] ) !== "function" ) { + cb = undefined; } + var value = baseMethod.apply( this, arguments ); + cb && cb( value ); + return value; }; } + + for( var p in { "get": 1, "count": 1, "complete": 1} ) { + lru[ p ] = methodCtor( LRUCache.prototype[ p ] ); } return lru; } @@ -43,14 +51,14 @@ var LRUCacheProxy = function LRUCacheProxy(options) { }; LRUCacheProxy.prototype.get = function(key, callback) { - var failSafe = setTimeout(function() { failSafe = undefined; callback(undefined); }, 100); + var failSafe = setTimeout(function() { failSafe = undefined; callback&&callback(undefined); }, 100); send(this.namespace, 'get', key, function(result) { if(!failSafe) return; clearTimeout(failSafe); - callback(result.value); + callback&&callback(result.value); }); } @@ -58,6 +66,29 @@ LRUCacheProxy.prototype.set = function(key, value) { send(this.namespace, 'set', key, value, function() {}); } + +LRUCacheProxy.prototype.complete = function( key, seq, total, partialData, callback ) { + var failSafe = setTimeout(function() { failSafe = undefined; callback&&callback(undefined); }, 100); + send(this.namespace, 'complete', key, seq, total, partialData, function(result) { + if(!failSafe) return; + + clearTimeout(failSafe); + + callback&&callback(result.value); + }); +} + +LRUCacheProxy.prototype.count = function( key, inc, callback ) { + var failSafe = setTimeout(function() { failSafe = undefined; callback&&callback(undefined); }, 100); + send(this.namespace, 'count', key, inc, function(result) { + if(!failSafe) return; + + clearTimeout(failSafe); + + callback&&callback(result.value); + }); +} + Object.defineProperty(LRUCacheProxy.prototype, 'pendingMessages', { get : function () { return Object.keys(pending).length; }, enumerable : true }); module.exports = LRUCacheProxy; diff --git a/lib/lru-cache-cluster.js b/lib/lru-cache-cluster.js index e23b1d0..9a3d752 100644 --- a/lib/lru-cache-cluster.js +++ b/lib/lru-cache-cluster.js @@ -1,57 +1,73 @@ var cluster = require('cluster'); var LRUCache = require('lru-cache'); -var caches = {}; +var caches = {}, messages = 0, ts; if (cluster.isMaster) { cluster.on('fork', function(worker) { - worker.on('message', function(msg) { - if (msg.source !== 'lru-cache-cluster') return; - var lru = caches[msg.namespace]; - - function send(cmd, data){ - data.source = 'lru-cache-cluster'; - data.cmd = cmd; - data.namespace = lru.namespace; - data.id = msg.id; - - worker.send(data); - } - - var switcheroo = { - max: function () {}, - lengthCalculator: function () {}, - length: function () {}, - itemCount: function () {}, - forEach: function () {}, - keys: function () {}, - values: function () {}, - reset: function () {}, - dump: function () {}, - dumpLru: function () {}, - set: function (args) { - lru.set(args[0], args[1]); - - send('setResponse', {}); - }, - has: function () {}, - get: function (args) { + var lru, f; + + var send = function(cmd, data, msg){ + data.source = 'lru-cache-cluster'; + data.cmd = cmd; + data.namespace = lru.namespace; + data.id = msg.id; + worker.send( data ); }; + + var nop = function () {}; + var set = function ( args, msg ) { + lru.set( args[0], args[1] ); + send( 'setResponse', {}, msg ); }; + var get = function ( args, msg ) { var value = lru.get(args[0]); + send( 'getResponse', { value: value }, msg ); }; + var count = function( args, msg ) { + var value = lru.count(args[0],args[1]); + send( 'getCount', { value: value }, msg ); }; + var complete = function( args, msg ) { + var value = lru.complete( args[ 0 ], args[ 1 ], args[ 2 ], args[ 3 ] ); + send( 'getComplete', { value: value }, msg ); }; + var constructor = function( args, msg ) { + var ns = msg.namespace; + if ( !( lru = caches[ ns ] ) ) { + caches[ ns ] = lru = LRUCache(args[0]); } + lru.namespace = ns; + send('constructorResponse', {}, msg); }; - send('getResponse', { value: value }); - }, - peek: function () {}, - pop: function () {}, - del: function () {}, - constructor: function(args) { - lru = caches[msg.namespace] = LRUCache(args[0]); - lru.namespace = msg.namespace; + var switcheroo = { + max: nop, + lengthCalculator: nop, + length: nop, + itemCount: nop, + forEach: nop, + keys: nop, + values: nop, + reset: nop, + dump: nop, + dumpLru: nop, + set: set, // -- + has: nop, + get: get, // -- + complete: complete, // -- + count: count, // -- + peek: nop, + pop: nop, + del: nop, + constructor: constructor // -- + }; + + worker.on('message', function(msg) { + + // some basic performace stats + // if ( !( ts ) ) { ts = ( new Date( ) ).valueOf(); } + // if ( !( ( ++messages ) % 1000 ) ) console.log( messages, messages / ( ( new Date( ) ).valueOf() - ts ) ); + + if (msg.source !== 'lru-cache-cluster') return; - send('constructorResponse', {}); - } - }; + lru = caches[ msg.namespace ]; + ( f = msg.cmd ) && ( f = switcheroo[ f ] ) && f( msg.arguments, msg ); + f = undefined; - msg.cmd && switcheroo[msg.cmd] && switcheroo[msg.cmd](msg.arguments); }); }); } diff --git a/lib/lru-cache-extensions.js b/lib/lru-cache-extensions.js new file mode 100644 index 0000000..0b5b519 --- /dev/null +++ b/lib/lru-cache-extensions.js @@ -0,0 +1,38 @@ +var LRUCache = require('lru-cache'); + +LRUCache.prototype.count = function(key, inc){ + var lru = this; + var value = lru.get( key ); + var it = lru.get( key ); + if ( isNaN ( it ) ) { + it = 0; + } + it += inc; lru.set( key, it ); + return it; +}; + +LRUCache.prototype.complete = function( key, seq, total, partialData ) { + + var lru = this; + var set = false; + var it = lru.get( key ) || ( ( set = true ) && { c: [], w: 0 } ); + + ++it.w; it.c[ seq ] = partialData; + + if ( it.w === total ) { + if ( !set ) { + lru.del( key ); } + it = it.c.join( "" ); + } + else if ( set ) { + lru.set( key, it ); + it = ""; + } + else { + it = ""; + } + + return it; +}; + +module.exports = LRUCache; diff --git a/package.json b/package.json index d1c56bc..9667982 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "lru-cache-cluster", "description": "Cluster aware LRU cache. Master process maintains cache and uses worker messaging to deliver items machine-local.", - "version": "1.0.0", + "version": "1.0.1", "main": "lib/lru-cache-cluster.js", "devDependencies": { "tap": "~0.4.8" diff --git a/test/clustered.js b/test/clustered.js index 60b24f8..1bafd80 100644 --- a/test/clustered.js +++ b/test/clustered.js @@ -7,4 +7,5 @@ cluster.setupMaster({ silent : false }); -cluster.fork(); +for( var i = 0, l = require("os").cpus().length >> 2; i < l; i++ ) { + cluster.fork(); } diff --git a/test/worker.js b/test/worker.js index c2835e8..847dff6 100644 --- a/test/worker.js +++ b/test/worker.js @@ -40,10 +40,13 @@ setTimeout(function(){ }, 250); }, 1000); */ + +var cluster = require( "cluster" ); var test = require('tap').test; +/* test("basic", function (t) { - var cache = new LRU({max: 10}) + var cache = LRU( { max: 10, namespace: "basic " + ( ( cluster.worker || { } ).workerID || 0 ) } ); cache.set("key", "value") cache.get('key', function(value) { @@ -58,7 +61,7 @@ test("basic", function (t) { }) test("least recently set", function (t) { - var cache = new LRU(2) + var cache = LRU( { max: 2, namespace: "least recently set " + ( ( cluster.worker || { } ).workerID || 0 ) } ) cache.set("a", "A") cache.set("b", "B") cache.set("c", "C") @@ -79,7 +82,7 @@ test("least recently set", function (t) { }) test("lru recently gotten", function (t) { - var cache = new LRU(2) + var cache = LRU( { max: 2, namespace: "lru recently gotten " + ( ( cluster.worker || { } ).workerID || 0 ) } ) cache.set("a", "A") cache.set("b", "B") cache.get("a") @@ -99,3 +102,28 @@ test("lru recently gotten", function (t) { setTimeout(function(){ t.end(); }, 1000); }) +*/ +test("lru complete test", function (t) { + + var dt = (new Date()).valueOf(), dtMax = dt + 1; + var cache = LRU( { max: 10000000, namespace: "complete" } ); + + var pieces = 12, items = 10000, workers = require("os").cpus().length >> 2; + for( var i = ( cluster.worker || {} ).workerID - 1; i < pieces; i += workers ) { + for ( var j = 0, l = items; j < l; j++ ) { + cache.complete( ( "0000" + j.toString( 16 ) ).slice( -4 ), i, pieces, String( i ) + " ", function( data ) { + dtMax = (new Date()).valueOf(); + if ( !data ) return; + cache.count( "++++", 1 ); + } ); + } + } + + setTimeout( function() { + cache.count( "++++", 0, function( value ) { + console.log( ( dtMax - dt ) ); + console.log( Math.floor( pieces /* workers */ ) * items / ( dtMax - dt ) * 1000 ); + t.equal( value, items ); } ); }, 10000 ); + + setTimeout(function(){ t.end(); }, 5000); +})