// Initialize libevent core
registerEvents(static_cast<event_base*>(event_init()));
- // Run pre-serve callback function if we have one
- if (preServeCallback_) {
- preServeCallback_(preServeCallbackArg_);
+ // Run the preServe event
+ if (eventHandler_ != NULL) {
+ eventHandler_->preServe();
}
// Run libevent engine, never returns, invokes calls to eventHandler
*/
std::stack<TConnection*> connectionStack_;
- // Pointer to optional function called after opening the listen socket and
- // before running the event loop, along with its argument data.
- void (*preServeCallback_)(void*);
- void* preServeCallbackArg_;
-
void handleEvent(int fd, short which);
public:
port_(port),
frameResponses_(true),
threadPoolProcessing_(false),
- eventBase_(NULL),
- preServeCallback_(NULL),
- preServeCallbackArg_(NULL) {}
+ eventBase_(NULL) {}
TNonblockingServer(boost::shared_ptr<TProcessor> processor,
boost::shared_ptr<TProtocolFactory> protocolFactory,
port_(port),
frameResponses_(true),
threadManager_(threadManager),
- eventBase_(NULL),
- preServeCallback_(NULL),
- preServeCallbackArg_(NULL) {
+ eventBase_(NULL) {
setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
setInputProtocolFactory(protocolFactory);
serverSocket_(0),
port_(port),
frameResponses_(true),
- threadManager_(threadManager),
- preServeCallback_(NULL),
- preServeCallbackArg_(NULL) {
+ threadManager_(threadManager) {
setInputTransportFactory(inputTransportFactory);
setOutputTransportFactory(outputTransportFactory);
setInputProtocolFactory(inputProtocolFactory);
void serve();
- void setPreServeCallback(void(*fn_ptr)(void*), void* arg = NULL) {
- preServeCallback_ = fn_ptr;
- preServeCallbackArg_ = arg;
- }
-
};
/**
#include <boost/shared_ptr.hpp>
-namespace facebook { namespace thrift { namespace server {
+namespace facebook { namespace thrift { namespace server {
using facebook::thrift::TProcessor;
using facebook::thrift::protocol::TBinaryProtocolFactory;
+using facebook::thrift::protocol::TProtocol;
using facebook::thrift::protocol::TProtocolFactory;
using facebook::thrift::transport::TServerTransport;
using facebook::thrift::transport::TTransport;
using facebook::thrift::transport::TTransportFactory;
+/**
+ * Virtual interface class that can handle events from the server core. To
+ * use this you should subclass it and implement the methods that you care
+ * about. Your subclass can also store local data that you may care about,
+ * such as additional "arguments" to these methods (stored in the object
+ * instance's state).
+ */
+class TServerEventHandler {
+ public:
+
+ virtual ~TServerEventHandler() {}
+
+ /**
+ * Called before the server begins.
+ */
+ virtual void preServe() {}
+
+ /**
+ * Called when a new client has connected and is about to being processing.
+ */
+ virtual void clientBegin(boost::shared_ptr<TProtocol> input,
+ boost::shared_ptr<TProtocol> output) {}
+
+ /**
+ * Called when a client has finished making requests.
+ */
+ virtual void clientEnd(boost::shared_ptr<TProtocol> input,
+ boost::shared_ptr<TProtocol> output) {}
+
+ protected:
+
+ /**
+ * Prevent direct instantiation.
+ */
+ TServerEventHandler() {}
+
+};
+
/**
* Thrift server.
*
* @author Mark Slee <mcslee@facebook.com>
*/
class TServer : public concurrency::Runnable {
-public:
+ public:
+
virtual ~TServer() {}
virtual void serve() = 0;
virtual void run() {
serve();
}
-
+
boost::shared_ptr<TProcessor> getProcessor() {
return processor_;
}
boost::shared_ptr<TTransportFactory> getOutputTransportFactory() {
return outputTransportFactory_;
}
-
+
boost::shared_ptr<TProtocolFactory> getInputProtocolFactory() {
return inputProtocolFactory_;
}
return outputProtocolFactory_;
}
+ boost::shared_ptr<TServerEventHandler> getEventHandler() {
+ return eventHandler_;
+ }
+
protected:
TServer(boost::shared_ptr<TProcessor> processor):
processor_(processor) {
inputProtocolFactory_(inputProtocolFactory),
outputProtocolFactory_(outputProtocolFactory) {}
-
+
// Class variables
boost::shared_ptr<TProcessor> processor_;
boost::shared_ptr<TServerTransport> serverTransport_;
boost::shared_ptr<TProtocolFactory> inputProtocolFactory_;
boost::shared_ptr<TProtocolFactory> outputProtocolFactory_;
+ boost::shared_ptr<TServerEventHandler> eventHandler_;
+
void setInputTransportFactory(boost::shared_ptr<TTransportFactory> inputTransportFactory) {
inputTransportFactory_ = inputTransportFactory;
}
outputProtocolFactory_ = outputProtocolFactory;
}
+ void setServerEventHandler(boost::shared_ptr<TServerEventHandler> eventHandler) {
+ eventHandler_ = eventHandler;
+ }
+
};
-
+
}}} // facebook::thrift::server
#endif // #ifndef _THRIFT_SERVER_TSERVER_H_
#include <string>
#include <iostream>
-namespace facebook { namespace thrift { namespace server {
+namespace facebook { namespace thrift { namespace server {
using namespace std;
using namespace facebook::thrift;
return;
}
+ // Run the preServe event
+ if (eventHandler_ != NULL) {
+ eventHandler_->preServe();
+ }
+
// Fetch client from server
while (!stop_) {
try {
outputTransport = outputTransportFactory_->getTransport(client);
inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
+ if (eventHandler_ != NULL) {
+ eventHandler_->clientBegin(inputProtocol, outputProtocol);
+ }
try {
while (processor_->process(inputProtocol, outputProtocol)) {
// Peek ahead, is the remote side closed?
} catch (TException& tx) {
cerr << "TSimpleServer exception: " << tx.what() << endl;
}
+ if (eventHandler_ != NULL) {
+ eventHandler_->clientEnd(inputProtocol, outputProtocol);
+ }
inputTransport->close();
outputTransport->close();
- client->close();
+ client->close();
} catch (TTransportException& ttx) {
if (inputTransport != NULL) { inputTransport->close(); }
if (outputTransport != NULL) { outputTransport->close(); }
#include "server/TServer.h"
#include "transport/TServerTransport.h"
-namespace facebook { namespace thrift { namespace server {
+namespace facebook { namespace thrift { namespace server {
/**
* This is the most basic simple server. It is single-threaded and runs a
boost::shared_ptr<TTransportFactory> outputTransportFactory,
boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
boost::shared_ptr<TProtocolFactory> outputProtocolFactory):
- TServer(processor, serverTransport,
+ TServer(processor, serverTransport,
inputTransportFactory, outputTransportFactory,
inputProtocolFactory, outputProtocolFactory),
stop_(false) {}
-
+
~TSimpleServer() {}
void serve();
#include <string>
#include <iostream>
-namespace facebook { namespace thrift { namespace server {
+namespace facebook { namespace thrift { namespace server {
using boost::shared_ptr;
using namespace std;
using namespace facebook::thrift::protocol;;
using namespace facebook::thrift::transport;
-class TThreadPoolServer::Task: public Runnable {
-
+class TThreadPoolServer::Task : public Runnable {
+
public:
-
- Task(shared_ptr<TProcessor> processor,
+
+ Task(TThreadPoolServer &server,
+ shared_ptr<TProcessor> processor,
shared_ptr<TProtocol> input,
shared_ptr<TProtocol> output) :
+ server_(server),
processor_(processor),
input_(input),
output_(output) {
}
~Task() {}
-
+
void run() {
+ boost::shared_ptr<TServerEventHandler> eventHandler =
+ server_.getEventHandler();
+ if (eventHandler != NULL) {
+ eventHandler->clientBegin(input_, output_);
+ }
try {
while (processor_->process(input_, output_)) {
if (!input_->getTransport()->peek()) {
} catch (...) {
cerr << "TThreadPoolServer uncaught exception." << endl;
}
+ if (eventHandler != NULL) {
+ eventHandler->clientEnd(input_, output_);
+ }
input_->getTransport()->close();
output_->getTransport()->close();
}
private:
+ TServer& server_;
shared_ptr<TProcessor> processor_;
shared_ptr<TProtocol> input_;
shared_ptr<TProtocol> output_;
};
-
+
TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
shared_ptr<TServerTransport> serverTransport,
shared_ptr<TTransportFactory> transportFactory,
shared_ptr<TProtocolFactory> protocolFactory,
shared_ptr<ThreadManager> threadManager) :
- TServer(processor, serverTransport, transportFactory, protocolFactory),
+ TServer(processor, serverTransport, transportFactory, protocolFactory),
threadManager_(threadManager),
stop_(false), timeout_(0) {}
shared_ptr<TTransportFactory> inputTransportFactory,
shared_ptr<TTransportFactory> outputTransportFactory,
shared_ptr<TProtocolFactory> inputProtocolFactory,
- shared_ptr<TProtocolFactory> outputProtocolFactory,
+ shared_ptr<TProtocolFactory> outputProtocolFactory,
shared_ptr<ThreadManager> threadManager) :
TServer(processor, serverTransport, inputTransportFactory, outputTransportFactory,
inputProtocolFactory, outputProtocolFactory),
cerr << "TThreadPoolServer::run() listen(): " << ttx.what() << endl;
return;
}
-
+
+ // Run the preServe event
+ if (eventHandler_ != NULL) {
+ eventHandler_->preServe();
+ }
+
while (!stop_) {
try {
client.reset();
outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
// Add to threadmanager pool
- threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, inputProtocol, outputProtocol)), timeout_);
+ threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(*this, processor_, inputProtocol, outputProtocol)), timeout_);
} catch (TTransportException& ttx) {
if (inputTransport != NULL) { inputTransport->close(); }
#include <boost/shared_ptr.hpp>
-namespace facebook { namespace thrift { namespace server {
+namespace facebook { namespace thrift { namespace server {
using facebook::thrift::concurrency::ThreadManager;
using facebook::thrift::protocol::TProtocolFactory;
class TThreadPoolServer : public TServer {
public:
class Task;
-
+
TThreadPoolServer(boost::shared_ptr<TProcessor> processor,
boost::shared_ptr<TServerTransport> serverTransport,
boost::shared_ptr<TTransportFactory> transportFactory,
boost::shared_ptr<TTransportFactory> inputTransportFactory,
boost::shared_ptr<TTransportFactory> outputTransportFactory,
boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
- boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
+ boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
boost::shared_ptr<ThreadManager> threadManager);
virtual ~TThreadPoolServer();
virtual int64_t getTimeout() const;
virtual void setTimeout(int64_t value);
-
+
virtual void stop() {
stop_ = true;
serverTransport_->interrupt();
volatile bool stop_;
volatile int64_t timeout_;
-
+
};
}}} // facebook::thrift::server
#include <pthread.h>
#include <unistd.h>
-namespace facebook { namespace thrift { namespace server {
+namespace facebook { namespace thrift { namespace server {
using boost::shared_ptr;
using namespace std;
using namespace facebook::thrift::concurrency;
class TThreadedServer::Task: public Runnable {
-
+
public:
-
- Task(TThreadedServer* server,
+
+ Task(TThreadedServer& server,
shared_ptr<TProcessor> processor,
shared_ptr<TProtocol> input,
shared_ptr<TProtocol> output) :
}
~Task() {}
-
+
void run() {
+ boost::shared_ptr<TServerEventHandler> eventHandler =
+ server_.getEventHandler();
+ if (eventHandler != NULL) {
+ eventHandler->clientBegin(input_, output_);
+ }
try {
while (processor_->process(input_, output_)) {
if (!input_->getTransport()->peek()) {
} catch (...) {
cerr << "TThreadedServer uncaught exception." << endl;
}
+ if (eventHandler != NULL) {
+ eventHandler->clientEnd(input_, output_);
+ }
input_->getTransport()->close();
output_->getTransport()->close();
-
+
// Remove this task from parent bookkeeping
{
- Synchronized s(server_->tasksMonitor_);
- server_->tasks_.erase(this);
- if (server_->tasks_.empty()) {
- server_->tasksMonitor_.notify();
+ Synchronized s(server_.tasksMonitor_);
+ server_.tasks_.erase(this);
+ if (server_.tasks_.empty()) {
+ server_.tasksMonitor_.notify();
}
}
}
private:
- TThreadedServer* server_;
+ TThreadedServer& server_;
friend class TThreadedServer;
shared_ptr<TProcessor> processor_;
return;
}
- while (!stop_) {
+ // Run the preServe event
+ if (eventHandler_ != NULL) {
+ eventHandler_->preServe();
+ }
+
+ while (!stop_) {
try {
client.reset();
inputTransport.reset();
inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
- TThreadedServer::Task* task = new TThreadedServer::Task(this,
- processor_,
+ TThreadedServer::Task* task = new TThreadedServer::Task(*this,
+ processor_,
inputProtocol,
outputProtocol);
-
+
// Create a task
shared_ptr<Runnable> runnable =
shared_ptr<Runnable>(task);
// Create a thread for this task
shared_ptr<Thread> thread =
shared_ptr<Thread>(threadFactory_->newThread(runnable));
-
+
// Insert thread into the set of threads
{
Synchronized s(tasksMonitor_);
#include <boost/shared_ptr.hpp>
-namespace facebook { namespace thrift { namespace server {
+namespace facebook { namespace thrift { namespace server {
using facebook::thrift::TProcessor;
using facebook::thrift::transport::TServerTransport;
public:
class Task;
-
+
TThreadedServer(boost::shared_ptr<TProcessor> processor,
boost::shared_ptr<TServerTransport> serverTransport,
boost::shared_ptr<TTransportFactory> transportFactory,