From: Roger Meier Date: Sat, 9 Apr 2011 11:10:04 +0000 (+0000) Subject: THRIFT-1129 Add BufferedTransport (non-framed) to Node.js library X-Git-Tag: 0.7.0~119 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=5c819c084c09faace575f3889869a8a0b0dd7c95;p=common%2Fthrift.git THRIFT-1129 Add BufferedTransport (non-framed) to Node.js library Patch: Wade Simmons git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1090565 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/nodejs/examples/client_multitransport.js b/lib/nodejs/examples/client_multitransport.js new file mode 100644 index 00000000..5c9e109f --- /dev/null +++ b/lib/nodejs/examples/client_multitransport.js @@ -0,0 +1,40 @@ +var thrift = require('thrift'), + ttransport = require('thrift/transport'); + +var UserStorage = require('./gen-nodejs/UserStorage'), + ttypes = require('./gen-nodejs/user_types'); + +var f_conn = thrift.createConnection('localhost', 9090), // default: framed + f_client = thrift.createClient(UserStorage, f_conn); +var b_conn = thrift.createConnection('localhost', 9091, {transport: ttransport.TBufferedTransport}), + b_client = thrift.createClient(UserStorage, b_conn); +var user1 = new ttypes.UserProfile({uid: 1, + name: "Mark Slee", + blurb: "I'll find something to put here."}); +var user2 = new ttypes.UserProfile({uid: 2, + name: "Satoshi Tagomori", + blurb: "ok, let's test with buffered transport."}); + +f_conn.on('error', function(err) { + console.error("framed:", err); +}); + +f_client.store(user1, function(err, response) { + if (err) { console.error(err); return; } + + console.log("stored:", user1.uid, " as ", user1.name); + b_client.retrieve(user1.uid, function(err, responseUser) { + if (err) { console.error(err); return; } + console.log("retrieved:", responseUser.uid, " as ", responseUser.name); + }); +}); + +b_client.store(user2, function(err, response) { + if (err) { console.error(err); return; } + + console.log("stored:", user2.uid, " as ", user2.name); + f_client.retrieve(user2.uid, function(err, responseUser) { + if (err) { console.error(err); return; } + console.log("retrieved:", responseUser.uid, " as ", responseUser.name); + }); +}); diff --git a/lib/nodejs/examples/server_multitransport.js b/lib/nodejs/examples/server_multitransport.js new file mode 100644 index 00000000..e5d6d760 --- /dev/null +++ b/lib/nodejs/examples/server_multitransport.js @@ -0,0 +1,28 @@ +var thrift = require('thrift'), + ttransport = require('thrift/transport'); + +var UserStorage = require('./gen-nodejs/UserStorage'), + ttypes = require('./gen-nodejs/user_types'); + +var users = {}; + +var store = function(user, success) { + console.log("stored:", user.uid); + users[user.uid] = user; + success(); +}; +var retrieve = function(uid, success) { + console.log("retrieved:", uid); + success(users[uid]); +}; + +var server_framed = thrift.createServer(UserStorage, { + store: store, + retrieve: retrieve +}); +server_framed.listen(9090); +var server_buffered = thrift.createServer(UserStorage, { + store: store, + retrieve: retrieve +}, {transport: ttransport.TBufferedTransport}); +server_buffered.listen(9091); diff --git a/lib/nodejs/lib/thrift/connection.js b/lib/nodejs/lib/thrift/connection.js index e469f496..35302274 100644 --- a/lib/nodejs/lib/thrift/connection.js +++ b/lib/nodejs/lib/thrift/connection.js @@ -19,60 +19,26 @@ var sys = require('sys'), EventEmitter = require("events").EventEmitter, net = require('net'), - TMemoryBuffer = require('./transport').TMemoryBuffer, - TBinaryProtocol = require('./protocol').TBinaryProtocol; + ttransport = require('./transport'), + tprotocol = require('./protocol'); var BinaryParser = require('./binary_parser').BinaryParser; BinaryParser.bigEndian = true; -var int32FramedReceiver = exports.int32FramedReceiver = function (callback) { - var frameLeft = 0, - framePos = 0, - frame = null; - - return function(data) { - // var buf = new Buffer(data, 'binary'); - // console.log(buf); - // framed transport - while (data.length) { - if (frameLeft === 0) { - // TODO assumes we have all 4 bytes - if (data.length < 4) { - throw Error("Not enough bytes"); - } - frameLeft = BinaryParser.toInt(data.slice(0,4)); - frame = new Buffer(frameLeft); - framePos = 0; - data = data.slice(4, data.length); - } - - if (data.length >= frameLeft) { - data.copy(frame, framePos, 0, frameLeft); - data = data.slice(frameLeft, data.length); - - frameLeft = 0; - callback(frame); - } else if (data.length) { - data.copy(frame, framePos, 0, data.length); - frameLeft -= data.length; - framePos += data.length; - data = data.slice(data.length, data.length); - } - } - }; -} - var Connection = exports.Connection = function(stream, options) { var self = this; EventEmitter.call(this); this.connection = stream; - this.offline_queue = []; this.options = options || {}; + this.transport = this.options.transport || ttransport.TFramedTransport; + this.protocol = this.options.protocol || tprotocol.TBinaryProtocol; + this.offline_queue = []; this.connected = false; this.connection.addListener("connect", function() { self.connected = true; + this.setTimeout(self.options.timeout || 0); this.setNoDelay(); this.frameLeft = 0; @@ -99,17 +65,47 @@ var Connection = exports.Connection = function(stream, options) { self.emit("timeout"); }); - this.connection.addListener("data", int32FramedReceiver(function(data) { - // console.log(typeof(data)); - var input = new TBinaryProtocol(new TMemoryBuffer(data)); - var r = input.readMessageBegin(); - // console.log(r); - self.client['recv_' + r.fname](input, r.mtype, r.rseqid); - // self.emit("data", data); + this.connection.addListener("data", self.transport.receiver(function(transport_with_data) { + var message = new self.protocol(transport_with_data); + try { + var header = message.readMessageBegin(); + var dummy_seqid = header.rseqid * -1; + var client = self.client; + client._reqs[dummy_seqid] = function(err, success){ + transport_with_data.commitPosition(); + + var callback = client._reqs[header.rseqid]; + delete client._reqs[header.rseqid]; + if (callback) { + callback(err, success); + } + }; + client['recv_' + header.fname](message, header.mtype, dummy_seqid); + } + catch (e) { + if (e instanceof ttransport.InputBufferUnderrunError) { + transport_with_data.rollbackPosition(); + } + else { + throw e; + } + } })); -} +}; sys.inherits(Connection, EventEmitter); +Connection.prototype.end = function() { + this.connection.end(); +} + +Connection.prototype.write = function(data) { + if (!this.connected) { + this.offline_queue.push(data); + return; + } + this.connection.write(data); +} + exports.createConnection = function(host, port, options) { var stream = net.createConnection(port, host); var connection = new Connection(stream, options); @@ -123,28 +119,12 @@ exports.createClient = function(cls, connection) { if (cls.Client) { cls = cls.Client; } - var client = new cls(new TMemoryBuffer(undefined, function(buf) { + var client = new cls(new connection.transport(undefined, function(buf) { connection.write(buf); - }), TBinaryProtocol); + }), connection.protocol); // TODO clean this up connection.client = client; return client; } - -Connection.prototype.end = function() { - this.connection.end(); -} - -Connection.prototype.write = function(buf) { - // TODO: optimize this better, allocate one buffer instead of both: - var msg = new Buffer(buf.length + 4); - BinaryParser.fromInt(buf.length).copy(msg, 0, 0, 4); - buf.copy(msg, 4, 0, buf.length); - if (!this.connected) { - this.offline_queue.push(msg); - } else { - this.connection.write(msg); - } -} diff --git a/lib/nodejs/lib/thrift/server.js b/lib/nodejs/lib/thrift/server.js index 43a7967a..a73f23e7 100644 --- a/lib/nodejs/lib/thrift/server.js +++ b/lib/nodejs/lib/thrift/server.js @@ -19,33 +19,41 @@ var sys = require('sys'), net = require('net'); +var ttransport = require('./transport'); var BinaryParser = require('./binary_parser').BinaryParser, - TMemoryBuffer = require('./transport').TMemoryBuffer, - TBinaryProtocol = require('./protocol').TBinaryProtocol, - int32FramedReceiver = require('./connection').int32FramedReceiver; + TBinaryProtocol = require('./protocol').TBinaryProtocol; -exports.createServer = function(cls, handler) { +exports.createServer = function(cls, handler, options) { if (cls.Processor) { cls = cls.Processor; } var processor = new cls(handler); + var transport = (options && options.transport) ? options.transport : ttransport.TFramedTransport; + var protocol = (options && options.protocol) ? options.protocol : TBinaryProtocol; return net.createServer(function(stream) { - stream.on('data', int32FramedReceiver(function(data) { - var input = new TBinaryProtocol(new TMemoryBuffer(data)); - var output = new TBinaryProtocol(new TMemoryBuffer(undefined, function(buf) { - // TODO: optimize this better, allocate one buffer instead of both: - var msg = new Buffer(buf.length + 4); - BinaryParser.fromInt(buf.length).copy(msg, 0, 0, 4); - buf.copy(msg, 4, 0, buf.length); - stream.write(msg); + stream.on('data', transport.receiver(function(transport_with_data) { + var input = new protocol(transport_with_data); + var output = new protocol(new transport(undefined, function(buf) { + stream.write(buf); })); - processor.process(input, output); + try { + processor.process(input, output); + transport_with_data.commitPosition(); + } + catch (e) { + if (e instanceof ttransport.InputBufferUnderrunError) { + transport_with_data.rollbackPosition(); + } + else { + throw e; + } + } })); stream.on('end', function() { stream.end(); }); }); -} +}; diff --git a/lib/nodejs/lib/thrift/transport.js b/lib/nodejs/lib/thrift/transport.js index 0c10ef0a..a926a6d8 100644 --- a/lib/nodejs/lib/thrift/transport.js +++ b/lib/nodejs/lib/thrift/transport.js @@ -16,72 +16,225 @@ * specific language governing permissions and limitations * under the License. */ -var TMemoryBuffer = exports.TMemoryBuffer = function(buffer, flushCallback) { - if (buffer !== undefined) { - this.recv_buf = buffer; - } else { - this.recv_buf = new Buffer(0); - } - this.recv_buf_sz = this.recv_buf.length; - this.send_buf = []; - this.rpos = 0; - this.flushCallback = flushCallback; -} +var BinaryParser = require('./binary_parser').BinaryParser; -TMemoryBuffer.prototype.isOpen = function() { - // TODO - return true; -} +var emptyBuf = new Buffer(0); -TMemoryBuffer.prototype.open = function() { -} +var InputBufferUnderrunError = exports.InputBufferUnderrunError = function() { +}; -TMemoryBuffer.prototype.close = function() { -} +var TFramedTransport = exports.TFramedTransport = function(buffer, callback) { + this.inBuf = buffer || emptyBuf; + this.outBuffers = []; + this.outCount = 0; + this.readPos = 0; + this.onFlush = callback; +}; +TFramedTransport.receiver = function(callback) { + var frameLeft = 0, + framePos = 0, + frame = null; + var residual = null; -TMemoryBuffer.prototype.read = function(len) { - var avail = this.recv_buf_sz - this.rpos; - // console.log("avail: " + avail); + return function(data) { + // Prepend any residual data from our previous read + if (residual) { + var dat = new Buffer(data.length + residual.length); + residual.copy(dat, 0, 0); + data.copy(dat, residual.length, 0); + residual = null; + } - if(avail == 0) - return new Buffer(0); + // framed transport + while (data.length) { + if (frameLeft === 0) { + // TODO assumes we have all 4 bytes + if (data.length < 4) { + console.log("Expecting > 4 bytes, found only " + data.length); + residual = data; + break; + //throw Error("Expecting > 4 bytes, found only " + data.length); + } + frameLeft = BinaryParser.toInt(data.slice(0,4)); + frame = new Buffer(frameLeft); + framePos = 0; + data = data.slice(4, data.length); + } + + if (data.length >= frameLeft) { + data.copy(frame, framePos, 0, frameLeft); + data = data.slice(frameLeft, data.length); + + frameLeft = 0; + callback(new TFramedTransport(frame)); + } else if (data.length) { + data.copy(frame, framePos, 0, data.length); + frameLeft -= data.length; + framePos += data.length; + data = data.slice(data.length, data.length); + } + } + }; +}; - var give = len +TFramedTransport.prototype = { + commitPosition: function(){}, + rollbackPosition: function(){}, - if(avail < len) { - console.log("asked for: " + len); - throw new Error("asked for too much"); - give = avail - } + // TODO: Implement open/close support + isOpen: function() {return true;}, + open: function() {}, + close: function() {}, - // console.log(this.rpos + "," + give); - var ret = this.recv_buf.slice(this.rpos,this.rpos + give) - this.rpos += give - // console.log(ret); + read: function(len) { // this function will be used for each frames. + var end = this.readPos + len; - //clear buf when complete? - return ret + if (this.inBuf.length < end) { + throw new Error('read(' + len + ') failed - not enough data'); + } -} + var buf = this.inBuf.slice(this.readPos, end); + this.readPos = end; + return buf; + }, -TMemoryBuffer.prototype.readAll = function() { - return this.recv_buf; -} + readAll: function() { + return this.inBuf; + }, -TMemoryBuffer.prototype.write = function(buf) { - // TODO - if (typeof(buf) === "string") { - for (var i = 0; i < buf.length; ++i) { - this.send_buf.push(buf.charCodeAt(i)); + write: function(buf, encoding) { + if (typeof(buf) === "string") { + // Defaulting to ascii encoding here since that's more like the original + // code, but I feel like 'utf8' would be a better choice. + buf = new Buffer(buf, encoding || 'ascii'); } - } else { - for (var i = 0; i < buf.length; ++i) { - this.send_buf.push(buf[i]); + this.outBuffers.push(buf); + this.outCount += buf.length; + }, + + flush: function() { + var out = new Buffer(this.outCount), + pos = 0; + this.outBuffers.forEach(function(buf) { + buf.copy(out, pos, 0); + pos += buf.length; + }); + + if (this.onFlush) { + // TODO: optimize this better, allocate one buffer instead of both: + var msg = new Buffer(out.length + 4); + BinaryParser.fromInt(out.length).copy(msg, 0, 0, 4); + out.copy(msg, 4, 0, out.length); + this.onFlush(msg); } + + this.outBuffers = []; + this.outCount = 0; } -} +}; + +var TBufferedTransport = exports.TBufferedTransport = function(buffer, callback) { + this.defaultReadBufferSize = 1024; + this.writeBufferSize = 512; // Soft Limit + this.inBuf = new Buffer(this.defaultReadBufferSize); + this.readCursor = 0; + this.writeCursor = 0; // for input buffer + this.outBuffers = []; + this.outCount = 0; + this.onFlush = callback; +}; +TBufferedTransport.receiver = function(callback) { + var reader = new TBufferedTransport(); + + return function(data) { + if (reader.writeCursor + data.length > reader.inBuf.length) { + var buf = new Buffer(reader.writeCursor + data.length); + reader.inBuf.copy(buf, 0, 0, reader.writeCursor); + reader.inBuf = buf; + } + data.copy(reader.inBuf, reader.writeCursor, 0); + reader.writeCursor += data.length; + + callback(reader); + }; +}; + +TBufferedTransport.prototype = { + commitPosition: function(){ + var unreadedSize = this.writeCursor - this.readCursor; + var bufSize = (unreadedSize * 2 > this.defaultReadBufferSize) ? unreadedSize * 2 : this.defaultReadBufferSize; + var buf = new Buffer(bufSize); + if (unreadedSize > 0) { + this.inBuf.copy(buf, 0, this.readCursor, unreadedSize); + } + this.readCursor = 0; + this.writeCursor = unreadedSize; + this.inBuf = buf; + }, + rollbackPosition: function(){ + this.readCursor = 0; + }, + + // TODO: Implement open/close support + isOpen: function() {return true;}, + open: function() {}, + close: function() {}, -TMemoryBuffer.prototype.flush = function() { - this.flushCallback(new Buffer(this.send_buf)); - this.send_buf = []; -} + read: function(len) { + if (this.readCursor + len > this.writeCursor) { + throw new InputBufferUnderrunError(); + } + var buf = new Buffer(len); + this.inBuf.copy(buf, 0, this.readCursor, this.readCursor + len); + this.readCursor += len; + return buf; + }, + + readAll: function() { + if (this.readCursor >= this.writeCursor) { + throw new InputBufferUnderrunError(); + } + var buf = new Buffer(this.writeCursor - this.readCursor); + this.inBuf.copy(buf, 0, this.readCursor, this.writeCursor - this.readCursor); + this.readCursor = this.writeCursor; + return buf; + }, + + write: function(buf, encoding) { + if (typeof(buf) === "string") { + // Defaulting to ascii encoding here since that's more like the original + // code, but I feel like 'utf8' would be a better choice. + buf = new Buffer(buf, encoding || 'ascii'); + } + if (this.outCount + buf.length > this.writeBufferSize) { + this.flush(); + } + + this.outBuffers.push(buf); + this.outCount += buf.length; + + if (this.outCount >= this.writeBufferSize) { + this.flush(); + } + }, + + flush: function() { + if (this.outCount < 1) { + return; + } + + var msg = new Buffer(this.outCount), + pos = 0; + this.outBuffers.forEach(function(buf) { + buf.copy(msg, pos, 0); + pos += buf.length; + }); + + if (this.onFlush) { + this.onFlush(msg); + } + + this.outBuffers = []; + this.outCount = 0; + } +};