From 857a8b8bf81d4ead905e02527727a751720ffdbc Mon Sep 17 00:00:00 2001 From: Roger Meier Date: Fri, 14 Oct 2011 06:35:28 +0000 Subject: [PATCH] THRIFT-1261 STDIO support for node-thrift Patch: Jordan Shaw git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1183221 13f79535-47bb-0310-9956-ffa450edef68 --- lib/nodejs/lib/thrift/connection.js | 112 ++++++++++++++++++++++++++++ lib/nodejs/lib/thrift/index.js | 2 + 2 files changed, 114 insertions(+) diff --git a/lib/nodejs/lib/thrift/connection.js b/lib/nodejs/lib/thrift/connection.js index 35302274..b7f9b7a7 100644 --- a/lib/nodejs/lib/thrift/connection.js +++ b/lib/nodejs/lib/thrift/connection.js @@ -128,3 +128,115 @@ exports.createClient = function(cls, connection) { return client; } + +var child_process = require('child_process'); +var StdIOConnection = exports.StdIOConnection = function(command, options) { + var command_parts = command.split(' '); + command = command_parts[0]; + var args = command_parts.splice(1,command_parts.length -1); + var child = this.child = child_process.spawn(command,args); + + var self = this; + EventEmitter.call(this); + + this._debug = options.debug || false; + this.connection = child.stdin; + this.options = options || {}; + this.transport = this.options.transport || ttransport.TFramedTransport; + this.protocol = this.options.protocol || tprotocol.TBinaryProtocol; + this.offline_queue = []; + + if(this._debug === true){ + + this.child.stderr.on('data',function(err){ + console.log(err.toString(),'CHILD ERROR'); + + }); + + this.child.on('exit',function(code,signal){ + console.log(code+':'+signal,'CHILD EXITED'); + + }); + + } + + this.frameLeft = 0; + this.framePos = 0; + this.frame = null; + this.connected = true; + + self.offline_queue.forEach(function(data) { + self.connection.write(data); + }); + + + this.connection.addListener("error", function(err) { + self.emit("error", err); + }); + + // Add a close listener + this.connection.addListener("close", function() { + self.emit("close"); + }); + + child.stdout.addListener("data", self.transport.receiver(function(transport_with_data) { + var message = new self.protocol(transport_with_data); + try { + var header = message.readMessageBegin(); + var dummy_seqid = header.rseqid * -1; + var client = self.client; + client._reqs[dummy_seqid] = function(err, success){ + transport_with_data.commitPosition(); + + var callback = client._reqs[header.rseqid]; + delete client._reqs[header.rseqid]; + if (callback) { + callback(err, success); + } + }; + client['recv_' + header.fname](message, header.mtype, dummy_seqid); + } + catch (e) { + if (e instanceof ttransport.InputBufferUnderrunError) { + transport_with_data.rollbackPosition(); + } + else { + throw e; + } + } + })); + +}; + +sys.inherits(StdIOConnection, EventEmitter); + +StdIOConnection.prototype.end = function() { + this.connection.end(); +} + +StdIOConnection.prototype.write = function(data) { + if (!this.connected) { + this.offline_queue.push(data); + return; + } + this.connection.write(data); +} +exports.createStdIOConnection = function(command,options){ + return new StdIOConnection(command,options); + +}; + +exports.createStdIOClient = function(cls,connection) { + if (cls.Client) { + cls = cls.Client; + } + + var client = new cls(new connection.transport(undefined, function(buf) { + connection.write(buf); + }), connection.protocol); + + // TODO clean this up + connection.client = client; + + return client; +} diff --git a/lib/nodejs/lib/thrift/index.js b/lib/nodejs/lib/thrift/index.js index f4fa3cd1..c7cedc0c 100644 --- a/lib/nodejs/lib/thrift/index.js +++ b/lib/nodejs/lib/thrift/index.js @@ -22,5 +22,7 @@ var connection = require('./connection'); exports.Connection = connection.Connection; exports.createClient = connection.createClient; exports.createConnection = connection.createConnection; +exports.createStdIOClient = connection.createStdIOClient; +exports.createStdIOConnection = connection.createStdIOConnection; exports.createServer = require('./server').createServer; -- 2.17.1