From 8d7e1f62a85d91b159887eb406a5855bcbd6245e Mon Sep 17 00:00:00 2001 From: Mark Slee Date: Wed, 7 Jun 2006 06:48:56 +0000 Subject: [PATCH] Update Thrift CPP libraries to work with new generated source, change underlying buffers to use uint8_t* instead of std::string Summary: Major overhaul to the CPP libraries. Reviewed By: aditya Test Plan: Again, keep an eye out for the unit tests commit Notes: Initial perf tests show that Thrift is not only more robust than Pillar, but its implementation is actually around 10-20% faster. We can do about 10 RPC function calls with small data payloads in under 2ms. THAT IS FAST. THAT IS THRIFTY. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664714 13f79535-47bb-0310-9956-ffa450edef68 --- lib/cpp/Makefile | 16 +- lib/cpp/TDispatcher.h | 22 -- lib/cpp/TProcessor.h | 24 ++ lib/cpp/Thrift.h | 3 +- lib/cpp/client/TClient.h | 16 -- lib/cpp/client/TSimpleClient.cc | 44 ---- lib/cpp/client/TSimpleClient.h | 21 -- lib/cpp/protocol/TBinaryProtocol.cc | 308 ++++++++++++++++-------- lib/cpp/protocol/TBinaryProtocol.h | 131 ++++++++-- lib/cpp/protocol/TProtocol.h | 267 ++++++++++++++++---- lib/cpp/server/TServer.h | 8 +- lib/cpp/server/TSimpleServer.cc | 78 +++--- lib/cpp/server/TSimpleServer.h | 4 +- lib/cpp/transport/TBufferedTransport.cc | 60 +++++ lib/cpp/transport/TBufferedTransport.h | 75 ++++++ lib/cpp/transport/TNullTransport.h | 24 ++ lib/cpp/transport/TServerSocket.cc | 24 +- lib/cpp/transport/TServerSocket.h | 7 +- lib/cpp/transport/TServerTransport.h | 43 +++- lib/cpp/transport/TSocket.cc | 170 ++++++++----- lib/cpp/transport/TSocket.h | 79 +++++- lib/cpp/transport/TTransport.h | 86 ++++++- lib/cpp/transport/TTransportException.h | 63 +++++ 23 files changed, 1150 insertions(+), 423 deletions(-) delete mode 100644 lib/cpp/TDispatcher.h create mode 100644 lib/cpp/TProcessor.h delete mode 100644 lib/cpp/client/TClient.h delete mode 100644 lib/cpp/client/TSimpleClient.cc delete mode 100644 lib/cpp/client/TSimpleClient.h create mode 100644 lib/cpp/transport/TBufferedTransport.cc create mode 100644 lib/cpp/transport/TBufferedTransport.h create mode 100644 lib/cpp/transport/TNullTransport.h create mode 100644 lib/cpp/transport/TTransportException.h diff --git a/lib/cpp/Makefile b/lib/cpp/Makefile index 2045dbae..f77c493b 100644 --- a/lib/cpp/Makefile +++ b/lib/cpp/Makefile @@ -1,4 +1,9 @@ -# Makefile for Thrift C++ library. +# Makefile for Thrift C++ library. Generates a shared object that can be +# installed to /usr/local/lib +# +# TODO(mcslee): Add the ability to compile separate statis modules that can +# be compiled directly into Thrift applications instead of dynamic runtime +# loading of the full libs # # Author: # Mark Slee @@ -10,16 +15,17 @@ LD = g++ LDFL = -shared -Wall -I. -fPIC -Wl,-soname=libthrift.so # Source files -SRCS = client/TSimpleClient.cc \ - protocol/TBinaryProtocol.cc \ - server/TSimpleServer.cc \ +SRCS = protocol/TBinaryProtocol.cc \ + transport/TBufferedTransport.cc \ transport/TSocket.cc \ - transport/TServerSocket.cc + transport/TServerSocket.cc \ + server/TSimpleServer.cc # Linked library libthrift: $(LD) -o libthrift.so $(LDFL) $(SRCS) +# Clean it up clean: rm -f libthrift.so diff --git a/lib/cpp/TDispatcher.h b/lib/cpp/TDispatcher.h deleted file mode 100644 index f8ff8472..00000000 --- a/lib/cpp/TDispatcher.h +++ /dev/null @@ -1,22 +0,0 @@ -#ifndef T_DISPATCHER_H -#define T_DISPATCHER_H - -#include - -/** - * A dispatcher is a generic object that accepts an input buffer and returns - * a buffer. It can be used in a variety of ways, i.e. as a client that - * sends data over the network and returns a response, or as a server that - * reads an input and returns an output. - * - * @author Mark Slee - */ -class TDispatcher { - public: - virtual ~TDispatcher() {}; - virtual std::string dispatch(const std::string& s) = 0; - protected: - TDispatcher() {} -}; - -#endif diff --git a/lib/cpp/TProcessor.h b/lib/cpp/TProcessor.h new file mode 100644 index 00000000..f01379dc --- /dev/null +++ b/lib/cpp/TProcessor.h @@ -0,0 +1,24 @@ +#ifndef T_PROCESSOR_H +#define T_PROCESSOR_H + +#include +#include "transport/TTransport.h" + +/** + * A processor is a generic object that acts upon two streams of data, one + * an input and the other an output. The definition of this object is loose, + * though the typical case is for some sort of server that either generates + * responses to an input stream or forwards data from one pipe onto another. + * + * @author Mark Slee + */ +class TProcessor { + public: + virtual ~TProcessor() {} + virtual bool process(TTransport* in, TTransport *out) = 0; + virtual bool process(TTransport* io) { return process(io, io); } + protected: + TProcessor() {} +}; + +#endif diff --git a/lib/cpp/Thrift.h b/lib/cpp/Thrift.h index 04fbaa1f..9986e3bd 100644 --- a/lib/cpp/Thrift.h +++ b/lib/cpp/Thrift.h @@ -1,7 +1,8 @@ #ifndef THRIFT_H #define THRIFT_H -#include +#include +#include #include #include #include diff --git a/lib/cpp/client/TClient.h b/lib/cpp/client/TClient.h deleted file mode 100644 index 73dd0938..00000000 --- a/lib/cpp/client/TClient.h +++ /dev/null @@ -1,16 +0,0 @@ -#ifndef T_CLIENT_H -#define T_CLIENT_H - -#include "TDispatcher.h" - -class TClient : public TDispatcher { - public: - virtual ~TClient() {} - virtual bool open() = 0; - virtual void close() = 0; - protected: - TClient() {} -}; - -#endif - diff --git a/lib/cpp/client/TSimpleClient.cc b/lib/cpp/client/TSimpleClient.cc deleted file mode 100644 index 9069c91b..00000000 --- a/lib/cpp/client/TSimpleClient.cc +++ /dev/null @@ -1,44 +0,0 @@ -#include "TSimpleClient.h" -using std::string; - -TSimpleClient::TSimpleClient(TTransport* transport) : - transport_(transport) {} - -bool TSimpleClient::open() { - return transport_->open(); -} - -void TSimpleClient::close() { - transport_->close(); -} - -std::string TSimpleClient::dispatch(const string& s) { - // Write size header - int32_t size = s.size(); - // fprintf(stderr, "Writing size header %d to server\n", size); - transport_->write(string((char*)&size, 4)); - - // Write data payload - // fprintf(stderr, "Writing %d byte payload to server\n", (int)s.size()); - transport_->write(s); - - // Read response size - // fprintf(stderr, "Reading 4-byte response size header\n"); - string response; - transport_->read(response, 4); - size = *(int32_t*)response.data(); - - // Read response data - if (size < 0) { - // TODO(mcslee): Handle exception - // fprintf(stderr, "Exception case! Response size < 0\n"); - return ""; - } else { - // fprintf(stderr, "Reading %d byte response payload\n", size); - transport_->read(response, size); - // TODO(mcslee): Check that we actually read enough data - // fprintf(stderr, "Done reading payload, returning.\n"); - return response; - } -} - diff --git a/lib/cpp/client/TSimpleClient.h b/lib/cpp/client/TSimpleClient.h deleted file mode 100644 index 249afe51..00000000 --- a/lib/cpp/client/TSimpleClient.h +++ /dev/null @@ -1,21 +0,0 @@ -#ifndef T_SIMPLE_CLIENT_H -#define T_SIMPLE_CLIENT_H - -#include "client/TClient.h" -#include "transport/TTransport.h" - -class TSimpleClient : public TClient { - public: - TSimpleClient(TTransport* transport); - ~TSimpleClient() {} - - bool open(); - void close(); - std::string dispatch(const std::string& in); - - protected: - TTransport* transport_; -}; - -#endif - diff --git a/lib/cpp/protocol/TBinaryProtocol.cc b/lib/cpp/protocol/TBinaryProtocol.cc index 4c10babd..1f31c8dd 100644 --- a/lib/cpp/protocol/TBinaryProtocol.cc +++ b/lib/cpp/protocol/TBinaryProtocol.cc @@ -1,140 +1,246 @@ #include "protocol/TBinaryProtocol.h" -using namespace std; +using std::string; -string TBinaryProtocol::readFunction(TBuf& buf) const { - // Let readString increment the buffer position - return readString(buf); +uint32_t TBinaryProtocol::writeStructBegin(TTransport* out, + const string& name) const { + return 0; } -string TBinaryProtocol::writeFunction(const string& name, - const string& args) const{ - return writeString(name) + args; +uint32_t TBinaryProtocol::writeStructEnd(TTransport* out) const { + return 0; } -map TBinaryProtocol::readStruct(TBuf& buf) const { - map fieldMap; - - if (buf.len < 4) { - return fieldMap; - } - uint32_t total_size = readU32(buf); - if (buf.len < total_size) { - // Data looks corrupt, we don't have that much, we will try to read what - // we can but be sure not to go over - total_size = buf.len; - } +uint32_t TBinaryProtocol::writeFieldBegin(TTransport* out, + const string& name, + const TType fieldType, + const uint16_t fieldId) const { + return + writeByte(out, (uint8_t)fieldType) + + writeU32(out, (uint32_t)fieldId); +} - // Field headers are 8 bytes, 4 byte fid + 4 byte length - while (total_size > 0 && buf.len > 8) { - uint32_t fid = readU32(buf); - uint32_t flen = readU32(buf); - if (flen > buf.len) { - // flen corrupt, there isn't that much data left - break; - } - fieldMap.insert(make_pair(fid, TBuf(buf.data, flen))); - buf.data += flen; - buf.len -= flen; - total_size -= 8 + flen; - } +uint32_t TBinaryProtocol::writeFieldEnd(TTransport* out) const { + return 0; +} - return fieldMap; +uint32_t TBinaryProtocol::writeFieldStop(TTransport* out) const { + return + writeByte(out, (uint8_t)T_STOP); +} + +uint32_t TBinaryProtocol::writeMapBegin(TTransport* out, + const TType keyType, + const TType valType, + const uint32_t size) const { + return + writeByte(out, (uint8_t)keyType) + + writeByte(out, (uint8_t)valType) + + writeU32(out, size); } -string TBinaryProtocol::writeStruct(const map& s) const { - string result = ""; - map::const_iterator s_iter; - for (s_iter = s.begin(); s_iter != s.end(); ++s_iter) { - result += writeU32(s_iter->first); - result += writeU32(s_iter->second.size()); - result += s_iter->second; - } - return writeU32(result.size()) + result; +uint32_t TBinaryProtocol::writeMapEnd(TTransport* out) const { + return 0; } -string TBinaryProtocol::readString(TBuf& buf) const { - uint32_t len = readU32(buf); - if (len == 0) { - return ""; - } - string result((const char*)(buf.data), len); - buf.data += len; - buf.len -= len; - return result; +uint32_t TBinaryProtocol::writeListBegin(TTransport* out, + const TType elemType, + const uint32_t size) const { + return + writeByte(out, (uint8_t) elemType) + + writeU32(out, size); } -uint8_t TBinaryProtocol::readByte(TBuf& buf) const { - if (buf.len == 0) { - return 0; - } - uint8_t result = (uint8_t)buf.data[0]; - buf.data += 1; - buf.len -= 1; - return result; +uint32_t TBinaryProtocol::writeListEnd(TTransport* out) const { + return 0; +} + +uint32_t TBinaryProtocol::writeSetBegin(TTransport* out, + const TType elemType, + const uint32_t size) const { + return + writeByte(out, (uint8_t)elemType) + + writeU32(out, size); +} + +uint32_t TBinaryProtocol::writeSetEnd(TTransport* out) const { + return 0; +} + +uint32_t TBinaryProtocol::writeByte(TTransport* out, + const uint8_t byte) const { + out->write(&byte, 1); + return 1; +} + +uint32_t TBinaryProtocol::writeU32(TTransport* out, + const uint32_t u32) const { + uint32_t net = (uint32_t)htonl(u32); + out->write((uint8_t*)&net, 4); + return 4; +} + +uint32_t TBinaryProtocol::writeI32(TTransport* out, + const int32_t i32) const { + int32_t net = (int32_t)htonl(i32); + out->write((uint8_t*)&net, 4); + return 4; } -uint32_t TBinaryProtocol::readU32(TBuf& buf) const { - if (buf.len < 4) { - return 0; +uint32_t TBinaryProtocol::writeU64(TTransport* out, + const uint64_t u64) const { + uint64_t net = (uint64_t)htonll(u64); + out->write((uint8_t*)&net, 8); + return 8; +} + +uint32_t TBinaryProtocol::writeI64(TTransport* out, + const int64_t i64) const { + int64_t net = (int64_t)htonll(i64); + out->write((uint8_t*)&net, 8); + return 8; +} + +uint32_t TBinaryProtocol::writeString(TTransport* out, + const string& str) const { + uint32_t result = writeU32(out, str.size()); + out->write((uint8_t*)str.data(), str.size()); + return result + str.size(); +} + +/** + * Reading functions + */ + +uint32_t TBinaryProtocol::readStructBegin(TTransport* in, + string& name) const { + name = ""; + return 0; +} + +uint32_t TBinaryProtocol::readStructEnd(TTransport* in) const { + return 0; +} + +uint32_t TBinaryProtocol::readFieldBegin(TTransport* in, + string& name, + TType& fieldType, + uint16_t& fieldId) const { + uint32_t result = 0; + uint8_t type; + result += readByte(in, type); + fieldType = (TType)type; + if (fieldType == T_STOP) { + fieldId = 0; + return result; } - uint32_t result = *(uint32_t*)buf.data; - buf.data += 4; - buf.len -= 4; + uint32_t id; + result += readU32(in, id); + fieldId = (uint16_t)id; + return result; +} + +uint32_t TBinaryProtocol::readFieldEnd(TTransport* in) const { + return 0; +} + +uint32_t TBinaryProtocol::readMapBegin(TTransport* in, + TType& keyType, + TType& valType, + uint32_t& size) const { + uint8_t k, v; + uint32_t result = 0; + result += readByte(in, k); + keyType = (TType)k; + result += readByte(in, v); + valType = (TType)v; + result += readU32(in, size); return result; } -int32_t TBinaryProtocol::readI32(TBuf& buf) const { - if (buf.len < 4) { - return 0; - } - int32_t result = *(int32_t*)buf.data; - buf.data += 4; - buf.len -= 4; - return result; +uint32_t TBinaryProtocol::readMapEnd(TTransport* in) const { + return 0; } -uint64_t TBinaryProtocol::readU64(TBuf& buf) const { - if (buf.len < 8) { - return 0; - } - uint64_t result = *(uint64_t*)buf.data; - buf.data += 8; - buf.len -= 8; +uint32_t TBinaryProtocol::readListBegin(TTransport* in, + TType& elemType, + uint32_t& size) const { + uint8_t e; + uint32_t result = 0; + result += readByte(in, e); + elemType = (TType)e; + result += readU32(in, size); return result; } -int64_t TBinaryProtocol::readI64(TBuf& buf) const { - if (buf.len < 8) { - return 0; - } - int64_t result = *(int64_t*)buf.data; - buf.data += 8; - buf.len -= 8; +uint32_t TBinaryProtocol::readListEnd(TTransport* in) const { + return 0; +} + +uint32_t TBinaryProtocol::readSetBegin(TTransport* in, + TType& elemType, + uint32_t& size) const { + uint8_t e; + uint32_t result = 0; + result += readByte(in, e); + elemType = (TType)e; + result += readU32(in, size); return result; } -string TBinaryProtocol::writeString(const string& str) const { - uint32_t size = str.size(); - string result = string((const char*)&size, 4); - return result + str; +uint32_t TBinaryProtocol::readSetEnd(TTransport* in) const { + return 0; +} + +uint32_t TBinaryProtocol::readByte(TTransport* in, + uint8_t& byte) const { + uint8_t b[1]; + in->readAll(b, 1); + byte = *(uint8_t*)b; + return 1; } -string TBinaryProtocol::writeByte(const uint8_t byte) const { - return string((const char*)&byte, 1); +uint32_t TBinaryProtocol::readU32(TTransport* in, + uint32_t& u32) const { + uint8_t b[4]; + in->readAll(b, 4); + u32 = *(uint32_t*)b; + u32 = (uint32_t)ntohl(u32); + return 4; } -string TBinaryProtocol::writeU32(const uint32_t u32) const { - return string((const char*)&u32, 4); +uint32_t TBinaryProtocol::readI32(TTransport* in, + int32_t& i32) const { + uint8_t b[4]; + in->readAll(b, 4); + i32 = *(int32_t*)b; + i32 = (int32_t)ntohl(i32); + return 4; } -string TBinaryProtocol::writeI32(int32_t i32) const { - return string((const char*)&i32, 4); +uint32_t TBinaryProtocol::readU64(TTransport* in, + uint64_t& u64) const { + uint8_t b[8]; + in->readAll(b, 8); + u64 = *(uint64_t*)b; + u64 = (uint64_t)ntohll(u64); + return 8; } -string TBinaryProtocol::writeU64(uint64_t u64) const { - return string((const char*)&u64, 8); +uint32_t TBinaryProtocol::readI64(TTransport* in, + int64_t& i64) const { + uint8_t b[8]; + in->readAll(b, 8); + i64 = *(int64_t*)b; + i64 = (int64_t)ntohll(i64); + return 8; } -string TBinaryProtocol::writeI64(int64_t i64) const { - return string((const char*)&i64, 8); +uint32_t TBinaryProtocol::readString(TTransport* in, + string& str) const { + uint32_t size, result; + result = readU32(in, size); + uint8_t b[size]; + in->readAll(b, size); + str = string((char*)b, size); + return result+size; } diff --git a/lib/cpp/protocol/TBinaryProtocol.h b/lib/cpp/protocol/TBinaryProtocol.h index 976c3835..e05bd9fc 100644 --- a/lib/cpp/protocol/TBinaryProtocol.h +++ b/lib/cpp/protocol/TBinaryProtocol.h @@ -14,29 +14,114 @@ class TBinaryProtocol : public TProtocol { TBinaryProtocol() {} ~TBinaryProtocol() {} - std::string - readFunction(TBuf& buf) const; - std::string - writeFunction(const std::string& name, const std::string& args) const; - - std::map - readStruct(TBuf& buf) const; - std::string - writeStruct(const std::map& s) const; - - std::string readString (TBuf& buf) const; - uint8_t readByte (TBuf& buf) const; - uint32_t readU32 (TBuf& buf) const; - int32_t readI32 (TBuf& buf) const; - uint64_t readU64 (TBuf& buf) const; - int64_t readI64 (TBuf& buf) const; - - std::string writeString (const std::string& str) const; - std::string writeByte (const uint8_t byte) const; - std::string writeU32 (const uint32_t u32) const; - std::string writeI32 (const int32_t i32) const; - std::string writeU64 (const uint64_t u64) const; - std::string writeI64 (const int64_t i64) const; + /** + * Writing functions. + */ + + uint32_t writeStructBegin (TTransport* out, + const std::string& name) const; + + uint32_t writeStructEnd (TTransport* out) const; + + uint32_t writeFieldBegin (TTransport* out, + const std::string& name, + const TType fieldType, + const uint16_t fieldId) const; + + uint32_t writeFieldEnd (TTransport* out) const; + + uint32_t writeFieldStop (TTransport* out) const; + + uint32_t writeMapBegin (TTransport* out, + const TType keyType, + const TType valType, + const uint32_t size) const; + + uint32_t writeMapEnd (TTransport* out) const; + + uint32_t writeListBegin (TTransport* out, + const TType elemType, + const uint32_t size) const; + + uint32_t writeListEnd (TTransport* out) const; + + uint32_t writeSetBegin (TTransport* out, + const TType elemType, + const uint32_t size) const; + + uint32_t writeSetEnd (TTransport* out) const; + + uint32_t writeByte (TTransport* out, + const uint8_t byte) const; + + uint32_t writeU32 (TTransport* out, + const uint32_t u32) const; + + uint32_t writeI32 (TTransport* out, + const int32_t i32) const; + + uint32_t writeU64 (TTransport* out, + const uint64_t u64) const; + + uint32_t writeI64 (TTransport* out, + const int64_t i64) const; + + uint32_t writeString (TTransport* out, + const std::string& str) const; + + /** + * Reading functions + */ + + uint32_t readStructBegin (TTransport* in, + std::string& name) const; + + uint32_t readStructEnd (TTransport* in) const; + + uint32_t readFieldBegin (TTransport* in, + std::string& name, + TType& fieldType, + uint16_t& fieldId) const; + + uint32_t readFieldEnd (TTransport* in) const; + + uint32_t readMapBegin (TTransport* in, + TType& keyType, + TType& valType, + uint32_t& size) const; + + uint32_t readMapEnd (TTransport* in) const; + + uint32_t readListBegin (TTransport* in, + TType& elemType, + uint32_t& size) const; + + uint32_t readListEnd (TTransport* in) const; + + uint32_t readSetBegin (TTransport* in, + TType& elemType, + uint32_t& size) const; + + uint32_t readSetEnd (TTransport* in) const; + + uint32_t readByte (TTransport* in, + uint8_t& byte) const; + + uint32_t readU32 (TTransport* in, + uint32_t& u32) const; + + uint32_t readI32 (TTransport* in, + int32_t& i32) const; + + uint32_t readU64 (TTransport* in, + uint64_t& u64) const; + + uint32_t readI64 (TTransport* in, + int64_t& i64) const; + + uint32_t readString (TTransport* in, + std::string& str) const; + }; #endif diff --git a/lib/cpp/protocol/TProtocol.h b/lib/cpp/protocol/TProtocol.h index 1f2e0c88..fe2d6ef1 100644 --- a/lib/cpp/protocol/TProtocol.h +++ b/lib/cpp/protocol/TProtocol.h @@ -1,13 +1,41 @@ #ifndef T_PROTOCOL_H #define T_PROTOCOL_H +#include #include #include #include +#include "transport/TTransport.h" + +#define ntohll(x) (((uint64_t)(ntohl((int)((x << 32) >> 32))) << 32) | (uint32_t)ntohl(((int)(x >> 32)))) + +#define htonll(x) ntohll(x) + /** Forward declaration for TProtocol */ struct TBuf; +/** + * Enumerated definition of the types that the Thrift protocol supports. + * Take special note of the T_END type which is used specifically to mark + * the end of a sequence of fields. + */ +enum TType { + T_STOP = 1, + T_BYTE = 2, + T_U16 = 3, + T_I16 = 4, + T_U32 = 5, + T_I32 = 6, + T_U64 = 7, + T_I64 = 8, + T_STRING = 9, + T_STRUCT = 10, + T_MAP = 11, + T_SET = 12, + T_LIST = 13 +}; + /** * Abstract class for a thrift protocol driver. These are all the methods that * a protocol must implement. Essentially, there must be some way of reading @@ -25,64 +53,211 @@ class TProtocol { virtual ~TProtocol() {} /** - * Function call serialization. + * Writing functions. */ - virtual std::string - readFunction(TBuf& buf) const = 0; - virtual std::string - writeFunction(const std::string& name, const std::string& args) const = 0; + virtual uint32_t writeStructBegin (TTransport* out, + const std::string& name) const = 0; - /** - * Struct serialization. - */ + virtual uint32_t writeStructEnd (TTransport* out) const = 0; + + virtual uint32_t writeFieldBegin (TTransport* out, + const std::string& name, + const TType fieldType, + const uint16_t fieldId) const = 0; + + virtual uint32_t writeFieldEnd (TTransport* out) const = 0; + + virtual uint32_t writeFieldStop (TTransport* out) const = 0; + + virtual uint32_t writeMapBegin (TTransport* out, + const TType keyType, + const TType valType, + const uint32_t size) const = 0; + + virtual uint32_t writeMapEnd (TTransport* out) const = 0; + + virtual uint32_t writeListBegin (TTransport* out, + const TType elemType, + const uint32_t size) const = 0; + + virtual uint32_t writeListEnd (TTransport* out) const = 0; + + virtual uint32_t writeSetBegin (TTransport* out, + const TType elemType, + const uint32_t size) const = 0; + + virtual uint32_t writeSetEnd (TTransport* out) const = 0; + + virtual uint32_t writeByte (TTransport* out, + const uint8_t byte) const = 0; + + virtual uint32_t writeU32 (TTransport* out, + const uint32_t u32) const = 0; + + virtual uint32_t writeI32 (TTransport* out, + const int32_t i32) const = 0; + + virtual uint32_t writeU64 (TTransport* out, + const uint64_t u64) const = 0; - virtual std::map - readStruct(TBuf& buf) const = 0; - virtual std::string - writeStruct(const std::map& s) const = 0; + virtual uint32_t writeI64 (TTransport* out, + const int64_t i64) const = 0; + + virtual uint32_t writeString (TTransport* out, + const std::string& str) const = 0; /** - * Basic data type deserialization. Note that these read methods do not - * take a const reference to the TBuf object. They SHOULD change the TBuf - * object so that it reflects the buffer AFTER the basic data type has - * been consumed such that data may continue being read serially from the - * buffer. + * Reading functions */ - virtual std::string readString (TBuf& buf) const = 0; - virtual uint8_t readByte (TBuf& buf) const = 0; - virtual uint32_t readU32 (TBuf& buf) const = 0; - virtual int32_t readI32 (TBuf& buf) const = 0; - virtual uint64_t readU64 (TBuf& buf) const = 0; - virtual int64_t readI64 (TBuf& buf) const = 0; + virtual uint32_t readStructBegin (TTransport* in, + std::string& name) const = 0; + + virtual uint32_t readStructEnd (TTransport* in) const = 0; + + virtual uint32_t readFieldBegin (TTransport* in, + std::string& name, + TType& fieldType, + uint16_t& fieldId) const = 0; + + virtual uint32_t readFieldEnd (TTransport* in) const = 0; + + virtual uint32_t readMapBegin (TTransport* in, + TType& keyType, + TType& valType, + uint32_t& size) const = 0; + + virtual uint32_t readMapEnd (TTransport* in) const = 0; + + virtual uint32_t readListBegin (TTransport* in, + TType& elemType, + uint32_t& size) const = 0; + + virtual uint32_t readListEnd (TTransport* in) const = 0; + + virtual uint32_t readSetBegin (TTransport* in, + TType& elemType, + uint32_t& size) const = 0; + + virtual uint32_t readSetEnd (TTransport* in) const = 0; + + virtual uint32_t readByte (TTransport* in, + uint8_t& byte) const = 0; + + virtual uint32_t readU32 (TTransport* in, + uint32_t& u32) const = 0; - virtual std::string writeString (const std::string& str) const = 0; - virtual std::string writeByte (const uint8_t byte) const = 0; - virtual std::string writeU32 (const uint32_t u32) const = 0; - virtual std::string writeI32 (const int32_t i32) const = 0; - virtual std::string writeU64 (const uint64_t u64) const = 0; - virtual std::string writeI64 (const int64_t i64) const = 0; + virtual uint32_t readI32 (TTransport* in, + int32_t& i32) const = 0; + + virtual uint32_t readU64 (TTransport* in, + uint64_t& u64) const = 0; + + virtual uint32_t readI64 (TTransport* in, + int64_t& i64) const = 0; + + virtual uint32_t readString (TTransport* in, + std::string& str) const = 0; + + /** + * Method to arbitrarily skip over data. + */ + uint32_t skip(TTransport* in, TType type) const { + switch (type) { + case T_BYTE: + { + uint8_t byte; + return readByte(in, byte); + } + case T_U32: + { + uint32_t u32; + return readU32(in, u32); + } + case T_I32: + { + int32_t i32; + return readI32(in, i32); + } + case T_U64: + { + uint64_t u64; + return readU64(in, u64); + } + case T_I64: + { + int64_t i64; + return readI64(in, i64); + } + case T_STRING: + { + std::string str; + return readString(in, str); + } + case T_STRUCT: + { + uint32_t result = 0; + std::string name; + uint16_t fid; + TType ftype; + result += readStructBegin(in, name); + while (true) { + result += readFieldBegin(in, name, ftype, fid); + if (ftype == T_STOP) { + break; + } + result += skip(in, ftype); + result += readFieldEnd(in); + } + result += readStructEnd(in); + return result; + } + case T_MAP: + { + uint32_t result = 0; + TType keyType; + TType valType; + uint32_t i, size; + result += readMapBegin(in, keyType, valType, size); + for (i = 0; i < size; i++) { + result += skip(in, keyType); + result += skip(in, valType); + } + result += readMapEnd(in); + return result; + } + case T_SET: + { + uint32_t result = 0; + TType elemType; + uint32_t i, size; + result += readSetBegin(in, elemType, size); + for (i = 0; i < size; i++) { + result += skip(in, elemType); + } + result += readSetEnd(in); + return result; + } + case T_LIST: + { + uint32_t result = 0; + TType elemType; + uint32_t i, size; + result += readListBegin(in, elemType, size); + for (i = 0; i < size; i++) { + result += skip(in, elemType); + } + result += readListEnd(in); + return result; + } + default: + return 0; + } + } protected: TProtocol() {} }; -/** - * Wrapper around raw data that allows us to track the length of a data - * buffer. It is the responsibility of a robust TProtocol implementation - * to ensure that any reads that are done from data do NOT overrun the - * memory address at data+len. It is also a convention that TBuf objects - * do NOT own the memory pointed to by data. They are merely wrappers - * around buffers that have been allocated elsewhere. Therefore, the user - * should never allocate memory before putting it into a TBuf nor should - * they free the data pointed to by a TBuf. - */ -struct TBuf { - TBuf(const TBuf& that) : data(that.data), len(that.len) {} - TBuf(const uint8_t* d, uint32_t l) : data(d), len(l) {} - const uint8_t* data; - uint32_t len; -}; - #endif diff --git a/lib/cpp/server/TServer.h b/lib/cpp/server/TServer.h index 9c4cc59e..0d275ba3 100644 --- a/lib/cpp/server/TServer.h +++ b/lib/cpp/server/TServer.h @@ -1,7 +1,7 @@ #ifndef T_SERVER_H #define T_SERVER_H -#include "TDispatcher.h" +#include "TProcessor.h" class TServerOptions; @@ -16,10 +16,10 @@ class TServer { virtual void run() = 0; protected: - TServer(TDispatcher* dispatcher, TServerOptions* options) : - dispatcher_(dispatcher), options_(options) {} + TServer(TProcessor* processor, TServerOptions* options) : + processor_(processor), options_(options) {} - TDispatcher* dispatcher_; + TProcessor* processor_; TServerOptions* options_; }; diff --git a/lib/cpp/server/TSimpleServer.cc b/lib/cpp/server/TSimpleServer.cc index 16f5006b..03069ae0 100644 --- a/lib/cpp/server/TSimpleServer.cc +++ b/lib/cpp/server/TSimpleServer.cc @@ -1,60 +1,54 @@ #include "server/TSimpleServer.h" +#include "transport/TBufferedTransport.h" +#include "transport/TTransportException.h" #include +#include using namespace std; +/** + * A simple single-threaded application server. Perfect for unit tests! + * + * @author Mark Slee + */ void TSimpleServer::run() { - TTransport* client; + TTransport* client = NULL; - // Start the server listening - if (serverTransport_->listen() == false) { - // TODO(mcslee): Log error here - fprintf(stderr, "TSimpleServer::run(): Call to listen failed\n"); + try { + // Start the server listening + serverTransport_->listen(); + } catch (TTransportException& ttx) { + cerr << "TSimpleServer::run() listen(): " << ttx.getMessage() << endl; return; } // Fetch client from server while (true) { - // fprintf(stderr, "Listening for connection\n"); - if ((client = serverTransport_->accept()) == NULL) { - // fprintf(stderr, "Got NULL connection, exiting.\n"); - break; - } - - while (true) { - // Read header from client - // fprintf(stderr, "Reading 4 byte header from client.\n"); - string in; - if (client->read(in, 4) <= 0) { - // fprintf(stderr, "Size header negative. Exception!\n"); - break; + try { + client = serverTransport_->accept(); + if (client != NULL) { + // Process for as long as we can keep the processor happy! + TBufferedTransport bufferedClient(client); + while (processor_->process(&bufferedClient)) {} } - - // Read payload from client - int32_t size = *(int32_t*)(in.data()); - // fprintf(stderr, "Reading %d byte payload from client.\n", size); - if (client->read(in, size) < size) { - // fprintf(stderr, "Didn't get enough data!!!\n"); - break; + } catch (TTransportException& ttx) { + if (client != NULL) { + cerr << "TSimpleServer client died: " << ttx.getMessage() << endl; } - - // Pass payload to dispatcher - // TODO(mcslee): Wrap this in try/catch and return exceptions - string out = dispatcher_->dispatch(in); - - size = out.size(); - - // Write size of response packet - client->write(string((char*)&size, 4)); - - // Write response payload - client->write(out); } - // Clean up that client - // fprintf(stderr, "Closing and cleaning up client\n"); - client->close(); - delete client; + // Clean up the client + if (client != NULL) { + + // Ensure no resource leaks + client->close(); + + // Ensure no memory leaks + delete client; + + // Ensure we don't try to double-free on the next pass + client = NULL; + } } - // TODO(mcslee): Is this a timeout case or the real thing? + // TODO(mcslee): Could this be a timeout case? Or always the real thing? } diff --git a/lib/cpp/server/TSimpleServer.h b/lib/cpp/server/TSimpleServer.h index 47ab69e8..9e0f79f9 100644 --- a/lib/cpp/server/TSimpleServer.h +++ b/lib/cpp/server/TSimpleServer.h @@ -14,10 +14,10 @@ */ class TSimpleServer : public TServer { public: - TSimpleServer(TDispatcher* dispatcher, + TSimpleServer(TProcessor* processor, TServerOptions* options, TServerTransport* serverTransport) : - TServer(dispatcher, options), serverTransport_(serverTransport) {} + TServer(processor, options), serverTransport_(serverTransport) {} ~TSimpleServer() {} diff --git a/lib/cpp/transport/TBufferedTransport.cc b/lib/cpp/transport/TBufferedTransport.cc new file mode 100644 index 00000000..3fccc580 --- /dev/null +++ b/lib/cpp/transport/TBufferedTransport.cc @@ -0,0 +1,60 @@ +#include "TBufferedTransport.h" +using std::string; + +uint32_t TBufferedTransport::read(uint8_t* buf, uint32_t len) { + uint32_t need = len; + + // We don't have enough data yet + if (rLen_-rPos_ < need) { + // Copy out whatever we have + if (rLen_ > 0) { + memcpy(buf, rBuf_+rPos_, rLen_-rPos_); + need -= rLen_-rPos_; + buf += rLen_-rPos_; + } + // Get more from underlying transport up to buffer size + rLen_ = transport_->read(rBuf_, rBufSize_); + rPos_ = 0; + } + + // Hand over whatever we have + uint32_t give = need; + if (rLen_-rPos_ < give) { + give = rLen_-rPos_; + } + memcpy(buf, rBuf_+rPos_, give); + rPos_ += give; + need -= give; + return (len - need); +} + +void TBufferedTransport::write(const uint8_t* buf, uint32_t len) { + if (len == 0) { + return; + } + + if (len + wLen_ >= wBufSize_) { + uint32_t copy = wBufSize_ - wLen_; + memcpy(wBuf_ + wLen_, buf, copy); + transport_->write(wBuf_, wBufSize_); + + wLen_ = len - copy; + if (wLen_ > 0) { + memcpy(wBuf_, buf+copy, wLen_); + } + } else { + memcpy(wBuf_+wLen_, buf, len); + wLen_ += len; + } +} + +void TBufferedTransport::flush() { + // Write out any data waiting in the write buffer + if (wLen_ > 0) { + transport_->write(wBuf_, wLen_); + wLen_ = 0; + } + + // Flush the underlying transport + transport_->flush(); +} diff --git a/lib/cpp/transport/TBufferedTransport.h b/lib/cpp/transport/TBufferedTransport.h new file mode 100644 index 00000000..991b50c8 --- /dev/null +++ b/lib/cpp/transport/TBufferedTransport.h @@ -0,0 +1,75 @@ +#ifndef T_BUFFERED_TRANSPORT_H +#define T_BUFFERED_TRANSPORT_H + +#include "transport/TTransport.h" +#include + +/** + * Buffered transport. For reads it will read more data than is requested + * and will serve future data out of a local buffer. For writes, data is + * stored to an in memory buffer before being written out. + * + * @author Mark Slee + */ +class TBufferedTransport : public TTransport { + public: + TBufferedTransport(TTransport* transport) : + transport_(transport), + rBufSize_(512), rPos_(0), rLen_(0), + wBufSize_(512), wLen_(0) { + rBuf_ = new uint8_t[rBufSize_]; + wBuf_ = new uint8_t[wBufSize_]; + } + + TBufferedTransport(TTransport* transport, uint32_t sz) : + transport_(transport), + rBufSize_(sz), rPos_(0), rLen_(0), + wBufSize_(sz), wLen_(0) { + rBuf_ = new uint8_t[rBufSize_]; + wBuf_ = new uint8_t[wBufSize_]; + } + + TBufferedTransport(TTransport* transport, uint32_t rsz, uint32_t wsz) : + transport_(transport), + rBufSize_(rsz), rPos_(0), rLen_(0), + wBufSize_(wsz), wLen_(0) { + rBuf_ = new uint8_t[rBufSize_]; + wBuf_ = new uint8_t[wBufSize_]; + } + + ~TBufferedTransport() { + delete [] rBuf_; + delete [] wBuf_; + } + + bool isOpen() { + return transport_->isOpen(); + } + + void open() { + transport_->open(); + } + + void close() { + transport_->close(); + } + + uint32_t read(uint8_t* buf, uint32_t len); + + void write(const uint8_t* buf, uint32_t len); + + void flush(); + + protected: + TTransport* transport_; + uint8_t* rBuf_; + uint32_t rBufSize_; + uint32_t rPos_; + uint32_t rLen_; + + uint8_t* wBuf_; + uint32_t wBufSize_; + uint32_t wLen_; +}; + +#endif diff --git a/lib/cpp/transport/TNullTransport.h b/lib/cpp/transport/TNullTransport.h new file mode 100644 index 00000000..9562d9fc --- /dev/null +++ b/lib/cpp/transport/TNullTransport.h @@ -0,0 +1,24 @@ +#ifndef T_NULL_TRANSPORT +#define T_NULL_TRANSPORT + +#include "transport/TTransport.h" + +/** + * The null transport is a dummy transport that doesn't actually do anything. + * It's sort of an analogy to /dev/null, you can never read anything from it + * and it will let you write anything you want to it, though it won't actually + * go anywhere. + * + * @author Mark Slee + */ +class TNullTransport : public TTransport { + public: + TNullTransport() {} + ~TNullTransport() {} + + bool isOpen() { return true; } + void open() { } + void write(const std::string& s) {} +}; + +#endif diff --git a/lib/cpp/transport/TServerSocket.cc b/lib/cpp/transport/TServerSocket.cc index 178de812..1cf4a320 100644 --- a/lib/cpp/transport/TServerSocket.cc +++ b/lib/cpp/transport/TServerSocket.cc @@ -1,5 +1,6 @@ #include #include +#include #include "transport/TSocket.h" #include "transport/TServerSocket.h" @@ -11,11 +12,12 @@ TServerSocket::~TServerSocket() { close(); } -bool TServerSocket::listen() { +void TServerSocket::listen() { serverSocket_ = socket(AF_INET, SOCK_STREAM, 0); if (serverSocket_ == -1) { + perror("TServerSocket::listen() socket"); close(); - return false; + throw TTransportException(TTX_NOT_OPEN, "Could not create server socket."); } // Set reusaddress to prevent 2MSL delay on accept @@ -24,16 +26,16 @@ bool TServerSocket::listen() { &one, sizeof(one))) { perror("TServerSocket::listen() SO_REUSEADDR"); close(); - return false; + throw TTransportException(TTX_NOT_OPEN, "Could not set SO_REUSEADDR"); } // Turn linger off, don't want to block on calls to close struct linger ling = {0, 0}; if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling))) { - perror("TServerSocket::listen() SO_LINGER"); close(); - return false; + perror("TServerSocket::listen() SO_LINGER"); + throw TTransportException(TTX_NOT_OPEN, "Could not set SO_LINGER"); } // Bind to a port @@ -47,24 +49,22 @@ bool TServerSocket::listen() { sprintf(errbuf, "TServerSocket::listen() BIND %d", port_); perror(errbuf); close(); - return false; + throw TTransportException(TTX_NOT_OPEN, "Could not bind"); } // Call listen if (-1 == ::listen(serverSocket_, acceptBacklog_)) { perror("TServerSocket::listen() LISTEN"); close(); - return false; + throw TTransportException(TTX_NOT_OPEN, "Could not listen"); } // The socket is now listening! - return true; } -TTransport* TServerSocket::accept() { +TTransport* TServerSocket::acceptImpl() { if (serverSocket_ <= 0) { - // TODO(mcslee): Log error with common logging tool - return NULL; + throw TTransportException(TTX_NOT_OPEN, "TServerSocket not listening"); } struct sockaddr_in clientAddress; @@ -75,7 +75,7 @@ TTransport* TServerSocket::accept() { if (clientSocket <= 0) { perror("TServerSocket::accept()"); - return NULL; + throw TTransportException(TTX_UNKNOWN, "ERROR:" + errno); } return new TSocket(clientSocket); diff --git a/lib/cpp/transport/TServerSocket.h b/lib/cpp/transport/TServerSocket.h index 8ded4e20..ca30a034 100644 --- a/lib/cpp/transport/TServerSocket.h +++ b/lib/cpp/transport/TServerSocket.h @@ -16,11 +16,14 @@ class TServerSocket : public TServerTransport { TServerSocket(int port); ~TServerSocket(); - bool listen(); - TTransport* accept(); + void listen(); void close(); + protected: + TTransport* acceptImpl(); + private: + int port_; int serverSocket_; int acceptBacklog_; diff --git a/lib/cpp/transport/TServerTransport.h b/lib/cpp/transport/TServerTransport.h index 4d063fcb..9d71539d 100644 --- a/lib/cpp/transport/TServerTransport.h +++ b/lib/cpp/transport/TServerTransport.h @@ -1,7 +1,8 @@ #ifndef T_SERVER_TRANSPORT_H #define T_SERVER_TRANSPORT_H -#include "TTransport.h" +#include "transport/TTransport.h" +#include "transport/TTransportException.h" /** * Server transport framework. A server needs to have some facility for @@ -13,12 +14,48 @@ class TServerTransport { public: virtual ~TServerTransport() {} - virtual bool listen() = 0; - virtual TTransport* accept() = 0; + /** + * Starts the server transport listening for new connections. Prior to this + * call most transports will not return anything when accept is called. + * + * @throws TTransportException if we were unable to listen + */ + virtual void listen() {} + + /** + * Gets a new dynamically allocated transport object and passes it to the + * caller. Note that it is the explicit duty of the caller to free the + * allocated object. The returned TTransport object must always be in the + * opened state. NULL should never be returned, instead an Exception should + * always be thrown. + * + * @return A new TTransport object + * @throws TTransportException if there is an error + */ + TTransport* accept() { + TTransport* result = acceptImpl(); + if (result == NULL) { + throw TTransportException("accept() may not return NULL"); + } + return result; + } + + /** + * Closes this transport such that future calls to accept will do nothing. + */ virtual void close() = 0; protected: TServerTransport() {} + + /** + * Subclasses should implement this function for accept. + * + * @return A newly allocated TTransport object + * @throw TTransportException If an error occurs + */ + virtual TTransport* acceptImpl() = 0; + }; #endif diff --git a/lib/cpp/transport/TSocket.cc b/lib/cpp/transport/TSocket.cc index 1dfe431f..34717551 100644 --- a/lib/cpp/transport/TSocket.cc +++ b/lib/cpp/transport/TSocket.cc @@ -7,9 +7,18 @@ #include #include "transport/TSocket.h" +#include "transport/TTransportException.h" using namespace std; +uint32_t g_socket_syscalls = 0; + +/** + * TSocket implementation. + * + * @author Mark Slee + */ + // Mutex to protect syscalls to netdb pthread_mutex_t g_netdb_mutex = PTHREAD_MUTEX_INITIALIZER; @@ -27,15 +36,20 @@ TSocket::~TSocket() { close(); } -bool TSocket::open() { +bool TSocket::isOpen() { + return (socket_ > 0); +} + +void TSocket::open() { // Create socket socket_ = socket(AF_INET, SOCK_STREAM, 0); if (socket_ == -1) { - socket_ = 0; - return false; + perror("TSocket::open() socket"); + close(); + throw TTransportException(TTX_NOT_OPEN, "socket() ERROR:" + errno); } - // Lookup the host + // Lookup the hostname struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(port_); @@ -54,7 +68,7 @@ bool TSocket::open() { if (host_entry == NULL) { // perror("dns error: failed call to gethostbyname.\n"); close(); - return false; + throw TTransportException(TTX_NOT_OPEN, "gethostbyname() failed"); } addr.sin_port = htons(port_); @@ -70,10 +84,10 @@ bool TSocket::open() { if (ret < 0) { perror("TSocket::open() connect"); close(); - return false; + throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno); } - return true; + // Connection was successful } void TSocket::close() { @@ -84,97 +98,127 @@ void TSocket::close() { socket_ = 0; } -int TSocket::read(string& s, uint32_t len) { - char buff[len]; - s = ""; +uint32_t TSocket::read(uint8_t* buf, uint32_t len) { + if (socket_ <= 0) { + throw TTransportException(TTX_NOT_OPEN, "Called read on non-open socket"); + } - uint32_t have = 0; uint32_t retries = 0; - - while (have < len) { - try_again: - // Read from the socket - int got = recv(socket_, buff+have, len-have, 0); - - // Check for error on read - if (got < 0) { - perror("TSocket::read()"); - - // If temporarily out of resources, sleep a bit and try again - if (errno == EAGAIN && retries++ < MAX_RECV_RETRIES) { - usleep(50); - goto try_again; - } - - // If interrupted, try again - if (errno == EINTR && retries++ < MAX_RECV_RETRIES) { - goto try_again; - } - - // If we disconnect with no linger time - if (errno == ECONNRESET) { - return 0; - } - - return 0; + + try_again: + // Read from the socket + int got = recv(socket_, buf, len, 0); + ++g_socket_syscalls; + + // Check for error on read + if (got < 0) { + perror("TSocket::read()"); + + // If temporarily out of resources, sleep a bit and try again + if (errno == EAGAIN && retries++ < MAX_RECV_RETRIES) { + usleep(50); + goto try_again; + } + + // If interrupted, try again + if (errno == EINTR && retries++ < MAX_RECV_RETRIES) { + goto try_again; + } + + // If we disconnect with no linger time + if (errno == ECONNRESET) { + throw TTransportException(TTX_NOT_OPEN, "ECONNRESET"); + } + + // This ish isn't open + if (errno == ENOTCONN) { + throw TTransportException(TTX_NOT_OPEN, "ENOTCONN"); } - // Check for empty read - if (got == 0) { - return 0; + // Timed out! + if (errno == ETIMEDOUT) { + throw TTransportException(TTX_TIMED_OUT, "ETIMEDOUT"); } - // Update the count - have += (uint32_t) got; + // Some other error, whatevz + throw TTransportException(TTX_UNKNOWN, "ERROR:" + errno); + } + + // The remote host has closed the socket + if (got == 0) { + close(); + return 0; } // Pack data into string - s = string(buff, have); - return have; + return got; } -void TSocket::write(const string& s) { +void TSocket::write(const uint8_t* buf, uint32_t len) { + if (socket_ <= 0) { + throw TTransportException(TTX_NOT_OPEN, "Called write on non-open socket"); + } + uint32_t sent = 0; - while (sent < s.size()) { - int b = send(socket_, s.data() + sent, s.size() - sent, 0); - + while (sent < len) { + // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we + // check for the EPIPE return condition and close the socket in that case + int b = send(socket_, buf + sent, len - sent, MSG_NOSIGNAL); + ++g_socket_syscalls; + // Fail on a send error if (b < 0) { - // TODO(mcslee): Make the function return how many bytes it wrote or - // throw an exception - // throw_perror("send"); - return; + if (errno == EPIPE) { + close(); + throw TTransportException(TTX_NOT_OPEN, "EPIPE"); + } + + if (errno == ECONNRESET) { + close(); + throw TTransportException(TTX_NOT_OPEN, "ECONNRESET"); + } + + if (errno == ENOTCONN) { + close(); + throw TTransportException(TTX_NOT_OPEN, "ENOTCONN"); + } + + perror("TSocket::write() send < 0"); + throw TTransportException(TTX_UNKNOWN, "ERROR:" + errno); } // Fail on blocked send if (b == 0) { - // TODO(mcslee): Make the function return how many bytes it wrote or - // throw string("couldn't send data.\n"); - return; + throw TTransportException(TTX_NOT_OPEN, "Socket send returned 0."); } - sent += b; } } -bool TSocket::setLinger(bool on, int linger) { +void TSocket::setLinger(bool on, int linger) { + // TODO(mcslee): Store these options so they can be set pre-connect + if (socket_ <= 0) { + return; + } + struct linger ling = {(on ? 1 : 0), linger}; if (-1 == setsockopt(socket_, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling))) { close(); perror("TSocket::setLinger()"); - return false; } - return true; } -bool TSocket::setNoDelay(bool noDelay) { +void TSocket::setNoDelay(bool noDelay) { + // TODO(mcslee): Store these options so they can be set pre-connect + if (socket_ <= 0) { + return; + } + // Set socket to NODELAY int val = (noDelay ? 1 : 0); if (-1 == setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val))) { close(); perror("TSocket::setNoDelay()"); - return false; } - return true; } diff --git a/lib/cpp/transport/TSocket.h b/lib/cpp/transport/TSocket.h index 1da74c60..18abfa71 100644 --- a/lib/cpp/transport/TSocket.h +++ b/lib/cpp/transport/TSocket.h @@ -6,33 +6,94 @@ #include "transport/TTransport.h" #include "transport/TServerSocket.h" -class TSocketOptions; - /** * TCP Socket implementation of the TTransport interface. * * @author Mark Slee */ class TSocket : public TTransport { - friend TTransport* TServerSocket::accept(); + /** + * We allow the TServerSocket acceptImpl() method to access the private + * members of a socket so that it can access the TSocket(int socket) + * constructor which creates a socket object from the raw UNIX socket + * handle. + */ + friend class TServerSocket; public: + /** + * Constructs a new socket. Note that this does NOT actually connect the + * socket. + * + * @param host An IP address or hostname to connect to + * @param port The port to connect on + */ TSocket(std::string host, int port); + + /** + * Destroyes the socket object, closing it if necessary. + */ ~TSocket(); - bool open(); + /** + * Whether the socket is alive. + * + * @return Is the socket alive? + */ + bool isOpen(); + + /** + * Creates and opens the UNIX socket. + * + * @throws TTransportException If the socket could not connect + */ + void open(); + + /** + * Shuts down communications on the socket. + */ void close(); - int read (std::string &s, uint32_t size); - void write(const std::string& s); - bool setLinger(bool on, int linger); - bool setNoDelay(bool noDelay); + /** + * Reads from the underlying socket. + */ + uint32_t read(uint8_t* buf, uint32_t len); + + /** + * Writes to the underlying socket. + */ + void write(const uint8_t* buf, uint32_t len); + + /** + * Controls whether the linger option is set on the socket. + * + * @param on Whether SO_LINGER is on + * @param linger If linger is active, the number of seconds to linger for + */ + void setLinger(bool on, int linger); + + /** + * Whether to enable/disable Nagle's algorithm. + * + * @param noDelay Whether or not to disable the algorithm. + * @return + */ + void setNoDelay(bool noDelay); private: + /** + * Constructor to create socket from raw UNIX handle. Never called directly + * but used by the TServerSocket class. + */ TSocket(int socket); - TSocketOptions *options_; + + /** Host to connect to */ std::string host_; + + /** Port number to connect on */ int port_; + + /** Underlying UNIX socket handle */ int socket_; }; diff --git a/lib/cpp/transport/TTransport.h b/lib/cpp/transport/TTransport.h index a1f43d4b..fcaece77 100644 --- a/lib/cpp/transport/TTransport.h +++ b/lib/cpp/transport/TTransport.h @@ -2,24 +2,96 @@ #define T_TRANSPORT_H #include +#include "transport/TTransportException.h" /** - * Generic interface for a method of transporting data. + * Generic interface for a method of transporting data. A TTransport may be + * capable of either reading or writing, but not necessarily both. * * @author Mark Slee */ class TTransport { public: - virtual ~TTransport() {}; + /** + * Virtual deconstructor. + */ + virtual ~TTransport() {} - virtual bool open() = 0; - virtual void close() = 0; + /** + * Whether this transport is open. + */ + virtual bool isOpen() { return false; } - virtual int read (std::string& s, uint32_t size) = 0; - virtual void write(const std::string& s) = 0; + /** + * Opens the transport for communications. + * + * @return bool Whether the transport was successfully opened + * @throws TTransportException if opening failed + */ + virtual void open() { + throw TTransportException(TTX_NOT_OPEN, "Cannot open base TTransport."); + } + + /** + * Closes the transport. + */ + virtual void close() { + throw TTransportException(TTX_NOT_OPEN, "Cannot close base TTransport."); + } + + /** + * Attempt to read up to the specified number of bytes into the string. + * + * @param s Reference to the location to append the read data + * @param len How many bytes to read + * @return How many bytes were actually read + * @throws TTransportException If an error occurs + */ + virtual uint32_t read(uint8_t* buf, uint32_t len) { + throw TTransportException(TTX_NOT_OPEN, "Base TTransport cannot read."); + } + + /** + * Reads the given amount of data in its entirety no matter what. + * + * @param s Reference to location for read data + * @param len How many bytes to read + * @return How many bytes read, which must be equal to size + * @throws TTransportException If insufficient data was read + */ + virtual uint32_t readAll(uint8_t* buf, uint32_t len) { + uint32_t have = 0; + + while (have < len) { + have += read(buf+have, len-have); + } + + return have; + } + + /** + * Writes the string in its entirety to the buffer. + * + * @param s The string to write out + * @throws TTransportException if an error occurs + */ + virtual void write(const uint8_t* buf, uint32_t len) { + throw TTransportException(TTX_NOT_OPEN, "Base TTransport cannot write."); + } + + /** + * Flushes any pending data to be written. Typically used with buffered + * transport mechanisms. + * + * @throws TTransportException if an error occurs + */ + virtual void flush() {} protected: - TTransport() {}; + /** + * Simple constructor. + */ + TTransport() {} }; #endif diff --git a/lib/cpp/transport/TTransportException.h b/lib/cpp/transport/TTransportException.h new file mode 100644 index 00000000..044e16d6 --- /dev/null +++ b/lib/cpp/transport/TTransportException.h @@ -0,0 +1,63 @@ +#ifndef T_TRANSPORT_EXCEPTION_H +#define T_TRANSPORT_EXCEPTION_H + +#include + +/** + * Error codes for the various types of exceptions. + */ +enum TTransportExceptionType { + TTX_UNKNOWN = 0, + TTX_NOT_OPEN = 1, + TTX_TIMED_OUT = 2, +}; + +/** + * Class to encapsulate all the possible types of transport errors that may + * occur in various transport systems. This provides a sort of generic + * wrapper around the shitty UNIX E_ error codes that lets a common code + * base of error handling to be used for various types of transports, i.e. + * pipes etc. + * + * @author Mark Slee + */ +class TTransportException { + public: + TTransportException() : + type_(TTX_UNKNOWN), message_() {} + + TTransportException(TTransportExceptionType type) : + type_(type), message_() {} + + TTransportException(std::string message) : + type_(TTX_UNKNOWN), message_(message) {} + + TTransportException(TTransportExceptionType type, std::string message) : + type_(type), message_(message) {} + + ~TTransportException() {} + + /** + * Returns an error code that provides information about the type of error + * that has occurred. + * + * @return Error code + */ + TTransportExceptionType getType() { return type_; } + + /** + * Returns an informative message about what caused this error. + * + * @return Error string + */ + const std::string& getMessage() { return message_; } + + protected: + /** Error code */ + TTransportExceptionType type_; + + /** Description */ + std::string message_; +}; + +#endif -- 2.17.1