From: Roger Meier Date: Tue, 26 Mar 2013 21:14:03 +0000 (+0100) Subject: THRIFT-1902 C++: Support for Multiplexing Services on any Transport, Protocol and... X-Git-Tag: 0.9.1~152 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=3a931b50d906db8099d3819a4c8c89328bb92a61;p=common%2Fthrift.git THRIFT-1902 C++: Support for Multiplexing Services on any Transport, Protocol and Server Patch: Patrik Lindblom --- diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am index f40cdb12..05caf8cd 100755 --- a/lib/cpp/Makefile.am +++ b/lib/cpp/Makefile.am @@ -63,6 +63,7 @@ libthrift_la_SOURCES = src/thrift/Thrift.cpp \ src/thrift/protocol/TDenseProtocol.cpp \ src/thrift/protocol/TJSONProtocol.cpp \ src/thrift/protocol/TBase64Utils.cpp \ + src/thrift/protocol/TMultiplexedProtocol.cpp \ src/thrift/transport/TTransportException.cpp \ src/thrift/transport/TFDTransport.cpp \ src/thrift/transport/TFileTransport.cpp \ @@ -155,6 +156,8 @@ include_protocol_HEADERS = \ src/thrift/protocol/TDebugProtocol.h \ src/thrift/protocol/TBase64Utils.h \ src/thrift/protocol/TJSONProtocol.h \ + src/thrift/protocol/TMultiplexedProtocol.h \ + src/thrift/protocol/TProtocolDecorator.h \ src/thrift/protocol/TProtocolTap.h \ src/thrift/protocol/TProtocolException.h \ src/thrift/protocol/TVirtualProtocol.h \ @@ -195,7 +198,8 @@ include_server_HEADERS = \ include_processordir = $(include_thriftdir)/processor include_processor_HEADERS = \ src/thrift/processor/PeekProcessor.h \ - src/thrift/processor/StatsProcessor.h + src/thrift/processor/StatsProcessor.h \ + src/thrift/processor/TMultiplexedProcessor.h include_asyncdir = $(include_thriftdir)/async include_async_HEADERS = \ diff --git a/lib/cpp/src/thrift/processor/TMultiplexedProcessor.h b/lib/cpp/src/thrift/processor/TMultiplexedProcessor.h new file mode 100644 index 00000000..494ec10b --- /dev/null +++ b/lib/cpp/src/thrift/processor/TMultiplexedProcessor.h @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef THRIFT_TMULTIPLEXEDPROCESSOR_H_ +#define THRIFT_TMULTIPLEXEDPROCESSOR_H_ 1 + +#include +#include +#include +#include + +namespace apache +{ + namespace thrift + { + using boost::shared_ptr; + + namespace protocol { + + /** + * To be able to work with any protocol, we needed + * to allow them to call readMessageBegin() and get a TMessage in exactly + * the standard format, without the service name prepended to TMessage.name. + */ + class StoredMessageProtocol : public TProtocolDecorator + { + public: + StoredMessageProtocol( shared_ptr _protocol, + const std::string& _name, const TMessageType _type, + const int32_t _seqid) : + TProtocolDecorator(_protocol), + name(_name), + type(_type), + seqid(_seqid) + { + } + + uint32_t readMessageBegin_virt(std::string& _name, TMessageType& _type, int32_t& _seqid) + { + + _name = name; + _type = type; + _seqid = seqid; + + return 0; // (Normal TProtocol read functions return number of bytes read) + } + + std::string name; + TMessageType type; + int32_t seqid; + }; + } //namespace protocol + + /** + * TMultiplexedProcessor is a TProcessor allowing + * a single TServer to provide multiple services. + * + *

To do so, you instantiate the processor and then register additional + * processors with it, as shown in the following example:

