diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3c3629e --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +node_modules diff --git a/README b/README deleted file mode 100644 index 0c65e38..0000000 --- a/README +++ /dev/null @@ -1,30 +0,0 @@ -This is a JSON-RPC server and client library for node.js , -the V8 based evented IO framework. - -Firing up an efficient JSON-RPC server becomes extremely simple: - - var rpc = require('jsonrpc'); - - function add(first, second) { - return first + second; - } - rpc.expose('add', add); - - rpc.listen(8000, 'localhost'); - - -And creating a client to speak to that server is easy too: - - var rpc = require('jsonrpc'); - var sys = require('sys'); - - var client = rpc.getClient(8000, 'localhost'); - - client.call('add', [1, 2], function(result) { - sys.puts('1 + 2 = ' + result); - }); - -To learn more, see the examples directory, peruse test/jsonrpc-test.js, or -simply "Use The Source, Luke". - -More documentation and development is on its way. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..20173d6 --- /dev/null +++ b/README.md @@ -0,0 +1,45 @@ +# node-jsonrpc2 + +This is a JSON-RPC server and client library for node.js , +the V8 based evented IO framework. + +## Install + +To install node-jsonrpc2 in the current directory, run: + + npm install jsonrpc2 + +## Usage + +Firing up an efficient JSON-RPC server becomes extremely simple: + +``` javascript +var rpc = require('jsonrpc2'); + +var server = new rpc.Server(); + +function add(args, opt, callback) { + callback(null, args[0] + args[1]); +} +server.expose('add', add); + +server.listen(8000, 'localhost'); +``` + +And creating a client to speak to that server is easy too: + +``` javascript +var rpc = require('jsonrpc2'); +var util = require('util'); + +var client = new rpc.Client(8000, 'localhost'); + +client.call('add', [1, 2], function(err, result) { + util.puts('1 + 2 = ' + result); +}); +``` + +To learn more, see the examples directory, peruse test/jsonrpc-test.js, or +simply "Use The Source, Luke". + +More documentation and development is on its way. diff --git a/examples/client.js b/examples/client.js index d8f4aaa..b0a84f6 100644 --- a/examples/client.js +++ b/examples/client.js @@ -1,32 +1,93 @@ -var sys = require('sys'); +var util = require('util'); var rpc = require('../src/jsonrpc'); -var client = rpc.getClient(8000, 'localhost'); +rpc.Endpoint.trace = function () {}; -client.call('add', [1, 2], function(result) { - sys.puts(' 1 + 2 = ' + result); +var client = new rpc.Client(8088, 'localhost', "myuser", "secret123"); + +client.call('add', [1, 2], function (err, result) { + if (err) { + console.error('RPC Error: '+ err.toString()); + return; + } + console.log(' 1 + 2 = ' + result); }); -client.call('multiply', [199, 2], function(result) { - sys.puts('199 * 2 = ' + result); +client.call('multiply', [199, 2], function (err, result) { + if (err) { + console.error('RPC Error: '+ err.toString()); + return; + } + console.log('199 * 2 = ' + result); }); // Accessing modules is as simple as dot-prefixing. -client.call('math.power', [3, 3], function(result) { - sys.puts(' 3 ^ 3 = ' + result); +client.call('math.power', [3, 3], function (err, result) { + if (err) { + console.error('RPC Error: '+ err.toString()); + return; + } + console.log(' 3 ^ 3 = ' + result); +}); + +// We can handle errors the same way as anywhere else in Node +client.call('add', [1, 1], function (err, result) { + if (err) { + console.error('RPC Error: '+ err.toString()); + return; + } + console.log(' 1 + 1 = ' + result + ', dummy!'); +}); + +// These calls should each take 1.5 seconds to complete +client.call('delayed.add', [1, 1, 1500], function (err, result) { + if (err) { + console.error('RPC Error: '+ err.toString()); + return; + } + console.log(result); }); -// Call simply returns a promise, so we can add callbacks or errbacks at will. -var promise = client.call('add', [1, 1]); -promise.addCallback(function(result) { - sys.puts(' 1 + 1 = ' + result + ', dummy!'); +client.call('delayed.echo', ['Echo.', 1500], function (err, result) { + if (err) { + console.error('RPC Error: '+ err.toString()); + return; + } + console.log(result); }); -/* These calls should each take 1.5 seconds to complete. */ -client.call('delayed.add', [1, 1, 1500], function(result) { - sys.puts(result); +client.stream('listen', [], function (err, connection) { + if (err) { + console.error('RPC Error: '+ err.toString()); + return; + } + var counter = 0; + connection.expose('event', function (params) { + console.log('Streaming #'+counter+': '+params[0]); + counter++; + if (counter > 4) { + connection.end(); + } + }); + console.log('start listening'); }); -client.call('delayed.echo', ['Echo.', 1500], function(result) { - sys.puts(result); -}); \ No newline at end of file +var socketClient = new rpc.Client(8089, 'localhost', "myuser", "secret123"); + +socketClient.connectSocket(function (err, conn) { + var counter = 0; + socketClient.expose('event', function (params) { + console.log('Streaming (socket) #'+counter+': '+params[0]); + counter++; + if (counter > 4) { + conn.end(); + } + }); + + conn.call('listen', [], function (err) { + if (err) { + console.error('RPC Error: '+ err.toString()); + return; + } + }); +}); diff --git a/examples/server.js b/examples/server.js index 08aaa8b..0d20862 100644 --- a/examples/server.js +++ b/examples/server.js @@ -1,46 +1,80 @@ var rpc = require('../src/jsonrpc'); +var events = require('events'); + +var server = new rpc.Server(); + +server.enableAuth("myuser", "secret123"); /* Create two simple functions */ -function add(first, second) { - return first + second; +function add(args, opts, callback) { + callback(null, args[0]+args[1]); } -function multiply(first, second) { - return first * second; +function multiply(args, opts, callback) { + callback(null, args[0]*args[1]); } /* Expose those methods */ -rpc.expose('add', add); -rpc.expose('multiply', multiply); +server.expose('add', add); +server.expose('multiply', multiply); /* We can expose entire modules easily */ var math = { - power: function(first, second) { return Math.pow(first, second); }, - sqrt: function(num) { return Math.sqrt(num); } -} -rpc.exposeModule('math', math); + power: function(args, opts, callback) { + callback(null, Math.pow(args[0], args[1])); + }, + sqrt: function(args, opts, callback) { + callback(null, Math.sqrt(args[0])); + } +}; +server.exposeModule('math', math); -/* Listen on port 8000 */ -rpc.listen(8000, 'localhost'); - -/* By returning a promise, we can delay our response indefinitely, leaving the - request hanging until the promise emits success. */ +/* By using a callback, we can delay our response indefinitely, leaving the + request hanging until the callback emits success. */ var delayed = { - echo: function(data, delay) { - var promise = new process.Promise(); - setTimeout(function() { - promise.emitSuccess(data); - }, delay); - return promise; - }, - - add: function(first, second, delay) { - var promise = new process.Promise(); - setTimeout(function() { - promise.emitSuccess(first + second); - }, delay); - return promise; - } + echo: function(args, opts, callback) { + var data = args[0]; + var delay = args[1]; + setTimeout(function() { + callback(null, data); + }, delay); + }, + + add: function(args, opts, callback) { + var first = args[0]; + var second = args[1]; + var delay = args[2]; + setTimeout(function() { + callback(null, first + second); + }, delay); + } } -rpc.exposeModule('delayed', delayed); \ No newline at end of file +server.exposeModule('delayed', delayed); + +// Create a message bus with random events on it +var firehose = new events.EventEmitter(); +(function emitFirehoseEvent() { + firehose.emit('foobar', {data: 'random '+Math.random()}); + setTimeout(arguments.callee, 200+Math.random()*3000); +})(); + +var listen = function (args, opts, callback) { + function handleFirehoseEvent(event) { + opts.call('event', event.data); + }; + firehose.on('foobar', handleFirehoseEvent); + opts.stream(function () { + console.log('connection ended'); + firehose.removeListener('foobar', handleFirehoseEvent); + }); + callback(null); +}; + +server.expose('listen', listen); + +/* HTTP server on port 8088 */ +server.listen(8088, 'localhost'); + +/* Raw socket server on port 8089 */ +server.listenRaw(8089, 'localhost'); diff --git a/package.json b/package.json new file mode 100644 index 0000000..c28ae8c --- /dev/null +++ b/package.json @@ -0,0 +1,28 @@ +{ + "name": "jsonrpc2", + "version": "0.1.1", + "description": "JSON-RPC server and client library", + "main": "./src/jsonrpc", + "keywords": [ + "json", + "rpc", + "server", + "client" + ], + + "author": "Eric Florenzano (eflorenzano.com)", + + "dependencies": { + "jsonparse": ">=0.0.1" + }, + + "contributors": [ + "Bill Casarin (jb55.com)", + "Stefan Thomas (justmoon.net)" + ], + + "repository": { + "type": "git", + "url": "git://github.com/bitcoinjs/node-jsonrpc2.git" + } +} diff --git a/src/jsonrpc.js b/src/jsonrpc.js index d694b94..f312e19 100644 --- a/src/jsonrpc.js +++ b/src/jsonrpc.js @@ -1,210 +1,433 @@ -var sys = require('sys'); +var util = require('util'); +var net = require('net'); var http = require('http'); +var util = require('util'); +var events = require('events'); +var JsonParser = require('jsonparse'); +var UNAUTHORIZED = "Unauthorized\n"; var METHOD_NOT_ALLOWED = "Method Not Allowed\n"; var INVALID_REQUEST = "Invalid Request\n"; +/** + * Abstract base class for RPC endpoints. + * + * Has the ability to register RPC events and expose RPC methods. + */ +var Endpoint = function () +{ + events.EventEmitter.call(this); + + this.functions = {}; + this.scopes = {}; + this.defaultScope = this; +}; +util.inherits(Endpoint, events.EventEmitter); + +/** + * Output a piece of debug information. + */ +Endpoint.trace = function(direction, message) +{ + console.log(' ' + direction + ' ' + message); +} + +/** + * Define a callable method on this RPC endpoint + */ +Endpoint.prototype.expose = function(name, func, scope) +{ + if ("function" === typeof func) { + Endpoint.trace('***', 'exposing: ' + name); + this.functions[name] = func; + + if (scope) { + this.scopes[name] = scope; + } + } else { + var funcs = []; + var object = func; + for(var funcName in object) { + var funcObj = object[funcName]; + if(typeof(funcObj) == 'function') { + this.functions[name + '.' + funcName] = funcObj; + funcs.push(funcName); + + if (scope) { + this.scopes[name + '.' + funcName] = scope; + } + } + } + Endpoint.trace('***', 'exposing module: ' + name + + ' [funs: ' + funcs.join(', ') + ']'); + return object; + } +} + +/** + * Handle a call to one of the endpoint's methods. + */ +Endpoint.prototype.handleCall = function handleCall(decoded, conn, callback) +{ + Endpoint.trace('<--', 'Request (id ' + decoded.id + '): ' + + decoded.method + '(' + decoded.params.join(', ') + ')'); + + if (!this.functions.hasOwnProperty(decoded.method)) { + callback(new Error("Unknown RPC call '"+decoded.method+"'")); + return; + } + + var method = this.functions[decoded.method]; + var scope = this.scopes[decoded.method] || this.defaultScope; + + // Try to call the method, but intercept errors and call our + // error handler. + try { + method.call(scope, decoded.params, conn, callback); + } catch (err) { + callback(err); + } +}; + +Endpoint.prototype.exposeModule = Endpoint.prototype.expose; + +/** + * JSON-RPC Client. + */ +var Client = function (port, host, user, password) +{ + Endpoint.call(this); -//===----------------------------------------------------------------------===// -// Server Client -//===----------------------------------------------------------------------===// -var Client = function(port, host, user, password) { this.port = port; this.host = host; this.user = user; this.password = password; - - this.call = function(method, params, callback, errback, path) { - var client = http.createClient(port, host); - - // First we encode the request into JSON - var requestJSON = JSON.stringify({ - 'id': '' + (new Date()).getTime(), - 'method': method, - 'params': params - }); - - var headers = {}; - - if (user && password) { - var buff = new Buffer(this.user + ":" + this.password) - .toString('base64'); - var auth = 'Basic ' + buff; - headers['Authorization'] = auth; - } +}; - // Then we build some basic headers. - headers['Host'] = host; - headers['Content-Length'] = requestJSON.length; +util.inherits(Client, Endpoint); - // Now we'll make a request to the server - var request = client.request('POST', path || '/', headers); - request.write(requestJSON); - request.on('response', function(response) { - // We need to buffer the response chunks in a nonblocking way. - var buffer = ''; - response.on('data', function(chunk) { - buffer = buffer + chunk; - }); - // When all the responses are finished, we decode the JSON and - // depending on whether it's got a result or an error, we call - // emitSuccess or emitError on the promise. - response.on('end', function() { - var decoded = JSON.parse(buffer); - if(decoded.hasOwnProperty('result')) { - if (callback) - callback(decoded.result); - } - else { - if (errback) - errback(decoded.error); - } - }); - }); - }; -} -//===----------------------------------------------------------------------===// -// Server -//===----------------------------------------------------------------------===// -function Server() { +/** + * Make HTTP connection/request. + * + * In HTTP mode, we get to submit exactly one message and receive up to n + * messages. + */ +Client.prototype.connectHttp = function connectHttp(method, params, opts, callback) +{ + if ("function" === typeof opts) { + callback = opts; + opts = {}; + } + opts = opts || {}; + + var client = http.createClient(this.port, this.host); + + var id = 1; + + // First we encode the request into JSON + var requestJSON = JSON.stringify({ + 'id': id, + 'method': method, + 'params': params, + 'jsonrpc': '2.0' + }); + + // Report errors from the http client. This also prevents crashes since + // an exception is thrown if we don't handle this event. + client.on('error', function(err) { + callback(err); + }); + + var headers = {}; + + if (this.user && this.password) { + var buff = new Buffer(this.user + ":" + this.password).toString('base64'); + var auth = 'Basic ' + buff; + headers['Authorization'] = auth; + } + + // Then we build some basic headers. + headers['Host'] = this.host; + headers['Content-Length'] = Buffer.byteLength(requestJSON, 'utf8'); + + // Now we'll make a request to the server + var request = client.request('POST', opts.path || '/', headers); + request.write(requestJSON); + request.on('response', callback.bind(this, id, request)); +}; + +/** + * Make Socket connection. + * + * This implements JSON-RPC over a raw socket. This mode allows us to send and + * receive as many messages as we like once the socket is established. + */ +Client.prototype.connectSocket = function connectSocket(callback) +{ var self = this; - this.functions = {}; - this.server = http.createServer(function(req, res) { - Server.trace('<--', 'accepted request'); - if(req.method === 'POST') { - self.handlePOST(req, res); + + var socket = net.connect(this.port, this.host, function () { + // Submit non-standard "auth" message for raw sockets. + if ("string" === typeof self.user && + "string" === typeof self.password) { + conn.call("auth", [self.user, self.password], function (err) { + if (err) { + callback(err); + } else { + callback(null, conn); + } + }); + return; } - else { - Server.handleNonPOST(req, res); + if ("function" === typeof callback) { + callback(null, conn); } }); -} + var conn = new SocketConnection(self, socket); + var parser = new JsonParser(); + parser.onValue = function (decoded) { + if (this.stack.length) return; + + conn.handleMessage(decoded); + }; + socket.on('data', function (chunk) { + try { + parser.write(chunk); + } catch(err) { + Endpoint.trace('<--', err.toString()); + } + }); + + return conn; +}; + +Client.prototype.stream = function (method, params, opts, callback) +{ + if ("function" === typeof opts) { + callback = opts; + opts = {}; + } + opts = opts || {}; + + this.connectHttp(method, params, opts, function (id, request, response) { + if ("function" === typeof callback) { + var connection = new events.EventEmitter(); + connection.id = id; + connection.req = request; + connection.res = response; + connection.expose = function (method, callback) { + connection.on('call:'+method, function (data) { + callback.call(null, data.params || []); + }); + }; + connection.end = function () { + this.req.connection.end(); + }; + // We need to buffer the response chunks in a nonblocking way. + var parser = new JsonParser(); + parser.onValue = function (decoded) { + if (this.stack.length) return; -//===----------------------------------------------------------------------===// -// exposeModule -//===----------------------------------------------------------------------===// -Server.prototype.exposeModule = function(mod, object) { - var funcs = []; - for(var funcName in object) { - var funcObj = object[funcName]; - if(typeof(funcObj) == 'function') { - this.functions[mod + '.' + funcName] = funcObj; - funcs.push(funcName); + connection.emit('data', decoded); + if (decoded.hasOwnProperty('result') || + decoded.hasOwnProperty('error') && + decoded.id === id && + "function" === typeof callback) { + connection.emit('result', decoded); + } else if (decoded.hasOwnProperty('method')) { + connection.emit('call:'+decoded.method, decoded); + } + }; + // Handle headers + connection.res.once('data', function (data) { + if (connection.res.statusCode === 200) { + callback(null, connection); + } else { + callback(new Error(""+connection.res.statusCode+" "+data)); + } + }); + connection.res.on('data', function (chunk) { + try { + parser.write(chunk); + } catch(err) { + // TODO: Is ignoring invalid data the right thing to do? + } + }); + connection.res.on('end', function () { + // TODO: Issue an error if there has been no valid response message + }); } + }); +}; + +Client.prototype.call = function (method, params, opts, callback) +{ + if ("function" === typeof opts) { + callback = opts; + opts = {}; } - Server.trace('***', 'exposing module: ' + mod + ' [funs: ' + funcs.join(', ') - + ']'); - return object; -} + opts = opts || {}; + this.connectHttp(method, params, opts, function (id, request, response) { + var data = ''; + response.on('data', function (chunk) { + data += chunk; + }); + response.on('end', function () { + if (response.statusCode !== 200) { + callback(new Error(""+response.statusCode+" "+data)); + return; + } + var decoded = JSON.parse(data); + if ("function" === typeof callback) { + if (!decoded.error) { + decoded.error = null; + } + callback(decoded.error, decoded.result); + } + }); + }); +}; +/** + * JSON-RPC Server. + */ +function Server(opts) { + Endpoint.call(this); -//===----------------------------------------------------------------------===// -// expose -//===----------------------------------------------------------------------===// -Server.prototype.expose = function(name, func) { - Server.trace('***', 'exposing: ' + name); - this.functions[name] = func; + opts = opts || {}; + opts.type = opts.type || 'http'; } +util.inherits(Server, Endpoint); - -//===----------------------------------------------------------------------===// -// trace -//===----------------------------------------------------------------------===// -Server.trace = function(direction, message) { - sys.puts(' ' + direction + ' ' + message); +/** + * Start listening to incoming connections. + */ +Server.prototype.listen = function listen(port, host) +{ + var server = http.createServer(this.handleHttp.bind(this)); + server.listen(port, host); + Endpoint.trace('***', 'Server listening on http://' + + (host || '127.0.0.1') + ':' + port + '/'); + return server; } +Server.prototype.listenRaw = function listenRaw(port, host) +{ + var server = net.createServer(this.handleRaw.bind(this)); + server.listen(port, host); + Endpoint.trace('***', 'Server listening on socket://' + + (host || '127.0.0.1') + ':' + port + '/'); + return server; +}; -//===----------------------------------------------------------------------===// -// listen -//===----------------------------------------------------------------------===// -Server.prototype.listen = function(port, host) { - this.server.listen(port, host); - Server.trace('***', 'Server listening on http://' + (host || '127.0.0.1') + - ':' + port + '/'); -} +Server.prototype.listenHybrid = function listenHybrid(port, host) { + var httpServer = http.createServer(this.handleHttp.bind(this)); + var server = net.createServer(this.handleHybrid.bind(this, httpServer)); + server.listen(port, host); + Endpoint.trace('***', 'Server (hybrid) listening on socket://' + + (host || '127.0.0.1') + ':' + port + '/'); + return server; +}; + +/** + * Handle a low level server error. + */ +Server.handleHttpError = function(req, res, code, message) +{ + var headers = {'Content-Type': 'text/plain', + 'Content-Length': message.length, + 'Allow': 'POST'}; + if (code === 401) { + headers['WWW-Authenticate'] = 'Basic realm="JSON-RPC"'; + } -//===----------------------------------------------------------------------===// -// handleInvalidRequest -//===----------------------------------------------------------------------===// -Server.handleInvalidRequest = function(req, res) { - res.writeHead(400, {'Content-Type': 'text/plain', - 'Content-Length': INVALID_REQUEST.length}); - res.write(INVALID_REQUEST); + res.writeHead(code, headers); + res.write(message); res.end(); -} +}; +/** + * Handle HTTP POST request. + */ +Server.prototype.handleHttp = function(req, res) +{ + Endpoint.trace('<--', 'Accepted http request'); + + if (req.method !== 'POST') { + Server.handleHttpError(req, res, 405, METHOD_NOT_ALLOWED); + return; + } -//===----------------------------------------------------------------------===// -// handlePOST -//===----------------------------------------------------------------------===// -Server.prototype.handlePOST = function(req, res) { var buffer = ''; var self = this; + + // Check authentication if we require it + if (this.authHandler) { + var authHeader = req.headers['authorization'] || '', // get the header + authToken = authHeader.split(/\s+/).pop() || '', // get the token + auth = new Buffer(authToken, 'base64').toString(), // base64 -> string + parts = auth.split(/:/), // split on colon + username = parts[0], + password = parts[1]; + if (!this.authHandler(username, password)) { + Server.handleHttpError(req, res, 401, UNAUTHORIZED); + return; + } + } + var handle = function (buf) { var decoded = JSON.parse(buf); - + // Check for the required fields, and if they aren't there, then - // dispatch to the handleInvalidRequest function. - if(!(decoded.method && decoded.params && decoded.id)) { - return Server.handleInvalidRequest(req, res); + // dispatch to the handleHttpError function. + if (!(decoded.method && decoded.params && decoded.id)) { + Endpoint.trace('-->', 'Response (invalid request)'); + Server.handleHttpError(req, res, 400, INVALID_REQUEST); + return; } - if(!self.functions.hasOwnProperty(decoded.method)) { - return Server.handleInvalidRequest(req, res); - } - - // Build our success handler - var onSuccess = function(funcResp) { - Server.trace('-->', 'response (id ' + decoded.id + '): ' + - JSON.stringify(funcResp)); - - var encoded = JSON.stringify({ - 'result': funcResp, - 'error': null, - 'id': decoded.id - }); - res.writeHead(200, {'Content-Type': 'application/json', - 'Content-Length': encoded.length}); - res.write(encoded); - res.end(); + var reply = function (json) { + var encoded = JSON.stringify(json); + + if (!conn.isStreaming) { + res.writeHead(200, {'Content-Type': 'application/json', + 'Content-Length': encoded.length}); + res.write(encoded); + res.end(); + } else { + res.writeHead(200, {'Content-Type': 'application/json'}); + res.write(encoded); + // Keep connection open + } }; - - // Build our failure handler (note that error must not be null) - var onFailure = function(failure) { - Server.trace('-->', 'failure: ' + JSON.stringify(failure)); - var encoded = JSON.stringify({ - 'result': null, - 'error': failure || 'Unspecified Failure', + + var callback = function(err, result) { + if (err) { + Endpoint.trace('-->', 'Failure (id ' + decoded.id + '): ' + + (err.stack ? err.stack : err.toString())); + err = err.toString(); + result = null; + } else { + Endpoint.trace('-->', 'Response (id ' + decoded.id + '): ' + + JSON.stringify(result)); + err = null; + } + + // TODO: Not sure if we should return a message if decoded.id == null + reply({ + 'result': result, + 'error': err, 'id': decoded.id }); - res.writeHead(200, {'Content-Type': 'application/json', - 'Content-Length': encoded.length}); - res.write(encoded); - res.end(); }; - - Server.trace('<--', 'request (id ' + decoded.id + '): ' + - decoded.method + '(' + decoded.params.join(', ') + ')'); - - // Try to call the method, but intercept errors and call our - // onFailure handler. - var method = self.functions[decoded.method]; - var args = decoded.params.push(function(resp) { - onSuccess(resp); - }); - try { - method.apply(null, decoded.params); - } - catch(err) { - return onFailure(err); - } + var conn = new HttpServerConnection(self, req, res); - } // function handle(buf) + self.handleCall(decoded, conn, callback); + }; // function handle(buf) req.addListener('data', function(chunk) { buffer = buffer + chunk; @@ -213,20 +436,337 @@ Server.prototype.handlePOST = function(req, res) { req.addListener('end', function() { handle(buffer); }); -} +}; +Server.prototype.handleRaw = function handleRaw(socket) +{ + Endpoint.trace('<--', 'Accepted socket connection'); -//===----------------------------------------------------------------------===// -// handleNonPOST -//===----------------------------------------------------------------------===// -Server.handleNonPOST = function(req, res) { - res.writeHead(405, {'Content-Type': 'text/plain', - 'Content-Length': METHOD_NOT_ALLOWED.length, - 'Allow': 'POST'}); - res.write(METHOD_NOT_ALLOWED); - res.end(); -} + var self = this; + + var conn = new SocketConnection(this, socket); + var parser = new JsonParser(); + var requireAuth = !!this.authHandler; + + parser.onValue = function (decoded) { + if (this.stack.length) return; + + // We're on a raw TCP socket. To enable authentication we implement a simple + // authentication scheme that is non-standard, but is easy to call from any + // client library. + // + // The authentication message is to be sent as follows: + // {"method": "auth", "params": ["myuser", "mypass"], id: 0} + if (requireAuth) { + if (decoded.method !== "auth" ) { + // Try to notify client about failure to authenticate + if ("number" === typeof decoded.id) { + conn.sendReply("Error: Unauthorized", null, decoded.id); + } + } else { + // Handle "auth" message + if (Array.isArray(decoded.params) && + decoded.params.length === 2 && + self.authHandler(decoded.params[0], decoded.params[1])) { + // Authorization completed + requireAuth = false; + + // Notify client about success + if ("number" === typeof decoded.id) { + conn.sendReply(null, true, decoded.id); + } + } else { + if ("number" === typeof decoded.id) { + conn.sendReply("Error: Invalid credentials", null, decoded.id); + } + } + } + // Make sure we explicitly return here - the client was not yet auth'd. + return; + } else { + conn.handleMessage(decoded); + } + }; + + socket.on('data', function (chunk) { + try { + parser.write(chunk); + } catch(err) { + // TODO: Is ignoring invalid data the right thing to do? + } + }); +}; + +Server.prototype.handleHybrid = function handleHybrid(httpServer, socket) +{ + var self = this; + socket.once('data', function (chunk) { + // If first byte is a capital letter, treat connection as HTTP + if (chunk[0] >= 65 && chunk[0] <= 90) { + httpServer.emit('connection', socket); + } else { + self.handleRaw(socket); + } + // Re-emit first chunk + socket.emit('data', chunk); + }); +}; + +/** + * Set the server to require authentication. + * + * Can be called with a custom handler function: + * server.enableAuth(function (user, password) { + * return true; // Do authentication and return result as boolean + * }); + * + * Or just with a single valid username and password: + * sever.enableAuth("myuser", "supersecretpassword"); + */ +Server.prototype.enableAuth = function enableAuth(handler, password) { + if ("function" !== typeof handler) { + var user = "" + handler; + password = "" + password; + handler = function checkAuth(suppliedUser, suppliedPassword) { + return user === suppliedUser && password === suppliedPassword; + }; + } + + this.authHandler = handler; +}; + +var Connection = function Connection(ep) { + events.EventEmitter.call(this); + + this.endpoint = ep; + this.callbacks = []; + this.latestId = 0; + + // Default error handler (prevents "uncaught error event") + this.on('error', function () {}); +}; + +util.inherits(Connection, events.EventEmitter); + +/** + * Make a standard RPC call to the other endpoint. + * + * Note that some ways to make RPC calls bypass this method, for example HTTP + * calls and responses are done in other places. + */ +Connection.prototype.call = function call(method, params, callback) +{ + if (!Array.isArray(params)) { + params = [params]; + } + + var id = null; + if ("function" === typeof callback) { + id = ++this.latestId; + this.callbacks[id] = callback; + } + + Endpoint.trace('-->', 'Call (method '+method+'): ' + JSON.stringify(params)); + var data = JSON.stringify({ + method: method, + params: params, + id: id + }); + this.write(data); +}; + +/** + * Dummy method for sending data. + * + * Connection types that support sending additional data will override this + * method. + */ +Connection.prototype.write = function write(data) +{ + throw new Error("Tried to write data on unsupported connection type."); +}; + +/** + * Keep the connection open. + * + * This method is used to tell a HttpServerConnection to stay open. In order + * to keep it compatible with other connection types, we add it here and make + * it register a connection end handler. + */ +Connection.prototype.stream = function (onend) +{ + if ("function" === typeof onend) { + this.on('end', onend); + } +}; + +Connection.prototype.handleMessage = function handleMessage(msg) +{ + if (msg.hasOwnProperty('result') || + msg.hasOwnProperty('error') && + msg.hasOwnProperty('id') && + "function" === typeof this.callbacks[msg.id]) { + try { + this.callbacks[msg.id](msg.error, msg.result); + } catch(err) { + // TODO: What do we do with erroneous callbacks? + } + } else if (msg.hasOwnProperty('method')) { + this.endpoint.handleCall(msg, this, (function (err, result) { + if (err) { + Endpoint.trace('-->', 'Failure (id ' + msg.id + '): ' + + (err.stack ? err.stack : err.toString())); + } + + if ("undefined" === msg.id || null === msg.id) return; + + if (err) { + err = err.toString(); + result = null; + } else { + Endpoint.trace('-->', 'Response (id ' + msg.id + '): ' + + JSON.stringify(result)); + err = null; + } + + this.sendReply(err, result, msg.id); + }).bind(this)); + } +}; + +Connection.prototype.sendReply = function sendReply(err, result, id) { + var data = JSON.stringify({ + result: result, + error: err, + id: id + }); + this.write(data); +}; + +var HttpServerConnection = function HttpServerConnection(server, req, res) +{ + Connection.call(this, server); + + var self = this; + + this.req = req; + this.res = res; + this.isStreaming = false; + + this.res.connection.on('end', function () { + self.emit('end'); + }); +}; + +util.inherits(HttpServerConnection, Connection); + +/** + * Can be called before the response callback to keep the connection open. + */ +HttpServerConnection.prototype.stream = function (onend) +{ + Connection.prototype.stream.call(this, onend); + + this.isStreaming = true; +}; + +/** + * Send the client additional data. + * + * An HTTP connection can be kept open and additional RPC calls sent through if + * the client supports it. + */ +HttpServerConnection.prototype.write = function (data) +{ + if (!this.isStreaming) { + throw new Error("Cannot send extra messages via non-streaming HTTP"); + } + + if (!this.res.connection.writable) { + // Client disconnected, we'll quietly fail + return; + } + + this.res.write(data); +}; + +/** + * Socket connection. + * + * Socket connections are mostly symmetric, so we are using a single class for + * representing both the server and client perspective. + */ +var SocketConnection = function SocketConnection(endpoint, conn) +{ + Connection.call(this, endpoint); + + var self = this; + + this.conn = conn; + this.autoReconnect = true; + this.ended = true; + + this.conn.on('connect', function () { + self.emit('connect'); + }); + + this.conn.on('end', function () { + self.emit('end'); + }); + + this.conn.on('error', function () { + self.emit('error'); + }); + + this.conn.on('close', function (hadError) { + self.emit('close', hadError); + + // Handle automatic reconnections if we are the client + if (self.endpoint instanceof Client && + self.autoReconnect && + !self.ended) { + if (hadError) { + // If there was an error, we'll wait a moment before retrying + setTimeout(self.reconnect.bind(self), 200); + } else { + self.reconnect(); + } + } + }); +}; + +util.inherits(SocketConnection, Connection); + +SocketConnection.prototype.write = function write(data) +{ + if (!this.conn.writable) { + // Other side disconnected, we'll quietly fail + return; + } + + this.conn.write(data); +}; + +SocketConnection.prototype.end = function end() +{ + this.ended = true; + this.conn.end(); +}; + +SocketConnection.prototype.reconnect = function reconnect() +{ + this.ended = false; + if (this.endpoint instanceof Client) { + this.conn.connect(this.endpoint.port, this.endpoint.host); + } else { + throw new Error('Cannot reconnect a connection from the server-side.'); + } +}; +exports.Endpoint = Endpoint; +exports.Server = Server; +exports.Client = Client; -module.exports.Server = Server; -module.exports.Client = Client; +exports.Connection = Connection; +exports.HttpServerConnection = HttpServerConnection; +exports.SocketConnection = SocketConnection; diff --git a/test/jsonrpc-test.js b/test/jsonrpc-test.js index 893a6b2..27abe76 100644 --- a/test/jsonrpc-test.js +++ b/test/jsonrpc-test.js @@ -1,156 +1,170 @@ -process.mixin(GLOBAL, require('./test')); +require('./test').extend(global); -var sys = require('sys'); -var jsonrpc = require('../src/jsonrpc'); +var util = require('util'); +var rpc = require('../src/jsonrpc'); +var events = require('events'); + +var server = new rpc.Server(); + +rpc.Endpoint.trace = function () {}; // MOCK REQUEST/RESPONSE OBJECTS var MockRequest = function(method) { - this.method = method; - process.EventEmitter.call(this); + this.method = method; + events.EventEmitter.call(this); }; -sys.inherits(MockRequest, process.EventEmitter); +util.inherits(MockRequest, events.EventEmitter); var MockResponse = function() { - process.EventEmitter.call(this); - this.sendHeader = function(httpCode, httpHeaders) { - this.httpCode = httpCode; - this.httpHeaders = httpCode; - }; - this.sendBody = function(httpBody) { - this.httpBody = httpBody; - }; - this.finish = function() {}; + events.EventEmitter.call(this); + this.writeHead = this.sendHeader = function(httpCode, httpHeaders) { + this.httpCode = httpCode; + this.httpHeaders = httpCode; + }; + this.write = this.sendBody = function(httpBody) { + this.httpBody = httpBody; + }; + this.end = this.finish = function() {}; + this.connection = new events.EventEmitter(); }; -sys.inherits(MockResponse, process.EventEmitter); +util.inherits(MockResponse, events.EventEmitter); // A SIMPLE MODULE var TestModule = { - foo: function (a, b) { - return ['foo', 'bar', a, b]; - }, + foo: function (a, b) { + return ['foo', 'bar', a, b]; + }, - other: 'hello' + other: 'hello' }; // EXPOSING FUNCTIONS -test('jsonrpc.expose', function() { - var echo = function(data) { - return data; - }; - jsonrpc.expose('echo', echo); - assert(jsonrpc.functions.echo === echo); +test('Server.expose', function() { + var echo = function(args, opts, callback) { + callback(null, args[0]); + }; + server.expose('echo', echo); + assert(server.functions.echo === echo); }) -test('jsonrpc.exposeModule', function() { - jsonrpc.exposeModule('test', TestModule); - sys.puts(jsonrpc.functions['test.foo']); - sys.puts(TestModule.foo); - assert(jsonrpc.functions['test.foo'] == TestModule.foo); +test('Server.exposeModule', function() { + server.exposeModule('test', TestModule); + assert(server.functions['test.foo'] == TestModule.foo); }); // INVALID REQUEST -test('GET jsonrpc.handleRequest', function() { - var req = new MockRequest('GET'); - var res = new MockResponse(); - jsonrpc.handleRequest(req, res); - assert(res.httpCode === 405); +test('GET Server.handleNonPOST', function() { + var req = new MockRequest('GET'); + var res = new MockResponse(); + server.handleHttp(req, res); + assert(res.httpCode === 405); }); function testBadRequest(testJSON) { - var req = new MockRequest('POST'); - var res = new MockResponse(); - jsonrpc.handleRequest(req, res); - req.emit('body', testJSON); - req.emit('complete'); - sys.puts(res.httpCode); - assert(res.httpCode === 400); + var req = new MockRequest('POST'); + var res = new MockResponse(); + server.handleHttp(req, res); + req.emit('data', testJSON); + req.emit('end'); + assert(res.httpCode === 400); } test('Missing object attribute (method)', function() { - var testJSON = '{ "params": ["Hello, World!"], "id": 1 }'; - testBadRequest(testJSON); + var testJSON = '{ "params": ["Hello, World!"], "id": 1 }'; + testBadRequest(testJSON); }); test('Missing object attribute (params)', function() { - var testJSON = '{ "method": "echo", "id": 1 }'; - testBadRequest(testJSON); + var testJSON = '{ "method": "echo", "id": 1 }'; + testBadRequest(testJSON); }); test('Missing object attribute (id)', function() { - var testJSON = '{ "method": "echo", "params": ["Hello, World!"] }'; - testBadRequest(testJSON); + var testJSON = '{ "method": "echo", "params": ["Hello, World!"] }'; + testBadRequest(testJSON); }); test('Unregistered method', function() { - var testJSON = '{ "method": "notRegistered", "params": ["Hello, World!"], "id": 1 }'; - testBadRequest(testJSON); + var testJSON = '{ "method": "notRegistered", "params": ["Hello, World!"], "id": 1 }'; + var req = new MockRequest('POST'); + var res = new MockResponse(); + try { + server.handleHttp(req, res); + }catch (e) {}; + req.emit('data', testJSON); + req.emit('end'); + assert(res.httpCode === 200); + var decoded = JSON.parse(res.httpBody); + assert(decoded.id === 1); + assert(decoded.error === 'Error: Unknown RPC call \'notRegistered\''); + assert(decoded.result === null); }); // VALID REQUEST test('Simple synchronous echo', function() { - var testJSON = '{ "method": "echo", "params": ["Hello, World!"], "id": 1 }'; - var req = new MockRequest('POST'); - var res = new MockResponse(); - jsonrpc.handleRequest(req, res); - req.emit('body', testJSON); - req.emit('complete'); - assert(res.httpCode === 200); - var decoded = JSON.parse(res.httpBody); - assert(decoded.id === 1); - assert(decoded.error === null); - assert(decoded.result == 'Hello, World!'); + var testJSON = '{ "method": "echo", "params": ["Hello, World!"], "id": 1 }'; + var req = new MockRequest('POST'); + var res = new MockResponse(); + server.handleHttp(req, res); + req.emit('data', testJSON); + req.emit('end'); + assert(res.httpCode === 200); + var decoded = JSON.parse(res.httpBody); + assert(decoded.id === 1); + assert(decoded.error === null); + assert(decoded.result == 'Hello, World!'); }); test('Using promise', function() { - // Expose a function that just returns a promise that we can control. - var promise = new process.Promise(); - jsonrpc.expose('promiseEcho', function(data) { - return promise; - }); - // Build a request to call that function - var testJSON = '{ "method": "promiseEcho", "params": ["Hello, World!"], "id": 1 }'; - var req = new MockRequest('POST'); - var res = new MockResponse(); - // Have the server handle that request - jsonrpc.handleRequest(req, res); - req.emit('body', testJSON); - req.emit('complete'); - // Now the request has completed, and in the above synchronous test, we - // would be finished. However, this function is smarter and only completes - // when the promise completes. Therefore, we should not have a response - // yet. - assert(res['httpCode'] == null); - // We can force the promise to emit a success code, with a message. - promise.emitSuccess('Hello, World!'); - // Aha, now that the promise has finished, our request has finished as well. - assert(res.httpCode === 200); - var decoded = JSON.parse(res.httpBody); - assert(decoded.id === 1); - assert(decoded.error === null); - assert(decoded.result == 'Hello, World!'); + // Expose a function that just returns a promise that we can control. + var callbackRef = null; + server.expose('promiseEcho', function(args, opts, callback) { + callbackRef = callback; + }); + // Build a request to call that function + var testJSON = '{ "method": "promiseEcho", "params": ["Hello, World!"], "id": 1 }'; + var req = new MockRequest('POST'); + var res = new MockResponse(); + // Have the server handle that request + server.handleHttp(req, res); + req.emit('data', testJSON); + req.emit('end'); + // Now the request has completed, and in the above synchronous test, we + // would be finished. However, this function is smarter and only completes + // when the promise completes. Therefore, we should not have a response + // yet. + assert(res['httpCode'] == null); + // We can force the promise to emit a success code, with a message. + callbackRef(null, 'Hello, World!'); + // Aha, now that the promise has finished, our request has finished as well. + assert(res.httpCode === 200); + var decoded = JSON.parse(res.httpBody); + assert(decoded.id === 1); + assert(decoded.error === null); + assert(decoded.result == 'Hello, World!'); }); test('Triggering an errback', function() { - var promise = new process.Promise(); - jsonrpc.expose('errbackEcho', function(data) { - return promise; - }); - var testJSON = '{ "method": "errbackEcho", "params": ["Hello, World!"], "id": 1 }'; - var req = new MockRequest('POST'); - var res = new MockResponse(); - jsonrpc.handleRequest(req, res); - req.emit('body', testJSON); - req.emit('complete'); - assert(res['httpCode'] == null); - // This time, unlike the above test, we trigger an error and expect to see - // it in the error attribute of the object returned. - promise.emitError('This is an error'); - assert(res.httpCode === 200); - var decoded = JSON.parse(res.httpBody); - assert(decoded.id === 1); - assert(decoded.error == 'This is an error'); - assert(decoded.result == null); -}) \ No newline at end of file + var callbackRef = null; + server.expose('errbackEcho', function(args, opts, callback) { + callbackRef = callback; + }); + var testJSON = '{ "method": "errbackEcho", "params": ["Hello, World!"], "id": 1 }'; + var req = new MockRequest('POST'); + var res = new MockResponse(); + server.handleHttp(req, res); + req.emit('data', testJSON); + req.emit('end'); + assert(res['httpCode'] == null); + // This time, unlike the above test, we trigger an error and expect to see + // it in the error attribute of the object returned. + callbackRef('This is an error'); + assert(res.httpCode === 200); + var decoded = JSON.parse(res.httpBody); + assert(decoded.id === 1); + assert(decoded.error == 'This is an error'); + assert(decoded.result == null); +}) diff --git a/test/test.js b/test/test.js index 943507f..504316d 100644 --- a/test/test.js +++ b/test/test.js @@ -1,77 +1,81 @@ -var sys = require('sys'); +var util = require('util'); -TEST = { - passed: 0, - failed: 0, - assertions: 0, +var TEST = module.exports = { + passed: 0, + failed: 0, + assertions: 0, - test: function (desc, block) { - var _puts = sys.puts, - output = "", - result = '?', - _boom = null; - sys.puts = function (s) { output += s + "\n"; } - try { - sys.print(" " + desc + " ..."); - block(); - result = '.'; - } catch(boom) { - if ( boom == 'FAIL' ) { - result = 'F'; - } else { - result = 'E'; - _boom = boom; - sys.puts(boom.toString()); - } - } - sys.puts = _puts; - if ( result == '.' ) { - sys.print(" OK\n"); - TEST.passed += 1; + output: "", + + test: function (desc, block) { + var result = '?', + _boom = null; + + TEST.output = ""; + try { + TEST.output += " " + desc + " ..."; + block(); + result = '.'; + } catch(boom) { + if ( boom == 'FAIL' ) { + result = 'F'; } else { - sys.print(" FAIL\n"); - sys.print(output.replace(/^/, " ") + "\n"); - TEST.failed += 1; - if ( _boom ) throw _boom; + result = 'E'; + _boom = boom; + TEST.output += boom.toString(); } - }, + } + if ( result == '.' ) { + process.stdout.write(TEST.output + " OK\n"); + TEST.passed += 1; + } else { + process.stdout.write(TEST.output + " FAIL\n"); + process.stdout.write(TEST.output.replace(/^/, " ") + "\n"); + TEST.failed += 1; + if ( _boom ) throw _boom; + } + }, - assert: function (value, desc) { - TEST.assertions += 1; - if ( desc ) sys.puts("ASSERT: " + desc); - if ( !value ) throw 'FAIL'; - }, + assert: function (value, desc) { + TEST.assertions += 1; + if ( desc ) TEST.output += "ASSERT: " + desc; + if ( !value ) throw 'FAIL'; + }, - assert_equal: function (expect, is) { - assert( - expect == is, - sys.inspect(expect) + " == " + sys.inspect(is) - ); - }, + assert_equal: function (expect, is) { + assert( + expect == is, + util.inspect(expect) + " == " + util.inspect(is) + ); + }, - assert_boom: function (message, block) { - var error = null; - try { block() } - catch (boom) { error = boom } + assert_boom: function (message, block) { + var error = null; + try { block(); } + catch (boom) { error = boom; } - if ( !error ) { - sys.puts('NO BOOM'); - throw 'FAIL' - } - if ( error != message ) { - sys.puts('BOOM: ' + sys.inspect(error) + - ' [' + sys.inspect(message) + ' expected]'); - throw 'FAIL' - } - } -}; + if ( !error ) { + TEST.output += 'NO BOOM'; + throw 'FAIL'; + } + if ( error != message ) { + TEST.output += 'BOOM: ' + util.inspect(error) + + ' [' + util.inspect(message) + ' expected]'; + throw 'FAIL'; + } + }, -process.mixin(exports, TEST); + extend: function (scope) { + Object.keys(TEST).forEach(function (key) { + scope[key] = TEST[key]; + }); + } +}; process.addListener('exit', function (code) { - if ( !TEST.exit ) { - TEST.exit = true; - sys.puts("" + TEST.passed + " passed, " + TEST.failed + " failed"); - if ( TEST.failed > 0 ) { process.exit(1) }; - } + if ( !TEST.exit ) { + TEST.exit = true; + console.log("" + TEST.passed + " passed, " + TEST.failed + " failed"); + if ( TEST.failed > 0 ) { process.exit(1) }; + } });