From 3661867febfa3a303ae8f5df2ed1a29e882821c3 Mon Sep 17 00:00:00 2001 From: Jens Geyer Date: Sun, 24 Mar 2013 11:53:31 +0200 Subject: [PATCH] THRIFT-563 Support for Multiplexing Services on any Transport, Protocol and Server Patch: Rob Slifka --- .../apache/thrift/TMultiplexedProcessor.java | 124 +++++++++++ .../thrift/protocol/TMultiplexedProtocol.java | 72 +++++++ .../thrift/protocol/TProtocolDecorator.java | 192 ++++++++++++++++++ 3 files changed, 388 insertions(+) create mode 100644 lib/java/src/org/apache/thrift/TMultiplexedProcessor.java create mode 100644 lib/java/src/org/apache/thrift/protocol/TMultiplexedProtocol.java create mode 100644 lib/java/src/org/apache/thrift/protocol/TProtocolDecorator.java diff --git a/lib/java/src/org/apache/thrift/TMultiplexedProcessor.java b/lib/java/src/org/apache/thrift/TMultiplexedProcessor.java new file mode 100644 index 00000000..8547cf0c --- /dev/null +++ b/lib/java/src/org/apache/thrift/TMultiplexedProcessor.java @@ -0,0 +1,124 @@ +package org.apache.thrift; + +import org.apache.thrift.protocol.*; + +import java.util.Map; +import java.util.HashMap; + +/** + * TMultiplexedProcessor is a TProcessor allowing + * a single TServer to provide multiple services. + * + *

To do so, you instantiate the processor and then register additional + * processors with it, as shown in the following example:

+ * + *
+ * TMultiplexedProcessor processor = new TMultiplexedProcessor(); + * + * processor.registerProcessor( + * "Calculator", + * new Calculator.Processor(new CalculatorHandler())); + * + * processor.registerProcessor( + * "WeatherReport", + * new WeatherReport.Processor(new WeatherReportHandler())); + * + * TServerTransport t = new TServerSocket(9090); + * TSimpleServer server = new TSimpleServer(processor, t); + * + * server.serve(); + *
+ */ +public class TMultiplexedProcessor implements TProcessor { + + private final Map SERVICE_PROCESSOR_MAP + = new HashMap(); + + /** + * 'Register' a service with this TMultiplexedProcessor. This + * allows us to broker requests to individual services by using the service + * name to select them at request time. + * + * @param serviceName Name of a service, has to be identical to the name + * declared in the Thrift IDL, e.g. "WeatherReport". + * @param processor Implementation of a service, ususally referred to + * as "handlers", e.g. WeatherReportHandler implementing WeatherReport.Iface. + */ + public void registerProcessor(String serviceName, TProcessor processor) { + SERVICE_PROCESSOR_MAP.put(serviceName, processor); + } + + /** + * This implementation of process performs the following steps: + * + *
    + *
  1. Read the beginning of the message.
  2. + *
  3. Extract the service name from the message.
  4. + *
  5. Using the service name to locate the appropriate processor.
  6. + *
  7. Dispatch to the processor, with a decorated instance of TProtocol + * that allows readMessageBegin() to return the original TMessage.
  8. + *
+ * + * @throws TException If the message type is not CALL or ONEWAY, if + * the service name was not found in the message, or if the service + * name was not found in the service map. You called {@link #registerProcessor(String, TProcessor) registerProcessor} + * during initialization, right? :) + */ + public boolean process(TProtocol iprot, TProtocol oprot) throws TException { + /* + Use the actual underlying protocol (e.g. TBinaryProtocol) to read the + message header. This pulls the message "off the wire", which we'll + deal with at the end of this method. + */ + TMessage message = iprot.readMessageBegin(); + + if (message.type != TMessageType.CALL && message.type != TMessageType.ONEWAY) { + // TODO Apache Guys - Can the server ever get an EXCEPTION or REPLY? + // TODO Should we check for this here? + throw new TException("This should not have happened!?"); + } + + // Extract the service name + int index = message.name.indexOf(TMultiplexedProtocol.SEPARATOR); + if (index < 0) { + throw new TException("Service name not found in message name: " + message.name + ". Did you " + + "forget to use a TMultiplexProtocol in your client?"); + } + + // Create a new TMessage, something that can be consumed by any TProtocol + String serviceName = message.name.substring(0, index); + TProcessor actualProcessor = SERVICE_PROCESSOR_MAP.get(serviceName); + if (actualProcessor == null) { + throw new TException("Service name not found: " + serviceName + ". Did you forget " + + "to call registerProcessor()?"); + } + + // Create a new TMessage, removing the service name + TMessage standardMessage = new TMessage( + message.name.substring(serviceName.length()+TMultiplexedProtocol.SEPARATOR.length()), + message.type, + message.seqid + ); + + // Dispatch processing to the stored processor + return actualProcessor.process(new StoredMessageProtocol(iprot, standardMessage), oprot); + } + + /** + * Our goal was to work with any protocol. In order to do that, we needed + * to allow them to call readMessageBegin() and get a TMessage in exactly + * the standard format, without the service name prepended to TMessage.name. + */ + private class StoredMessageProtocol extends TProtocolDecorator { + TMessage messageBegin; + public StoredMessageProtocol(TProtocol protocol, TMessage messageBegin) { + super(protocol); + this.messageBegin = messageBegin; + } + @Override + public TMessage readMessageBegin() throws TException { + return messageBegin; + } + } + +} diff --git a/lib/java/src/org/apache/thrift/protocol/TMultiplexedProtocol.java b/lib/java/src/org/apache/thrift/protocol/TMultiplexedProtocol.java new file mode 100644 index 00000000..4b5e6718 --- /dev/null +++ b/lib/java/src/org/apache/thrift/protocol/TMultiplexedProtocol.java @@ -0,0 +1,72 @@ +package org.apache.thrift.protocol; + +import org.apache.thrift.TException; + +/** + * TMultiplexedProtocol is a protocol-independent concrete decorator + * that allows a Thrift client to communicate with a multiplexing Thrift server, + * by prepending the service name to the function name during function calls. + * + *

NOTE: THIS IS NOT USED BY SERVERS. On the server, use {@link org.apache.thrift.TMultiplexedProcessor TMultiplexedProcessor} to handle requests + * from a multiplexing client. + * + *

This example uses a single socket transport to invoke two services: + * + *

+ * TSocket transport = new TSocket("localhost", 9090);
+ * transport.open();
+ *
+ * TBinaryProtocol protocol = new TBinaryProtocol(transport);
+ *
+ * TMultiplexedProtocol mp = new TMultiplexedProtocol(protocol, "Calculator");
+ * Calculator.Client service = new Calculator.Client(mp);
+ *
+ * TMultiplexedProtocol mp2 = new TMultiplexedProtocol(protocol, "WeatherReport");
+ * WeatherReport.Client service2 = new WeatherReport.Client(mp2);
+ *
+ * System.out.println(service.add(2,2));
+ * System.out.println(service2.getTemperature());
+ *
+ * + * @see org.apache.thrift.protocol.TProtocolDecorator + */ +public class TMultiplexedProtocol extends TProtocolDecorator { + + /** Used to delimit the service name from the function name */ + public static final String SEPARATOR = ":"; + + private final String SERVICE_NAME; + + /** + * Wrap the specified protocol, allowing it to be used to communicate with a + * multiplexing server. The serviceName is required as it is + * prepended to the message header so that the multiplexing server can broker + * the function call to the proper service. + * + * @param protocol Your communication protocol of choice, e.g. TBinaryProtocol. + * @param serviceName The service name of the service communicating via this protocol. + */ + public TMultiplexedProtocol(TProtocol protocol, String serviceName) { + super(protocol); + SERVICE_NAME = serviceName; + } + + /** + * Prepends the service name to the function name, separated by TMultiplexedProtocol.SEPARATOR. + * + * @param tMessage The original message. + * @throws TException Passed through from wrapped TProtocol instance. + */ + @Override + public void writeMessageBegin(TMessage tMessage) throws TException { + if (tMessage.type == TMessageType.CALL || tMessage.type == TMessageType.ONEWAY) { + super.writeMessageBegin(new TMessage( + SERVICE_NAME + SEPARATOR + tMessage.name, + tMessage.type, + tMessage.seqid + )); + } else { + super.writeMessageBegin(tMessage); + } + } +} diff --git a/lib/java/src/org/apache/thrift/protocol/TProtocolDecorator.java b/lib/java/src/org/apache/thrift/protocol/TProtocolDecorator.java new file mode 100644 index 00000000..6190d135 --- /dev/null +++ b/lib/java/src/org/apache/thrift/protocol/TProtocolDecorator.java @@ -0,0 +1,192 @@ +package org.apache.thrift.protocol; + +import org.apache.thrift.TException; + +/** + * TProtocolDecorator forwards all requests to an enclosed + * TProtocol instance, providing a way to author concise + * concrete decorator subclasses. While it has no abstract methods, it + * is marked abstract as a reminder that by itself, it does not modify + * the behaviour of the enclosed TProtocol. + * + *

See p.175 of Design Patterns (by Gamma et al.)

+ * + * @see org.apache.thrift.protocol.TMultiplexedProtocol + */ +public abstract class TProtocolDecorator extends TProtocol { + + private final TProtocol concreteProtocol; + + /** + * Encloses the specified protocol. + * @param protocol All operations will be forward to this protocol. Must be non-null. + */ + public TProtocolDecorator(TProtocol protocol) { + super(protocol.getTransport()); + concreteProtocol = protocol; + } + + public void writeMessageBegin(TMessage tMessage) throws TException { + concreteProtocol.writeMessageBegin(tMessage); + } + + public void writeMessageEnd() throws TException { + concreteProtocol.writeMessageEnd(); + } + + public void writeStructBegin(TStruct tStruct) throws TException { + concreteProtocol.writeStructBegin(tStruct); + } + + public void writeStructEnd() throws TException { + concreteProtocol.writeStructEnd(); + } + + public void writeFieldBegin(TField tField) throws TException { + concreteProtocol.writeFieldBegin(tField); + } + + public void writeFieldEnd() throws TException { + concreteProtocol.writeFieldEnd(); + } + + public void writeFieldStop() throws TException { + concreteProtocol.writeFieldStop(); + } + + public void writeMapBegin(TMap tMap) throws TException { + concreteProtocol.writeMapBegin(tMap); + } + + public void writeMapEnd() throws TException { + concreteProtocol.writeMapEnd(); + } + + public void writeListBegin(TList tList) throws TException { + concreteProtocol.writeListBegin(tList); + } + + public void writeListEnd() throws TException { + concreteProtocol.writeListEnd(); + } + + public void writeSetBegin(TSet tSet) throws TException { + concreteProtocol.writeSetBegin(tSet); + } + + public void writeSetEnd() throws TException { + concreteProtocol.writeSetEnd(); + } + + public void writeBool(boolean b) throws TException { + concreteProtocol.writeBool(b); + } + + public void writeByte(byte b) throws TException { + concreteProtocol.writeByte(b); + } + + public void writeI16(short i) throws TException { + concreteProtocol.writeI16(i); + } + + public void writeI32(int i) throws TException { + concreteProtocol.writeI32(i); + } + + public void writeI64(long l) throws TException { + concreteProtocol.writeI64(l); + } + + public void writeDouble(double v) throws TException { + concreteProtocol.writeDouble(v); + } + + public void writeString(String s) throws TException { + concreteProtocol.writeString(s); + } + + public void writeBinary(byte[] bytes) throws TException { + concreteProtocol.writeBinary(bytes); + } + + public TMessage readMessageBegin() throws TException { + return concreteProtocol.readMessageBegin(); + } + + public void readMessageEnd() throws TException { + concreteProtocol.readMessageEnd(); + } + + public TStruct readStructBegin() throws TException { + return concreteProtocol.readStructBegin(); + } + + public void readStructEnd() throws TException { + concreteProtocol.readStructEnd(); + } + + public TField readFieldBegin() throws TException { + return concreteProtocol.readFieldBegin(); + } + + public void readFieldEnd() throws TException { + concreteProtocol.readFieldEnd(); + } + + public TMap readMapBegin() throws TException { + return concreteProtocol.readMapBegin(); + } + + public void readMapEnd() throws TException { + concreteProtocol.readMapEnd(); + } + + public TList readListBegin() throws TException { + return concreteProtocol.readListBegin(); + } + + public void readListEnd() throws TException { + concreteProtocol.readListEnd(); + } + + public TSet readSetBegin() throws TException { + return concreteProtocol.readSetBegin(); + } + + public void readSetEnd() throws TException { + concreteProtocol.readSetEnd(); + } + + public boolean readBool() throws TException { + return concreteProtocol.readBool(); + } + + public byte readByte() throws TException { + return concreteProtocol.readByte(); + } + + public short readI16() throws TException { + return concreteProtocol.readI16(); + } + + public int readI32() throws TException { + return concreteProtocol.readI32(); + } + + public long readI64() throws TException { + return concreteProtocol.readI64(); + } + + public double readDouble() throws TException { + return concreteProtocol.readDouble(); + } + + public String readString() throws TException { + return concreteProtocol.readString(); + } + + public byte[] readBinary() throws TException { + return concreteProtocol.readBinary(); + } +} -- 2.17.1