+ * + *
+ * shared_ptr processor(new TMultiplexedProcessor()); + * + * processor->registerProcessor( + * "Calculator", + * shared_ptr( new CalculatorProcessor( + * shared_ptr( new CalculatorHandler())))); + * + * processor->registerProcessor( + * "WeatherReport", + * shared_ptr( new WeatherReportProcessor( + * shared_ptr( new WeatherReportHandler())))); + * + * shared_ptr transport(new TServerSocket(9090)); + * TSimpleServer server(processor, transport); + * + * server.serve(); + *
+ */ + class TMultiplexedProcessor : public TProcessor + { + public: + typedef std::map< std::string, shared_ptr > services_t; + + /** + * 'Register' a service with this TMultiplexedProcessor. This + * allows us to broker requests to individual services by using the service + * name to select them at request time. + * + * \param [in] serviceName Name of a service, has to be identical to the name + * declared in the Thrift IDL, e.g. "WeatherReport". + * \param [in] processor Implementation of a service, ususally referred to + * as "handlers", e.g. WeatherReportHandler, + * implementing WeatherReportIf interface. + */ + void registerProcessor( const std::string & serviceName, + shared_ptr processor ) + { + services[serviceName] = processor; + } + + /** + * This implementation of process performs the following steps: + * + *
    + *
  1. Read the beginning of the message.
  2. + *
  3. Extract the service name from the message.
  4. + *
  5. Using the service name to locate the appropriate processor.
  6. + *
  7. Dispatch to the processor, with a decorated instance of TProtocol + * that allows readMessageBegin() to return the original TMessage.
  8. + *
+ * + * \throws TException If the message type is not T_CALL or T_ONEWAY, if + * the service name was not found in the message, or if the service + * name was not found in the service map. + */ + bool process( shared_ptr in, + shared_ptr out, + void *connectionContext) + { + std::string name; + protocol::TMessageType type; + int32_t seqid; + + // Use the actual underlying protocol (e.g. TBinaryProtocol) to read the + // message header. This pulls the message "off the wire", which we'll + // deal with at the end of this method. + in->readMessageBegin(name, type, seqid); + + if( type != protocol::T_CALL && type != protocol::T_ONEWAY ) { + // Unexpected message type. + in->skip(::apache::thrift::protocol::T_STRUCT); + in->readMessageEnd(); + in->getTransport()->readEnd(); + const std::string msg("TMultiplexedProcessor: Unexpected message type"); + ::apache::thrift::TApplicationException x( + ::apache::thrift::TApplicationException::PROTOCOL_ERROR, + msg); + out->writeMessageBegin(name, ::apache::thrift::protocol::T_EXCEPTION, seqid); + x.write(out.get()); + out->writeMessageEnd(); + out->getTransport()->writeEnd(); + out->getTransport()->flush(); + throw TException(msg); + } + + // Extract the service name + + boost::tokenizer > tok( name, boost::char_separator(":") ); + + std::vector tokens; + std::copy( tok.begin(), tok.end(), std::back_inserter(tokens) ); + + // A valid message should consist of two tokens: the service + // name and the name of the method to call. + if( tokens.size() == 2 ) + { + // Search for a processor associated with this service name. + services_t::iterator it = services.find(tokens[0]); + + if( it != services.end() ) + { + shared_ptr processor = it->second; + // Let the processor registered for this service name + // process the message. + return processor->process( + shared_ptr( + new protocol::StoredMessageProtocol( in, tokens[1], type, seqid ) ), + out, connectionContext ); + } + else + { + // Unknown service. + in->skip(::apache::thrift::protocol::T_STRUCT); + in->readMessageEnd(); + in->getTransport()->readEnd(); + + std::string msg("TMultiplexedProcessor: Unknown service: "); + msg += tokens[0]; + ::apache::thrift::TApplicationException x( + ::apache::thrift::TApplicationException::PROTOCOL_ERROR, + msg); + out->writeMessageBegin(name, ::apache::thrift::protocol::T_EXCEPTION, seqid); + x.write(out.get()); + out->writeMessageEnd(); + out->getTransport()->writeEnd(); + out->getTransport()->flush(); + msg += ". Did you forget to call registerProcessor()?"; + throw TException(msg); + } + } + return false; + } + + private: + /** Map of service processor objects, indexed by service names. */ + services_t services; + }; + } +} + +#endif // THRIFT_TMULTIPLEXEDPROCESSOR_H_ diff --git a/lib/cpp/src/thrift/protocol/TMultiplexedProtocol.cpp b/lib/cpp/src/thrift/protocol/TMultiplexedProtocol.cpp new file mode 100644 index 00000000..a17eacc8 --- /dev/null +++ b/lib/cpp/src/thrift/protocol/TMultiplexedProtocol.cpp @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include + +namespace apache +{ + namespace thrift + { + namespace protocol + { + uint32_t TMultiplexedProtocol::writeMessageBegin_virt( + const std::string& _name, + const TMessageType _type, + const int32_t _seqid) + { + if( _type == T_CALL || _type == T_ONEWAY ) + { + return TProtocolDecorator::writeMessageBegin_virt( serviceName + separator + _name, _type, _seqid ); + } + else + { + return TProtocolDecorator::writeMessageBegin_virt(_name, _type, _seqid); + } + } + } + } +} + diff --git a/lib/cpp/src/thrift/protocol/TMultiplexedProtocol.h b/lib/cpp/src/thrift/protocol/TMultiplexedProtocol.h new file mode 100644 index 00000000..a59c7b48 --- /dev/null +++ b/lib/cpp/src/thrift/protocol/TMultiplexedProtocol.h @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef THRIFT_TMULTIPLEXEDPROTOCOL_H_ +#define THRIFT_TMULTIPLEXEDPROTOCOL_H_ 1 + +#include + +namespace apache +{ + namespace thrift + { + namespace protocol + { + using boost::shared_ptr; + + /** + * TMultiplexedProtocol is a protocol-independent concrete decorator + * that allows a Thrift client to communicate with a multiplexing Thrift server, + * by prepending the service name to the function name during function calls. + * + * \note THIS IS NOT USED BY SERVERS. On the server, use + * {@link apache::thrift::TMultiplexedProcessor TMultiplexedProcessor} to handle requests + * from a multiplexing client. + * + * This example uses a single socket transport to invoke two services: + * + *
+ * shared_ptr transport(new TSocket("localhost", 9090)); + * transport->open(); + * + * shared_ptr protocol(new TBinaryProtocol(transport)); + * + * shared_ptr mp1(new TMultiplexedProtocol(protocol, "Calculator")); + * shared_ptr service1(new CalculatorClient(mp1)); + * + * shared_ptr mp2(new TMultiplexedProtocol(protocol, "WeatherReport")); + * shared_ptr service2(new WeatherReportClient(mp2)); + * + * service1->add(2,2); + * int temp = service2->getTemperature(); + *
+ * + * @see apache::thrift::protocol::TProtocolDecorator + */ + class TMultiplexedProtocol : public TProtocolDecorator + { + public: + /** + * Wrap the specified protocol, allowing it to be used to communicate with a + * multiplexing server. The serviceName is required as it is + * prepended to the message header so that the multiplexing server can broker + * the function call to the proper service. + * + * \param _protocol Your communication protocol of choice, e.g. TBinaryProtocol. + * \param _serviceName The service name of the service communicating via this protocol. + */ + TMultiplexedProtocol( shared_ptr _protocol, const std::string& _serviceName ) + : TProtocolDecorator(_protocol), + serviceName(_serviceName), + separator(":") + { } + virtual ~TMultiplexedProtocol() {} + + /** + * Prepends the service name to the function name, separated by TMultiplexedProtocol::SEPARATOR. + * + * \param [in] _name The name of the method to be called in the service. + * \param [in] _type The type of message + * \param [in] _name The sequential id of the message + * + * \throws TException Passed through from wrapped TProtocol instance. + */ + uint32_t writeMessageBegin_virt( + const std::string& _name, + const TMessageType _type, + const int32_t _seqid); + private: + const std::string serviceName; + const std::string separator; + }; + + } + } +} + +#endif // THRIFT_TMULTIPLEXEDPROTOCOL_H_ diff --git a/lib/cpp/src/thrift/protocol/TProtocolDecorator.h b/lib/cpp/src/thrift/protocol/TProtocolDecorator.h new file mode 100644 index 00000000..7850bc54 --- /dev/null +++ b/lib/cpp/src/thrift/protocol/TProtocolDecorator.h @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef THRIFT_TPROTOCOLDECORATOR_H_ +#define THRIFT_TPROTOCOLDECORATOR_H_ 1 + +#include +#include + +namespace apache +{ + namespace thrift + { + namespace protocol + { + using boost::shared_ptr; + + /** + * TProtocolDecorator forwards all requests to an enclosed + * TProtocol instance, providing a way to author concise + * concrete decorator subclasses. + * + *

