From b9ff32ae30af5a667efa7d1d3b39687d5758c742 Mon Sep 17 00:00:00 2001 From: Mark Slee Date: Thu, 16 Nov 2006 01:00:24 +0000 Subject: [PATCH] Thrift: C++ peek() method and TException not Exception git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664876 13f79535-47bb-0310-9956-ffa450edef68 --- compiler/cpp/src/generate/t_cpp_generator.cc | 8 +-- lib/cpp/src/Thrift.h | 14 +++-- .../src/concurrency/PosixThreadFactory.cpp | 13 ++-- lib/cpp/src/server/TNonblockingServer.cpp | 10 +++- lib/cpp/src/server/TNonblockingServer.h | 2 +- lib/cpp/src/server/TSimpleServer.cpp | 13 ++-- lib/cpp/src/server/TThreadPoolServer.cpp | 21 ++++--- lib/cpp/src/transport/TSocket.cpp | 24 +++++++- lib/cpp/src/transport/TSocket.h | 9 +++ lib/cpp/src/transport/TTransport.h | 16 ++++- lib/cpp/src/transport/TTransportException.h | 33 +++++------ lib/cpp/src/transport/TTransportUtils.cpp | 1 - lib/cpp/src/transport/TTransportUtils.h | 20 ++++++- test/cpp/src/TestClient.cpp | 4 +- test/cpp/src/main.cpp | 59 +++++++++++++++++-- 15 files changed, 188 insertions(+), 59 deletions(-) diff --git a/compiler/cpp/src/generate/t_cpp_generator.cc b/compiler/cpp/src/generate/t_cpp_generator.cc index 3a382144..01345c75 100644 --- a/compiler/cpp/src/generate/t_cpp_generator.cc +++ b/compiler/cpp/src/generate/t_cpp_generator.cc @@ -845,7 +845,7 @@ void t_cpp_generator::generate_service_client(t_service* tservice) { indent() << "if (mtype != facebook::thrift::protocol::T_REPLY || fname.compare(\"" << (*f_iter)->get_name() << "\") != 0) {" << endl; indent_up(); f_service_ << - indent() << "throw facebook::thrift::Exception(\"Unexpected message type, name, or id\");" << endl; + indent() << "throw facebook::thrift::TException(\"Unexpected message type, name, or id\");" << endl; indent_down(); f_service_ << @@ -879,7 +879,7 @@ void t_cpp_generator::generate_service_client(t_service* tservice) { "return;" << endl; } else { f_service_ << - indent() << "throw facebook::thrift::Exception(\"" << (*f_iter)->get_name() << " failed: unknown result\");" << endl; + indent() << "throw facebook::thrift::TException(\"" << (*f_iter)->get_name() << " failed: unknown result\");" << endl; } // Close function @@ -985,7 +985,7 @@ void t_cpp_generator::generate_service_processor(t_service* tservice) { indent() << "iprot->readMessageBegin(fname, mtype, seqid);" << endl << endl << indent() << "if (mtype != facebook::thrift::protocol::T_CALL) {" << endl << - indent() << " throw facebook::thrift::Exception(\"Unexpected message type\");" << endl << + indent() << " throw facebook::thrift::TException(\"Unexpected message type\");" << endl << indent() << "}" << endl << endl << indent() << "return process_fn(iprot, oprot, fname, seqid);" << @@ -1007,7 +1007,7 @@ void t_cpp_generator::generate_service_processor(t_service* tservice) { indent() << "if (pfn == processMap_.end()) {" << endl; if (extends.empty()) { f_service_ << - indent() << " throw facebook::thrift::Exception(\"Unknown function name: '\"+fname+\"'\");" << endl; + indent() << " throw facebook::thrift::TException(\"Unknown function name: '\"+fname+\"'\");" << endl; } else { f_service_ << indent() << " return " << extends << "Processor::process_fn(iprot, oprot, fname, seqid);" << endl; diff --git a/lib/cpp/src/Thrift.h b/lib/cpp/src/Thrift.h index 92143cc9..e497255e 100644 --- a/lib/cpp/src/Thrift.h +++ b/lib/cpp/src/Thrift.h @@ -14,15 +14,21 @@ namespace facebook { namespace thrift { -class Exception : public std::exception { +class TException : public std::exception { public: - Exception(const std::string message) : + TException() {} + + TException(const std::string message) : message_(message) {} - ~Exception() throw () {} + ~TException() throw() {} const char* what() { - return message_.c_str(); + if (message_.empty()) { + return "Default TException."; + } else { + return message_.c_str(); + } } private: diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/concurrency/PosixThreadFactory.cpp index 74a3ec31..262fb2f1 100644 --- a/lib/cpp/src/concurrency/PosixThreadFactory.cpp +++ b/lib/cpp/src/concurrency/PosixThreadFactory.cpp @@ -138,12 +138,15 @@ class PosixThreadFactory::Impl { * API values. */ static int toPthreadPolicy(POLICY policy) { - switch(policy) { - case OTHER: return SCHED_OTHER; break; - case FIFO: return SCHED_FIFO; break; - case ROUND_ROBIN: return SCHED_RR; break; - default: return SCHED_OTHER; break; + switch (policy) { + case OTHER: + return SCHED_OTHER; + case FIFO: + return SCHED_FIFO; + case ROUND_ROBIN: + return SCHED_RR; } + return SCHED_OTHER; } /** diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp index 75a209ea..a7c8393d 100644 --- a/lib/cpp/src/server/TNonblockingServer.cpp +++ b/lib/cpp/src/server/TNonblockingServer.cpp @@ -154,10 +154,14 @@ void TConnection::transition() { try { // Invoke the processor server_->getProcessor()->process(inputProtocol_, outputProtocol_); - } catch (TTransportException &x) { - fprintf(stderr, "Server::process %s\n", x.getMessage().c_str()); + } catch (TTransportException &ttx) { + fprintf(stderr, "Server::process() %s\n", ttx.what()); close(); - return; + return; + } catch (TException &x) { + fprintf(stderr, "Server::process() %s\n", x.what()); + close(); + return; } catch (...) { fprintf(stderr, "Server::process() unknown exception\n"); close(); diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h index faa572ff..11b58b1a 100644 --- a/lib/cpp/src/server/TNonblockingServer.h +++ b/lib/cpp/src/server/TNonblockingServer.h @@ -186,7 +186,7 @@ class TConnection { TConnection(int socket, short eventFlags, TNonblockingServer *s) { readBuffer_ = (uint8_t*)malloc(1024); if (readBuffer_ == NULL) { - throw new facebook::thrift::Exception("Out of memory."); + throw new facebook::thrift::TException("Out of memory."); } readBufferSize_ = 1024; diff --git a/lib/cpp/src/server/TSimpleServer.cpp b/lib/cpp/src/server/TSimpleServer.cpp index 3eb035ee..b453ce65 100644 --- a/lib/cpp/src/server/TSimpleServer.cpp +++ b/lib/cpp/src/server/TSimpleServer.cpp @@ -21,7 +21,7 @@ void TSimpleServer::serve() { // Start the server listening serverTransport_->listen(); } catch (TTransportException& ttx) { - cerr << "TSimpleServer::run() listen(): " << ttx.getMessage() << endl; + cerr << "TSimpleServer::run() listen(): " << ttx.what() << endl; return; } @@ -32,16 +32,21 @@ void TSimpleServer::serve() { iot = transportFactory_->getIOTransports(client); iop = protocolFactory_->getIOProtocols(iot.first, iot.second); try { - while (processor_->process(iop.first, iop.second)) {} + while (processor_->process(iop.first, iop.second)) { + // Peek ahead, is the remote side closed? + if (!iot.first->peek()) { + break; + } + } } catch (TTransportException& ttx) { - cerr << "TSimpleServer client died: " << ttx.getMessage() << endl; + cerr << "TSimpleServer client died: " << ttx.what() << endl; } iot.first->close(); iot.second->close(); client->close(); } } catch (TTransportException& ttx) { - cerr << "TServerTransport died on accept: " << ttx.getMessage() << endl; + cerr << "TServerTransport died on accept: " << ttx.what() << endl; } // TODO(mcslee): Could this be a timeout case? Or always the real thing? diff --git a/lib/cpp/src/server/TThreadPoolServer.cpp b/lib/cpp/src/server/TThreadPoolServer.cpp index 7885f0f5..357152bc 100644 --- a/lib/cpp/src/server/TThreadPoolServer.cpp +++ b/lib/cpp/src/server/TThreadPoolServer.cpp @@ -8,6 +8,7 @@ namespace facebook { namespace thrift { namespace server { using namespace std; +using namespace facebook::thrift; using namespace facebook::thrift::concurrency; using namespace facebook::thrift::transport; @@ -26,14 +27,18 @@ public: ~Task() {} void run() { - while(true) { - try { - processor_->process(input_, output_); - } catch (TTransportException& ttx) { - break; - } catch(...) { - break; + try { + while (processor_->process(input_, output_)) { + if (!input_->getInputTransport()->peek()) { + break; + } } + } catch (TTransportException& ttx) { + cerr << "TThreadPoolServer client died: " << ttx.what() << endl; + } catch (TException& x) { + cerr << "TThreadPoolServer exception: " << x.what() << endl; + } catch (...) { + cerr << "TThreadPoolServer uncaught exception." << endl; } input_->getInputTransport()->close(); output_->getOutputTransport()->close(); @@ -68,7 +73,7 @@ void TThreadPoolServer::serve() { // Start the server listening serverTransport_->listen(); } catch (TTransportException& ttx) { - cerr << "TThreadPoolServer::run() listen(): " << ttx.getMessage() << endl; + cerr << "TThreadPoolServer::run() listen(): " << ttx.what() << endl; return; } diff --git a/lib/cpp/src/transport/TSocket.cpp b/lib/cpp/src/transport/TSocket.cpp index de3bea7b..8b9048c8 100644 --- a/lib/cpp/src/transport/TSocket.cpp +++ b/lib/cpp/src/transport/TSocket.cpp @@ -43,6 +43,8 @@ TSocket::TSocket(string host, int port) : lingerOn_(1), lingerVal_(0), noDelay_(1) { + recvTimeval_.tv_sec = (int)(recvTimeout_/1000); + recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000); } TSocket::TSocket(int socket) : @@ -55,6 +57,8 @@ TSocket::TSocket(int socket) : lingerOn_(1), lingerVal_(0), noDelay_(1) { + recvTimeval_.tv_sec = (int)(recvTimeout_/1000); + recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000); } TSocket::~TSocket() { @@ -65,6 +69,20 @@ bool TSocket::isOpen() { return (socket_ > 0); } +bool TSocket::peek() { + if (!isOpen()) { + return false; + } + uint8_t buf; + int r = recv(socket_, &buf, 1, MSG_PEEK); + if (r == -1) { + perror("TSocket::peek()"); + close(); + throw TTransportException(TTX_UNKNOWN, "recv() ERROR:" + errno); + } + return (r > 0); +} + void TSocket::open() { // Create socket socket_ = socket(AF_INET, SOCK_STREAM, 0); @@ -322,12 +340,14 @@ void TSocket::setConnTimeout(int ms) { void TSocket::setRecvTimeout(int ms) { recvTimeout_ = ms; + recvTimeval_.tv_sec = (int)(recvTimeout_/1000); + recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000); if (socket_ <= 0) { return; } - struct timeval r = {(int)(recvTimeout_/1000), - (int)((recvTimeout_%1000)*1000)}; + // Copy because select may modify + struct timeval r = recvTimeval_; int ret = setsockopt(socket_, SOL_SOCKET, SO_RCVTIMEO, &r, sizeof(r)); if (ret == -1) { perror("TSocket::setRecvTimeout()"); diff --git a/lib/cpp/src/transport/TSocket.h b/lib/cpp/src/transport/TSocket.h index b946b6ae..8137984a 100644 --- a/lib/cpp/src/transport/TSocket.h +++ b/lib/cpp/src/transport/TSocket.h @@ -2,6 +2,7 @@ #define _THRIFT_TRANSPORT_TSOCKET_H_ 1 #include +#include #include "TTransport.h" #include "TServerSocket.h" @@ -44,6 +45,11 @@ class TSocket : public TTransport { */ bool isOpen(); + /** + * Calls select on the socket to see if there is more data available. + */ + bool peek(); + /** * Creates and opens the UNIX socket. * @@ -131,6 +137,9 @@ class TSocket : public TTransport { /** Nodelay */ bool noDelay_; + + /** Recv timeout timeval */ + struct timeval recvTimeval_; }; }}} // facebook::thrift::transport diff --git a/lib/cpp/src/transport/TTransport.h b/lib/cpp/src/transport/TTransport.h index 7b4cbe14..5e4ae6bd 100644 --- a/lib/cpp/src/transport/TTransport.h +++ b/lib/cpp/src/transport/TTransport.h @@ -24,7 +24,21 @@ class TTransport { /** * Whether this transport is open. */ - virtual bool isOpen() { return false; } + virtual bool isOpen() { + return false; + } + + /** + * Tests whether there is more data to read or if the remote side is + * still open. By default this is true whenever the transport is open, + * but implementations should add logic to test for this condition where + * possible (i.e. on a socket). + * This is used by a server to check if it should listen for another + * request. + */ + virtual bool peek() { + return isOpen(); + } /** * Opens the transport for communications. diff --git a/lib/cpp/src/transport/TTransportException.h b/lib/cpp/src/transport/TTransportException.h index c54084d3..e02eb70f 100644 --- a/lib/cpp/src/transport/TTransportException.h +++ b/lib/cpp/src/transport/TTransportException.h @@ -23,21 +23,25 @@ enum TTransportExceptionType { * * @author Mark Slee */ -class TTransportException { +class TTransportException : public facebook::thrift::TException { public: TTransportException() : - type_(TTX_UNKNOWN), message_() {} + facebook::thrift::TException(), + type_(TTX_UNKNOWN) {} TTransportException(TTransportExceptionType type) : - type_(type), message_() {} + facebook::thrift::TException(), + type_(type) {} - TTransportException(std::string message) : - type_(TTX_UNKNOWN), message_(message) {} + TTransportException(const std::string message) : + facebook::thrift::TException(message), + type_(TTX_UNKNOWN) {} - TTransportException(TTransportExceptionType type, std::string message) : - type_(type), message_(message) {} + TTransportException(TTransportExceptionType type, const std::string message) : + facebook::thrift::TException(message), + type_(type) {} - ~TTransportException() {} + virtual ~TTransportException() throw() {} /** * Returns an error code that provides information about the type of error @@ -45,21 +49,14 @@ class TTransportException { * * @return Error code */ - TTransportExceptionType getType() { return type_; } + TTransportExceptionType getType() { + return type_; + } - /** - * Returns an informative message about what caused this error. - * - * @return Error string - */ - const std::string& getMessage() { return message_; } - protected: /** Error code */ TTransportExceptionType type_; - /** Description */ - std::string message_; }; }}} // facebook::thrift::transport diff --git a/lib/cpp/src/transport/TTransportUtils.cpp b/lib/cpp/src/transport/TTransportUtils.cpp index d9f47750..02454e3b 100644 --- a/lib/cpp/src/transport/TTransportUtils.cpp +++ b/lib/cpp/src/transport/TTransportUtils.cpp @@ -16,7 +16,6 @@ uint32_t TBufferedTransport::read(uint8_t* buf, uint32_t len) { buf += rLen_-rPos_; } // Get more from underlying transport up to buffer size - // TODO: should this be a readAll? rLen_ = transport_->read(rBuf_, rBufSize_); rPos_ = 0; } diff --git a/lib/cpp/src/transport/TTransportUtils.h b/lib/cpp/src/transport/TTransportUtils.h index 8d8d093e..a8003cf2 100644 --- a/lib/cpp/src/transport/TTransportUtils.h +++ b/lib/cpp/src/transport/TTransportUtils.h @@ -71,6 +71,14 @@ class TBufferedTransport : public TTransport { return transport_->isOpen(); } + bool peek() { + if (rPos_ >= rLen_) { + rLen_ = transport_->read(rBuf_, rBufSize_); + rPos_ = 0; + } + return (rLen_ > rPos_); + } + void open() { transport_->open(); } @@ -177,6 +185,13 @@ class TFramedTransport : public TTransport { return transport_->isOpen(); } + bool peek() { + if (rPos_ < rLen_) { + return true; + } + return transport_->peek(); + } + void close() { transport_->close(); } @@ -260,7 +275,10 @@ class TMemoryBuffer : public TTransport { return true; } - + bool peek() { + return (rPos_ < wPos_); + } + void open() {} void close() {} diff --git a/test/cpp/src/TestClient.cpp b/test/cpp/src/TestClient.cpp index e508b0f7..db003478 100644 --- a/test/cpp/src/TestClient.cpp +++ b/test/cpp/src/TestClient.cpp @@ -88,7 +88,7 @@ int main(int argc, char** argv) { try { transport->open(); } catch (TTransportException& ttx) { - printf("Connect failed: %s\n", ttx.getMessage().c_str()); + printf("Connect failed: %s\n", ttx.what()); continue; } @@ -373,7 +373,7 @@ int main(int argc, char** argv) { testClient.testException("Xception"); printf(" void\nFAILURE\n"); - } catch(Xception& e) { + } catch(Xception& e) { printf(" {%u, \"%s\"}\n", e.errorCode, e.message.c_str()); } diff --git a/test/cpp/src/main.cpp b/test/cpp/src/main.cpp index d9643c33..8344a885 100644 --- a/test/cpp/src/main.cpp +++ b/test/cpp/src/main.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include #include @@ -18,19 +19,57 @@ #include #include +#include +#include +using __gnu_cxx::hash_map; +using __gnu_cxx::hash; + using namespace std; using namespace facebook::thrift; using namespace facebook::thrift::protocol; using namespace facebook::thrift::transport; using namespace facebook::thrift::server; +using namespace facebook::thrift::concurrency; using namespace test::stress; +struct eqstr { + bool operator()(const char* s1, const char* s2) const { + return strcmp(s1, s2) == 0; + } +}; + +struct ltstr { + bool operator()(const char* s1, const char* s2) const { + return strcmp(s1, s2) < 0; + } +}; + + +// typedef hash_map, eqstr> count_map; +typedef map count_map; + class Server : public ServiceIf { public: - Server() {}; - void echoVoid() {return;} + Server() {} + + void count(const char* method) { + MutexMonitor m(lock_); + int ct = counts_[method]; + counts_[method] = ++ct; + } + + void echoVoid() { + count("echoVoid"); + return; + } + + count_map getCount() { + MutexMonitor m(lock_); + return counts_; + } + int8_t echoByte(int8_t arg) {return arg;} int32_t echoI32(int32_t arg) {return arg;} int64_t echoI64(int64_t arg) {return arg;} @@ -38,6 +77,11 @@ class Server : public ServiceIf { vector echoList(vector arg) {return arg;} set echoSet(set arg) {return arg;} map echoMap(map arg) {return arg;} + +private: + count_map counts_; + Mutex lock_; + }; class ClientThread: public Runnable { @@ -252,10 +296,10 @@ int main(int argc, char **argv) { shared_ptr threadFactory = shared_ptr(new PosixThreadFactory()); - if(runServer) { + // Dispatcher + shared_ptr serviceHandler(new Server()); - // Dispatcher - shared_ptr serviceHandler(new Server()); + if(runServer) { shared_ptr serviceProcessor(new ServiceProcessor(serviceHandler)); @@ -390,6 +434,11 @@ int main(int argc, char **argv) { cout << "workers :" << workerCount << ", client : " << clientCount << ", loops : " << loopCount << ", rate : " << (clientCount * loopCount * 1000) / ((double)(time01 - time00)) << endl; + count_map count = serviceHandler->getCount(); + count_map::iterator iter; + for (iter = count.begin(); iter != count.end(); ++iter) { + printf("%s => %d\n", iter->first, iter->second); + } cerr << "done." << endl; } -- 2.17.1