THRIFT-1901 C#: Support for Multiplexing Services on any Transport, Protocol and Server
Patch: Jens Geyer
diff --git a/lib/csharp/src/Protocol/TMultiplexedProcessor.cs b/lib/csharp/src/Protocol/TMultiplexedProcessor.cs
new file mode 100644
index 0000000..29fac9e
--- /dev/null
+++ b/lib/csharp/src/Protocol/TMultiplexedProcessor.cs
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * Contains some contributions under the Thrift Software License.
+ * Please see doc/old-thrift-license.txt in the Thrift distribution for
+ * details.
+ */
+
+using System;
+using System.Text;
+using Thrift.Transport;
+using System.Collections.Generic;
+
+namespace Thrift.Protocol
+{
+
+ /**
+ * TMultiplexedProcessor is a TProcessor allowing a single TServer to provide multiple services.
+ * To do so, you instantiate the processor and then register additional processors with it,
+ * as shown in the following example:
+ *
+ * TMultiplexedProcessor processor = new TMultiplexedProcessor();
+ *
+ * processor.registerProcessor(
+ * "Calculator",
+ * new Calculator.Processor(new CalculatorHandler()));
+ *
+ * processor.registerProcessor(
+ * "WeatherReport",
+ * new WeatherReport.Processor(new WeatherReportHandler()));
+ *
+ * TServerTransport t = new TServerSocket(9090);
+ * TSimpleServer server = new TSimpleServer(processor, t);
+ *
+ * server.serve();
+ */
+ public class TMultiplexedProcessor : TProcessor
+ {
+ private Dictionary<String,TProcessor> ServiceProcessorMap = new Dictionary<String,TProcessor>();
+
+ /**
+ * 'Register' a service with this TMultiplexedProcessor. This allows us to broker
+ * requests to individual services by using the service name to select them at request time.
+ *
+ * Args:
+ * - serviceName Name of a service, has to be identical to the name
+ * declared in the Thrift IDL, e.g. "WeatherReport".
+ * - processor Implementation of a service, ususally referred to as "handlers",
+ * e.g. WeatherReportHandler implementing WeatherReport.Iface.
+ */
+ public void RegisterProcessor(String serviceName, TProcessor processor)
+ {
+ ServiceProcessorMap.Add(serviceName, processor);
+ }
+
+
+ private void Fail( TProtocol oprot, TMessage message, TApplicationException.ExceptionType extype, string etxt)
+ {
+ TApplicationException appex = new TApplicationException( extype, etxt);
+
+ TMessage newMessage = new TMessage(message.Name, TMessageType.Exception, message.SeqID);
+
+ oprot.WriteMessageBegin(newMessage);
+ appex.Write( oprot);
+ oprot.WriteMessageEnd();
+ oprot.Transport.Flush();
+ }
+
+
+ /**
+ * This implementation of process performs the following steps:
+ *
+ * - Read the beginning of the message.
+ * - Extract the service name from the message.
+ * - Using the service name to locate the appropriate processor.
+ * - Dispatch to the processor, with a decorated instance of TProtocol
+ * that allows readMessageBegin() to return the original TMessage.
+ *
+ * Throws an exception if
+ * - the message type is not CALL or ONEWAY,
+ * - the service name was not found in the message, or
+ * - the service name has not been RegisterProcessor()ed.
+ */
+ public bool Process(TProtocol iprot, TProtocol oprot)
+ {
+ /* Use the actual underlying protocol (e.g. TBinaryProtocol) to read the
+ message header. This pulls the message "off the wire", which we'll
+ deal with at the end of this method. */
+
+ TMessage message = iprot.ReadMessageBegin();
+
+ if ((message.Type != TMessageType.Call) && (message.Type != TMessageType.Oneway))
+ {
+ Fail( oprot, message,
+ TApplicationException.ExceptionType.InvalidMessageType,
+ "Message type CALL or ONEWAY expected");
+ return false;
+ }
+
+ // Extract the service name
+ int index = message.Name.IndexOf(TMultiplexedProtocol.SEPARATOR);
+ if (index < 0) {
+ Fail( oprot, message,
+ TApplicationException.ExceptionType.InvalidProtocol,
+ "Service name not found in message name: " + message.Name + ". "+
+ "Did you forget to use a TMultiplexProtocol in your client?");
+ return false;
+ }
+
+ // Create a new TMessage, something that can be consumed by any TProtocol
+ string serviceName = message.Name.Substring(0, index);
+ TProcessor actualProcessor;
+ if( ! ServiceProcessorMap.TryGetValue(serviceName, out actualProcessor))
+ {
+ Fail( oprot, message,
+ TApplicationException.ExceptionType.InternalError,
+ "Service name not found: " + serviceName + ". "+
+ "Did you forget to call RegisterProcessor()?");
+ return false;
+ }
+
+ // Create a new TMessage, removing the service name
+ TMessage newMessage = new TMessage(
+ message.Name.Substring(serviceName.Length + TMultiplexedProtocol.SEPARATOR.Length),
+ message.Type,
+ message.SeqID);
+
+ // Dispatch processing to the stored processor
+ return actualProcessor.Process(new StoredMessageProtocol(iprot, newMessage), oprot);
+ }
+
+ /**
+ * Our goal was to work with any protocol. In order to do that, we needed
+ * to allow them to call readMessageBegin() and get a TMessage in exactly
+ * the standard format, without the service name prepended to TMessage.name.
+ */
+ private class StoredMessageProtocol : TProtocolDecorator
+ {
+ TMessage MsgBegin;
+
+ public StoredMessageProtocol(TProtocol protocol, TMessage messageBegin)
+ :base(protocol)
+ {
+ this.MsgBegin = messageBegin;
+ }
+
+ public override TMessage ReadMessageBegin()
+ {
+ return MsgBegin;
+ }
+ }
+
+ }
+}