See p.175 of Design Patterns (by Gamma et al.)

+ * + * @see apache::thrift::protocol::TMultiplexedProtocol + */ + class TProtocolDecorator : public TProtocol + { + public: + virtual ~TProtocolDecorator() {} + + // Desc: Initializes the protocol decorator object. + TProtocolDecorator( shared_ptr proto ) + : TProtocol(proto->getTransport()), protocol(proto) + { + } + + virtual uint32_t writeMessageBegin_virt( + const std::string& name, + const TMessageType messageType, + const int32_t seqid) + { + return protocol->writeMessageBegin(name, messageType, seqid); + } + virtual uint32_t writeMessageEnd_virt() { return protocol->writeMessageEnd(); } + virtual uint32_t writeStructBegin_virt(const char* name) { return protocol->writeStructBegin(name); } + virtual uint32_t writeStructEnd_virt() { return protocol->writeStructEnd(); } + + virtual uint32_t writeFieldBegin_virt(const char* name, + const TType fieldType, + const int16_t fieldId) { return protocol->writeFieldBegin(name,fieldType,fieldId); } + + virtual uint32_t writeFieldEnd_virt() { return protocol->writeFieldEnd(); } + virtual uint32_t writeFieldStop_virt() { return protocol->writeFieldStop(); } + + virtual uint32_t writeMapBegin_virt(const TType keyType, + const TType valType, + const uint32_t size) { return protocol->writeMapBegin(keyType,valType,size); } + + virtual uint32_t writeMapEnd_virt() { return protocol->writeMapEnd(); } + + virtual uint32_t writeListBegin_virt(const TType elemType, const uint32_t size) { return protocol->writeListBegin(elemType,size); } + virtual uint32_t writeListEnd_virt() { return protocol->writeListEnd(); } + + virtual uint32_t writeSetBegin_virt(const TType elemType, const uint32_t size) { return protocol->writeSetBegin(elemType,size); } + virtual uint32_t writeSetEnd_virt() { return protocol->writeSetEnd(); } + + virtual uint32_t writeBool_virt(const bool value) { return protocol->writeBool(value); } + virtual uint32_t writeByte_virt(const int8_t byte) { return protocol->writeByte(byte); } + virtual uint32_t writeI16_virt(const int16_t i16) { return protocol->writeI16(i16); } + virtual uint32_t writeI32_virt(const int32_t i32) { return protocol->writeI32(i32); } + virtual uint32_t writeI64_virt(const int64_t i64) { return protocol->writeI64(i64); } + + virtual uint32_t writeDouble_virt(const double dub) { return protocol->writeDouble(dub); } + virtual uint32_t writeString_virt(const std::string& str) { return protocol->writeString(str); } + virtual uint32_t writeBinary_virt(const std::string& str) { return protocol->writeBinary(str); } + + virtual uint32_t readMessageBegin_virt(std::string& name, TMessageType& messageType, int32_t& seqid) { return protocol->readMessageBegin(name,messageType,seqid); } + virtual uint32_t readMessageEnd_virt() { return protocol->readMessageEnd(); } + + virtual uint32_t readStructBegin_virt(std::string& name) { return protocol->readStructBegin(name); } + virtual uint32_t readStructEnd_virt() { return protocol->readStructEnd(); } + + virtual uint32_t readFieldBegin_virt(std::string& name, TType& fieldType, int16_t& fieldId) { return protocol->readFieldBegin(name, fieldType, fieldId); } + virtual uint32_t readFieldEnd_virt() { return protocol->readFieldEnd(); } + + virtual uint32_t readMapBegin_virt(TType& keyType, TType& valType, uint32_t& size) { return protocol->readMapBegin(keyType,valType,size); } + virtual uint32_t readMapEnd_virt() { return protocol->readMapEnd(); } + + virtual uint32_t readListBegin_virt(TType& elemType, uint32_t& size) { return protocol->readListBegin(elemType,size); } + virtual uint32_t readListEnd_virt() { return protocol->readListEnd(); } + + virtual uint32_t readSetBegin_virt(TType& elemType, uint32_t& size) { return protocol->readSetBegin(elemType,size); } + virtual uint32_t readSetEnd_virt() { return protocol->readSetEnd(); } + + virtual uint32_t readBool_virt(bool& value) { return protocol->readBool(value); } + virtual uint32_t readBool_virt(std::vector::reference value) { return protocol->readBool(value); } + + virtual uint32_t readByte_virt(int8_t& byte) { return protocol->readByte(byte); } + + virtual uint32_t readI16_virt(int16_t& i16) { return protocol->readI16(i16); } + virtual uint32_t readI32_virt(int32_t& i32) { return protocol->readI32(i32); } + virtual uint32_t readI64_virt(int64_t& i64) { return protocol->readI64(i64); } + + virtual uint32_t readDouble_virt(double& dub) { return protocol->readDouble(dub); } + + virtual uint32_t readString_virt(std::string& str) { return protocol->readString(str); } + virtual uint32_t readBinary_virt(std::string& str) { return protocol->readBinary(str); } + + private: + shared_ptr protocol; + }; + } + } +} + +#endif // THRIFT_TPROTOCOLDECORATOR_H_