From 5b8c9a9d4caaca36c6d79f2754fd3be254b2803d Mon Sep 17 00:00:00 2001 From: henrique Date: Tue, 27 Aug 2013 15:06:42 +0200 Subject: [PATCH] THRIFT-2058:Add reconnect support to node.js library Client: nodejs Patch: Hamed Madani --- lib/nodejs/lib/thrift/connection.js | 82 ++++++++++++++++++++++++++++- 1 file changed, 81 insertions(+), 1 deletion(-) diff --git a/lib/nodejs/lib/thrift/connection.js b/lib/nodejs/lib/thrift/connection.js index a49d427d..904d7bd8 100644 --- a/lib/nodejs/lib/thrift/connection.js +++ b/lib/nodejs/lib/thrift/connection.js @@ -35,6 +35,21 @@ var Connection = exports.Connection = function(stream, options) { this.offline_queue = []; this.connected = false; + this._debug = options.debug || false; + if (options.max_attempts + && !isNaN(options.max_attempts) && options.max_attempts > 0) { + this.max_attempts = +options.max_attempts; + } + this.retry_max_delay = null; + if (options.retry_max_delay !== undefined + && !isNaN(options.retry_max_delay) && options.retry_max_delay > 0) { + this.retry_max_delay = options.retry_max_delay; + } + this.connect_timeout = false; + if (options.connect_timeout + && !isNaN(options.connect_timeout) && options.connect_timeout > 0) { + this.connect_timeout = +options.connect_timeout; + } this.connection.addListener("connect", function() { self.connected = true; @@ -43,6 +58,7 @@ var Connection = exports.Connection = function(stream, options) { this.frameLeft = 0; this.framePos = 0; this.frame = null; + self.initialize_retry_vars(); self.offline_queue.forEach(function(data) { self.connection.write(data); @@ -58,11 +74,14 @@ var Connection = exports.Connection = function(stream, options) { || self.listeners('error').length > 0) { self.emit("error", err) } + // "error" events get turned into exceptions if they aren't listened for. If the user handled this error + // then we should try to reconnect. + self.connection_gone(); }); // Add a close listener this.connection.addListener("close", function() { - self.emit("close"); + self.connection_gone(); // handle close event. try to reconnect }); this.connection.addListener("timeout", function() { @@ -102,6 +121,14 @@ Connection.prototype.end = function() { this.connection.end(); } +Connection.prototype.initialize_retry_vars = function () { + this.retry_timer = null; + this.retry_totaltime = 0; + this.retry_delay = 150; + this.retry_backoff = 1.7; + this.attempts = 0; +}; + Connection.prototype.write = function(data) { if (!this.connected) { this.offline_queue.push(data); @@ -110,6 +137,59 @@ Connection.prototype.write = function(data) { this.connection.write(data); } +Connection.prototype.connection_gone = function () { + var self = this; + + // If a retry is already in progress, just let that happen + if (this.retry_timer || !this.max_attempts) { + return; + } + + this.connected = false; + this.ready = false; + + if (this.retry_max_delay !== null && this.retry_delay > this.retry_max_delay) { + this.retry_delay = this.retry_max_delay; + } else { + this.retry_delay = Math.floor(this.retry_delay * this.retry_backoff); + } + + if (self._debug) { + console.log("Retry connection in " + this.retry_delay + " ms"); + } + + if (this.max_attempts && this.attempts >= this.max_attempts) { + this.retry_timer = null; + console.error("thrift: Couldn't get thrift connection after " + this.max_attempts + " attempts."); + self.emit("close"); + return; + } + + this.attempts += 1; + this.emit("reconnecting", { + delay: self.retry_delay, + attempt: self.attempts + }); + + this.retry_timer = setTimeout(function () { + if (self._debug) { + console.log("Retrying connection..."); + } + + self.retry_totaltime += self.retry_delay; + + if (self.connect_timeout && self.retry_totaltime >= self.connect_timeout) { + self.retry_timer = null; + console.error("thrift: Couldn't get thrift connection after " + self.retry_totaltime + "ms."); + self.emit("close"); + return; + } + + self.connection.connect(self.port, self.host); + self.retry_timer = null; + }, this.retry_delay); +}; + exports.createConnection = function(host, port, options) { var stream = net.createConnection(port, host); var connection = new Connection(stream, options); -- 2.17.1