connection.end();
});
+<a name="int64"></a>
+## Int64
+
+Since JavaScript represents all numbers as doubles, int64 values cannot be accurately represented naturally. To solve this, int64 values in responses will be wrapped with Thirft.Int64 objects. The Int64 implementation used is [broofa/node-int64](https://github.com/broofa/node-int64).
+
+## Libraries using node-thrift
+
+* [yukim/node_cassandra](https://github.com/yukim/node_cassandra)
+
## Custom client and server example
An example based on the one shown on the Thrift front page is included in the examples/ folder.
ttransport = require('./transport'),
tprotocol = require('./protocol');
-var BinaryParser = require('./binary_parser').BinaryParser;
-BinaryParser.bigEndian = true;
+var binary = require('./binary');
var Connection = exports.Connection = function(stream, options) {
var self = this;
});
this.connection.addListener("error", function(err) {
- self.emit("error", err);
+ // Only emit the error if no-one else is listening on the connection
+ // or if someone is listening on us
+ if (self.connection.listeners('error').length === 1
+ || self.listeners('error').length > 0) {
+ self.emit("error", err)
+ }
});
// Add a close listener
exports.createServer = require('./server').createServer;
+exports.Int64 = require('node-int64')
+
/*
* Export transport and protocol so they can be used outside of a
* cassandra/server context
Thrift = require('./thrift'),
Type = Thrift.Type;
-var BinaryParser = require('./binary_parser').BinaryParser;
-BinaryParser.bigEndian = true;
+var binary = require('./binary'),
+ Int64 = require('node-int64');
var UNKNOWN = 0,
INVALID_DATA = 1,
}
TBinaryProtocol.prototype.writeByte = function(byte) {
- this.trans.write(BinaryParser.fromByte(byte));
+ this.trans.write(new Buffer([byte]));
}
TBinaryProtocol.prototype.writeI16 = function(i16) {
- this.trans.write(BinaryParser.fromShort(i16));
+ this.trans.write(binary.writeI16(new Buffer(2), i16));
}
TBinaryProtocol.prototype.writeI32 = function(i32) {
- this.trans.write(BinaryParser.fromInt(i32));
+ this.trans.write(binary.writeI32(new Buffer(4), i32));
}
TBinaryProtocol.prototype.writeI64 = function(i64) {
- this.trans.write(BinaryParser.fromLong(i64));
+ if (i64.buffer) {
+ this.trans.write(i64.buffer);
+ } else {
+ this.trans.write(new Int64(i64).buffer)
+ }
}
TBinaryProtocol.prototype.writeDouble = function(dub) {
- this.trans.write(BinaryParser.fromDouble(dub));
+ this.trans.write(binary.writeDouble(new Buffer(8), dub));
+}
+
+TBinaryProtocol.prototype.writeString = function(arg) {
+ if (typeof(arg) === 'string') {
+ this.writeI32(Buffer.byteLength(arg, 'utf8'))
+ this.trans.write(arg, 'utf8');
+ } else if (arg instanceof Buffer) {
+ this.writeI32(arg.length)
+ this.trans.write(arg);
+ } else {
+ throw new Error('writeString called without a string/Buffer argument: ' + arg)
+ }
}
-TBinaryProtocol.prototype.writeString = function(str) {
- this.writeI32(str.length);
- this.trans.write(str);
+TBinaryProtocol.prototype.writeBinary = function(arg) {
+ if (typeof(arg) === 'string') {
+ this.writeI32(Buffer.byteLength(arg, 'utf8'))
+ this.trans.write(arg, 'utf8');
+ } else if (arg instanceof Buffer) {
+ this.writeI32(arg.length)
+ this.trans.write(arg);
+ } else {
+ throw new Error('writeBinary called without a string/Buffer argument: ' + arg)
+ }
}
TBinaryProtocol.prototype.readMessageBegin = function() {
}
TBinaryProtocol.prototype.readByte = function() {
- var buff = this.trans.read(1);
- return BinaryParser.toByte(buff);
+ return this.trans.readByte();
}
TBinaryProtocol.prototype.readI16 = function() {
- var buff = this.trans.read(2);
- return BinaryParser.toShort(buff);
+ return this.trans.readI16();
}
TBinaryProtocol.prototype.readI32 = function() {
- var buff = this.trans.read(4);
- // console.log("read buf: ");
- // console.log(buff);
- // console.log("result: " + BinaryParser.toInt(buff));
- return BinaryParser.toInt(buff);
+ return this.trans.readI32();
}
TBinaryProtocol.prototype.readI64 = function() {
var buff = this.trans.read(8);
- return BinaryParser.toLong(buff);
+ return new Int64(buff);
}
TBinaryProtocol.prototype.readDouble = function() {
- var buff = this.trans.read(8);
- return BinaryParser.toFloat(buff);
+ return this.trans.readDouble();
}
TBinaryProtocol.prototype.readBinary = function() {
}
TBinaryProtocol.prototype.readString = function() {
- var r = this.readBinary().toString('binary');
- // console.log("readString: " + r);
- return r;
+ var len = this.readI32();
+ return this.trans.readString(len);
}
TBinaryProtocol.prototype.getTransport = function() {
*/
var net = require('net');
-var ttransport = require('./transport');
-var BinaryParser = require('./binary_parser').BinaryParser,
- TBinaryProtocol = require('./protocol').TBinaryProtocol;
+var ttransport = require('./transport')
+ , TBinaryProtocol = require('./protocol').TBinaryProtocol;
exports.createServer = function(cls, handler, options) {
if (cls.Processor) {
var protocol = (options && options.protocol) ? options.protocol : TBinaryProtocol;
return net.createServer(function(stream) {
- stream.on('data', transport.receiver(function(transport_with_data) {
- var input = new protocol(transport_with_data);
+ var self = this;
+ stream.on('data', transport.receiver(function(transportWithData) {
+ var input = new protocol(transportWithData);
var output = new protocol(new transport(undefined, function(buf) {
- stream.write(buf);
+ try {
+ stream.write(buf);
+ } catch (err) {
+ self.emit('error', err);
+ stream.end();
+ }
}));
try {
processor.process(input, output);
- transport_with_data.commitPosition();
+ transportWithData.commitPosition();
}
- catch (e) {
- if (e instanceof ttransport.InputBufferUnderrunError) {
- transport_with_data.rollbackPosition();
+ catch (err) {
+ if (err instanceof ttransport.InputBufferUnderrunError) {
+ transportWithData.rollbackPosition();
}
else {
- throw e;
+ self.emit('error', err);
+ stream.end();
}
}
}));
* specific language governing permissions and limitations
* under the License.
*/
-var BinaryParser = require('./binary_parser').BinaryParser;
-
var emptyBuf = new Buffer(0);
+var binary = require('./binary');
+
var InputBufferUnderrunError = exports.InputBufferUnderrunError = function() {
};
break;
//throw Error("Expecting > 4 bytes, found only " + data.length);
}
- frameLeft = BinaryParser.toInt(data.slice(0,4));
+ frameLeft = binary.readI32(data, 0);
frame = new Buffer(frameLeft);
framePos = 0;
data = data.slice(4, data.length);
return buf;
},
+ readByte: function() {
+ return this.inBuf[this.readPos++];
+ },
+
+ readI16: function() {
+ var i16 = binary.readI16(this.inBuf, this.readPos);
+ this.readPos += 2;
+ return i16;
+ },
+
+ readI32: function() {
+ var i32 = binary.readI32(this.inBuf, this.readPos);
+ this.readPos += 4;
+ return i32;
+ },
+
+ readDouble: function() {
+ var d = binary.readDouble(this.inBuf, this.readPos);
+ this.readPos += 8;
+ return d;
+ },
+
+ readString: function(len) {
+ var str = this.inBuf.toString('utf8', this.readPos, this.readPos + len);
+ this.readPos += len;
+ return str;
+ },
+
readAll: function() {
return this.inBuf;
},
write: function(buf, encoding) {
if (typeof(buf) === "string") {
- // Defaulting to ascii encoding here since that's more like the original
- // code, but I feel like 'utf8' would be a better choice.
- buf = new Buffer(buf, encoding || 'ascii');
+ buf = new Buffer(buf, encoding || 'utf8');
}
this.outBuffers.push(buf);
this.outCount += buf.length;
if (this.onFlush) {
// TODO: optimize this better, allocate one buffer instead of both:
var msg = new Buffer(out.length + 4);
- BinaryParser.fromInt(out.length).copy(msg, 0, 0, 4);
+ binary.writeI32(msg, out.length)
+ frameLeft = binary.readI32(this.inBuf, 0);
out.copy(msg, 4, 0, out.length);
this.onFlush(msg);
}
open: function() {},
close: function() {},
- read: function(len) {
+ ensureAvailable: function(len) {
if (this.readCursor + len > this.writeCursor) {
throw new InputBufferUnderrunError();
}
+ },
+
+ read: function(len) {
+ this.ensureAvailable(len)
var buf = new Buffer(len);
this.inBuf.copy(buf, 0, this.readCursor, this.readCursor + len);
this.readCursor += len;
return buf;
},
+ readByte: function() {
+ this.ensureAvailable(1)
+ return this.inBuf[this.readCursor++];
+ },
+
+ readI16: function() {
+ this.ensureAvailable(2)
+ var i16 = binary.readI16(this.inBuf, this.readCursor);
+ this.readCursor += 2;
+ return i16;
+ },
+
+ readI32: function() {
+ this.ensureAvailable(4)
+ var i32 = binary.readI32(this.inBuf, this.readCursor);
+ this.readCursor += 4;
+ return i32;
+ },
+
+ readDouble: function() {
+ this.ensureAvailable(8)
+ var d = binary.readDouble(this.inBuf, this.readCursor);
+ this.readCursor += 8;
+ return d;
+ },
+
+ readString: function(len) {
+ this.ensureAvailable(len)
+ var str = this.inBuf.toString('utf8', this.readCursor, this.readCursor + len);
+ this.readCursor += len;
+ return str;
+ },
+
+
readAll: function() {
if (this.readCursor >= this.writeCursor) {
throw new InputBufferUnderrunError();
},
"directories" : { "lib" : "./lib/thrift" },
"main": "./lib/thrift",
- "engines": { "node": ">= 0.2.4" }
+ "engines": { "node": ">= 0.2.4" },
+ "dependencies": {
+ "node-int64": "0.3.x"
+ }
}
passed();
});
-/*
- * FATAL ERROR: CALL_AND_RETRY_2 Allocation failed - process out of memory
client.testI64(-5, function(err, response) {
if (err) { return failed(err); }
console.log("testI64(-5) = ", response);
passed();
});
- */
client.testI64(-34359738368, function(err, response) {
if (err) { return failed(err); }
"make -C nodejs/ client" \
"make -C nodejs/ server" \
"1"
+do_test "nodejs-cpp" "binary" "framed-ip" \
+ "make -C nodejs/ client" \
+ "cpp/TestServer --transport=framed" \
+ "1"
do_test "cpp-nodejs" "binary" "framed-ip" \
"cpp/TestClient --transport=framed" \
"make -C nodejs/ server" \