From: ra Date: Tue, 22 Apr 2014 13:37:37 +0000 (-0700) Subject: THRIFT-2405:Node.js Multiplexer tests fail (silently) X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=7201c0d38ffb1505fdddcc9b65b16621f7e493c3;p=common%2Fthrift.git THRIFT-2405:Node.js Multiplexer tests fail (silently) Client: node Patch: Randy Abernethy Repairs client side multiplex protocol. --- diff --git a/lib/nodejs/lib/thrift/connection.js b/lib/nodejs/lib/thrift/connection.js index 36451d55..a3c2d794 100644 --- a/lib/nodejs/lib/thrift/connection.js +++ b/lib/nodejs/lib/thrift/connection.js @@ -21,14 +21,16 @@ var util = require('util'), net = require('net'), tls = require('tls'), ttransport = require('./transport'), - tprotocol = require('./protocol'); + tprotocol = require('./protocol'), + thrift = require('./thrift'); var binary = require('./binary'); var Connection = exports.Connection = function(stream, options) { var self = this; EventEmitter.call(this); - + + this.seqId2Service = {}; this.connection = stream; this.options = options || {}; this.transport = this.options.transport || ttransport.TBufferedTransport; @@ -37,18 +39,21 @@ var Connection = exports.Connection = function(stream, options) { this.connected = false; this._debug = this.options.debug || false; - if (this.options.max_attempts - && !isNaN(this.options.max_attempts) && this.options.max_attempts > 0) { + if (this.options.max_attempts && + !isNaN(this.options.max_attempts) && + this.options.max_attempts > 0) { this.max_attempts = +this.options.max_attempts; } this.retry_max_delay = null; - if (this.options.retry_max_delay !== undefined - && !isNaN(this.options.retry_max_delay) && this.options.retry_max_delay > 0) { + if (this.options.retry_max_delay !== undefined && + !isNaN(this.options.retry_max_delay) && + this.options.retry_max_delay > 0) { this.retry_max_delay = this.options.retry_max_delay; } this.connect_timeout = false; - if (this.options.connect_timeout - && !isNaN(this.options.connect_timeout) && this.options.connect_timeout > 0) { + if (this.options.connect_timeout && + !isNaN(this.options.connect_timeout) && + this.options.connect_timeout > 0) { this.connect_timeout = +this.options.connect_timeout; } this.connection.addListener("connect", function() { @@ -89,9 +94,9 @@ var Connection = exports.Connection = function(stream, options) { this.connection.addListener("error", function(err) { // Only emit the error if no-one else is listening on the connection // or if someone is listening on us - if (self.connection.listeners('error').length === 1 - || self.listeners('error').length > 0) { - self.emit("error", err) + if (self.connection.listeners('error').length === 1 || + self.listeners('error').length > 0) { + self.emit("error", err); } // "error" events get turned into exceptions if they aren't listened for. If the user handled this error // then we should try to reconnect. @@ -114,7 +119,25 @@ var Connection = exports.Connection = function(stream, options) { var header = message.readMessageBegin(); var dummy_seqid = header.rseqid * -1; var client = self.client; - client._reqs[dummy_seqid] = function(err, success){ + //The Multiplexed Protocol stores a hash of seqid to service names + // in seqId2Service. If the SeqId is found in the hash we need to + // lookup the appropriate client for this call. + // The connection.client object is a single client object when not + // multiplexing, when using multiplexing it is a service name keyed + // hash of client objects. + //NOTE: The 2 way interdependencies between protocols, transports, + // connections and clients in the Node.js implementation are irregular + // and make the implementation difficult to extend and maintain. We + // should bring this stuff inline with typical thrift I/O stack + // operation soon. + // --ra + var service_name = self.seqId2Service[header.rseqid]; + if (service_name) { + client = self.client[service_name]; + delete self.seqId2Service[header.rseqid]; + } + /*jshint -W083 */ + client._reqs[dummy_seqid] = function(err, success){ transport_with_data.commitPosition(); var callback = client._reqs[header.rseqid]; @@ -123,13 +146,15 @@ var Connection = exports.Connection = function(stream, options) { callback(err, success); } }; - - if(!client['recv_' + header.fname]) { - // msg was for another serivce, just drop it - delete client._reqs[dummy_seqid] - return + /*jshint +W083 */ + + if(client['recv_' + header.fname]) { + client['recv_' + header.fname](message, header.mtype, dummy_seqid); + } else { + delete client._reqs[dummy_seqid]; + throw new thrift.TApplicationException(thrift.TApplicationExceptionType.WRONG_METHOD_NAME, + "Received a response to an unknown RPC function"); } - client['recv_' + header.fname](message, header.mtype, dummy_seqid); } } catch (e) { @@ -146,7 +171,7 @@ util.inherits(Connection, EventEmitter); Connection.prototype.end = function() { this.connection.end(); -} +}; Connection.prototype.initialize_retry_vars = function () { this.retry_timer = null; @@ -162,14 +187,14 @@ Connection.prototype.write = function(data) { return; } this.connection.write(data); -} +}; Connection.prototype.connection_gone = function () { var self = this; // If a retry is already in progress, just let that happen if (this.retry_timer) { - return; + return; } if (!this.max_attempts) { self.emit("close"); @@ -228,7 +253,7 @@ exports.createConnection = function(host, port, options) { connection.port = port; return connection; -} +}; exports.createSSLConnection = function(host, port, options) { var stream = tls.connect(port, host, options); @@ -237,7 +262,7 @@ exports.createSSLConnection = function(host, port, options) { connection.port = port; return connection; -} +}; exports.createClient = function(cls, connection) { @@ -252,7 +277,7 @@ exports.createClient = function(cls, connection) { connection.client = client; return client; -} +}; var child_process = require('child_process'); var StdIOConnection = exports.StdIOConnection = function(command, options) { @@ -326,14 +351,13 @@ var StdIOConnection = exports.StdIOConnection = function(command, options) { } } })); - }; util.inherits(StdIOConnection, EventEmitter); StdIOConnection.prototype.end = function() { this.connection.end(); -} +}; StdIOConnection.prototype.write = function(data) { if (!this.connected) { @@ -341,10 +365,10 @@ StdIOConnection.prototype.write = function(data) { return; } this.connection.write(data); -} +}; + exports.createStdIOConnection = function(command,options){ return new StdIOConnection(command,options); - }; exports.createStdIOClient = function(cls,connection) { @@ -360,4 +384,4 @@ exports.createStdIOClient = function(cls,connection) { connection.client = client; return client; -} +}; diff --git a/lib/nodejs/lib/thrift/multiplexed_processor.js b/lib/nodejs/lib/thrift/multiplexed_processor.js index 2931c4f3..1aef4c35 100644 --- a/lib/nodejs/lib/thrift/multiplexed_processor.js +++ b/lib/nodejs/lib/thrift/multiplexed_processor.js @@ -38,7 +38,7 @@ MultiplexedProcessor.prototype.process = function(inp, out) { var sname = p[0]; var fname = p[1]; - if (! sname in this.services) { + if (! (sname in this.services)) { throw new Thrift.TException("TMultiplexedProcessor: Unknown service: " + sname); } diff --git a/lib/nodejs/lib/thrift/multiplexed_protocol.js b/lib/nodejs/lib/thrift/multiplexed_protocol.js index 9a955abd..68440af3 100644 --- a/lib/nodejs/lib/thrift/multiplexed_protocol.js +++ b/lib/nodejs/lib/thrift/multiplexed_protocol.js @@ -19,27 +19,31 @@ var util = require('util'); var Thrift = require('./thrift'); -var Wrapper = exports.Wrapper = function(service_name, protocol) { +var Wrapper = exports.Wrapper = function(service_name, protocol, connection) { var MultiplexProtocol = function(trans, strictRead, strictWrite) { protocol.call(this, trans, strictRead, strictWrite); - } + }; util.inherits(MultiplexProtocol, protocol); MultiplexProtocol.prototype.writeMessageBegin = function(name, type, seqid) { - - if (type == Thrift.MessageType.CALL || type == Thrift.MessageType.ONEWAY) - MultiplexProtocol.super_.prototype.writeMessageBegin.call(this, service_name + ":" + name, type, seqid); - else + if (type == Thrift.MessageType.CALL || type == Thrift.MessageType.ONEWAY) { + connection.seqId2Service[seqid] = service_name; + MultiplexProtocol.super_.prototype.writeMessageBegin.call(this, + service_name + ":" + name, + type, + seqid); + } else { MultiplexProtocol.super_.prototype.writeMessageBegin.call(this, name, type, seqid); - } + } + }; return MultiplexProtocol; -} +}; var Multiplexer = exports.Multiplexer = function() { this.seqid = 0; -} +}; Multiplexer.prototype.createClient = function(service_name, cls, connection) { if (cls.Client) { @@ -49,15 +53,15 @@ Multiplexer.prototype.createClient = function(service_name, cls, connection) { cls.prototype.new_seqid = function() { self.seqid += 1; return self.seqid; - } - + }; var client = new cls(new connection.transport(undefined, function(buf) { connection.write(buf); - }), new Wrapper(service_name, connection.protocol)); - - - // TODO clean this up - connection.client = client; + }), new Wrapper(service_name, connection.protocol, connection)); + + if (typeof connection.client !== 'object') { + connection.client = {}; + } + connection.client[service_name] = client; return client; -} +};