THRIFT-2408 Named Pipe Transport Option for C#
authorJens Geyer <jensg@apache.org>
Wed, 19 Mar 2014 22:52:18 +0000 (00:52 +0200)
committerJens Geyer <jensg@apache.org>
Wed, 19 Mar 2014 22:52:55 +0000 (00:52 +0200)
Patch: Carl Yeksigian & Jens Geyer

lib/csharp/Makefile.am
lib/csharp/src/Thrift.csproj
lib/csharp/src/Transport/TNamedPipeClientTransport.cs [new file with mode: 0644]
lib/csharp/src/Transport/TNamedPipeServerTransport.cs [new file with mode: 0644]
lib/csharp/test/ThriftTest/TestClient.cs
lib/csharp/test/ThriftTest/TestServer.cs

index 71cc83b..e847237 100644 (file)
@@ -58,6 +58,8 @@ THRIFTCODE= \
             src/Transport/THttpClient.cs \
             src/Transport/THttpHandler.cs \
             src/Transport/TMemoryBuffer.cs \
+            src/Transport/TNamedPipeClientTransport.cs \
+            src/Transport/TNamedPipeServerTransport.cs \
             src/TProcessor.cs \
             src/TException.cs \
             src/TApplicationException.cs
index 58d3793..d475ed6 100644 (file)
     <Compile Include="Transport\TFramedTransport.cs" />
     <Compile Include="Transport\THttpClient.cs" />
     <Compile Include="Transport\THttpHandler.cs" />
+    <Compile Include="Transport\TNamedPipeClientTransport.cs" />
+    <Compile Include="Transport\TNamedPipeServerTransport.cs" />
     <Compile Include="Transport\TServerSocket.cs" />
     <Compile Include="Transport\TServerTransport.cs" />
     <Compile Include="Transport\TSocket.cs" />
