THRIFT-1261 STDIO support for node-thrift
authorRoger Meier <roger@apache.org>
Fri, 14 Oct 2011 06:35:28 +0000 (06:35 +0000)
committerRoger Meier <roger@apache.org>
Fri, 14 Oct 2011 06:35:28 +0000 (06:35 +0000)
Patch: Jordan Shaw

git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1183221 13f79535-47bb-0310-9956-ffa450edef68

lib/nodejs/lib/thrift/connection.js
lib/nodejs/lib/thrift/index.js

index 3530227..b7f9b7a 100644 (file)
@@ -128,3 +128,115 @@ exports.createClient = function(cls, connection) {
 
   return client;
 }
+
+var child_process = require('child_process');
+var StdIOConnection = exports.StdIOConnection = function(command, options) {
+       var command_parts = command.split(' ');
+       command = command_parts[0];
+       var args = command_parts.splice(1,command_parts.length -1);
+       var child = this.child = child_process.spawn(command,args);
+
+  var self = this;
+  EventEmitter.call(this);
+
+       this._debug = options.debug || false;
+  this.connection = child.stdin;
+  this.options = options || {};
+  this.transport = this.options.transport || ttransport.TFramedTransport;
+  this.protocol = this.options.protocol || tprotocol.TBinaryProtocol;
+  this.offline_queue = [];
+
+       if(this._debug === true){
+
+  this.child.stderr.on('data',function(err){
+                       console.log(err.toString(),'CHILD ERROR');
+
+               });
+
+       this.child.on('exit',function(code,signal){
+                       console.log(code+':'+signal,'CHILD EXITED');
+
+               });
+
+       }
+
+       this.frameLeft = 0;
+       this.framePos = 0;
+       this.frame = null;
+       this.connected = true;
+
+       self.offline_queue.forEach(function(data) {
+                       self.connection.write(data);
+       });
+
+
+  this.connection.addListener("error", function(err) {
+    self.emit("error", err);
+  });
+
+  // Add a close listener
+  this.connection.addListener("close", function() {
+    self.emit("close");
+  });
+
+  child.stdout.addListener("data", self.transport.receiver(function(transport_with_data) {
+    var message = new self.protocol(transport_with_data);
+    try {
+      var header = message.readMessageBegin();
+      var dummy_seqid = header.rseqid * -1;
+      var client = self.client;
+      client._reqs[dummy_seqid] = function(err, success){
+        transport_with_data.commitPosition();
+
+        var callback = client._reqs[header.rseqid];
+        delete client._reqs[header.rseqid];
+        if (callback) {
+          callback(err, success);
+        }
+      };
+      client['recv_' + header.fname](message, header.mtype, dummy_seqid);
+    }
+    catch (e) {
+      if (e instanceof ttransport.InputBufferUnderrunError) {
+        transport_with_data.rollbackPosition();
+      }
+      else {
+        throw e;
+      }
+    }
+  }));
+
+};
+
+sys.inherits(StdIOConnection, EventEmitter);     
+
+StdIOConnection.prototype.end = function() {
+  this.connection.end();
+}
+
+StdIOConnection.prototype.write = function(data) {
+  if (!this.connected) {
+    this.offline_queue.push(data);
+    return;
+  }
+  this.connection.write(data);
+}
+exports.createStdIOConnection = function(command,options){
+       return new StdIOConnection(command,options);
+
+};
+
+exports.createStdIOClient = function(cls,connection) {
+  if (cls.Client) {
+    cls = cls.Client;
+  }
+
+  var client = new cls(new connection.transport(undefined, function(buf) {
+               connection.write(buf);
+       }), connection.protocol);
+
+  // TODO clean this up
+  connection.client = client;
+
+  return client;
+}
index f4fa3cd..c7cedc0 100644 (file)
@@ -22,5 +22,7 @@ var connection = require('./connection');
 exports.Connection = connection.Connection;
 exports.createClient = connection.createClient;
 exports.createConnection = connection.createConnection;
+exports.createStdIOClient = connection.createStdIOClient;
+exports.createStdIOConnection = connection.createStdIOConnection;
 
 exports.createServer = require('./server').createServer;