From 452dd5b42b2074978a1cce76f4e98548b6021cc4 Mon Sep 17 00:00:00 2001 From: henrique Date: Tue, 27 Aug 2013 15:37:01 +0200 Subject: [PATCH] THRIFT-1893 HTTP/JSON server/client for node.js Client: nodejs Patch: Phillip Campbell --- lib/js/thrift.js | 6 +- lib/nodejs/examples/server_http.js | 53 ++++ lib/nodejs/lib/thrift/index.js | 6 +- lib/nodejs/lib/thrift/protocol.js | 479 ++++++++++++++++++++++++++++- lib/nodejs/lib/thrift/server.js | 46 +++ lib/nodejs/package.json | 3 + 6 files changed, 579 insertions(+), 14 deletions(-) create mode 100644 lib/nodejs/examples/server_http.js diff --git a/lib/js/thrift.js b/lib/js/thrift.js index 39b0a5cc..e59f07ff 100644 --- a/lib/js/thrift.js +++ b/lib/js/thrift.js @@ -552,7 +552,7 @@ Thrift.Protocol.prototype = { // Reading functions - readMessageBegin: function(name, messageType, seqid) { + readMessageBegin: function() { this.rstack = []; this.rpos = []; @@ -658,7 +658,7 @@ Thrift.Protocol.prototype = { }, - readMapBegin: function(keyType, valType, size) { + readMapBegin: function() { var map = this.rstack.pop(); var r = {}; @@ -677,7 +677,7 @@ Thrift.Protocol.prototype = { this.readFieldEnd(); }, - readListBegin: function(elemType, size) { + readListBegin: function() { var list = this.rstack[this.rstack.length - 1]; var r = {}; diff --git a/lib/nodejs/examples/server_http.js b/lib/nodejs/examples/server_http.js new file mode 100644 index 00000000..ef2dc83a --- /dev/null +++ b/lib/nodejs/examples/server_http.js @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +var connect = require('connect'); +var thrift = require('thrift'); + +var UserStorage = require('./gen-nodejs/UserStorage'), + ttypes = require('./gen-nodejs/user_types'); + +var users = {}; + +var store = function(user, result) { + console.log("stored:", user.uid); + users[user.uid] = user; + result(null); +}; +var retrieve = function(uid, result) { + console.log("retrieved:", uid); + result(null, users[uid]); +}; + +var server_http = thrift.createHttpServer(UserStorage, { + store: store, + retrieve: retrieve +}); +server_http.listen(9090); + +var server_connect = connect(thrift.httpMiddleware(UserStorage, { + store: store, + retrieve: retrieve +})); +server_http.listen(9091); + +var server_connect_json = connect(thrift.httpMiddleware(UserStorage, { + store: store, + retrieve: retrieve +}, {protocol: thrift.TJSONProtocol})); +server_connect_json.listen(9092); diff --git a/lib/nodejs/lib/thrift/index.js b/lib/nodejs/lib/thrift/index.js index 2554a2f8..bf340850 100644 --- a/lib/nodejs/lib/thrift/index.js +++ b/lib/nodejs/lib/thrift/index.js @@ -25,7 +25,10 @@ exports.createConnection = connection.createConnection; exports.createStdIOClient = connection.createStdIOClient; exports.createStdIOConnection = connection.createStdIOConnection; -exports.createServer = require('./server').createServer; +server = require('./server') +exports.createServer = server.createServer; +exports.createHttpServer = server.createHttpServer; +exports.httpMiddleware = server.httpMiddleware exports.Int64 = require('node-int64') @@ -36,3 +39,4 @@ exports.Int64 = require('node-int64') exports.TFramedTransport = require('./transport').TFramedTransport; exports.TBufferedTransport = require('./transport').TBufferedTransport; exports.TBinaryProtocol = require('./protocol').TBinaryProtocol; +exports.TJSONProtocol = require('./protocol').TJSONProtocol; diff --git a/lib/nodejs/lib/thrift/protocol.js b/lib/nodejs/lib/thrift/protocol.js index cbf6c9a7..f3cd9389 100644 --- a/lib/nodejs/lib/thrift/protocol.js +++ b/lib/nodejs/lib/thrift/protocol.js @@ -36,6 +36,13 @@ var TProtocolException = function(type, message) { } util.inherits(TProtocolException, Error); +// NastyHaxx. JavaScript forces hex constants to be +// positive, converting this into a long. If we hardcode the int value +// instead it'll stay in 32 bit-land. +var VERSION_MASK = -65536, // 0xffff0000 + VERSION_1 = -2147418112, // 0x80010000 + TYPE_MASK = 0x000000ff; + var TBinaryProtocol = exports.TBinaryProtocol = function(trans, strictRead, strictWrite) { this.trans = trans; this.strictRead = (strictRead !== undefined ? strictRead : false); @@ -46,14 +53,6 @@ TBinaryProtocol.prototype.flush = function() { return this.trans.flush(); } -// NastyHaxx. JavaScript forces hex constants to be -// positive, converting this into a long. If we hardcode the int value -// instead it'll stay in 32 bit-land. - -var VERSION_MASK = -65536, // 0xffff0000 - VERSION_1 = -2147418112, // 0x80010000 - TYPE_MASK = 0x000000ff; - TBinaryProtocol.prototype.writeMessageBegin = function(name, type, seqid) { if (this.strictWrite) { this.writeI32(VERSION_1 | type); @@ -105,7 +104,6 @@ TBinaryProtocol.prototype.writeListEnd = function() { } TBinaryProtocol.prototype.writeSetBegin = function(etype, size) { - console.log('write set', etype, size); this.writeByte(etype); this.writeI32(size); } @@ -287,7 +285,6 @@ TBinaryProtocol.prototype.getTransport = function() { } TBinaryProtocol.prototype.skip = function(type) { - // console.log("skip: " + type); switch (type) { case Type.STOP: return; @@ -350,3 +347,465 @@ TBinaryProtocol.prototype.skip = function(type) { throw Error("Invalid type: " + type); } } + +var TJSONProtocol = exports.TJSONProtocol = function(trans) { + this.trans = trans; +} + +TJSONProtocol.Type = {}; +TJSONProtocol.Type[Thrift.Type.BOOL] = '"tf"'; +TJSONProtocol.Type[Thrift.Type.BYTE] = '"i8"'; +TJSONProtocol.Type[Thrift.Type.I16] = '"i16"'; +TJSONProtocol.Type[Thrift.Type.I32] = '"i32"'; +TJSONProtocol.Type[Thrift.Type.I64] = '"i64"'; +TJSONProtocol.Type[Thrift.Type.DOUBLE] = '"dbl"'; +TJSONProtocol.Type[Thrift.Type.STRUCT] = '"rec"'; +TJSONProtocol.Type[Thrift.Type.STRING] = '"str"'; +TJSONProtocol.Type[Thrift.Type.MAP] = '"map"'; +TJSONProtocol.Type[Thrift.Type.LIST] = '"lst"'; +TJSONProtocol.Type[Thrift.Type.SET] = '"set"'; + + +TJSONProtocol.RType = {}; +TJSONProtocol.RType.tf = Thrift.Type.BOOL; +TJSONProtocol.RType.i8 = Thrift.Type.BYTE; +TJSONProtocol.RType.i16 = Thrift.Type.I16; +TJSONProtocol.RType.i32 = Thrift.Type.I32; +TJSONProtocol.RType.i64 = Thrift.Type.I64; +TJSONProtocol.RType.dbl = Thrift.Type.DOUBLE; +TJSONProtocol.RType.rec = Thrift.Type.STRUCT; +TJSONProtocol.RType.str = Thrift.Type.STRING; +TJSONProtocol.RType.map = Thrift.Type.MAP; +TJSONProtocol.RType.lst = Thrift.Type.LIST; +TJSONProtocol.RType.set = Thrift.Type.SET; + +TJSONProtocol.Version = 1; + +TJSONProtocol.prototype.flush = function() { + return this.trans.flush(); +} + +TJSONProtocol.prototype.writeMessageBegin = function(name, messageType, seqid) { + this.tstack = []; + this.tpos = []; + + this.tstack.push([TJSONProtocol.Version, '"' + name + '"', messageType, seqid]); +} + +TJSONProtocol.prototype.writeMessageEnd = function() { + var obj = this.tstack.pop(); + + this.wobj = this.tstack.pop(); + this.wobj.push(obj); + + this.wbuf = '[' + this.wobj.join(',') + ']'; + + this.trans.write(this.wbuf); +} + +TJSONProtocol.prototype.writeStructBegin = function(name) { + this.tpos.push(this.tstack.length); + this.tstack.push({}); +} + +TJSONProtocol.prototype.writeStructEnd = function() { + var p = this.tpos.pop(); + var struct = this.tstack[p]; + var str = '{'; + var first = true; + for (var key in struct) { + if (first) { + first = false; + } else { + str += ','; + } + + str += key + ':' + struct[key]; + } + + str += '}'; + this.tstack[p] = str; +} + +TJSONProtocol.prototype.writeFieldBegin = function(name, fieldType, fieldId) { + this.tpos.push(this.tstack.length); + this.tstack.push({ 'fieldId': '"' + + fieldId + '"', 'fieldType': TJSONProtocol.Type[fieldType] + }); +} + +TJSONProtocol.prototype.writeFieldEnd = function() { + var value = this.tstack.pop(); + var fieldInfo = this.tstack.pop(); + + this.tstack[this.tstack.length - 1][fieldInfo.fieldId] = '{' + + fieldInfo.fieldType + ':' + value + '}'; + this.tpos.pop(); +} + +TJSONProtocol.prototype.writeFieldStop = function() { +} + +TJSONProtocol.prototype.writeMapBegin = function(ktype, vtype, size) { + //size is invalid, we'll set it on end. + this.tpos.push(this.tstack.length); + this.tstack.push([TJSONProtocol.Type[ktype], TJSONProtocol.Type[vtype], 0]); +} + +TJSONProtocol.prototype.writeMapEnd = function() { + var p = this.tpos.pop(); + + if (p == this.tstack.length) { + return; + } + + if ((this.tstack.length - p - 1) % 2 !== 0) { + this.tstack.push(''); + } + + var size = (this.tstack.length - p - 1) / 2; + + this.tstack[p][this.tstack[p].length - 1] = size; + + var map = '}'; + var first = true; + while (this.tstack.length > p + 1) { + var v = this.tstack.pop(); + var k = this.tstack.pop(); + if (first) { + first = false; + } else { + map = ',' + map; + } + + if (! isNaN(k)) { k = '"' + k + '"'; } //json "keys" need to be strings + map = k + ':' + v + map; + } + map = '{' + map; + + this.tstack[p].push(map); + this.tstack[p] = '[' + this.tstack[p].join(',') + ']'; +} + +TJSONProtocol.prototype.writeListBegin = function(etype, size) { + this.tpos.push(this.tstack.length); + this.tstack.push([TJSONProtocol.Type[etype], size]); +} + +TJSONProtocol.prototype.writeListEnd = function() { + var p = this.tpos.pop(); + + while (this.tstack.length > p + 1) { + var tmpVal = this.tstack[p + 1]; + this.tstack.splice(p + 1, 1); + this.tstack[p].push(tmpVal); + } + + this.tstack[p] = '[' + this.tstack[p].join(',') + ']'; +} + +TJSONProtocol.prototype.writeSetBegin = function(etype, size) { + this.tpos.push(this.tstack.length); + this.tstack.push([TJSONProtocol.Type[etype], size]); +} + +TJSONProtocol.prototype.writeSetEnd = function() { + var p = this.tpos.pop(); + + while (this.tstack.length > p + 1) { + var tmpVal = this.tstack[p + 1]; + this.tstack.splice(p + 1, 1); + this.tstack[p].push(tmpVal); + } + + this.tstack[p] = '[' + this.tstack[p].join(',') + ']'; +} + +TJSONProtocol.prototype.writeBool = function(bool) { + this.tstack.push(bool ? 1 : 0); +} + +TJSONProtocol.prototype.writeByte = function(byte) { + this.tstack.push(byte); +} + +TJSONProtocol.prototype.writeI16 = function(i16) { + this.tstack.push(i16); +} + +TJSONProtocol.prototype.writeI32 = function(i32) { + this.tstack.push(i32); +} + +TJSONProtocol.prototype.writeI64 = function(i64) { + this.tstack.push(i64); +} + +TJSONProtocol.prototype.writeDouble = function(dub) { + this.tstack.push(dub); +} + +TJSONProtocol.prototype.writeString = function(str) { + // We do not encode uri components for wire transfer: + if (str === null) { + this.tstack.push(null); + } else { + // concat may be slower than building a byte buffer + var escapedString = ''; + for (var i = 0; i < str.length; i++) { + var ch = str.charAt(i); // a single double quote: " + if (ch === '\\\\"') { + escapedString += '\\\\\\\\\\\\"'; // write out as: \\\\" + } else if (ch === '\\\\\\\\') { // a single backslash: \\\\ + escapedString += '\\\\\\\\\\\\\\\\'; // write out as: \\\\\\\\ + /* Currently escaped forward slashes break TJSONProtocol. + * As it stands, we can simply pass forward slashes into + * our strings across the wire without being escaped. + * I think this is the protocol's bug, not thrift.js + * } else if(ch === '/') { // a single forward slash: / + * escapedString += '\\\\\\\\/'; // write out as \\\\/ + * } + */ + } else if (ch === '\\\\b') { // a single backspace: invisible + escapedString += '\\\\\\\\b'; // write out as: \\\\b" + } else if (ch === '\\\\f') { // a single formfeed: invisible + escapedString += '\\\\\\\\f'; // write out as: \\\\f" + } else if (ch === '\\\\n') { // a single newline: invisible + escapedString += '\\\\\\\\n'; // write out as: \\\\n" + } else if (ch === '\\\\r') { // a single return: invisible + escapedString += '\\\\\\\\r'; // write out as: \\\\r" + } else if (ch === '\\\\t') { // a single tab: invisible + escapedString += '\\\\\\\\t'; // write out as: \\\\t" + } else { + escapedString += ch; // Else it need not be escaped + } + } + this.tstack.push('"' + escapedString + '"'); + } +} + +TJSONProtocol.prototype.writeBinary = function(arg) { + this.writeString(arg); +} + +TJSONProtocol.prototype.readMessageBegin = function() { + this.rstack = []; + this.rpos = []; + + this.robj = JSON.parse(this.trans.readAll()); + + var r = {}; + var version = this.robj.shift(); + + if (version != TJSONProtocol.Version) { + throw 'Wrong thrift protocol version: ' + version; + } + + r.fname = this.robj.shift(); + r.mtype = this.robj.shift(); + r.rseqid = this.robj.shift(); + + + //get to the main obj + this.rstack.push(this.robj.shift()); + + return r; +} + +TJSONProtocol.prototype.readMessageEnd = function() { +} + +TJSONProtocol.prototype.readStructBegin = function() { + var r = {}; + r.fname = ''; + + //incase this is an array of structs + if (this.rstack[this.rstack.length - 1] instanceof Array) { + this.rstack.push(this.rstack[this.rstack.length - 1].shift()); + } + + return r; +} + +TJSONProtocol.prototype.readStructEnd = function() { + if (this.rstack[this.rstack.length - 2] instanceof Array) { + this.rstack.pop(); + } +} + +TJSONProtocol.prototype.readFieldBegin = function() { + var r = {}; + + var fid = -1; + var ftype = Thrift.Type.STOP; + + //get a fieldId + for (var f in (this.rstack[this.rstack.length - 1])) { + if (f === null) { + continue; + } + + fid = parseInt(f, 10); + this.rpos.push(this.rstack.length); + + var field = this.rstack[this.rstack.length - 1][fid]; + + //remove so we don't see it again + delete this.rstack[this.rstack.length - 1][fid]; + + this.rstack.push(field); + + break; + } + + if (fid != -1) { + //should only be 1 of these but this is the only + //way to match a key + for (var i in (this.rstack[this.rstack.length - 1])) { + if (TJSONProtocol.RType[i] === null) { + continue; + } + + ftype = TJSONProtocol.RType[i]; + this.rstack[this.rstack.length - 1] = this.rstack[this.rstack.length - 1][i]; + } + } + + r.fname = ''; + r.ftype = ftype; + r.fid = fid; + + return r; +} + +TJSONProtocol.prototype.readFieldEnd = function() { + var pos = this.rpos.pop(); + + //get back to the right place in the stack + while (this.rstack.length > pos) { + this.rstack.pop(); + } +} + +TJSONProtocol.prototype.readMapBegin = function() { + var map = this.rstack.pop(); + + var r = {}; + r.ktype = TJSONProtocol.RType[map.shift()]; + r.vtype = TJSONProtocol.RType[map.shift()]; + r.size = map.shift(); + + + this.rpos.push(this.rstack.length); + this.rstack.push(map.shift()); + + return r; +} + +TJSONProtocol.prototype.readMapEnd = function() { + this.readFieldEnd(); +} + +TJSONProtocol.prototype.readListBegin = function() { + var list = this.rstack[this.rstack.length - 1]; + + var r = {}; + r.etype = TJSONProtocol.RType[list.shift()]; + r.size = list.shift(); + + this.rpos.push(this.rstack.length); + this.rstack.push(list); + + return r; +} + +TJSONProtocol.prototype.readListEnd = function() { + this.readFieldEnd(); +} + +TJSONProtocol.prototype.readSetBegin = function() { + return this.readListBegin(); +} + +TJSONProtocol.prototype.readSetEnd = function() { + return this.readListEnd(); +} + +TJSONProtocol.prototype.readBool = function() { + var r = this.readI32(); + + if (r !== null && r.value == '1') { + r.value = true; + } else { + r.value = false; + } + + return r; +} + +TJSONProtocol.prototype.readByte = function() { + return this.readI32(); +} + +TJSONProtocol.prototype.readI16 = function() { + return this.readI32(); +} + +TJSONProtocol.prototype.readI32 = function(f) { + if (f === undefined) { + f = this.rstack[this.rstack.length - 1]; + } + + var r = {}; + + if (f instanceof Array) { + if (f.length === 0) { + r.value = undefined; + } else { + r.value = f.shift(); + } + } else if (f instanceof Object) { + for (var i in f) { + if (i === null) { + continue; + } + this.rstack.push(f[i]); + delete f[i]; + + r.value = i; + break; + } + } else { + r.value = f; + this.rstack.pop(); + } + + return r.value; +} + +TJSONProtocol.prototype.readI64 = function() { + return new Int64(this.readI32()); +} + +TJSONProtocol.prototype.readDouble = function() { + return this.readI32(); +} + +TJSONProtocol.prototype.readBinary = function() { + return this.readString(); +} + +TJSONProtocol.prototype.readString = function() { + var r = this.readI32(); + return r; +} + +TJSONProtocol.prototype.getTransport = function() { + return this.trans; +} + +//Method to arbitrarily skip over data. +TJSONProtocol.prototype.skip = function(type) { + throw 'skip not supported yet'; +} + diff --git a/lib/nodejs/lib/thrift/server.js b/lib/nodejs/lib/thrift/server.js index f2190480..228bb036 100644 --- a/lib/nodejs/lib/thrift/server.js +++ b/lib/nodejs/lib/thrift/server.js @@ -17,6 +17,7 @@ * under the License. */ var net = require('net'); +var http = require('http'); var ttransport = require('./transport') , TBinaryProtocol = require('./protocol').TBinaryProtocol; @@ -62,3 +63,48 @@ exports.createServer = function(cls, handler, options) { }); }); }; + +function httpRequestHandler(cls, handler, options) { + if (cls.Processor) { + cls = cls.Processor; + } + var processor = new cls(handler); + var transport = (options && options.transport) ? options.transport : ttransport.TBufferedTransport; + var protocol = (options && options.protocol) ? options.protocol : TBinaryProtocol; + + return function(request, response) { + var self = this; + + request.on('data', transport.receiver(function(transportWithData) { + var input = new protocol(transportWithData); + var output = new protocol(new transport(undefined, function(buf) { + try { + response.write(buf); + } catch (err) { + response.writeHead(500); + } + response.end(); + })); + + try { + processor.process(input, output); + transportWithData.commitPosition(); + } + catch (err) { + if (err instanceof ttransport.InputBufferUnderrunError) { + transportWithData.rollbackPosition(); + } else { + response.writeHead(500); + response.end(); + throw err; + } + } + })); + }; +}; + +exports.httpMiddleware = httpRequestHandler; + +exports.createHttpServer = function(cls, handler, options) { + return http.createServer(httpRequestHandler(cls, handler, options)); +}; diff --git a/lib/nodejs/package.json b/lib/nodejs/package.json index c57a07ba..668c58df 100755 --- a/lib/nodejs/package.json +++ b/lib/nodejs/package.json @@ -27,5 +27,8 @@ "dependencies": { "node-int64": "~0.3.0", "nodeunit": "~0.8.0" + }, + "devDependencies": { + "connect": "2.7.x" } } -- 2.17.1