From 23248713345e36d8ed66704a9a58a5f39a48d942 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Wed, 6 Oct 2010 17:10:08 +0000 Subject: [PATCH] THRIFT-928. cpp: Thrift Server Client Stats Add the ability for Thrift servers to monitor client connections. It is activated by #including server/TClientInfo.h and creating 1) a TClientInfoCallHandler passed to the processor with setEventHandler() and 2) a TClientInforServerHandler passed to the server with setServerEventHandler(). The result vector, showing active connections, provides client address and the thrift call it is executing (or last executed), the time connected, and the number of calls made since connection. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005139 13f79535-47bb-0310-9956-ffa450edef68 --- compiler/cpp/src/generate/t_cpp_generator.cc | 45 +-- contrib/fb303/TClientInfo.cpp | 179 +++++++++++ contrib/fb303/TClientInfo.h | 320 +++++++++++++++++++ lib/cpp/src/TProcessor.h | 10 +- lib/cpp/src/processor/PeekProcessor.cpp | 5 +- lib/cpp/src/processor/PeekProcessor.h | 3 +- lib/cpp/src/processor/StatsProcessor.h | 4 +- lib/cpp/src/server/TNonblockingServer.cpp | 4 +- lib/cpp/src/server/TServer.h | 29 +- lib/cpp/src/server/TSimpleServer.cpp | 15 +- lib/cpp/src/server/TThreadPoolServer.cpp | 23 +- lib/cpp/src/server/TThreadedServer.cpp | 23 +- lib/cpp/src/transport/TFileTransport.cpp | 4 +- lib/cpp/src/transport/TServerSocket.cpp | 3 +- lib/cpp/src/transport/TSocket.cpp | 70 +++- lib/cpp/src/transport/TSocket.h | 27 ++ test/cpp/src/TestServer.cpp | 2 +- 17 files changed, 699 insertions(+), 67 deletions(-) create mode 100644 contrib/fb303/TClientInfo.cpp create mode 100644 contrib/fb303/TClientInfo.h diff --git a/compiler/cpp/src/generate/t_cpp_generator.cc b/compiler/cpp/src/generate/t_cpp_generator.cc index c8d9dda1..a0e11f83 100644 --- a/compiler/cpp/src/generate/t_cpp_generator.cc +++ b/compiler/cpp/src/generate/t_cpp_generator.cc @@ -2403,12 +2403,18 @@ void t_cpp_generator::generate_service_processor(t_service* tservice, string sty string finish_cob; string finish_cob_decl; string cob_arg; + string call_context = ", void* callContext"; + string call_context_arg = ", callContext"; + string call_context_decl = ", void*"; string ret_type = "bool "; if (style == "Cob") { ifstyle = "CobSv"; pstyle = "Async"; finish_cob = "std::tr1::function cob, "; finish_cob_decl = "std::tr1::function, "; + call_context = ""; // TODO(edhall) remove when callContext is aded to TAsyncProcessor + call_context_arg = ""; // ditto + call_context_decl = ""; // ditto cob_arg = "cob, "; ret_type = "void "; } @@ -2452,7 +2458,7 @@ void t_cpp_generator::generate_service_processor(t_service* tservice, string sty f_header_ << indent() << "boost::shared_ptr<" << service_name_ << ifstyle << "If> iface_;" << endl; f_header_ << - indent() << "virtual " << ret_type << "process_fn(" << finish_cob << "::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, std::string& fname, int32_t seqid);" << endl; + indent() << "virtual " << ret_type << "process_fn(" << finish_cob << "::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, std::string& fname, int32_t seqid" << call_context << ");" << endl; indent_down(); // Process function declarations @@ -2463,14 +2469,16 @@ void t_cpp_generator::generate_service_processor(t_service* tservice, string sty indent() << "std::map processMap_;" << endl; + "::apache::thrift::protocol::TProtocol*" << call_context_decl << + ")> processMap_;" << endl; for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) { indent(f_header_) << - "void process_" << (*f_iter)->get_name() << "(" << finish_cob << "int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot);" << endl; + "void process_" << (*f_iter)->get_name() << "(" << finish_cob << "int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot" << call_context << ");" << endl; if (gen_templates_) { indent(f_header_) << "void process_" << (*f_iter)->get_name() << "(" << finish_cob << - "int32_t seqid, Protocol_* iprot, Protocol_* oprot);" << endl; + "int32_t seqid, Protocol_* iprot, Protocol_* oprot" << + call_context << ");" << endl; } if (style == "Cob") { // XXX Factor this out, even if it is a pain. @@ -2539,7 +2547,7 @@ void t_cpp_generator::generate_service_processor(t_service* tservice, string sty declare_map << indent() << "}" << endl << endl << - indent() << "virtual " << ret_type << "process(" << finish_cob << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot);" << endl << + indent() << "virtual " << ret_type << "process(" << finish_cob << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot" << call_context << ");" << endl << indent() << "virtual ~" << service_name_ << pstyle << class_suffix << "() {}" << endl; indent_down(); @@ -2553,8 +2561,8 @@ void t_cpp_generator::generate_service_processor(t_service* tservice, string sty ret_type << service_name_ << pstyle << class_suffix << template_suffix << "::process(" << finish_cob << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, " << - "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot) {" << - endl; + "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot" << + call_context << ") {" << endl; indent_up(); out << @@ -2581,7 +2589,7 @@ void t_cpp_generator::generate_service_processor(t_service* tservice, string sty indent() << "}" << endl << endl << indent() << "return process_fn(" << (style == "Cob" ? "cob, " : "") - << "iprot, oprot, fname, seqid);" << + << "iprot, oprot, fname, seqid" << call_context_arg << ");" << endl; indent_down(); @@ -2595,7 +2603,7 @@ void t_cpp_generator::generate_service_processor(t_service* tservice, string sty "::process_fn(" << finish_cob << "::apache::thrift::protocol::TProtocol* iprot, " << "::apache::thrift::protocol::TProtocol* oprot, " << - "std::string& fname, int32_t seqid) {" << endl; + "std::string& fname, int32_t seqid" << call_context << ") {" << endl; indent_up(); // HOT: member function pointer map @@ -2603,7 +2611,7 @@ void t_cpp_generator::generate_service_processor(t_service* tservice, string sty indent() << typename_str << "std::map::iterator pfn;" << endl << + "::apache::thrift::protocol::TProtocol*" << call_context_decl << ")>::iterator pfn;" << endl << indent() << "pfn = processMap_.find(fname);" << endl << indent() << "if (pfn == processMap_.end()) {" << endl; if (extends.empty()) { @@ -2623,11 +2631,11 @@ void t_cpp_generator::generate_service_processor(t_service* tservice, string sty indent() << " return " << extends << "::process_fn(" << (style == "Cob" ? "cob, " : "") - << "iprot, oprot, fname, seqid);" << endl; + << "iprot, oprot, fname, seqid" << call_context_arg << ");" << endl; } out << indent() << "}" << endl << - indent() << "(this->*(pfn->second))(" << cob_arg << "seqid, iprot, oprot);" << endl; + indent() << "(this->*(pfn->second))(" << cob_arg << "seqid, iprot, oprot" << call_context_arg << ");" << endl; // TODO(dreiss): return pfn ret? if (style == "Cob") { @@ -2746,7 +2754,7 @@ void t_cpp_generator::generate_process_function(t_service* tservice, out << "void " << tservice->get_name() << "Processor" << class_suffix << "::" << "process_" << tfunction->get_name() << "(int32_t seqid, " << - prot_type << "* iprot, " << prot_type << "* oprot)" << endl; + prot_type << "* iprot, " << prot_type << "* oprot, void* callContext)" << endl; scope_up(out); if (gen_templates_ && !specialized) { @@ -2759,7 +2767,7 @@ void t_cpp_generator::generate_process_function(t_service* tservice, endl << indent() << "if (_iprot && _oprot) {" << endl << indent() << " return process_" << tfunction->get_name() << - "(seqid, _iprot, _oprot);" << endl << + "(seqid, _iprot, _oprot, callContext);" << endl << indent() << "}" << endl << endl; } @@ -2769,7 +2777,7 @@ void t_cpp_generator::generate_process_function(t_service* tservice, out << indent() << "void* ctx = NULL;" << endl << indent() << "if (eventHandler_.get() != NULL) {" << endl << - indent() << " ctx = eventHandler_->getContext(\"" << service_func_name << "\");" << endl << + indent() << " ctx = eventHandler_->getContext(\"" << service_func_name << "\", callContext);" << endl << indent() << "}" << endl << indent() << "::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, \"" << service_func_name << "\");" << endl << endl << indent() << "if (eventHandler_.get() != NULL) {" << endl << @@ -2901,6 +2909,7 @@ void t_cpp_generator::generate_process_function(t_service* tservice, // Cob style. else { // Processor entry point. + // TODO(edhall) update for callContext when TEventServer is ready if (gen_templates_) { out << indent() << "template " << endl; @@ -2932,7 +2941,7 @@ void t_cpp_generator::generate_process_function(t_service* tservice, indent() << tservice->get_name() + "_" + tfunction->get_name() + "_args" << " args;" << endl << indent() << "void* ctx = NULL;" << endl << indent() << "if (eventHandler_.get() != NULL) {" << endl << - indent() << " ctx = eventHandler_->getContext(\"" << service_func_name << "\");" << endl << + indent() << " ctx = eventHandler_->getContext(\"" << service_func_name << "\", NULL);" << endl << indent() << "}" << endl << indent() << "::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, \"" << service_func_name << "\");" << endl << endl << indent() << "try {" << endl; @@ -3073,7 +3082,7 @@ void t_cpp_generator::generate_process_function(t_service* tservice, out << endl << indent() << "if (eventHandler_.get() != NULL) {" << endl << - indent() << " ctx = eventHandler_->getContext(\"" << service_func_name << "\");" << endl << + indent() << " ctx = eventHandler_->getContext(\"" << service_func_name << "\", NULL);" << endl << indent() << "}" << endl << indent() << "::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, \"" << service_func_name << "\");" << endl << endl << indent() << "if (eventHandler_.get() != NULL) {" << endl << @@ -3142,7 +3151,7 @@ void t_cpp_generator::generate_process_function(t_service* tservice, out << endl << indent() << "if (eventHandler_.get() != NULL) {" << endl << - indent() << " ctx = eventHandler_->getContext(\"" << service_func_name << "\");" << endl << + indent() << " ctx = eventHandler_->getContext(\"" << service_func_name << "\", NULL);" << endl << indent() << "}" << endl << indent() << "::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, \"" << service_func_name << "\");" << endl << endl << indent() << "if (eventHandler_.get() != NULL) {" << endl << diff --git a/contrib/fb303/TClientInfo.cpp b/contrib/fb303/TClientInfo.cpp new file mode 100644 index 00000000..d0930179 --- /dev/null +++ b/contrib/fb303/TClientInfo.cpp @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +namespace apache { namespace thrift { namespace server { + +using namespace apache::thrift; +using namespace apache::thrift::transport; + +TClientInfoConnection::TClientInfoConnection() { + call_[kNameLen - 1] = '\0'; // insure NUL terminator is there + eraseAddr(); + eraseCall(); +} + +void TClientInfoConnection::recordAddr(const sockaddr* addr) { + eraseAddr(); + initTime(); + ncalls_ = 0; + if (addr != NULL) { + if (addr->sa_family == AF_INET) { + memcpy((void*)&addr_.ipv4, (const void *)addr, sizeof(sockaddr_in)); + } + else if (addr->sa_family == AF_INET6) { + memcpy((void*)&addr_.ipv6, (const void *)addr, sizeof(sockaddr_in6)); + } + } +} + +void TClientInfoConnection::eraseAddr() { + addr_.ipv4.sin_family = AF_UNSPEC; +} + +const char* TClientInfoConnection::getAddr(char* buf, int len) const { + switch (addr_.ipv4.sin_family) { + case AF_INET: + return inet_ntop(AF_INET, &addr_.ipv4.sin_addr, buf, len); + case AF_INET6: + return inet_ntop(AF_INET6, &addr_.ipv6.sin6_addr, buf, len); + default: + return NULL; + } +} + +void TClientInfoConnection::recordCall(const char* name) { + strncpy(call_, name, kNameLen - 1); // NUL terminator set in constructor + ncalls_++; +} + +void TClientInfoConnection::eraseCall() { + call_[0] = '\0'; +} + +const char* TClientInfoConnection::getCall() const { + if (call_[0] == '\0') { + return NULL; + } + return call_; +} + +void TClientInfoConnection::getTime(timespec* time) const { + *time = time_; +} + +uint64_t TClientInfoConnection::getNCalls() const { + return ncalls_; +} + +void TClientInfoConnection::initTime() { + clock_gettime(CLOCK_REALTIME, &time_); +} + + +TClientInfoConnection* TClientInfo::getConnection(int fd, bool grow) { + if (fd < 0 || (!grow && fd >= info_.size())) { + return NULL; + } + return &info_[fd]; +} + +size_t TClientInfo::size() const { + return info_.size(); +} + +void* TClientInfoServerHandler::createContext(boost::shared_ptr input, + boost::shared_ptr output) { + (void)input; + (void)output; + return (void*) new Connect(&clientInfo_); +} + +void TClientInfoServerHandler::deleteContext(void* connectionContext, + boost::shared_ptr input, + boost::shared_ptr output) { + Connect* call = static_cast(connectionContext); + if (call->callInfo_) { + call->callInfo_->eraseCall(); + } + delete call; +} + +void TClientInfoServerHandler::processContext(void* connectionContext, + shared_ptr transport) { + Connect* call = static_cast(connectionContext); + if (call->callInfo_ == NULL) { + if (typeid(*(transport.get())) == typeid(TSocket)) { + TSocket* tsocket = static_cast(transport.get()); + int fd = tsocket->getSocketFD(); + if (fd < 0) { + return; + } + call->callInfo_ = call->clientInfo_->getConnection(fd, true); + assert(call->callInfo_ != NULL); + socklen_t len; + call->callInfo_->recordAddr(tsocket->getCachedAddress(&len)); + } + } +} + +void TClientInfoServerHandler::getStatsStrings(vector& result) { + result.clear(); + timespec now; + clock_gettime(CLOCK_REALTIME, &now); + + for (int i = 0; i < clientInfo_.size(); ++i) { + TClientInfoConnection* info = clientInfo_.getConnection(i, false); + const char* callStr = info->getCall(); + if (callStr == NULL) { + continue; + } + + char addrBuf[INET6_ADDRSTRLEN]; + const char* addrStr = info->getAddr(addrBuf, sizeof addrBuf); + if (addrStr == NULL) { + // cerr << "no addr!" << endl; + continue; + } + + timespec start; + double secs = 0.0; + info->getTime(&start); + secs = (double)(now.tv_sec - start.tv_sec) + (now.tv_nsec - start.tv_nsec)*0.000000001; + + char buf[256]; + snprintf(buf, sizeof buf, "%d %s %s %.3f %llu", i, addrStr, callStr, secs, + (unsigned long long)info->getNCalls()); + + result.push_back(buf); + } +} + +void* TClientInfoCallHandler::getContext(const char* fn_name, void* serverContext) { + if (serverContext) { + TClientInfoConnection* callInfo = static_cast(serverContext)->callInfo_; + if (callInfo != NULL) { + callInfo->recordCall(fn_name); + } + } + return NULL; +} + +} } } // namespace apache::thrift::server diff --git a/contrib/fb303/TClientInfo.h b/contrib/fb303/TClientInfo.h new file mode 100644 index 00000000..9b2d284f --- /dev/null +++ b/contrib/fb303/TClientInfo.h @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _FACEBOOK_THRIFT_SERVER_TCLIENTINFO_H_ +#define _FACEBOOK_THRIFT_SERVER_TCLIENTINFO_H_ 1 + +// for inet_ntop -- +#include +#include +#include +#include + +namespace apache { namespace thrift { namespace server { + +using namespace apache::thrift; +using namespace apache::thrift::transport; +using namespace apache::thrift::concurrency; +using boost::shared_ptr; +using std::string; +using std::vector; + +/** + * StableVector -- a minimal vector class where growth is automatic and + * vector elements never move as the vector grows. Allocates new space + * as needed, but does not copy old values. + * + * A level vector stores a list of storage vectors containing the actual + * elements. Levels are added as needed, doubling in size each time. + * Locking is only done when a level is added. Access is amortized + * constant time. + */ +template +class StableVector { + /// The initial allocation as an exponent of 2 + static const uint32_t kInitialSizePowOf2 = 10; + /// The initial allocation size + static const uint32_t kInitialVectorSize = 1 << kInitialSizePowOf2; + /// This bound is guaranteed not to be exceeded on 64-bit archs + static const int kMaxLevels = 64; + + /// Values are kept in one or more of these + typedef vector Vect; + /// One or more value vectors are kept in one of these + typedef vector LevelVector; + + Mutex mutex_; + /// current size + size_t size_; + _Atomic_word vectLvl_; + LevelVector vects_; + + public: + /** + * Constructor -- initialize the level vector and allocate the + * initial storage vector + */ + StableVector() + : size_(0) + , vectLvl_(0) { + vects_.reserve(kMaxLevels); + Vect* storageVector(new Vect(1 << kInitialSizePowOf2)); + vects_.push_back(storageVector); + } + + private: + /** + * make sure the requested number of storage levels have been allocated. + */ + void expand(uint32_t level) { + // we need the guard to insure that we only allocate once. + Guard g(mutex_); + while (level > vectLvl_) { + Vect* levelVect(new Vect(1 << (vectLvl_ + kInitialSizePowOf2))); + vects_.push_back(levelVect); + // we need to make sure this is done after levelVect is inserted + // (what we want is effectively a memory barrier here). + __gnu_cxx::__atomic_add(&vectLvl_, 1); + } + } + + /** + * Given an index, determine which level and element of that level is + * required. Grows if needed. + */ + void which(uint32_t n, uint32_t* vno, uint32_t* idx) { + if (n >= size_) { + size_ = n + 1; + } + if (n < kInitialVectorSize) { + *idx = n; + *vno = 0; + } else { + uint32_t upper = n >> kInitialSizePowOf2; + *vno = CHAR_BIT*sizeof(upper) - __builtin_clz(upper); + *idx = n - (1 << (*vno + kInitialSizePowOf2 - 1)); + if (*vno > vectLvl_) { + expand(*vno); + } + } + } + + public: + /** + * Given an index, return a reference to that element, perhaps after + * allocating additional space. + * + * @param n a positive integer + */ + T& operator[](uint32_t n) { + uint32_t vno; + uint32_t idx; + which(n, &vno, &idx); + return (*vects_[vno])[idx]; + } + + /** + * Return the present size of the vector. + */ + size_t size() const { return size_; } +}; + + +/** + * This class embodies the representation of a single connection during + * processing. We'll keep one of these per file descriptor in TClientInfo. + */ +class TClientInfoConnection { + public: + const static int kNameLen = 32; + + private: + typedef union IPAddrUnion { + sockaddr_in ipv4; + sockaddr_in6 ipv6; + }; + + char call_[kNameLen]; ///< The name of the thrift call + IPAddrUnion addr_; ///< The client's IP address + timespec time_; ///< Time processing started + uint64_t ncalls_; ///< # of calls processed + + public: + /** + * Constructor; insure that no client address or thrift call name is + * represented. + */ + TClientInfoConnection(); + + /** + * A connection has been made; record its address. Since this is the + * first we'll know of a connection we start the timer here as well. + */ + void recordAddr(const sockaddr* addr); + + /** + * Mark the address as empty/unknown. + */ + void eraseAddr(); + + /** + * Return a string representing the present address, or NULL if none. + * Copies the string into the buffer provided. + */ + const char* getAddr(char* buf, int len) const; + + /** + * A call has been made on this connection; record its name. Since this is + * called for every thrift call processed, we also do our call count here. + */ + void recordCall(const char* name); + + /** + * Invoked when processing has ended to clear the call name. + */ + void eraseCall(); + + /** + * Return as string the thrift call either currently being processed or + * most recently processed if the connection is still open for additonal + * calls. Returns NULL if a call hasn't been made yet or processing + * has ended. + */ + const char* getCall() const; + + /** + * Get the timespec for the start of this connection (specifically, when + * recordAddr() was first called). + */ + void getTime(timespec* time) const; + + /** + * Return the number of calls made on this connection. + */ + uint64_t getNCalls() const; + + private: + void initTime(); +}; + + +/** + * Store for info about a server's clients -- specifically, the client's IP + * address and the call it is executing. This information is indexed by + * socket file descriptor and in the present implementation is updated + * asynchronously, so it may only approximate reality. + */ +class TClientInfo { + private: + StableVector info_; + + public: + /** + * Return the info object for a given file descriptor. If "grow" is true + * extend the info vector if required (such as for a file descriptor not seen + * before). If "grow" is false and the info vector isn't large enough, + * or if "fd" is negative, return NULL. + */ + TClientInfoConnection* getConnection(int fd, bool grow); + + size_t size() const; +}; + +/** + * This derivation of TServerEventHandler encapsulates the main status vector + * and provides context to the server's processing loop via overrides. + * Together with TClientInfoCallHandler (derived from TProcessorEventHandler) + * it integrates client info collection into the server. + */ +class TClientInfoServerHandler : public TServerEventHandler { + private: + TClientInfo clientInfo_; + + public: + /** + * One of these is constructed for each open connection/descriptor and links + * to both the status vector (clientInfo_) and that descriptor's entry + * within it. + */ + struct Connect { + TClientInfo* clientInfo_; + TClientInfoConnection* callInfo_; + + explicit Connect(TClientInfo* clientInfo) + : clientInfo_(clientInfo) + , callInfo_(NULL) { + } + }; + + /** + * Generate processor context; we don't know what descriptor we belong to + * yet -- we'll get hooked up in contextProcess(). + */ + void* createContext(boost::shared_ptr input, + boost::shared_ptr output); + + /** + * Mark our slot as unused and delete the context created in createContext(). + */ + void deleteContext(void* processorContext, + boost::shared_ptr input, + boost::shared_ptr output); + + /** + * Called in the processing loop just before the server invokes the + * processor itself, on the first call we establish which descriptor + * we correspond to and set it to that socket's peer IP address. This + * also has the side effect of initializing call counting and connection + * timing. We won't know which call we're handling until the handler + * first gets called in TClientInfoCallHandler::getContext(). + */ + void processContext(void* processorContext, + shared_ptr transport); + + /** + * Get status report for server in the form of a vector of strings. + * Each active client appears as one string in the format: + * + * FD IPADDR CALLNAME DURATION NCALLS + * + * where "FD" is the file descriptor for the client's socket, "IPADDR" + * is the IP address (as reported by accept()), "CALLNAME" is the + * current or most recent Thrift function name, "DURATION" is the + * duration of the connection, while NCALLS is the number of Thrift + * calls made since the connection was made. A single space separates + * fields. + */ + void getStatsStrings(vector& result); +}; + +/** + * This class derives from TProcessorEventHandler to gain access to the + * function name for the current Thrift call. We need two versions of + * this -- TClientInfoCallStatsHandler is the other -- since in the latter + * case we pass through to TFunctionStatHandler to perform Thrift call + * stats. + */ +class TClientInfoCallHandler : public TProcessorEventHandler { + public: + virtual void* getContext(const char* fn_name, void* serverContext); +}; + +} } } // namespace apache::thrift::server + +#endif // !_FACEBOOK_THRIFT_SERVER_TCLIENTINFO_H_ diff --git a/lib/cpp/src/TProcessor.h b/lib/cpp/src/TProcessor.h index 7858166e..16b46df0 100644 --- a/lib/cpp/src/TProcessor.h +++ b/lib/cpp/src/TProcessor.h @@ -44,7 +44,7 @@ class TProcessorEventHandler { * The return value is passed to all other callbacks * for that function invocation. */ - virtual void* getContext(const char* fn_name) { return NULL; } + virtual void* getContext(const char* fn_name, void* serverContext) { return NULL; } /** * Expected to free resources associated with a context. @@ -112,10 +112,12 @@ class TProcessor { virtual ~TProcessor() {} virtual bool process(boost::shared_ptr in, - boost::shared_ptr out) = 0; + boost::shared_ptr out, + void* connectionContext) = 0; - bool process(boost::shared_ptr io) { - return process(io, io); + bool process(boost::shared_ptr io, + void* connectionContext) { + return process(io, io, connectionContext); } boost::shared_ptr getEventHandler() { diff --git a/lib/cpp/src/processor/PeekProcessor.cpp b/lib/cpp/src/processor/PeekProcessor.cpp index c721861b..076975b5 100644 --- a/lib/cpp/src/processor/PeekProcessor.cpp +++ b/lib/cpp/src/processor/PeekProcessor.cpp @@ -58,7 +58,8 @@ void PeekProcessor::setTargetTransport(boost::shared_ptr targetTrans } bool PeekProcessor::process(boost::shared_ptr in, - boost::shared_ptr out) { + boost::shared_ptr out, + void* connectionContext) { std::string fname; TMessageType mtype; @@ -100,7 +101,7 @@ bool PeekProcessor::process(boost::shared_ptr in, // Done peeking at variables peekEnd(); - bool ret = actualProcessor_->process(pipedProtocol_, out); + bool ret = actualProcessor_->process(pipedProtocol_, out, connectionContext); memoryBuffer_->resetBuffer(); return ret; } diff --git a/lib/cpp/src/processor/PeekProcessor.h b/lib/cpp/src/processor/PeekProcessor.h index 0f7c016a..cb703f65 100644 --- a/lib/cpp/src/processor/PeekProcessor.h +++ b/lib/cpp/src/processor/PeekProcessor.h @@ -53,7 +53,8 @@ class PeekProcessor : public apache::thrift::TProcessor { void setTargetTransport(boost::shared_ptr targetTransport); virtual bool process(boost::shared_ptr in, - boost::shared_ptr out); + boost::shared_ptr out, + void* connectionContext); // The following three functions can be overloaded by child classes to // achieve desired peeking behavior diff --git a/lib/cpp/src/processor/StatsProcessor.h b/lib/cpp/src/processor/StatsProcessor.h index 820b3ad4..8600c6b0 100644 --- a/lib/cpp/src/processor/StatsProcessor.h +++ b/lib/cpp/src/processor/StatsProcessor.h @@ -39,7 +39,9 @@ public: {} virtual ~StatsProcessor() {}; - virtual bool process(boost::shared_ptr piprot, boost::shared_ptr poprot) { + virtual bool process(boost::shared_ptr piprot, + boost::shared_ptr poprot, + void* serverContext) { piprot_ = piprot; diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp index 85fe2657..4245d5e9 100644 --- a/lib/cpp/src/server/TNonblockingServer.cpp +++ b/lib/cpp/src/server/TNonblockingServer.cpp @@ -56,7 +56,7 @@ class TConnection::Task: public Runnable { void run() { try { - while (processor_->process(input_, output_)) { + while (processor_->process(input_, output_, NULL)) { if (!input_->getTransport()->peek()) { break; } @@ -293,7 +293,7 @@ void TConnection::transition() { } else { try { // Invoke the processor - server_->getProcessor()->process(inputProtocol_, outputProtocol_); + server_->getProcessor()->process(inputProtocol_, outputProtocol_, NULL); } catch (TTransportException &ttx) { GlobalOutput.printf("TTransportException: Server::process() %s", ttx.what()); server_->decrementActiveProcessors(); diff --git a/lib/cpp/src/server/TServer.h b/lib/cpp/src/server/TServer.h index 5c4c588d..4dddfeae 100644 --- a/lib/cpp/src/server/TServer.h +++ b/lib/cpp/src/server/TServer.h @@ -57,14 +57,33 @@ class TServerEventHandler { /** * Called when a new client has connected and is about to being processing. */ - virtual void clientBegin(boost::shared_ptr /* input */, - boost::shared_ptr /* output */) {} + virtual void* createContext(boost::shared_ptr input, + boost::shared_ptr output) { + (void)input; + (void)output; + return NULL; + } + + /** + * Called when a client has finished request-handling to delete server + * context. + */ + virtual void deleteContext(void* serverContext, + boost::shared_ptrinput, + boost::shared_ptroutput) { + (void)serverContext; + (void)input; + (void)output; + } /** - * Called when a client has finished making requests. + * Called when a client is about to call the processor. */ - virtual void clientEnd(boost::shared_ptr /* input */, - boost::shared_ptr /* output */) {} + virtual void processContext(void* serverContext, + boost::shared_ptr transport) { + (void)serverContext; + (void)transport; +} protected: diff --git a/lib/cpp/src/server/TSimpleServer.cpp b/lib/cpp/src/server/TSimpleServer.cpp index 394ce21e..438a587e 100644 --- a/lib/cpp/src/server/TSimpleServer.cpp +++ b/lib/cpp/src/server/TSimpleServer.cpp @@ -63,13 +63,18 @@ void TSimpleServer::serve() { outputTransport = outputTransportFactory_->getTransport(client); inputProtocol = inputProtocolFactory_->getProtocol(inputTransport); outputProtocol = outputProtocolFactory_->getProtocol(outputTransport); + void* connectionContext = NULL; if (eventHandler_ != NULL) { - eventHandler_->clientBegin(inputProtocol, outputProtocol); + connectionContext = eventHandler_->createContext(inputProtocol, outputProtocol); } try { - while (processor_->process(inputProtocol, outputProtocol)) { - // Peek ahead, is the remote side closed? - if (!inputTransport->peek()) { + for (;;) { + if (eventHandler_ != NULL) { + eventHandler_->processContext(connectionContext, client); + } + if (!processor_->process(inputProtocol, outputProtocol, connectionContext) || + // Peek ahead, is the remote side closed? + !inputProtocol->getTransport()->peek()) { break; } } @@ -79,7 +84,7 @@ void TSimpleServer::serve() { cerr << "TSimpleServer exception: " << tx.what() << endl; } if (eventHandler_ != NULL) { - eventHandler_->clientEnd(inputProtocol, outputProtocol); + eventHandler_->deleteContext(connectionContext, inputProtocol, outputProtocol); } inputTransport->close(); outputTransport->close(); diff --git a/lib/cpp/src/server/TThreadPoolServer.cpp b/lib/cpp/src/server/TThreadPoolServer.cpp index 6eea3dbd..18319be7 100644 --- a/lib/cpp/src/server/TThreadPoolServer.cpp +++ b/lib/cpp/src/server/TThreadPoolServer.cpp @@ -40,11 +40,13 @@ public: Task(TThreadPoolServer &server, shared_ptr processor, shared_ptr input, - shared_ptr output) : + shared_ptr output, + shared_ptr transport) : server_(server), processor_(processor), input_(input), - output_(output) { + output_(output), + transport_(transport) { } ~Task() {} @@ -52,12 +54,17 @@ public: void run() { boost::shared_ptr eventHandler = server_.getEventHandler(); + void* connectionContext = NULL; if (eventHandler != NULL) { - eventHandler->clientBegin(input_, output_); + connectionContext = eventHandler->createContext(input_, output_); } try { - while (processor_->process(input_, output_)) { - if (!input_->getTransport()->peek()) { + for (;;) { + if (eventHandler != NULL) { + eventHandler->processContext(connectionContext, transport_); + } + if (!processor_->process(input_, output_, connectionContext) || + !input_->getTransport()->peek()) { break; } } @@ -78,7 +85,7 @@ public: } if (eventHandler != NULL) { - eventHandler->clientEnd(input_, output_); + eventHandler->deleteContext(connectionContext, input_, output_); } try { @@ -101,7 +108,7 @@ public: shared_ptr processor_; shared_ptr input_; shared_ptr output_; - + shared_ptr transport_; }; TThreadPoolServer::TThreadPoolServer(shared_ptr processor, @@ -167,7 +174,7 @@ void TThreadPoolServer::serve() { outputProtocol = outputProtocolFactory_->getProtocol(outputTransport); // Add to threadmanager pool - threadManager_->add(shared_ptr(new TThreadPoolServer::Task(*this, processor_, inputProtocol, outputProtocol)), timeout_); + threadManager_->add(shared_ptr(new TThreadPoolServer::Task(*this, processor_, inputProtocol, outputProtocol, client)), timeout_); } catch (TTransportException& ttx) { if (inputTransport != NULL) { inputTransport->close(); } diff --git a/lib/cpp/src/server/TThreadedServer.cpp b/lib/cpp/src/server/TThreadedServer.cpp index cc30f8ff..11718cac 100644 --- a/lib/cpp/src/server/TThreadedServer.cpp +++ b/lib/cpp/src/server/TThreadedServer.cpp @@ -42,11 +42,13 @@ public: Task(TThreadedServer& server, shared_ptr processor, shared_ptr input, - shared_ptr output) : + shared_ptr output, + shared_ptr transport) : server_(server), processor_(processor), input_(input), - output_(output) { + output_(output), + transport_(transport) { } ~Task() {} @@ -54,12 +56,17 @@ public: void run() { boost::shared_ptr eventHandler = server_.getEventHandler(); + void* connectionContext = NULL; if (eventHandler != NULL) { - eventHandler->clientBegin(input_, output_); + connectionContext = eventHandler->createContext(input_, output_); } try { - while (processor_->process(input_, output_)) { - if (!input_->getTransport()->peek()) { + for (;;) { + if (eventHandler != NULL) { + eventHandler->processContext(connectionContext, transport_); + } + if (!processor_->process(input_, output_, connectionContext) || + !input_->getTransport()->peek()) { break; } } @@ -73,7 +80,7 @@ public: GlobalOutput("TThreadedServer uncaught exception."); } if (eventHandler != NULL) { - eventHandler->clientEnd(input_, output_); + eventHandler->deleteContext(connectionContext, input_, output_); } try { @@ -107,6 +114,7 @@ public: shared_ptr processor_; shared_ptr input_; shared_ptr output_; + shared_ptr transport_; }; @@ -173,7 +181,8 @@ void TThreadedServer::serve() { TThreadedServer::Task* task = new TThreadedServer::Task(*this, processor_, inputProtocol, - outputProtocol); + outputProtocol, + client); // Create a task shared_ptr runnable = diff --git a/lib/cpp/src/transport/TFileTransport.cpp b/lib/cpp/src/transport/TFileTransport.cpp index 0b41694e..40841acc 100644 --- a/lib/cpp/src/transport/TFileTransport.cpp +++ b/lib/cpp/src/transport/TFileTransport.cpp @@ -966,7 +966,7 @@ void TFileProcessor::process(uint32_t numEvents, bool tail) { // bad form to use exceptions for flow control but there is really // no other way around it try { - processor_->process(inputProtocol, outputProtocol); + processor_->process(inputProtocol, outputProtocol, NULL); numProcessed++; if ( (numEvents > 0) && (numProcessed == numEvents)) { return; @@ -998,7 +998,7 @@ void TFileProcessor::processChunk() { // bad form to use exceptions for flow control but there is really // no other way around it try { - processor_->process(inputProtocol, outputProtocol); + processor_->process(inputProtocol, outputProtocol, NULL); if (curChunk != inputTransport_->getCurChunk()) { break; } diff --git a/lib/cpp/src/transport/TServerSocket.cpp b/lib/cpp/src/transport/TServerSocket.cpp index 836f6ba0..90a27ce8 100644 --- a/lib/cpp/src/transport/TServerSocket.cpp +++ b/lib/cpp/src/transport/TServerSocket.cpp @@ -393,7 +393,8 @@ shared_ptr TServerSocket::acceptImpl() { if (recvTimeout_ > 0) { client->setRecvTimeout(recvTimeout_); } - + client->setCachedAddress((sockaddr*) &clientAddress, size); + return client; } diff --git a/lib/cpp/src/transport/TSocket.cpp b/lib/cpp/src/transport/TSocket.cpp index 951ddcf1..ee76c3fe 100644 --- a/lib/cpp/src/transport/TSocket.cpp +++ b/lib/cpp/src/transport/TSocket.cpp @@ -78,6 +78,7 @@ TSocket::TSocket(string path) : maxRecvRetries_(5) { recvTimeval_.tv_sec = (int)(recvTimeout_/1000); recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000); + cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC; } TSocket::TSocket() : @@ -94,6 +95,7 @@ TSocket::TSocket() : maxRecvRetries_(5) { recvTimeval_.tv_sec = (int)(recvTimeout_/1000); recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000); + cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC; } TSocket::TSocket(int socket) : @@ -110,6 +112,7 @@ TSocket::TSocket(int socket) : maxRecvRetries_(5) { recvTimeval_.tv_sec = (int)(recvTimeout_/1000); recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000); + cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC; } TSocket::~TSocket() { @@ -273,6 +276,8 @@ void TSocket::openConnection(struct addrinfo *res) { done: // Set socket back to normal mode (blocking) fcntl(socket_, F_SETFL, flags); + + setCachedAddress(res->ai_addr, res->ai_addrlen); } void TSocket::open() { @@ -600,22 +605,29 @@ string TSocket::getSocketInfo() { std::string TSocket::getPeerHost() { if (peerHost_.empty()) { struct sockaddr_storage addr; - socklen_t addrLen = sizeof(addr); + struct sockaddr* addrPtr; + socklen_t addrLen; if (socket_ < 0) { return host_; } - int rv = getpeername(socket_, (sockaddr*) &addr, &addrLen); + addrPtr = getCachedAddress(&addrLen); + + if (addrPtr == NULL) { + addrLen = sizeof(addr); + if (getpeername(socket_, (sockaddr*) &addr, &addrLen) != 0) { + return peerHost_; + } + addrPtr = (sockaddr*)&addr; - if (rv != 0) { - return peerHost_; + setCachedAddress(addrPtr, addrLen); } char clienthost[NI_MAXHOST]; char clientservice[NI_MAXSERV]; - getnameinfo((sockaddr*) &addr, addrLen, + getnameinfo((sockaddr*) addrPtr, addrLen, clienthost, sizeof(clienthost), clientservice, sizeof(clientservice), 0); @@ -627,22 +639,29 @@ std::string TSocket::getPeerHost() { std::string TSocket::getPeerAddress() { if (peerAddress_.empty()) { struct sockaddr_storage addr; - socklen_t addrLen = sizeof(addr); + struct sockaddr* addrPtr; + socklen_t addrLen; if (socket_ < 0) { return peerAddress_; } - int rv = getpeername(socket_, (sockaddr*) &addr, &addrLen); + addrPtr = getCachedAddress(&addrLen); - if (rv != 0) { - return peerAddress_; + if (addrPtr == NULL) { + addrLen = sizeof(addr); + if (getpeername(socket_, (sockaddr*) &addr, &addrLen) != 0) { + return peerAddress_; + } + addrPtr = (sockaddr*)&addr; + + setCachedAddress(addrPtr, addrLen); } char clienthost[NI_MAXHOST]; char clientservice[NI_MAXSERV]; - getnameinfo((sockaddr*) &addr, addrLen, + getnameinfo(addrPtr, addrLen, clienthost, sizeof(clienthost), clientservice, sizeof(clientservice), NI_NUMERICHOST|NI_NUMERICSERV); @@ -658,6 +677,37 @@ int TSocket::getPeerPort() { return peerPort_; } +void TSocket::setCachedAddress(const sockaddr* addr, socklen_t len) { + switch (addr->sa_family) { + case AF_INET: + if (len == sizeof(sockaddr_in)) { + memcpy((void*)&cachedPeerAddr_.ipv4, (void*)addr, len); + } + break; + + case AF_INET6: + if (len == sizeof(sockaddr_in6)) { + memcpy((void*)&cachedPeerAddr_.ipv6, (void*)addr, len); + } + break; + } +} + +sockaddr* TSocket::getCachedAddress(socklen_t* len) const { + switch (cachedPeerAddr_.ipv4.sin_family) { + case AF_INET: + *len = sizeof(sockaddr_in); + return (sockaddr*) &cachedPeerAddr_.ipv4; + + case AF_INET6: + *len = sizeof(sockaddr_in6); + return (sockaddr*) &cachedPeerAddr_.ipv6; + + default: + return NULL; + } +} + bool TSocket::useLowMinRto_ = false; void TSocket::setUseLowMinRto(bool useLowMinRto) { useLowMinRto_ = useLowMinRto; diff --git a/lib/cpp/src/transport/TSocket.h b/lib/cpp/src/transport/TSocket.h index f195438e..47a702d1 100644 --- a/lib/cpp/src/transport/TSocket.h +++ b/lib/cpp/src/transport/TSocket.h @@ -192,6 +192,18 @@ class TSocket : public TVirtualTransport { **/ int getPeerPort(); + /** + * Returns the underlying socket file descriptor. + */ + int getSocketFD() { + return socket_; + } + + /* + * Returns a cached copy of the peer address. + */ + sockaddr* getCachedAddress(socklen_t* len) const; + /** * Sets whether to use a low minimum TCP retransmission timeout. */ @@ -211,6 +223,12 @@ class TSocket : public TVirtualTransport { /** connect, called by open */ void openConnection(struct addrinfo *res); + /** + * Set a cache of the peer address (used when trivially available: e.g. + * accept() or connect()). Only caches IPV4 and IPV6; unset for others. + */ + void setCachedAddress(const sockaddr* addr, socklen_t len); + /** Host to connect to */ std::string host_; @@ -256,6 +274,15 @@ class TSocket : public TVirtualTransport { /** Recv timeout timeval */ struct timeval recvTimeval_; + /** Cached peer address */ + union { + sockaddr_in ipv4; + sockaddr_in6 ipv6; + } cachedPeerAddr_; + + /** Connection start time */ + timespec startTime_; + /** Whether to use low minimum TCP retransmission timeout */ static bool useLowMinRto_; diff --git a/test/cpp/src/TestServer.cpp b/test/cpp/src/TestServer.cpp index 685957ab..d30475b9 100644 --- a/test/cpp/src/TestServer.cpp +++ b/test/cpp/src/TestServer.cpp @@ -289,7 +289,7 @@ class TestHandler : public ThriftTestIf { class TestProcessorEventHandler : public TProcessorEventHandler { - virtual void* getContext(const char* fn_name) { + virtual void* getContext(const char* fn_name, void* serverContext) { return new std::string(fn_name); } virtual void freeContext(void* ctx, const char* fn_name) { -- 2.17.1