string finish_cob;
string finish_cob_decl;
string cob_arg;
+ string call_context = ", void* callContext";
+ string call_context_arg = ", callContext";
+ string call_context_decl = ", void*";
string ret_type = "bool ";
if (style == "Cob") {
ifstyle = "CobSv";
pstyle = "Async";
finish_cob = "std::tr1::function<void(bool ok)> cob, ";
finish_cob_decl = "std::tr1::function<void(bool ok)>, ";
+ call_context = ""; // TODO(edhall) remove when callContext is aded to TAsyncProcessor
+ call_context_arg = ""; // ditto
+ call_context_decl = ""; // ditto
cob_arg = "cob, ";
ret_type = "void ";
}
f_header_ <<
indent() << "boost::shared_ptr<" << service_name_ << ifstyle << "If> iface_;" << endl;
f_header_ <<
- indent() << "virtual " << ret_type << "process_fn(" << finish_cob << "::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, std::string& fname, int32_t seqid);" << endl;
+ indent() << "virtual " << ret_type << "process_fn(" << finish_cob << "::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, std::string& fname, int32_t seqid" << call_context << ");" << endl;
indent_down();
// Process function declarations
indent() << "std::map<std::string, void (" <<
service_name_ << pstyle << class_suffix << "::*)(" << finish_cob_decl <<
"int32_t, ::apache::thrift::protocol::TProtocol*, " <<
- "::apache::thrift::protocol::TProtocol*)> processMap_;" << endl;
+ "::apache::thrift::protocol::TProtocol*" << call_context_decl <<
+ ")> processMap_;" << endl;
for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
indent(f_header_) <<
- "void process_" << (*f_iter)->get_name() << "(" << finish_cob << "int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot);" << endl;
+ "void process_" << (*f_iter)->get_name() << "(" << finish_cob << "int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot" << call_context << ");" << endl;
if (gen_templates_) {
indent(f_header_) <<
"void process_" << (*f_iter)->get_name() << "(" << finish_cob <<
- "int32_t seqid, Protocol_* iprot, Protocol_* oprot);" << endl;
+ "int32_t seqid, Protocol_* iprot, Protocol_* oprot" <<
+ call_context << ");" << endl;
}
if (style == "Cob") {
// XXX Factor this out, even if it is a pain.
declare_map <<
indent() << "}" << endl <<
endl <<
- indent() << "virtual " << ret_type << "process(" << finish_cob << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot);" << endl <<
+ indent() << "virtual " << ret_type << "process(" << finish_cob << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot" << call_context << ");" << endl <<
indent() << "virtual ~" << service_name_ << pstyle << class_suffix <<
"() {}" << endl;
indent_down();
ret_type << service_name_ << pstyle << class_suffix << template_suffix <<
"::process(" << finish_cob <<
"boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, " <<
- "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot) {" <<
- endl;
+ "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot" <<
+ call_context << ") {" << endl;
indent_up();
out <<
indent() << "}" << endl <<
endl <<
indent() << "return process_fn(" << (style == "Cob" ? "cob, " : "")
- << "iprot, oprot, fname, seqid);" <<
+ << "iprot, oprot, fname, seqid" << call_context_arg << ");" <<
endl;
indent_down();
"::process_fn(" << finish_cob <<
"::apache::thrift::protocol::TProtocol* iprot, " <<
"::apache::thrift::protocol::TProtocol* oprot, " <<
- "std::string& fname, int32_t seqid) {" << endl;
+ "std::string& fname, int32_t seqid" << call_context << ") {" << endl;
indent_up();
// HOT: member function pointer map
indent() << typename_str << "std::map<std::string, void (" <<
service_name_ << pstyle << class_suffix << "::*)(" << finish_cob_decl <<
"int32_t, ::apache::thrift::protocol::TProtocol*, " <<
- "::apache::thrift::protocol::TProtocol*)>::iterator pfn;" << endl <<
+ "::apache::thrift::protocol::TProtocol*" << call_context_decl << ")>::iterator pfn;" << endl <<
indent() << "pfn = processMap_.find(fname);" << endl <<
indent() << "if (pfn == processMap_.end()) {" << endl;
if (extends.empty()) {
indent() << " return "
<< extends << "::process_fn("
<< (style == "Cob" ? "cob, " : "")
- << "iprot, oprot, fname, seqid);" << endl;
+ << "iprot, oprot, fname, seqid" << call_context_arg << ");" << endl;
}
out <<
indent() << "}" << endl <<
- indent() << "(this->*(pfn->second))(" << cob_arg << "seqid, iprot, oprot);" << endl;
+ indent() << "(this->*(pfn->second))(" << cob_arg << "seqid, iprot, oprot" << call_context_arg << ");" << endl;
// TODO(dreiss): return pfn ret?
if (style == "Cob") {
out <<
"void " << tservice->get_name() << "Processor" << class_suffix << "::" <<
"process_" << tfunction->get_name() << "(int32_t seqid, " <<
- prot_type << "* iprot, " << prot_type << "* oprot)" << endl;
+ prot_type << "* iprot, " << prot_type << "* oprot, void* callContext)" << endl;
scope_up(out);
if (gen_templates_ && !specialized) {
endl <<
indent() << "if (_iprot && _oprot) {" << endl <<
indent() << " return process_" << tfunction->get_name() <<
- "(seqid, _iprot, _oprot);" << endl <<
+ "(seqid, _iprot, _oprot, callContext);" << endl <<
indent() << "}" << endl << endl;
}
out <<
indent() << "void* ctx = NULL;" << endl <<
indent() << "if (eventHandler_.get() != NULL) {" << endl <<
- indent() << " ctx = eventHandler_->getContext(\"" << service_func_name << "\");" << endl <<
+ indent() << " ctx = eventHandler_->getContext(\"" << service_func_name << "\", callContext);" << endl <<
indent() << "}" << endl <<
indent() << "::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, \"" << service_func_name << "\");" << endl << endl <<
indent() << "if (eventHandler_.get() != NULL) {" << endl <<
// Cob style.
else {
// Processor entry point.
+ // TODO(edhall) update for callContext when TEventServer is ready
if (gen_templates_) {
out <<
indent() << "template <class Protocol_>" << endl;
indent() << tservice->get_name() + "_" + tfunction->get_name() + "_args" << " args;" << endl <<
indent() << "void* ctx = NULL;" << endl <<
indent() << "if (eventHandler_.get() != NULL) {" << endl <<
- indent() << " ctx = eventHandler_->getContext(\"" << service_func_name << "\");" << endl <<
+ indent() << " ctx = eventHandler_->getContext(\"" << service_func_name << "\", NULL);" << endl <<
indent() << "}" << endl <<
indent() << "::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, \"" << service_func_name << "\");" << endl << endl <<
indent() << "try {" << endl;
out <<
endl <<
indent() << "if (eventHandler_.get() != NULL) {" << endl <<
- indent() << " ctx = eventHandler_->getContext(\"" << service_func_name << "\");" << endl <<
+ indent() << " ctx = eventHandler_->getContext(\"" << service_func_name << "\", NULL);" << endl <<
indent() << "}" << endl <<
indent() << "::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, \"" << service_func_name << "\");" << endl << endl <<
indent() << "if (eventHandler_.get() != NULL) {" << endl <<
out <<
endl <<
indent() << "if (eventHandler_.get() != NULL) {" << endl <<
- indent() << " ctx = eventHandler_->getContext(\"" << service_func_name << "\");" << endl <<
+ indent() << " ctx = eventHandler_->getContext(\"" << service_func_name << "\", NULL);" << endl <<
indent() << "}" << endl <<
indent() << "::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, \"" << service_func_name << "\");" << endl << endl <<
indent() << "if (eventHandler_.get() != NULL) {" << endl <<
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <server/TClientInfo.h>
+
+namespace apache { namespace thrift { namespace server {
+
+using namespace apache::thrift;
+using namespace apache::thrift::transport;
+
+TClientInfoConnection::TClientInfoConnection() {
+ call_[kNameLen - 1] = '\0'; // insure NUL terminator is there
+ eraseAddr();
+ eraseCall();
+}
+
+void TClientInfoConnection::recordAddr(const sockaddr* addr) {
+ eraseAddr();
+ initTime();
+ ncalls_ = 0;
+ if (addr != NULL) {
+ if (addr->sa_family == AF_INET) {
+ memcpy((void*)&addr_.ipv4, (const void *)addr, sizeof(sockaddr_in));
+ }
+ else if (addr->sa_family == AF_INET6) {
+ memcpy((void*)&addr_.ipv6, (const void *)addr, sizeof(sockaddr_in6));
+ }
+ }
+}
+
+void TClientInfoConnection::eraseAddr() {
+ addr_.ipv4.sin_family = AF_UNSPEC;
+}
+
+const char* TClientInfoConnection::getAddr(char* buf, int len) const {
+ switch (addr_.ipv4.sin_family) {
+ case AF_INET:
+ return inet_ntop(AF_INET, &addr_.ipv4.sin_addr, buf, len);
+ case AF_INET6:
+ return inet_ntop(AF_INET6, &addr_.ipv6.sin6_addr, buf, len);
+ default:
+ return NULL;
+ }
+}
+
+void TClientInfoConnection::recordCall(const char* name) {
+ strncpy(call_, name, kNameLen - 1); // NUL terminator set in constructor
+ ncalls_++;
+}
+
+void TClientInfoConnection::eraseCall() {
+ call_[0] = '\0';
+}
+
+const char* TClientInfoConnection::getCall() const {
+ if (call_[0] == '\0') {
+ return NULL;
+ }
+ return call_;
+}
+
+void TClientInfoConnection::getTime(timespec* time) const {
+ *time = time_;
+}
+
+uint64_t TClientInfoConnection::getNCalls() const {
+ return ncalls_;
+}
+
+void TClientInfoConnection::initTime() {
+ clock_gettime(CLOCK_REALTIME, &time_);
+}
+
+
+TClientInfoConnection* TClientInfo::getConnection(int fd, bool grow) {
+ if (fd < 0 || (!grow && fd >= info_.size())) {
+ return NULL;
+ }
+ return &info_[fd];
+}
+
+size_t TClientInfo::size() const {
+ return info_.size();
+}
+
+void* TClientInfoServerHandler::createContext(boost::shared_ptr<TProtocol> input,
+ boost::shared_ptr<TProtocol> output) {
+ (void)input;
+ (void)output;
+ return (void*) new Connect(&clientInfo_);
+}
+
+void TClientInfoServerHandler::deleteContext(void* connectionContext,
+ boost::shared_ptr<TProtocol> input,
+ boost::shared_ptr<TProtocol> output) {
+ Connect* call = static_cast<Connect*>(connectionContext);
+ if (call->callInfo_) {
+ call->callInfo_->eraseCall();
+ }
+ delete call;
+}
+
+void TClientInfoServerHandler::processContext(void* connectionContext,
+ shared_ptr<TTransport> transport) {
+ Connect* call = static_cast<Connect*>(connectionContext);
+ if (call->callInfo_ == NULL) {
+ if (typeid(*(transport.get())) == typeid(TSocket)) {
+ TSocket* tsocket = static_cast<TSocket*>(transport.get());
+ int fd = tsocket->getSocketFD();
+ if (fd < 0) {
+ return;
+ }
+ call->callInfo_ = call->clientInfo_->getConnection(fd, true);
+ assert(call->callInfo_ != NULL);
+ socklen_t len;
+ call->callInfo_->recordAddr(tsocket->getCachedAddress(&len));
+ }
+ }
+}
+
+void TClientInfoServerHandler::getStatsStrings(vector<string>& result) {
+ result.clear();
+ timespec now;
+ clock_gettime(CLOCK_REALTIME, &now);
+
+ for (int i = 0; i < clientInfo_.size(); ++i) {
+ TClientInfoConnection* info = clientInfo_.getConnection(i, false);
+ const char* callStr = info->getCall();
+ if (callStr == NULL) {
+ continue;
+ }
+
+ char addrBuf[INET6_ADDRSTRLEN];
+ const char* addrStr = info->getAddr(addrBuf, sizeof addrBuf);
+ if (addrStr == NULL) {
+ // cerr << "no addr!" << endl;
+ continue;
+ }
+
+ timespec start;
+ double secs = 0.0;
+ info->getTime(&start);
+ secs = (double)(now.tv_sec - start.tv_sec) + (now.tv_nsec - start.tv_nsec)*0.000000001;
+
+ char buf[256];
+ snprintf(buf, sizeof buf, "%d %s %s %.3f %llu", i, addrStr, callStr, secs,
+ (unsigned long long)info->getNCalls());
+
+ result.push_back(buf);
+ }
+}
+
+void* TClientInfoCallHandler::getContext(const char* fn_name, void* serverContext) {
+ if (serverContext) {
+ TClientInfoConnection* callInfo = static_cast<TClientInfoServerHandler::Connect*>(serverContext)->callInfo_;
+ if (callInfo != NULL) {
+ callInfo->recordCall(fn_name);
+ }
+ }
+ return NULL;
+}
+
+} } } // namespace apache::thrift::server
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _FACEBOOK_THRIFT_SERVER_TCLIENTINFO_H_
+#define _FACEBOOK_THRIFT_SERVER_TCLIENTINFO_H_ 1
+
+// for inet_ntop --
+#include <arpa/inet.h>
+#include <server/TServer.h>
+#include <transport/TSocket.h>
+#include <concurrency/Mutex.h>
+
+namespace apache { namespace thrift { namespace server {
+
+using namespace apache::thrift;
+using namespace apache::thrift::transport;
+using namespace apache::thrift::concurrency;
+using boost::shared_ptr;
+using std::string;
+using std::vector;
+
+/**
+ * StableVector -- a minimal vector class where growth is automatic and
+ * vector elements never move as the vector grows. Allocates new space
+ * as needed, but does not copy old values.
+ *
+ * A level vector stores a list of storage vectors containing the actual
+ * elements. Levels are added as needed, doubling in size each time.
+ * Locking is only done when a level is added. Access is amortized
+ * constant time.
+ */
+template <typename T>
+class StableVector {
+ /// The initial allocation as an exponent of 2
+ static const uint32_t kInitialSizePowOf2 = 10;
+ /// The initial allocation size
+ static const uint32_t kInitialVectorSize = 1 << kInitialSizePowOf2;
+ /// This bound is guaranteed not to be exceeded on 64-bit archs
+ static const int kMaxLevels = 64;
+
+ /// Values are kept in one or more of these
+ typedef vector<T> Vect;
+ /// One or more value vectors are kept in one of these
+ typedef vector<Vect*> LevelVector;
+
+ Mutex mutex_;
+ /// current size
+ size_t size_;
+ _Atomic_word vectLvl_;
+ LevelVector vects_;
+
+ public:
+ /**
+ * Constructor -- initialize the level vector and allocate the
+ * initial storage vector
+ */
+ StableVector()
+ : size_(0)
+ , vectLvl_(0) {
+ vects_.reserve(kMaxLevels);
+ Vect* storageVector(new Vect(1 << kInitialSizePowOf2));
+ vects_.push_back(storageVector);
+ }
+
+ private:
+ /**
+ * make sure the requested number of storage levels have been allocated.
+ */
+ void expand(uint32_t level) {
+ // we need the guard to insure that we only allocate once.
+ Guard g(mutex_);
+ while (level > vectLvl_) {
+ Vect* levelVect(new Vect(1 << (vectLvl_ + kInitialSizePowOf2)));
+ vects_.push_back(levelVect);
+ // we need to make sure this is done after levelVect is inserted
+ // (what we want is effectively a memory barrier here).
+ __gnu_cxx::__atomic_add(&vectLvl_, 1);
+ }
+ }
+
+ /**
+ * Given an index, determine which level and element of that level is
+ * required. Grows if needed.
+ */
+ void which(uint32_t n, uint32_t* vno, uint32_t* idx) {
+ if (n >= size_) {
+ size_ = n + 1;
+ }
+ if (n < kInitialVectorSize) {
+ *idx = n;
+ *vno = 0;
+ } else {
+ uint32_t upper = n >> kInitialSizePowOf2;
+ *vno = CHAR_BIT*sizeof(upper) - __builtin_clz(upper);
+ *idx = n - (1 << (*vno + kInitialSizePowOf2 - 1));
+ if (*vno > vectLvl_) {
+ expand(*vno);
+ }
+ }
+ }
+
+ public:
+ /**
+ * Given an index, return a reference to that element, perhaps after
+ * allocating additional space.
+ *
+ * @param n a positive integer
+ */
+ T& operator[](uint32_t n) {
+ uint32_t vno;
+ uint32_t idx;
+ which(n, &vno, &idx);
+ return (*vects_[vno])[idx];
+ }
+
+ /**
+ * Return the present size of the vector.
+ */
+ size_t size() const { return size_; }
+};
+
+
+/**
+ * This class embodies the representation of a single connection during
+ * processing. We'll keep one of these per file descriptor in TClientInfo.
+ */
+class TClientInfoConnection {
+ public:
+ const static int kNameLen = 32;
+
+ private:
+ typedef union IPAddrUnion {
+ sockaddr_in ipv4;
+ sockaddr_in6 ipv6;
+ };
+
+ char call_[kNameLen]; ///< The name of the thrift call
+ IPAddrUnion addr_; ///< The client's IP address
+ timespec time_; ///< Time processing started
+ uint64_t ncalls_; ///< # of calls processed
+
+ public:
+ /**
+ * Constructor; insure that no client address or thrift call name is
+ * represented.
+ */
+ TClientInfoConnection();
+
+ /**
+ * A connection has been made; record its address. Since this is the
+ * first we'll know of a connection we start the timer here as well.
+ */
+ void recordAddr(const sockaddr* addr);
+
+ /**
+ * Mark the address as empty/unknown.
+ */
+ void eraseAddr();
+
+ /**
+ * Return a string representing the present address, or NULL if none.
+ * Copies the string into the buffer provided.
+ */
+ const char* getAddr(char* buf, int len) const;
+
+ /**
+ * A call has been made on this connection; record its name. Since this is
+ * called for every thrift call processed, we also do our call count here.
+ */
+ void recordCall(const char* name);
+
+ /**
+ * Invoked when processing has ended to clear the call name.
+ */
+ void eraseCall();
+
+ /**
+ * Return as string the thrift call either currently being processed or
+ * most recently processed if the connection is still open for additonal
+ * calls. Returns NULL if a call hasn't been made yet or processing
+ * has ended.
+ */
+ const char* getCall() const;
+
+ /**
+ * Get the timespec for the start of this connection (specifically, when
+ * recordAddr() was first called).
+ */
+ void getTime(timespec* time) const;
+
+ /**
+ * Return the number of calls made on this connection.
+ */
+ uint64_t getNCalls() const;
+
+ private:
+ void initTime();
+};
+
+
+/**
+ * Store for info about a server's clients -- specifically, the client's IP
+ * address and the call it is executing. This information is indexed by
+ * socket file descriptor and in the present implementation is updated
+ * asynchronously, so it may only approximate reality.
+ */
+class TClientInfo {
+ private:
+ StableVector<TClientInfoConnection> info_;
+
+ public:
+ /**
+ * Return the info object for a given file descriptor. If "grow" is true
+ * extend the info vector if required (such as for a file descriptor not seen
+ * before). If "grow" is false and the info vector isn't large enough,
+ * or if "fd" is negative, return NULL.
+ */
+ TClientInfoConnection* getConnection(int fd, bool grow);
+
+ size_t size() const;
+};
+
+/**
+ * This derivation of TServerEventHandler encapsulates the main status vector
+ * and provides context to the server's processing loop via overrides.
+ * Together with TClientInfoCallHandler (derived from TProcessorEventHandler)
+ * it integrates client info collection into the server.
+ */
+class TClientInfoServerHandler : public TServerEventHandler {
+ private:
+ TClientInfo clientInfo_;
+
+ public:
+ /**
+ * One of these is constructed for each open connection/descriptor and links
+ * to both the status vector (clientInfo_) and that descriptor's entry
+ * within it.
+ */
+ struct Connect {
+ TClientInfo* clientInfo_;
+ TClientInfoConnection* callInfo_;
+
+ explicit Connect(TClientInfo* clientInfo)
+ : clientInfo_(clientInfo)
+ , callInfo_(NULL) {
+ }
+ };
+
+ /**
+ * Generate processor context; we don't know what descriptor we belong to
+ * yet -- we'll get hooked up in contextProcess().
+ */
+ void* createContext(boost::shared_ptr<TProtocol> input,
+ boost::shared_ptr<TProtocol> output);
+
+ /**
+ * Mark our slot as unused and delete the context created in createContext().
+ */
+ void deleteContext(void* processorContext,
+ boost::shared_ptr<TProtocol> input,
+ boost::shared_ptr<TProtocol> output);
+
+ /**
+ * Called in the processing loop just before the server invokes the
+ * processor itself, on the first call we establish which descriptor
+ * we correspond to and set it to that socket's peer IP address. This
+ * also has the side effect of initializing call counting and connection
+ * timing. We won't know which call we're handling until the handler
+ * first gets called in TClientInfoCallHandler::getContext().
+ */
+ void processContext(void* processorContext,
+ shared_ptr<TTransport> transport);
+
+ /**
+ * Get status report for server in the form of a vector of strings.
+ * Each active client appears as one string in the format:
+ *
+ * FD IPADDR CALLNAME DURATION NCALLS
+ *
+ * where "FD" is the file descriptor for the client's socket, "IPADDR"
+ * is the IP address (as reported by accept()), "CALLNAME" is the
+ * current or most recent Thrift function name, "DURATION" is the
+ * duration of the connection, while NCALLS is the number of Thrift
+ * calls made since the connection was made. A single space separates
+ * fields.
+ */
+ void getStatsStrings(vector<string>& result);
+};
+
+/**
+ * This class derives from TProcessorEventHandler to gain access to the
+ * function name for the current Thrift call. We need two versions of
+ * this -- TClientInfoCallStatsHandler is the other -- since in the latter
+ * case we pass through to TFunctionStatHandler to perform Thrift call
+ * stats.
+ */
+class TClientInfoCallHandler : public TProcessorEventHandler {
+ public:
+ virtual void* getContext(const char* fn_name, void* serverContext);
+};
+
+} } } // namespace apache::thrift::server
+
+#endif // !_FACEBOOK_THRIFT_SERVER_TCLIENTINFO_H_
* The return value is passed to all other callbacks
* for that function invocation.
*/
- virtual void* getContext(const char* fn_name) { return NULL; }
+ virtual void* getContext(const char* fn_name, void* serverContext) { return NULL; }
/**
* Expected to free resources associated with a context.
virtual ~TProcessor() {}
virtual bool process(boost::shared_ptr<protocol::TProtocol> in,
- boost::shared_ptr<protocol::TProtocol> out) = 0;
+ boost::shared_ptr<protocol::TProtocol> out,
+ void* connectionContext) = 0;
- bool process(boost::shared_ptr<apache::thrift::protocol::TProtocol> io) {
- return process(io, io);
+ bool process(boost::shared_ptr<apache::thrift::protocol::TProtocol> io,
+ void* connectionContext) {
+ return process(io, io, connectionContext);
}
boost::shared_ptr<TProcessorEventHandler> getEventHandler() {
}
bool PeekProcessor::process(boost::shared_ptr<TProtocol> in,
- boost::shared_ptr<TProtocol> out) {
+ boost::shared_ptr<TProtocol> out,
+ void* connectionContext) {
std::string fname;
TMessageType mtype;
// Done peeking at variables
peekEnd();
- bool ret = actualProcessor_->process(pipedProtocol_, out);
+ bool ret = actualProcessor_->process(pipedProtocol_, out, connectionContext);
memoryBuffer_->resetBuffer();
return ret;
}
void setTargetTransport(boost::shared_ptr<apache::thrift::transport::TTransport> targetTransport);
virtual bool process(boost::shared_ptr<apache::thrift::protocol::TProtocol> in,
- boost::shared_ptr<apache::thrift::protocol::TProtocol> out);
+ boost::shared_ptr<apache::thrift::protocol::TProtocol> out,
+ void* connectionContext);
// The following three functions can be overloaded by child classes to
// achieve desired peeking behavior
{}
virtual ~StatsProcessor() {};
- virtual bool process(boost::shared_ptr<apache::thrift::protocol::TProtocol> piprot, boost::shared_ptr<apache::thrift::protocol::TProtocol> poprot) {
+ virtual bool process(boost::shared_ptr<apache::thrift::protocol::TProtocol> piprot,
+ boost::shared_ptr<apache::thrift::protocol::TProtocol> poprot,
+ void* serverContext) {
piprot_ = piprot;
void run() {
try {
- while (processor_->process(input_, output_)) {
+ while (processor_->process(input_, output_, NULL)) {
if (!input_->getTransport()->peek()) {
break;
}
} else {
try {
// Invoke the processor
- server_->getProcessor()->process(inputProtocol_, outputProtocol_);
+ server_->getProcessor()->process(inputProtocol_, outputProtocol_, NULL);
} catch (TTransportException &ttx) {
GlobalOutput.printf("TTransportException: Server::process() %s", ttx.what());
server_->decrementActiveProcessors();
/**
* Called when a new client has connected and is about to being processing.
*/
- virtual void clientBegin(boost::shared_ptr<TProtocol> /* input */,
- boost::shared_ptr<TProtocol> /* output */) {}
+ virtual void* createContext(boost::shared_ptr<TProtocol> input,
+ boost::shared_ptr<TProtocol> output) {
+ (void)input;
+ (void)output;
+ return NULL;
+ }
+
+ /**
+ * Called when a client has finished request-handling to delete server
+ * context.
+ */
+ virtual void deleteContext(void* serverContext,
+ boost::shared_ptr<TProtocol>input,
+ boost::shared_ptr<TProtocol>output) {
+ (void)serverContext;
+ (void)input;
+ (void)output;
+ }
/**
- * Called when a client has finished making requests.
+ * Called when a client is about to call the processor.
*/
- virtual void clientEnd(boost::shared_ptr<TProtocol> /* input */,
- boost::shared_ptr<TProtocol> /* output */) {}
+ virtual void processContext(void* serverContext,
+ boost::shared_ptr<TTransport> transport) {
+ (void)serverContext;
+ (void)transport;
+}
protected:
outputTransport = outputTransportFactory_->getTransport(client);
inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
+ void* connectionContext = NULL;
if (eventHandler_ != NULL) {
- eventHandler_->clientBegin(inputProtocol, outputProtocol);
+ connectionContext = eventHandler_->createContext(inputProtocol, outputProtocol);
}
try {
- while (processor_->process(inputProtocol, outputProtocol)) {
- // Peek ahead, is the remote side closed?
- if (!inputTransport->peek()) {
+ for (;;) {
+ if (eventHandler_ != NULL) {
+ eventHandler_->processContext(connectionContext, client);
+ }
+ if (!processor_->process(inputProtocol, outputProtocol, connectionContext) ||
+ // Peek ahead, is the remote side closed?
+ !inputProtocol->getTransport()->peek()) {
break;
}
}
cerr << "TSimpleServer exception: " << tx.what() << endl;
}
if (eventHandler_ != NULL) {
- eventHandler_->clientEnd(inputProtocol, outputProtocol);
+ eventHandler_->deleteContext(connectionContext, inputProtocol, outputProtocol);
}
inputTransport->close();
outputTransport->close();
Task(TThreadPoolServer &server,
shared_ptr<TProcessor> processor,
shared_ptr<TProtocol> input,
- shared_ptr<TProtocol> output) :
+ shared_ptr<TProtocol> output,
+ shared_ptr<TTransport> transport) :
server_(server),
processor_(processor),
input_(input),
- output_(output) {
+ output_(output),
+ transport_(transport) {
}
~Task() {}
void run() {
boost::shared_ptr<TServerEventHandler> eventHandler =
server_.getEventHandler();
+ void* connectionContext = NULL;
if (eventHandler != NULL) {
- eventHandler->clientBegin(input_, output_);
+ connectionContext = eventHandler->createContext(input_, output_);
}
try {
- while (processor_->process(input_, output_)) {
- if (!input_->getTransport()->peek()) {
+ for (;;) {
+ if (eventHandler != NULL) {
+ eventHandler->processContext(connectionContext, transport_);
+ }
+ if (!processor_->process(input_, output_, connectionContext) ||
+ !input_->getTransport()->peek()) {
break;
}
}
}
if (eventHandler != NULL) {
- eventHandler->clientEnd(input_, output_);
+ eventHandler->deleteContext(connectionContext, input_, output_);
}
try {
shared_ptr<TProcessor> processor_;
shared_ptr<TProtocol> input_;
shared_ptr<TProtocol> output_;
-
+ shared_ptr<TTransport> transport_;
};
TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
// Add to threadmanager pool
- threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(*this, processor_, inputProtocol, outputProtocol)), timeout_);
+ threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(*this, processor_, inputProtocol, outputProtocol, client)), timeout_);
} catch (TTransportException& ttx) {
if (inputTransport != NULL) { inputTransport->close(); }
Task(TThreadedServer& server,
shared_ptr<TProcessor> processor,
shared_ptr<TProtocol> input,
- shared_ptr<TProtocol> output) :
+ shared_ptr<TProtocol> output,
+ shared_ptr<TTransport> transport) :
server_(server),
processor_(processor),
input_(input),
- output_(output) {
+ output_(output),
+ transport_(transport) {
}
~Task() {}
void run() {
boost::shared_ptr<TServerEventHandler> eventHandler =
server_.getEventHandler();
+ void* connectionContext = NULL;
if (eventHandler != NULL) {
- eventHandler->clientBegin(input_, output_);
+ connectionContext = eventHandler->createContext(input_, output_);
}
try {
- while (processor_->process(input_, output_)) {
- if (!input_->getTransport()->peek()) {
+ for (;;) {
+ if (eventHandler != NULL) {
+ eventHandler->processContext(connectionContext, transport_);
+ }
+ if (!processor_->process(input_, output_, connectionContext) ||
+ !input_->getTransport()->peek()) {
break;
}
}
GlobalOutput("TThreadedServer uncaught exception.");
}
if (eventHandler != NULL) {
- eventHandler->clientEnd(input_, output_);
+ eventHandler->deleteContext(connectionContext, input_, output_);
}
try {
shared_ptr<TProcessor> processor_;
shared_ptr<TProtocol> input_;
shared_ptr<TProtocol> output_;
+ shared_ptr<TTransport> transport_;
};
TThreadedServer::Task* task = new TThreadedServer::Task(*this,
processor_,
inputProtocol,
- outputProtocol);
+ outputProtocol,
+ client);
// Create a task
shared_ptr<Runnable> runnable =
// bad form to use exceptions for flow control but there is really
// no other way around it
try {
- processor_->process(inputProtocol, outputProtocol);
+ processor_->process(inputProtocol, outputProtocol, NULL);
numProcessed++;
if ( (numEvents > 0) && (numProcessed == numEvents)) {
return;
// bad form to use exceptions for flow control but there is really
// no other way around it
try {
- processor_->process(inputProtocol, outputProtocol);
+ processor_->process(inputProtocol, outputProtocol, NULL);
if (curChunk != inputTransport_->getCurChunk()) {
break;
}
if (recvTimeout_ > 0) {
client->setRecvTimeout(recvTimeout_);
}
-
+ client->setCachedAddress((sockaddr*) &clientAddress, size);
+
return client;
}
maxRecvRetries_(5) {
recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
+ cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
}
TSocket::TSocket() :
maxRecvRetries_(5) {
recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
+ cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
}
TSocket::TSocket(int socket) :
maxRecvRetries_(5) {
recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
+ cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
}
TSocket::~TSocket() {
done:
// Set socket back to normal mode (blocking)
fcntl(socket_, F_SETFL, flags);
+
+ setCachedAddress(res->ai_addr, res->ai_addrlen);
}
void TSocket::open() {
std::string TSocket::getPeerHost() {
if (peerHost_.empty()) {
struct sockaddr_storage addr;
- socklen_t addrLen = sizeof(addr);
+ struct sockaddr* addrPtr;
+ socklen_t addrLen;
if (socket_ < 0) {
return host_;
}
- int rv = getpeername(socket_, (sockaddr*) &addr, &addrLen);
+ addrPtr = getCachedAddress(&addrLen);
+
+ if (addrPtr == NULL) {
+ addrLen = sizeof(addr);
+ if (getpeername(socket_, (sockaddr*) &addr, &addrLen) != 0) {
+ return peerHost_;
+ }
+ addrPtr = (sockaddr*)&addr;
- if (rv != 0) {
- return peerHost_;
+ setCachedAddress(addrPtr, addrLen);
}
char clienthost[NI_MAXHOST];
char clientservice[NI_MAXSERV];
- getnameinfo((sockaddr*) &addr, addrLen,
+ getnameinfo((sockaddr*) addrPtr, addrLen,
clienthost, sizeof(clienthost),
clientservice, sizeof(clientservice), 0);
std::string TSocket::getPeerAddress() {
if (peerAddress_.empty()) {
struct sockaddr_storage addr;
- socklen_t addrLen = sizeof(addr);
+ struct sockaddr* addrPtr;
+ socklen_t addrLen;
if (socket_ < 0) {
return peerAddress_;
}
- int rv = getpeername(socket_, (sockaddr*) &addr, &addrLen);
+ addrPtr = getCachedAddress(&addrLen);
- if (rv != 0) {
- return peerAddress_;
+ if (addrPtr == NULL) {
+ addrLen = sizeof(addr);
+ if (getpeername(socket_, (sockaddr*) &addr, &addrLen) != 0) {
+ return peerAddress_;
+ }
+ addrPtr = (sockaddr*)&addr;
+
+ setCachedAddress(addrPtr, addrLen);
}
char clienthost[NI_MAXHOST];
char clientservice[NI_MAXSERV];
- getnameinfo((sockaddr*) &addr, addrLen,
+ getnameinfo(addrPtr, addrLen,
clienthost, sizeof(clienthost),
clientservice, sizeof(clientservice),
NI_NUMERICHOST|NI_NUMERICSERV);
return peerPort_;
}
+void TSocket::setCachedAddress(const sockaddr* addr, socklen_t len) {
+ switch (addr->sa_family) {
+ case AF_INET:
+ if (len == sizeof(sockaddr_in)) {
+ memcpy((void*)&cachedPeerAddr_.ipv4, (void*)addr, len);
+ }
+ break;
+
+ case AF_INET6:
+ if (len == sizeof(sockaddr_in6)) {
+ memcpy((void*)&cachedPeerAddr_.ipv6, (void*)addr, len);
+ }
+ break;
+ }
+}
+
+sockaddr* TSocket::getCachedAddress(socklen_t* len) const {
+ switch (cachedPeerAddr_.ipv4.sin_family) {
+ case AF_INET:
+ *len = sizeof(sockaddr_in);
+ return (sockaddr*) &cachedPeerAddr_.ipv4;
+
+ case AF_INET6:
+ *len = sizeof(sockaddr_in6);
+ return (sockaddr*) &cachedPeerAddr_.ipv6;
+
+ default:
+ return NULL;
+ }
+}
+
bool TSocket::useLowMinRto_ = false;
void TSocket::setUseLowMinRto(bool useLowMinRto) {
useLowMinRto_ = useLowMinRto;
**/
int getPeerPort();
+ /**
+ * Returns the underlying socket file descriptor.
+ */
+ int getSocketFD() {
+ return socket_;
+ }
+
+ /*
+ * Returns a cached copy of the peer address.
+ */
+ sockaddr* getCachedAddress(socklen_t* len) const;
+
/**
* Sets whether to use a low minimum TCP retransmission timeout.
*/
/** connect, called by open */
void openConnection(struct addrinfo *res);
+ /**
+ * Set a cache of the peer address (used when trivially available: e.g.
+ * accept() or connect()). Only caches IPV4 and IPV6; unset for others.
+ */
+ void setCachedAddress(const sockaddr* addr, socklen_t len);
+
/** Host to connect to */
std::string host_;
/** Recv timeout timeval */
struct timeval recvTimeval_;
+ /** Cached peer address */
+ union {
+ sockaddr_in ipv4;
+ sockaddr_in6 ipv6;
+ } cachedPeerAddr_;
+
+ /** Connection start time */
+ timespec startTime_;
+
/** Whether to use low minimum TCP retransmission timeout */
static bool useLowMinRto_;
class TestProcessorEventHandler : public TProcessorEventHandler {
- virtual void* getContext(const char* fn_name) {
+ virtual void* getContext(const char* fn_name, void* serverContext) {
return new std::string(fn_name);
}
virtual void freeContext(void* ctx, const char* fn_name) {