Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 38 additions & 7 deletions lib/LRUCacheProxy.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
var LRUCache = require('lru-cache');
var LRUCache = require('./lru-cache-extensions');
var cluster = require('cluster');
var uuid = require('node-uuid');
var pending = {};
Expand All @@ -24,12 +24,20 @@ function send(namespace, cmd) {
var LRUCacheProxy = function LRUCacheProxy(options) {
if (!(this instanceof LRUCacheProxy)) {
if(cluster.isMaster) {

var lru = new LRUCache(options);
var lruGet = LRUCache.prototype.get;

lru.get = function(key, callback) {
callback(lruGet.apply(lru, arguments));
};
var methodCtor = function( baseMethod ) {
return function( ) {
var cb;
if ( typeof ( cb = arguments[ arguments.length - 1 ] ) !== "function" ) {
cb = undefined; }
var value = baseMethod.apply( this, arguments );
cb && cb( value );
return value; }; }

for( var p in { "get": 1, "count": 1, "complete": 1} ) {
lru[ p ] = methodCtor( LRUCache.prototype[ p ] ); }

return lru;
}
Expand All @@ -43,21 +51,44 @@ var LRUCacheProxy = function LRUCacheProxy(options) {
};

LRUCacheProxy.prototype.get = function(key, callback) {
var failSafe = setTimeout(function() { failSafe = undefined; callback(undefined); }, 100);
var failSafe = setTimeout(function() { failSafe = undefined; callback&&callback(undefined); }, 100);

send(this.namespace, 'get', key, function(result) {
if(!failSafe) return;

clearTimeout(failSafe);

callback(result.value);
callback&&callback(result.value);
});
}

LRUCacheProxy.prototype.set = function(key, value) {
send(this.namespace, 'set', key, value, function() {});
}


LRUCacheProxy.prototype.complete = function( key, seq, total, partialData, callback ) {
var failSafe = setTimeout(function() { failSafe = undefined; callback&&callback(undefined); }, 100);
send(this.namespace, 'complete', key, seq, total, partialData, function(result) {
if(!failSafe) return;

clearTimeout(failSafe);

callback&&callback(result.value);
});
}

LRUCacheProxy.prototype.count = function( key, inc, callback ) {
var failSafe = setTimeout(function() { failSafe = undefined; callback&&callback(undefined); }, 100);
send(this.namespace, 'count', key, inc, function(result) {
if(!failSafe) return;

clearTimeout(failSafe);

callback&&callback(result.value);
});
}

Object.defineProperty(LRUCacheProxy.prototype, 'pendingMessages', { get : function () { return Object.keys(pending).length; }, enumerable : true });

module.exports = LRUCacheProxy;
104 changes: 60 additions & 44 deletions lib/lru-cache-cluster.js
Original file line number Diff line number Diff line change
@@ -1,57 +1,73 @@
var cluster = require('cluster');
var LRUCache = require('lru-cache');
var caches = {};
var caches = {}, messages = 0, ts;

if (cluster.isMaster) {
cluster.on('fork', function(worker) {
worker.on('message', function(msg) {
if (msg.source !== 'lru-cache-cluster') return;

var lru = caches[msg.namespace];

function send(cmd, data){
data.source = 'lru-cache-cluster';
data.cmd = cmd;
data.namespace = lru.namespace;
data.id = msg.id;

worker.send(data);
}

var switcheroo = {
max: function () {},
lengthCalculator: function () {},
length: function () {},
itemCount: function () {},
forEach: function () {},
keys: function () {},
values: function () {},
reset: function () {},
dump: function () {},
dumpLru: function () {},
set: function (args) {
lru.set(args[0], args[1]);

send('setResponse', {});
},
has: function () {},
get: function (args) {
var lru, f;

var send = function(cmd, data, msg){
data.source = 'lru-cache-cluster';
data.cmd = cmd;
data.namespace = lru.namespace;
data.id = msg.id;
worker.send( data ); };

var nop = function () {};
var set = function ( args, msg ) {
lru.set( args[0], args[1] );
send( 'setResponse', {}, msg ); };
var get = function ( args, msg ) {
var value = lru.get(args[0]);
send( 'getResponse', { value: value }, msg ); };
var count = function( args, msg ) {
var value = lru.count(args[0],args[1]);
send( 'getCount', { value: value }, msg ); };
var complete = function( args, msg ) {
var value = lru.complete( args[ 0 ], args[ 1 ], args[ 2 ], args[ 3 ] );
send( 'getComplete', { value: value }, msg ); };
var constructor = function( args, msg ) {
var ns = msg.namespace;
if ( !( lru = caches[ ns ] ) ) {
caches[ ns ] = lru = LRUCache(args[0]); }
lru.namespace = ns;
send('constructorResponse', {}, msg); };

send('getResponse', { value: value });
},
peek: function () {},
pop: function () {},
del: function () {},
constructor: function(args) {
lru = caches[msg.namespace] = LRUCache(args[0]);
lru.namespace = msg.namespace;
var switcheroo = {
max: nop,
lengthCalculator: nop,
length: nop,
itemCount: nop,
forEach: nop,
keys: nop,
values: nop,
reset: nop,
dump: nop,
dumpLru: nop,
set: set, // --
has: nop,
get: get, // --
complete: complete, // --
count: count, // --
peek: nop,
pop: nop,
del: nop,
constructor: constructor // --
};

worker.on('message', function(msg) {

// some basic performace stats
// if ( !( ts ) ) { ts = ( new Date( ) ).valueOf(); }
// if ( !( ( ++messages ) % 1000 ) ) console.log( messages, messages / ( ( new Date( ) ).valueOf() - ts ) );

if (msg.source !== 'lru-cache-cluster') return;

send('constructorResponse', {});
}
};
lru = caches[ msg.namespace ];
( f = msg.cmd ) && ( f = switcheroo[ f ] ) && f( msg.arguments, msg );
f = undefined;

msg.cmd && switcheroo[msg.cmd] && switcheroo[msg.cmd](msg.arguments);
});
});
}
Expand Down
38 changes: 38 additions & 0 deletions lib/lru-cache-extensions.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
var LRUCache = require('lru-cache');

LRUCache.prototype.count = function(key, inc){
var lru = this;
var value = lru.get( key );
var it = lru.get( key );
if ( isNaN ( it ) ) {
it = 0;
}
it += inc; lru.set( key, it );
return it;
};

LRUCache.prototype.complete = function( key, seq, total, partialData ) {

var lru = this;
var set = false;
var it = lru.get( key ) || ( ( set = true ) && { c: [], w: 0 } );

++it.w; it.c[ seq ] = partialData;

if ( it.w === total ) {
if ( !set ) {
lru.del( key ); }
it = it.c.join( "" );
}
else if ( set ) {
lru.set( key, it );
it = "";
}
else {
it = "";
}

return it;
};

module.exports = LRUCache;
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "lru-cache-cluster",
"description": "Cluster aware LRU cache. Master process maintains cache and uses worker messaging to deliver items machine-local.",
"version": "1.0.0",
"version": "1.0.1",
"main": "lib/lru-cache-cluster.js",
"devDependencies": {
"tap": "~0.4.8"
Expand Down
3 changes: 2 additions & 1 deletion test/clustered.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ cluster.setupMaster({
silent : false
});

cluster.fork();
for( var i = 0, l = require("os").cpus().length >> 2; i < l; i++ ) {
cluster.fork(); }
34 changes: 31 additions & 3 deletions test/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@ setTimeout(function(){
}, 250);
}, 1000);
*/

var cluster = require( "cluster" );
var test = require('tap').test;

/*
test("basic", function (t) {
var cache = new LRU({max: 10})
var cache = LRU( { max: 10, namespace: "basic " + ( ( cluster.worker || { } ).workerID || 0 ) } );
cache.set("key", "value")

cache.get('key', function(value) {
Expand All @@ -58,7 +61,7 @@ test("basic", function (t) {
})

test("least recently set", function (t) {
var cache = new LRU(2)
var cache = LRU( { max: 2, namespace: "least recently set " + ( ( cluster.worker || { } ).workerID || 0 ) } )
cache.set("a", "A")
cache.set("b", "B")
cache.set("c", "C")
Expand All @@ -79,7 +82,7 @@ test("least recently set", function (t) {
})

test("lru recently gotten", function (t) {
var cache = new LRU(2)
var cache = LRU( { max: 2, namespace: "lru recently gotten " + ( ( cluster.worker || { } ).workerID || 0 ) } )
cache.set("a", "A")
cache.set("b", "B")
cache.get("a")
Expand All @@ -99,3 +102,28 @@ test("lru recently gotten", function (t) {

setTimeout(function(){ t.end(); }, 1000);
})
*/
test("lru complete test", function (t) {

var dt = (new Date()).valueOf(), dtMax = dt + 1;
var cache = LRU( { max: 10000000, namespace: "complete" } );

var pieces = 12, items = 10000, workers = require("os").cpus().length >> 2;
for( var i = ( cluster.worker || {} ).workerID - 1; i < pieces; i += workers ) {
for ( var j = 0, l = items; j < l; j++ ) {
cache.complete( ( "0000" + j.toString( 16 ) ).slice( -4 ), i, pieces, String( i ) + " ", function( data ) {
dtMax = (new Date()).valueOf();
if ( !data ) return;
cache.count( "++++", 1 );
} );
}
}

setTimeout( function() {
cache.count( "++++", 0, function( value ) {
console.log( ( dtMax - dt ) );
console.log( Math.floor( pieces /* workers */ ) * items / ( dtMax - dt ) * 1000 );
t.equal( value, items ); } ); }, 10000 );

setTimeout(function(){ t.end(); }, 5000);
})