diff --git a/lib/csharp/src/Transport/TNamedPipeClientTransport.cs b/lib/csharp/src/Transport/TNamedPipeClientTransport.cs
new file mode 100644 (file)
index 0000000..4c320e6
--- /dev/null
@@ -0,0 +1,72 @@
+\feffusing System.IO.Pipes;
+
+namespace Thrift.Transport
+{
+       public class TNamedPipeClientTransport : TTransport
+       {
+               private NamedPipeClientStream client;
+               private string ServerName;
+               private string PipeName;
+
+               public TNamedPipeClientTransport(string pipe)
+               {
+                       ServerName = ".";
+                       PipeName = pipe;
+               }
+
+               public TNamedPipeClientTransport(string server, string pipe)
+               {
+                       ServerName = (server != "") ? server : ".";
+                       PipeName = pipe;
+               }
+
+               public override bool IsOpen
+               {
+                       get { return client != null && client.IsConnected; }
+               }
+
+               public override void Open()
+               {
+                       if (IsOpen)
+                       {
+                               throw new TTransportException(TTransportException.ExceptionType.AlreadyOpen);
+                       }
+                       client = new NamedPipeClientStream(ServerName, PipeName, PipeDirection.InOut, PipeOptions.None);
+                       client.Connect();
+               }
+
+               public override void Close()
+               {
+                       if (client != null)
+                       {
+                               client.Close();
+                               client = null;
+                       }
+               }
+
+               public override int Read(byte[] buf, int off, int len)
+               {
+                       if (client == null)
+                       {
+                               throw new TTransportException(TTransportException.ExceptionType.NotOpen);
+                       }
+
+                       return client.Read(buf, off, len);
+               }
+
+               public override void Write(byte[] buf, int off, int len)
+               {
+                       if (client == null)
+                       {
+                               throw new TTransportException(TTransportException.ExceptionType.NotOpen);
+                       }
+
+                       client.Write(buf, off, len);
+               }
+
+               protected override void Dispose(bool disposing)
+               {
+                       client.Dispose();
+               }
+       }
+}
\ No newline at end of file
diff --git a/lib/csharp/src/Transport/TNamedPipeServerTransport.cs b/lib/csharp/src/Transport/TNamedPipeServerTransport.cs
new file mode 100644 (file)
index 0000000..b87898e
--- /dev/null
@@ -0,0 +1,113 @@
+\feffusing System;
+using System.Collections.Generic;
+using System.IO.Pipes;
+
+namespace Thrift.Transport
+{
+       public class TNamedPipeServerTransport : TServerTransport
+       {
+               /// <summary>
+               /// This is the address of the Pipe on the localhost.
+               /// </summary>
+               private readonly string pipeAddress;
+               NamedPipeServerStream stream = null;
+
+               public TNamedPipeServerTransport(string pipeAddress)
+               {
+                       this.pipeAddress = pipeAddress;
+               }
+
+               public override void Listen()
+               {
+                       // nothing to do here
+               }
+
+               public override void Close()
+               {
+                       if (stream != null)
+                       {
+                               try
+                               {
+                                       stream.Close();
+                                       stream.Dispose();
+                               }
+                               finally
+                               {
+                                       stream = null;
+                               }
+                       }
+               }
+
+               private void EnsurePipeInstance()
+               {
+                       if( stream == null)
+                               stream = new NamedPipeServerStream(
+                                       pipeAddress, PipeDirection.InOut, 254,
+                                       PipeTransmissionMode.Byte,
+                                       PipeOptions.None, 4096, 4096 /*TODO: security*/);
+               }
+
+               protected override TTransport AcceptImpl()
+               {
+                       try
+                       {
+                               EnsurePipeInstance();
+                               stream.WaitForConnection();
+                               var trans = new ServerTransport(stream);
+                               stream = null;  // pass ownership to ServerTransport
+                               return trans;
+                       }
+                       catch (Exception e)
+                       {
+                               Close();
+                               throw new TTransportException(TTransportException.ExceptionType.NotOpen, e.Message);
+                       }
+               }
+
+               private class ServerTransport : TTransport
+               {
+                       private NamedPipeServerStream server;
+                       public ServerTransport(NamedPipeServerStream server)
+                       {
+                               this.server = server;
+                       }
+
+                       public override bool IsOpen
+                       {
+                               get { return server != null && server.IsConnected; }
+                       }
+
+                       public override void Open()
+                       {
+                       }
+
+                       public override void Close()
+                       {
+                               if (server != null) server.Close();
+                       }
+
+                       public override int Read(byte[] buf, int off, int len)
+                       {
+                               if (server == null)
+                               {
+                                       throw new TTransportException(TTransportException.ExceptionType.NotOpen);
+                               }
+                               return server.Read(buf, off, len);
+                       }
+
+                       public override void Write(byte[] buf, int off, int len)
+                       {
+                               if (server == null)
+                               {
+                                       throw new TTransportException(TTransportException.ExceptionType.NotOpen);
+                               }
+                               server.Write(buf, off, len);
+                       }
+
+                       protected override void Dispose(bool disposing)
+                       {
+                               server.Dispose();
+                       }
+               }
+       }
+}
\ No newline at end of file
index c7b81b4..ba2d4d0 100644 (file)
@@ -37,7 +37,7 @@ namespace Test
                        {
                                string host = "localhost";
                                int port = 9090;
-                               string url = null;
+                               string url = null, pipe = null;
                                int numThreads = 1;
                                bool buffered = false, framed = false;
 
@@ -72,6 +72,11 @@ namespace Test
                                                        framed = true;
                                                        Console.WriteLine("Using framed transport");
                                                }
