From 16698859316cdae5ef09a907d35331e978969981 Mon Sep 17 00:00:00 2001 From: Marc Slemko Date: Fri, 4 Aug 2006 03:16:10 +0000 Subject: [PATCH] More boosification of thrift driver, server, transport and protocol code Modified TestServer to use thread-pool manager git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664737 13f79535-47bb-0310-9956-ffa450edef68 --- lib/cpp/Makefile.am | 38 ++++--- lib/cpp/src/TProcessor.h | 9 +- lib/cpp/src/protocol/TBinaryProtocol.cc | 94 ++++++++++------ lib/cpp/src/protocol/TBinaryProtocol.h | 88 +++++++++------ lib/cpp/src/protocol/TProtocol.h | 97 +++++++++++------ lib/cpp/src/protocol/protocol.txt | 16 +-- lib/cpp/src/server/TServer.h | 25 +++-- lib/cpp/src/server/TSimpleServer.cc | 15 +-- lib/cpp/src/server/TSimpleServer.h | 8 +- lib/cpp/src/server/TThreadPoolServer.cc | 121 ++++++++++----------- lib/cpp/src/server/TThreadPoolServer.h | 33 +++++- lib/cpp/src/transport/TBufferedTransport.h | 14 ++- lib/cpp/src/transport/TChunkedTransport.h | 11 +- lib/cpp/src/transport/TServerSocket.cc | 7 +- lib/cpp/src/transport/TServerSocket.h | 5 +- lib/cpp/src/transport/TServerTransport.h | 9 +- 16 files changed, 348 insertions(+), 242 deletions(-) diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am index 1ec11d49..cee53270 100644 --- a/lib/cpp/Makefile.am +++ b/lib/cpp/Makefile.am @@ -1,6 +1,5 @@ -#lib_LTLIBRARIES = libthrift.la - -lib_LIBRARIES = libthrift.a +lib_LTLIBRARIES = libthrift.la +#lib_LIBRARIES = libthrift.a common_cxxflags = -Isrc $(BOOST_CPPFLAGS) common_ldflags = $(BOOST_LDFLAGS) @@ -19,8 +18,8 @@ libthrift_sources = src/concurrency/Monitor.cc \ src/server/TSimpleServer.cc \ src/server/TThreadPoolServer.cc -libthrift_a_SOURCES = $(libthrift_sources) -#libthrift_la_SOURCES = $(libthrift_sources) +#libthrift_a_SOURCES = $(libthrift_sources) +libthrift_la_SOURCES = $(libthrift_sources) libthrift_cxxflags = $(common_cxxflags) libthrift_ldflags = $(common_ldflags) @@ -28,14 +27,27 @@ libthrift_ldflags = $(common_ldflags) libthrift_la_CXXFLAGS = $(libthrift_cxxflags) libthrift_a_CXXFLAGS = $(libthrift_cxxflags) -libthrift_inst_headers = src/concurrency/Exception.h \ +include_thriftdir = $(includedir)/thrift +include_thrift_HEADERS = \ + src/Thrift.h \ + src/TProcessor.h + +include_concurrencydir = $(includedir)/thrift/concurrency +include_concurrency_HEADERS = \ + src/concurrency/Exception.h \ src/concurrency/Monitor.h \ src/concurrency/PosixThreadFactory.h \ src/concurrency/Thread.h \ src/concurrency/ThreadManager.h \ - src/concurrency/TimerManager.h \ + src/concurrency/TimerManager.h + +include_protocoldir = $(includedir)/thrift/protocol +include_protocol_HEADERS = \ src/protocol/TBinaryProtocol.h \ - src/protocol/TProtocol.h \ + src/protocol/TProtocol.h + +include_transportdir = $(includedir)/thrift/transport +include_transport_HEADERS = \ src/transport/TBufferedTransport.h \ src/transport/TChunkedTransport.h \ src/transport/TNullTransport.h \ @@ -43,16 +55,16 @@ libthrift_inst_headers = src/concurrency/Exception.h \ src/transport/TServerTransport.h \ src/transport/TSocket.h \ src/transport/TTransport.h \ - src/transport/TTransport.h \ - src/transport/TTransportException.h \ + src/transport/TTransportException.h + +include_serverdir = $(includedir)/thrift/server +include_server_HEADERS = \ + src/server/TServer.h \ src/server/TSimpleServer.h \ src/server/TThreadPoolServer.h bin_PROGRAMS = concurrency_test -include_HEADERS = $(libconcurrency_inst_headers) \ - $(libthrift_inst_headers) - concurrency_test_SOURCES = src/concurrency/test/Tests.cc \ src/concurrency/test/ThreadFactoryTests.h \ src/concurrency/test/ThreadManagerTests.h \ diff --git a/lib/cpp/src/TProcessor.h b/lib/cpp/src/TProcessor.h index 2ed24304..ce275393 100644 --- a/lib/cpp/src/TProcessor.h +++ b/lib/cpp/src/TProcessor.h @@ -2,10 +2,13 @@ #define T_PROCESSOR_H #include -#include "transport/TTransport.h" +#include +#include namespace facebook { namespace thrift { +using namespace boost; + using namespace facebook::thrift::transport; /** @@ -19,8 +22,8 @@ using namespace facebook::thrift::transport; class TProcessor { public: virtual ~TProcessor() {} - virtual bool process(TTransport* in, TTransport *out) = 0; - virtual bool process(TTransport* io) { return process(io, io); } + virtual bool process(shared_ptr in, shared_ptr out) = 0; + virtual bool process(shared_ptr io) { return process(io, io); } protected: TProcessor() {} }; diff --git a/lib/cpp/src/protocol/TBinaryProtocol.cc b/lib/cpp/src/protocol/TBinaryProtocol.cc index ed482b88..fef8ab48 100644 --- a/lib/cpp/src/protocol/TBinaryProtocol.cc +++ b/lib/cpp/src/protocol/TBinaryProtocol.cc @@ -3,16 +3,28 @@ using std::string; namespace facebook { namespace thrift { namespace protocol { -uint32_t TBinaryProtocol::writeStructBegin(TTransport* out, +uint32_t TBinaryProtocol::writeMessageBegin(shared_ptr out, + const TMessageType messageType, + const uint32_t seqid) const { + return + writeByte(out, (uint8_t)messageType) + + writeU32(out, seqid); +} + +uint32_t TBinaryProtocol::writeMessageEnd(shared_ptr out) const { + return 0; +} + +uint32_t TBinaryProtocol::writeStructBegin(shared_ptr out, const string& name) const { return 0; } -uint32_t TBinaryProtocol::writeStructEnd(TTransport* out) const { +uint32_t TBinaryProtocol::writeStructEnd(shared_ptr out) const { return 0; } -uint32_t TBinaryProtocol::writeFieldBegin(TTransport* out, +uint32_t TBinaryProtocol::writeFieldBegin(shared_ptr out, const string& name, const TType fieldType, const uint16_t fieldId) const { @@ -21,16 +33,16 @@ uint32_t TBinaryProtocol::writeFieldBegin(TTransport* out, writeI32(out, (int32_t)fieldId); } -uint32_t TBinaryProtocol::writeFieldEnd(TTransport* out) const { +uint32_t TBinaryProtocol::writeFieldEnd(shared_ptr out) const { return 0; } -uint32_t TBinaryProtocol::writeFieldStop(TTransport* out) const { +uint32_t TBinaryProtocol::writeFieldStop(shared_ptr out) const { return writeByte(out, (uint8_t)T_STOP); } -uint32_t TBinaryProtocol::writeMapBegin(TTransport* out, +uint32_t TBinaryProtocol::writeMapBegin(shared_ptr out, const TType keyType, const TType valType, const int32_t size) const { @@ -40,11 +52,11 @@ uint32_t TBinaryProtocol::writeMapBegin(TTransport* out, writeI32(out, (int32_t)size); } -uint32_t TBinaryProtocol::writeMapEnd(TTransport* out) const { +uint32_t TBinaryProtocol::writeMapEnd(shared_ptr out) const { return 0; } -uint32_t TBinaryProtocol::writeListBegin(TTransport* out, +uint32_t TBinaryProtocol::writeListBegin(shared_ptr out, const TType elemType, const int32_t size) const { return @@ -52,11 +64,11 @@ uint32_t TBinaryProtocol::writeListBegin(TTransport* out, writeI32(out, (int32_t)size); } -uint32_t TBinaryProtocol::writeListEnd(TTransport* out) const { +uint32_t TBinaryProtocol::writeListEnd(shared_ptr out) const { return 0; } -uint32_t TBinaryProtocol::writeSetBegin(TTransport* out, +uint32_t TBinaryProtocol::writeSetBegin(shared_ptr out, const TType elemType, const int32_t size) const { return @@ -64,45 +76,45 @@ uint32_t TBinaryProtocol::writeSetBegin(TTransport* out, writeI32(out, (int32_t)size); } -uint32_t TBinaryProtocol::writeSetEnd(TTransport* out) const { +uint32_t TBinaryProtocol::writeSetEnd(shared_ptr out) const { return 0; } -uint32_t TBinaryProtocol::writeByte(TTransport* out, +uint32_t TBinaryProtocol::writeByte(shared_ptr out, const uint8_t byte) const { out->write(&byte, 1); return 1; } -uint32_t TBinaryProtocol::writeU32(TTransport* out, +uint32_t TBinaryProtocol::writeU32(shared_ptr out, const uint32_t u32) const { uint32_t net = (uint32_t)htonl(u32); out->write((uint8_t*)&net, 4); return 4; } -uint32_t TBinaryProtocol::writeI32(TTransport* out, +uint32_t TBinaryProtocol::writeI32(shared_ptr out, const int32_t i32) const { int32_t net = (int32_t)htonl(i32); out->write((uint8_t*)&net, 4); return 4; } -uint32_t TBinaryProtocol::writeU64(TTransport* out, +uint32_t TBinaryProtocol::writeU64(shared_ptr out, const uint64_t u64) const { uint64_t net = (uint64_t)htonll(u64); out->write((uint8_t*)&net, 8); return 8; } -uint32_t TBinaryProtocol::writeI64(TTransport* out, +uint32_t TBinaryProtocol::writeI64(shared_ptr out, const int64_t i64) const { int64_t net = (int64_t)htonll(i64); out->write((uint8_t*)&net, 8); return 8; } -uint32_t TBinaryProtocol::writeString(TTransport* out, +uint32_t TBinaryProtocol::writeString(shared_ptr out, const string& str) const { uint32_t result = writeI32(out, str.size()); out->write((uint8_t*)str.data(), str.size()); @@ -113,17 +125,33 @@ uint32_t TBinaryProtocol::writeString(TTransport* out, * Reading functions */ -uint32_t TBinaryProtocol::readStructBegin(TTransport* in, +uint32_t TBinaryProtocol::readMessasgeBegin(shared_ptr in, + TMessageType& messageType, + uint32_t& seqid) const { + + uint32_t result = 0; + uint8_t type; + result+= readByte(in, type); + messageType = (TMessageType)type; + result+= readU32(in, seqid); + return result; +} + +uint32_t TBinaryProtocol::readMessageEnd(shared_ptr in) const{ + return 0; +} + +uint32_t TBinaryProtocol::readStructBegin(shared_ptr in, string& name) const { name = ""; return 0; } -uint32_t TBinaryProtocol::readStructEnd(TTransport* in) const { +uint32_t TBinaryProtocol::readStructEnd(shared_ptr in) const { return 0; } -uint32_t TBinaryProtocol::readFieldBegin(TTransport* in, +uint32_t TBinaryProtocol::readFieldBegin(shared_ptr in, string& name, TType& fieldType, uint16_t& fieldId) const { @@ -141,11 +169,11 @@ uint32_t TBinaryProtocol::readFieldBegin(TTransport* in, return result; } -uint32_t TBinaryProtocol::readFieldEnd(TTransport* in) const { +uint32_t TBinaryProtocol::readFieldEnd(shared_ptr in) const { return 0; } -uint32_t TBinaryProtocol::readMapBegin(TTransport* in, +uint32_t TBinaryProtocol::readMapBegin(shared_ptr in, TType& keyType, TType& valType, int32_t& size) const { @@ -159,11 +187,11 @@ uint32_t TBinaryProtocol::readMapBegin(TTransport* in, return result; } -uint32_t TBinaryProtocol::readMapEnd(TTransport* in) const { +uint32_t TBinaryProtocol::readMapEnd(shared_ptr in) const { return 0; } -uint32_t TBinaryProtocol::readListBegin(TTransport* in, +uint32_t TBinaryProtocol::readListBegin(shared_ptr in, TType& elemType, int32_t& size) const { uint8_t e; @@ -174,11 +202,11 @@ uint32_t TBinaryProtocol::readListBegin(TTransport* in, return result; } -uint32_t TBinaryProtocol::readListEnd(TTransport* in) const { +uint32_t TBinaryProtocol::readListEnd(shared_ptr in) const { return 0; } -uint32_t TBinaryProtocol::readSetBegin(TTransport* in, +uint32_t TBinaryProtocol::readSetBegin(shared_ptr in, TType& elemType, int32_t& size) const { uint8_t e; @@ -189,11 +217,11 @@ uint32_t TBinaryProtocol::readSetBegin(TTransport* in, return result; } -uint32_t TBinaryProtocol::readSetEnd(TTransport* in) const { +uint32_t TBinaryProtocol::readSetEnd(shared_ptr in) const { return 0; } -uint32_t TBinaryProtocol::readByte(TTransport* in, +uint32_t TBinaryProtocol::readByte(shared_ptr in, uint8_t& byte) const { uint8_t b[1]; in->readAll(b, 1); @@ -201,7 +229,7 @@ uint32_t TBinaryProtocol::readByte(TTransport* in, return 1; } -uint32_t TBinaryProtocol::readU32(TTransport* in, +uint32_t TBinaryProtocol::readU32(shared_ptr in, uint32_t& u32) const { uint8_t b[4]; in->readAll(b, 4); @@ -210,7 +238,7 @@ uint32_t TBinaryProtocol::readU32(TTransport* in, return 4; } -uint32_t TBinaryProtocol::readI32(TTransport* in, +uint32_t TBinaryProtocol::readI32(shared_ptr in, int32_t& i32) const { uint8_t b[4]; in->readAll(b, 4); @@ -219,7 +247,7 @@ uint32_t TBinaryProtocol::readI32(TTransport* in, return 4; } -uint32_t TBinaryProtocol::readU64(TTransport* in, +uint32_t TBinaryProtocol::readU64(shared_ptr in, uint64_t& u64) const { uint8_t b[8]; in->readAll(b, 8); @@ -228,7 +256,7 @@ uint32_t TBinaryProtocol::readU64(TTransport* in, return 8; } -uint32_t TBinaryProtocol::readI64(TTransport* in, +uint32_t TBinaryProtocol::readI64(shared_ptr in, int64_t& i64) const { uint8_t b[8]; in->readAll(b, 8); @@ -237,7 +265,7 @@ uint32_t TBinaryProtocol::readI64(TTransport* in, return 8; } -uint32_t TBinaryProtocol::readString(TTransport* in, +uint32_t TBinaryProtocol::readString(shared_ptr in, string& str) const { uint32_t result; int32_t size; diff --git a/lib/cpp/src/protocol/TBinaryProtocol.h b/lib/cpp/src/protocol/TBinaryProtocol.h index 0f0560a1..3456abf7 100644 --- a/lib/cpp/src/protocol/TBinaryProtocol.h +++ b/lib/cpp/src/protocol/TBinaryProtocol.h @@ -1,10 +1,14 @@ #ifndef T_BINARY_PROTOCOL_H #define T_BINARY_PROTOCOL_H -#include "protocol/TProtocol.h" +#include + +#include namespace facebook { namespace thrift { namespace protocol { +using namespace boost; + /** * The default binary protocol for thrift. Writes all data in a very basic * binary format, essentially just spitting out the raw bytes. @@ -12,7 +16,7 @@ namespace facebook { namespace thrift { namespace protocol { * @author Mark Slee */ class TBinaryProtocol : public TProtocol { - public: + public: TBinaryProtocol() {} ~TBinaryProtocol() {} @@ -20,108 +24,122 @@ class TBinaryProtocol : public TProtocol { * Writing functions. */ - uint32_t writeStructBegin (TTransport* out, + virtual uint32_t writeMessageBegin(shared_ptr out, + const TMessageType messageType, + const uint32_t seqid) const; + + virtual uint32_t writeMessageEnd (shared_ptr out) const; + + + uint32_t writeStructBegin (shared_ptr out, const std::string& name) const; - uint32_t writeStructEnd (TTransport* out) const; + uint32_t writeStructEnd (shared_ptr out) const; - uint32_t writeFieldBegin (TTransport* out, + uint32_t writeFieldBegin (shared_ptr out, const std::string& name, const TType fieldType, const uint16_t fieldId) const; - uint32_t writeFieldEnd (TTransport* out) const; + uint32_t writeFieldEnd (shared_ptr out) const; - uint32_t writeFieldStop (TTransport* out) const; + uint32_t writeFieldStop (shared_ptr out) const; - uint32_t writeMapBegin (TTransport* out, + uint32_t writeMapBegin (shared_ptr out, const TType keyType, const TType valType, const int32_t size) const; - uint32_t writeMapEnd (TTransport* out) const; + uint32_t writeMapEnd (shared_ptr out) const; - uint32_t writeListBegin (TTransport* out, + uint32_t writeListBegin (shared_ptr out, const TType elemType, const int32_t size) const; - uint32_t writeListEnd (TTransport* out) const; + uint32_t writeListEnd (shared_ptr out) const; - uint32_t writeSetBegin (TTransport* out, + uint32_t writeSetBegin (shared_ptr out, const TType elemType, const int32_t size) const; - uint32_t writeSetEnd (TTransport* out) const; + uint32_t writeSetEnd (shared_ptr out) const; - uint32_t writeByte (TTransport* out, + uint32_t writeByte (shared_ptr out, const uint8_t byte) const; - uint32_t writeU32 (TTransport* out, + uint32_t writeU32 (shared_ptr out, const uint32_t u32) const; - uint32_t writeI32 (TTransport* out, + uint32_t writeI32 (shared_ptr out, const int32_t i32) const; - uint32_t writeU64 (TTransport* out, + uint32_t writeU64 (shared_ptr out, const uint64_t u64) const; - uint32_t writeI64 (TTransport* out, + uint32_t writeI64 (shared_ptr out, const int64_t i64) const; - uint32_t writeString (TTransport* out, + uint32_t writeString (shared_ptr out, const std::string& str) const; /** * Reading functions */ - uint32_t readStructBegin (TTransport* in, + + uint32_t readMessasgeBegin (shared_ptr in, + TMessageType& messageType, + uint32_t& seqid) const; + + uint32_t readMessageEnd (shared_ptr in) const; + + uint32_t readStructBegin (shared_ptr in, std::string& name) const; - uint32_t readStructEnd (TTransport* in) const; + uint32_t readStructEnd (shared_ptr in) const; - uint32_t readFieldBegin (TTransport* in, + uint32_t readFieldBegin (shared_ptr in, std::string& name, TType& fieldType, uint16_t& fieldId) const; - uint32_t readFieldEnd (TTransport* in) const; + uint32_t readFieldEnd (shared_ptr in) const; - uint32_t readMapBegin (TTransport* in, + uint32_t readMapBegin (shared_ptr in, TType& keyType, TType& valType, int32_t& size) const; - uint32_t readMapEnd (TTransport* in) const; + uint32_t readMapEnd (shared_ptr in) const; - uint32_t readListBegin (TTransport* in, + uint32_t readListBegin (shared_ptr in, TType& elemType, int32_t& size) const; - uint32_t readListEnd (TTransport* in) const; + uint32_t readListEnd (shared_ptr in) const; - uint32_t readSetBegin (TTransport* in, + uint32_t readSetBegin (shared_ptr in, TType& elemType, int32_t& size) const; - uint32_t readSetEnd (TTransport* in) const; + uint32_t readSetEnd (shared_ptr in) const; - uint32_t readByte (TTransport* in, + uint32_t readByte (shared_ptr in, uint8_t& byte) const; - uint32_t readU32 (TTransport* in, + uint32_t readU32 (shared_ptr in, uint32_t& u32) const; - uint32_t readI32 (TTransport* in, + uint32_t readI32 (shared_ptr in, int32_t& i32) const; - uint32_t readU64 (TTransport* in, + uint32_t readU64 (shared_ptr in, uint64_t& u64) const; - uint32_t readI64 (TTransport* in, + uint32_t readI64 (shared_ptr in, int64_t& i64) const; - uint32_t readString (TTransport* in, + uint32_t readString (shared_ptr in, std::string& str) const; }; diff --git a/lib/cpp/src/protocol/TProtocol.h b/lib/cpp/src/protocol/TProtocol.h index 40beaddd..33a6eb77 100644 --- a/lib/cpp/src/protocol/TProtocol.h +++ b/lib/cpp/src/protocol/TProtocol.h @@ -1,15 +1,19 @@ #ifndef T_PROTOCOL_H #define T_PROTOCOL_H +#include + +#include + #include #include #include #include -#include "transport/TTransport.h" - namespace facebook { namespace thrift { namespace protocol { +using namespace boost; + using namespace facebook::thrift::transport; #define ntohll(x) (((uint64_t)(ntohl((int)((x << 32) >> 32))) << 32) | (uint32_t)ntohl(((int)(x >> 32)))) @@ -40,6 +44,14 @@ enum TType { T_LIST = 13 }; +/** + * Enumerated definition of the message types that the Thrift protocol supports. + */ +enum TMessageType { + T_CALL = 1, + T_REPLY = 2 +}; + /** * Abstract class for a thrift protocol driver. These are all the methods that * a protocol must implement. Essentially, there must be some way of reading @@ -60,114 +72,127 @@ class TProtocol { * Writing functions. */ - virtual uint32_t writeStructBegin (TTransport* out, + virtual uint32_t writeMessageBegin (shared_ptr out, + const TMessageType messageType, + const uint32_t seqid) const = 0; + + virtual uint32_t writeMessageEnd (shared_ptr out) const = 0; + + + virtual uint32_t writeStructBegin (shared_ptr out, const std::string& name) const = 0; - virtual uint32_t writeStructEnd (TTransport* out) const = 0; + virtual uint32_t writeStructEnd (shared_ptr out) const = 0; - virtual uint32_t writeFieldBegin (TTransport* out, + virtual uint32_t writeFieldBegin (shared_ptr out, const std::string& name, const TType fieldType, const uint16_t fieldId) const = 0; - virtual uint32_t writeFieldEnd (TTransport* out) const = 0; + virtual uint32_t writeFieldEnd (shared_ptr out) const = 0; - virtual uint32_t writeFieldStop (TTransport* out) const = 0; + virtual uint32_t writeFieldStop (shared_ptr out) const = 0; - virtual uint32_t writeMapBegin (TTransport* out, + virtual uint32_t writeMapBegin (shared_ptr out, const TType keyType, const TType valType, const int32_t size) const = 0; - virtual uint32_t writeMapEnd (TTransport* out) const = 0; + virtual uint32_t writeMapEnd (shared_ptr out) const = 0; - virtual uint32_t writeListBegin (TTransport* out, + virtual uint32_t writeListBegin (shared_ptr out, const TType elemType, const int32_t size) const = 0; - virtual uint32_t writeListEnd (TTransport* out) const = 0; + virtual uint32_t writeListEnd (shared_ptr out) const = 0; - virtual uint32_t writeSetBegin (TTransport* out, + virtual uint32_t writeSetBegin (shared_ptr out, const TType elemType, const int32_t size) const = 0; - virtual uint32_t writeSetEnd (TTransport* out) const = 0; + virtual uint32_t writeSetEnd (shared_ptr out) const = 0; - virtual uint32_t writeByte (TTransport* out, + virtual uint32_t writeByte (shared_ptr out, const uint8_t byte) const = 0; - virtual uint32_t writeU32 (TTransport* out, + virtual uint32_t writeU32 (shared_ptr out, const uint32_t u32) const = 0; - virtual uint32_t writeI32 (TTransport* out, + virtual uint32_t writeI32 (shared_ptr out, const int32_t i32) const = 0; - virtual uint32_t writeU64 (TTransport* out, + virtual uint32_t writeU64 (shared_ptr out, const uint64_t u64) const = 0; - virtual uint32_t writeI64 (TTransport* out, + virtual uint32_t writeI64 (shared_ptr out, const int64_t i64) const = 0; - virtual uint32_t writeString (TTransport* out, + virtual uint32_t writeString (shared_ptr out, const std::string& str) const = 0; /** * Reading functions */ - virtual uint32_t readStructBegin (TTransport* in, + virtual uint32_t readMessasgeBegin (shared_ptr in, + TMessageType& messageType, + uint32_t& seqid) const = 0; + + virtual uint32_t readMessageEnd (shared_ptr in) const = 0; + + virtual uint32_t readStructBegin (shared_ptr in, std::string& name) const = 0; - virtual uint32_t readStructEnd (TTransport* in) const = 0; + virtual uint32_t readStructEnd (shared_ptr in) const = 0; - virtual uint32_t readFieldBegin (TTransport* in, + virtual uint32_t readFieldBegin (shared_ptr in, std::string& name, TType& fieldType, uint16_t& fieldId) const = 0; - virtual uint32_t readFieldEnd (TTransport* in) const = 0; + virtual uint32_t readFieldEnd (shared_ptr in) const = 0; - virtual uint32_t readMapBegin (TTransport* in, + virtual uint32_t readMapBegin (shared_ptr in, TType& keyType, TType& valType, int32_t& size) const = 0; - virtual uint32_t readMapEnd (TTransport* in) const = 0; + virtual uint32_t readMapEnd (shared_ptr in) const = 0; - virtual uint32_t readListBegin (TTransport* in, + virtual uint32_t readListBegin (shared_ptr in, TType& elemType, int32_t& size) const = 0; - virtual uint32_t readListEnd (TTransport* in) const = 0; + virtual uint32_t readListEnd (shared_ptr in) const = 0; - virtual uint32_t readSetBegin (TTransport* in, + virtual uint32_t readSetBegin (shared_ptr in, TType& elemType, int32_t& size) const = 0; - virtual uint32_t readSetEnd (TTransport* in) const = 0; + virtual uint32_t readSetEnd (shared_ptr in) const = 0; - virtual uint32_t readByte (TTransport* in, + virtual uint32_t readByte (shared_ptr in, uint8_t& byte) const = 0; - virtual uint32_t readU32 (TTransport* in, + virtual uint32_t readU32 (shared_ptr in, uint32_t& u32) const = 0; - virtual uint32_t readI32 (TTransport* in, + virtual uint32_t readI32 (shared_ptr in, int32_t& i32) const = 0; - virtual uint32_t readU64 (TTransport* in, + virtual uint32_t readU64 (shared_ptr in, uint64_t& u64) const = 0; - virtual uint32_t readI64 (TTransport* in, + virtual uint32_t readI64 (shared_ptr in, int64_t& i64) const = 0; - virtual uint32_t readString (TTransport* in, + virtual uint32_t readString (shared_ptr in, std::string& str) const = 0; /** * Method to arbitrarily skip over data. */ - uint32_t skip(TTransport* in, TType type) const { + uint32_t skip(shared_ptr in, TType type) const { switch (type) { case T_BYTE: { diff --git a/lib/cpp/src/protocol/protocol.txt b/lib/cpp/src/protocol/protocol.txt index d6db0abc..d66b7eba 100644 --- a/lib/cpp/src/protocol/protocol.txt +++ b/lib/cpp/src/protocol/protocol.txt @@ -25,13 +25,7 @@ For binary formats it is typically not necessary to represent the begin and end package-name : STRING service-name : STRING - arguments : ARGS_b arg-list ARGS_e - arg-list : arg-list arg | NIL - arg : ARG_b arg-identifier arg-value ARG_e - arg-identifier : arg-name | arg-id | arg-name arg-id - arg-name : STRING - arg-id : UINT32 - arg-value : datum + arguments : struct-datum """ service function reply message body """ @@ -79,16 +73,10 @@ For binary formats it is typically not necessary to represent the begin and end """ collection datum """ - collection-type-specifier : ARRAY | MAP | SET | LIST + collection-type-specifier : MAP | SET | LIST collection-datum : list-datum | set-datum | map-datum - array-datum : simple-array-datum | complex-array-datum - - simple-array-datum : ARRAY_b element-count simple-type-specifier simple-data ARRAY_e - - complex-array-datum : ARRAY_b element-count simple-type-specifier simple-data ARRAY_e - list-datum : LIST_b element-count element-type-specifier elements LIST_e element-count : UINT32 diff --git a/lib/cpp/src/server/TServer.h b/lib/cpp/src/server/TServer.h index f34944b2..68728f2d 100644 --- a/lib/cpp/src/server/TServer.h +++ b/lib/cpp/src/server/TServer.h @@ -1,11 +1,14 @@ #ifndef T_SERVER_H #define T_SERVER_H -#include "TProcessor.h" +#include + +#include namespace facebook { namespace thrift { namespace server { using namespace facebook::thrift; +using namespace boost; class TServerOptions; @@ -15,25 +18,25 @@ class TServerOptions; * @author Mark Slee */ class TServer { - public: +public: virtual ~TServer() {} virtual void run() = 0; - - protected: - TServer(TProcessor* processor, TServerOptions* options) : + +protected: + TServer(shared_ptr processor, shared_ptr options) : processor_(processor), options_(options) {} - - TProcessor* processor_; - TServerOptions* options_; + + shared_ptr processor_; + shared_ptr options_; }; - + /** * Class to encapsulate all generic server options. */ class TServerOptions { - public: +public: // TODO(mcslee): Fill in getters/setters here - protected: +protected: // TODO(mcslee): Fill data members in here }; diff --git a/lib/cpp/src/server/TSimpleServer.cc b/lib/cpp/src/server/TSimpleServer.cc index 7199ab9c..2ad5145e 100644 --- a/lib/cpp/src/server/TSimpleServer.cc +++ b/lib/cpp/src/server/TSimpleServer.cc @@ -13,7 +13,8 @@ namespace facebook { namespace thrift { namespace server { * @author Mark Slee */ void TSimpleServer::run() { - TTransport* client = NULL; + + shared_ptr client; try { // Start the server listening @@ -29,8 +30,8 @@ void TSimpleServer::run() { client = serverTransport_->accept(); if (client != NULL) { // Process for as long as we can keep the processor happy! - TBufferedTransport bufferedClient(client); - while (processor_->process(&bufferedClient)) {} + shared_ptr bufferedClient(new TBufferedTransport(client)); + while (processor_->process(bufferedClient)) {} } } catch (TTransportException& ttx) { if (client != NULL) { @@ -43,13 +44,7 @@ void TSimpleServer::run() { // Ensure no resource leaks client->close(); - - // Ensure no memory leaks - delete client; - - // Ensure we don't try to double-free on the next pass - client = NULL; - } + } } // TODO(mcslee): Could this be a timeout case? Or always the real thing? diff --git a/lib/cpp/src/server/TSimpleServer.h b/lib/cpp/src/server/TSimpleServer.h index a4a2d98a..a8242d4a 100644 --- a/lib/cpp/src/server/TSimpleServer.h +++ b/lib/cpp/src/server/TSimpleServer.h @@ -16,9 +16,9 @@ namespace facebook { namespace thrift { namespace server { */ class TSimpleServer : public TServer { public: - TSimpleServer(TProcessor* processor, - TServerOptions* options, - TServerTransport* serverTransport) : + TSimpleServer(shared_ptr processor, + shared_ptr options, + shared_ptr serverTransport) : TServer(processor, options), serverTransport_(serverTransport) {} ~TSimpleServer() {} @@ -26,7 +26,7 @@ class TSimpleServer : public TServer { void run(); protected: - TServerTransport* serverTransport_; + shared_ptr serverTransport_; }; }}} // facebook::thrift::server diff --git a/lib/cpp/src/server/TThreadPoolServer.cc b/lib/cpp/src/server/TThreadPoolServer.cc index 2d6290c7..d53d174c 100644 --- a/lib/cpp/src/server/TThreadPoolServer.cc +++ b/lib/cpp/src/server/TThreadPoolServer.cc @@ -12,87 +12,78 @@ using namespace std; using namespace facebook::thrift::concurrency; using namespace facebook::thrift::transport; -class TThreadPoolServer : public TServer { - - class Task: public Runnable { - - TProcessor* _processor; - TTransport* _transport; - TBufferedTransport _bufferedTransport; +class TThreadPoolServer::Task: public Runnable { - public: + shared_ptr _processor; + shared_ptr _transport; + shared_ptr _bufferedTransport; - Task(TProcessor* processor, - TTransport* transport) : - _processor(processor), - _transport(transport), - _bufferedTransport(_transport) { - } +public: - ~Task() { - delete _transport; - } + Task(shared_ptr processor, + shared_ptr transport) : + _processor(processor), + _transport(transport), + _bufferedTransport(new TBufferedTransport(transport)) { + } + + ~Task() {} - void run() { + void run() { - while(true) { + while(true) { - try { - _processor->process(&_bufferedTransport); - - } catch (TTransportException& ttx) { - - break; - - } catch(...) { - - break; - } + try { + _processor->process(_bufferedTransport); + + } catch (TTransportException& ttx) { + + break; + + } catch(...) { + + break; } - - _bufferedTransport.close(); } - }; - - TThreadPoolServer(TProcessor* processor, - TServerOptions* options, - TServerTransport* serverTransport, - ThreadManager* threadManager) : - TServer(processor, options), - serverTransport_(serverTransport), - threadManager_(threadManager) { + + _bufferedTransport->close(); } +}; + +TThreadPoolServer::TThreadPoolServer(shared_ptr processor, + shared_ptr options, + shared_ptr serverTransport, + shared_ptr threadManager) : + TServer(processor, options), + serverTransport_(serverTransport), + threadManager_(threadManager) { +} - ~TThreadPoolServer() {} +TThreadPoolServer::~TThreadPoolServer() {} - protected: +void TThreadPoolServer::run() { - TServerTransport* serverTransport_; - ThreadManager* threadManager_; + try { + // Start the server listening + serverTransport_->listen(); + } catch (TTransportException& ttx) { + cerr << "TThreadPoolServer::run() listen(): " << ttx.getMessage() << endl; + return; + } - void run() { - + // Fetch client from server + + while (true) { + try { - // Start the server listening - serverTransport_->listen(); + + threadManager_->add(shared_ptr(new TThreadPoolServer::Task(processor_, + shared_ptr(serverTransport_->accept())))); + } catch (TTransportException& ttx) { - cerr << "TThreadPoolServer::run() listen(): " << ttx.getMessage() << endl; - return; - } - - // Fetch client from server - - while (true) { - - try { - - threadManager_->add(shared_ptr(new TThreadPoolServer::Task(processor_, serverTransport_->accept())));; - - } catch (TTransportException& ttx) { - break; - } + break; } } -}; +} }}} // facebook::thrift::server diff --git a/lib/cpp/src/server/TThreadPoolServer.h b/lib/cpp/src/server/TThreadPoolServer.h index ef63a37b..827491db 100644 --- a/lib/cpp/src/server/TThreadPoolServer.h +++ b/lib/cpp/src/server/TThreadPoolServer.h @@ -1,10 +1,39 @@ #ifndef T_THREADPOOL_SERVER_H #define T_THREADPOOL_SERVER_H -#include "server/TServer.h" -#include "transport/TServerTransport.h" +#include +#include +#include + +#include namespace facebook { namespace thrift { namespace server { + +using namespace facebook::thrift::concurrency; +using namespace facebook::thrift::transport; +using namespace boost; + +class TThreadPoolServer : public TServer { +public: + + class Task; + + TThreadPoolServer(shared_ptr processor, + shared_ptr options, + shared_ptr serverTransport, + shared_ptr threadManager); + + virtual ~TThreadPoolServer(); + + virtual void run(); + +protected: + + shared_ptr serverTransport_; + shared_ptr threadManager_; + +}; + }}} // facebook::thrift::server #endif diff --git a/lib/cpp/src/transport/TBufferedTransport.h b/lib/cpp/src/transport/TBufferedTransport.h index b8153fef..922754ea 100644 --- a/lib/cpp/src/transport/TBufferedTransport.h +++ b/lib/cpp/src/transport/TBufferedTransport.h @@ -1,11 +1,15 @@ #ifndef T_BUFFERED_TRANSPORT_H #define T_BUFFERED_TRANSPORT_H -#include "transport/TTransport.h" +#include #include +#include + namespace facebook { namespace thrift { namespace transport { +using namespace boost; + /** * Buffered transport. For reads it will read more data than is requested * and will serve future data out of a local buffer. For writes, data is @@ -15,7 +19,7 @@ namespace facebook { namespace thrift { namespace transport { */ class TBufferedTransport : public TTransport { public: - TBufferedTransport(TTransport* transport) : + TBufferedTransport(shared_ptr transport) : transport_(transport), rBufSize_(512), rPos_(0), rLen_(0), wBufSize_(512), wLen_(0) { @@ -23,7 +27,7 @@ class TBufferedTransport : public TTransport { wBuf_ = new uint8_t[wBufSize_]; } - TBufferedTransport(TTransport* transport, uint32_t sz) : + TBufferedTransport(shared_ptr transport, uint32_t sz) : transport_(transport), rBufSize_(sz), rPos_(0), rLen_(0), wBufSize_(sz), wLen_(0) { @@ -31,7 +35,7 @@ class TBufferedTransport : public TTransport { wBuf_ = new uint8_t[wBufSize_]; } - TBufferedTransport(TTransport* transport, uint32_t rsz, uint32_t wsz) : + TBufferedTransport(shared_ptr transport, uint32_t rsz, uint32_t wsz) : transport_(transport), rBufSize_(rsz), rPos_(0), rLen_(0), wBufSize_(wsz), wLen_(0) { @@ -67,7 +71,7 @@ class TBufferedTransport : public TTransport { void flush(); protected: - TTransport* transport_; + shared_ptr transport_; uint8_t* rBuf_; uint32_t rBufSize_; uint32_t rPos_; diff --git a/lib/cpp/src/transport/TChunkedTransport.h b/lib/cpp/src/transport/TChunkedTransport.h index 16f9e0e4..0fe8d75e 100644 --- a/lib/cpp/src/transport/TChunkedTransport.h +++ b/lib/cpp/src/transport/TChunkedTransport.h @@ -1,11 +1,14 @@ #ifndef T_CHUNKED_TRANSPORT_H #define T_CHUNKED_TRANSPORT_H -#include "transport/TTransport.h" +#include #include +#include namespace facebook { namespace thrift { namespace transport { +using namespace boost; + /** * Chunked transport. All writes go into an in-memory buffer until flush is * called, at which point the transport writes the length of the entire @@ -16,7 +19,7 @@ namespace facebook { namespace thrift { namespace transport { */ class TChunkedTransport : public TTransport { public: - TChunkedTransport(TTransport* transport) : + TChunkedTransport(shared_ptr transport) : transport_(transport), rPos_(0), rLen_(0), wBufSize_(512), wLen_(0) { @@ -24,7 +27,7 @@ class TChunkedTransport : public TTransport { wBuf_ = new uint8_t[wBufSize_]; } - TChunkedTransport(TTransport* transport, uint32_t sz) : + TChunkedTransport(shared_ptr transport, uint32_t sz) : transport_(transport), rPos_(0), rLen_(0), wBufSize_(sz), wLen_(0) { @@ -60,7 +63,7 @@ class TChunkedTransport : public TTransport { void flush(); protected: - TTransport* transport_; + shared_ptr transport_; uint8_t* rBuf_; uint32_t rPos_; uint32_t rLen_; diff --git a/lib/cpp/src/transport/TServerSocket.cc b/lib/cpp/src/transport/TServerSocket.cc index 21230d99..003ddec8 100644 --- a/lib/cpp/src/transport/TServerSocket.cc +++ b/lib/cpp/src/transport/TServerSocket.cc @@ -4,9 +4,12 @@ #include "transport/TSocket.h" #include "transport/TServerSocket.h" +#include namespace facebook { namespace thrift { namespace transport { +using namespace boost; + TServerSocket::TServerSocket(int port) : port_(port), serverSocket_(0), acceptBacklog_(1024) {} @@ -64,7 +67,7 @@ void TServerSocket::listen() { // The socket is now listening! } -TTransport* TServerSocket::acceptImpl() { +shared_ptr TServerSocket::acceptImpl() { if (serverSocket_ <= 0) { throw TTransportException(TTX_NOT_OPEN, "TServerSocket not listening"); } @@ -80,7 +83,7 @@ TTransport* TServerSocket::acceptImpl() { throw TTransportException(TTX_UNKNOWN, "ERROR:" + errno); } - return new TSocket(clientSocket); + return shared_ptr(new TSocket(clientSocket)); } void TServerSocket::close() { diff --git a/lib/cpp/src/transport/TServerSocket.h b/lib/cpp/src/transport/TServerSocket.h index c18a8d23..619a3669 100644 --- a/lib/cpp/src/transport/TServerSocket.h +++ b/lib/cpp/src/transport/TServerSocket.h @@ -1,7 +1,8 @@ #ifndef T_SERVER_SOCKET_H #define T_SERVER_SOCKET_H -#include "transport/TServerTransport.h" +#include +#include namespace facebook { namespace thrift { namespace transport { @@ -22,7 +23,7 @@ class TServerSocket : public TServerTransport { void close(); protected: - TTransport* acceptImpl(); + shared_ptr acceptImpl(); private: diff --git a/lib/cpp/src/transport/TServerTransport.h b/lib/cpp/src/transport/TServerTransport.h index 9bf74d1a..f51e88c9 100644 --- a/lib/cpp/src/transport/TServerTransport.h +++ b/lib/cpp/src/transport/TServerTransport.h @@ -3,9 +3,12 @@ #include "transport/TTransport.h" #include "transport/TTransportException.h" +#include namespace facebook { namespace thrift { namespace transport { +using namespace boost; + /** * Server transport framework. A server needs to have some facility for * creating base transports to read/write from. @@ -34,8 +37,8 @@ class TServerTransport { * @return A new TTransport object * @throws TTransportException if there is an error */ - TTransport* accept() { - TTransport* result = acceptImpl(); + shared_ptr accept() { + shared_ptr result = acceptImpl(); if (result == NULL) { throw TTransportException("accept() may not return NULL"); } @@ -56,7 +59,7 @@ class TServerTransport { * @return A newly allocated TTransport object * @throw TTransportException If an error occurs */ - virtual TTransport* acceptImpl() = 0; + virtual shared_ptr acceptImpl() = 0; }; -- 2.17.1