+                                               else if (args[i] == "-pipe")  // -pipe <name>
+                                               {
+                                                       pipe = args[++i];
+                                                       Console.WriteLine("Using named pipes transport");
+                                               }
                                                else if (args[i] == "-t")
                                                {
                                                        numThreads = Convert.ToInt32(args[++i]);
@@ -94,7 +99,14 @@ namespace Test
                                        threads[test] = t;
                                        if (url == null)
                                        {
-                                               TTransport trans = new TSocket(host, port);
+                                               // endpoint transport
+                                               TTransport trans = null;
+                                               if( pipe != null)
+                                                       trans = new TNamedPipeClientTransport(pipe);
+                                               else
+                                                       trans = new TSocket(host, port);
+                                               
+                                               // layered transport
                                                if (buffered)
                                                        trans = new TBufferedTransport(trans as TStreamTransport);
                                                if (framed)
index 8a4e605..965a7de 100644 (file)
@@ -117,25 +117,25 @@ namespace Test
                                return thing;
                        }
 
-            public Dictionary<string, string> testStringMap(Dictionary<string, string> thing)
-            {
-                Console.WriteLine("testStringMap({");
-                bool first = true;
-                foreach (string key in thing.Keys)
-                {
-                    if (first)
-                    {
-                        first = false;
-                    }
-                    else
-                    {
-                        Console.WriteLine(", ");
-                    }
-                    Console.WriteLine(key + " => " + thing[key]);
-                }
-                Console.WriteLine("})");
-                return thing;
-            }
+                       public Dictionary<string, string> testStringMap(Dictionary<string, string> thing)
+                       {
+                               Console.WriteLine("testStringMap({");
+                               bool first = true;
+                               foreach (string key in thing.Keys)
+                               {
+                                       if (first)
+                                       {
+                                               first = false;
+                                       }
+                                       else
+                                       {
+                                               Console.WriteLine(", ");
+                                       }
+                                       Console.WriteLine(key + " => " + thing[key]);
+                               }
+                               Console.WriteLine("})");
+                               return thing;
+                       }
 
                        public THashSet<int> testSet(THashSet<int> thing)
                        {
@@ -322,29 +322,39 @@ namespace Test
                        try
                        {
                                bool useBufferedSockets = false, useFramed = false;
-                               int port = 9090;
+                               int port = 9090, i = 0;
+                               string pipe = null;
                                if (args.Length > 0)
                                {
-                                       port = int.Parse(args[0]);
+                                       i = 0;
+                                       if (args[i] == "-pipe")  // -pipe name
+                                       {
+                                               pipe = args[++i];
+                                       }
+                                       else  // default to port number (compatibility)
+                                       {
+                                               port = int.Parse(args[i]);
+                                       }
 
-                                       if (args.Length > 1)
+                                       ++i;
+                                       if (args.Length > i)
                                        {
-                                               if ( args[1] == "raw" )
+                                               if ( args[i] == "raw" )
                                                {
                                                        // as default
                                                }
-                                               else if ( args[1] == "buffered" )
+                                               else if ( args[i] == "buffered" )
                                                {
                                                        useBufferedSockets = true;
                                                }
-                                               else if ( args[1] == "framed" )
+                                               else if (args[i] == "framed")
                                                {
                                                        useFramed = true;
                                                }
                                                else
                                                {
                                                        // Fall back to the older boolean syntax
-                                                       bool.TryParse(args[1], out useBufferedSockets);
+                                                       bool.TryParse(args[i], out useBufferedSockets);
                                                }
                                        }
                                }
@@ -354,14 +364,22 @@ namespace Test
                                ThriftTest.Processor testProcessor = new ThriftTest.Processor(testHandler);
 
                                // Transport
-                               TServerSocket tServerSocket = new TServerSocket(port, 0, useBufferedSockets);
+                               TServerTransport trans;
+                               if( pipe != null)
+                               {
+                                       trans = new TNamedPipeServerTransport(pipe);
+                               }
+                               else
+                               {
+                                       trans = new TServerSocket(port, 0, useBufferedSockets);
+                               }
 
                                // Simple Server
                                TServer serverEngine;
                                if ( useFramed )
-                                       serverEngine = new TSimpleServer(testProcessor, tServerSocket, new TFramedTransport.Factory());
+                                       serverEngine = new TSimpleServer(testProcessor, trans, new TFramedTransport.Factory());
                                else
-                                       serverEngine = new TSimpleServer(testProcessor, tServerSocket);
+                                       serverEngine = new TSimpleServer(testProcessor, trans);
 
                                // ThreadPool Server
                                // serverEngine = new TThreadPoolServer(testProcessor, tServerSocket);
@@ -372,7 +390,8 @@ namespace Test
                                testHandler.server = serverEngine;
 
                                // Run it
-                               Console.WriteLine("Starting the server on port " + port + 
+                               string where = ( pipe != null ? "on pipe "+pipe : "on port " + port);
+                               Console.WriteLine("Starting the server " +where+
                                        (useBufferedSockets ? " with buffered socket" : "") + 
                                        (useFramed ? " with framed transport" : "") + 
                                        "...");