THRIFT-904: disable nagle and linger
authorT Jake Luciani <jake@apache.org>
Thu, 27 Jan 2011 02:51:51 +0000 (02:51 +0000)
committerT Jake Luciani <jake@apache.org>
Thu, 27 Jan 2011 02:51:51 +0000 (02:51 +0000)
git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1063966 13f79535-47bb-0310-9956-ffa450edef68

configure.ac
lib/csharp/src/Transport/TFramedTransport.cs
test/csharp/ThriftTest/TestClient.cs
test/csharp/ThriftTest/TestServer.cs

index b1ab0a7..6f18c0e 100644 (file)
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-AC_PREREQ(2.65)
+#AC_PREREQ(2.65)
 
 AC_INIT([thrift], [0.7.0-dev])
 
index b7ad5f2..e259f5a 100644 (file)
@@ -24,9 +24,12 @@ namespace Thrift.Transport
        public class TFramedTransport : TTransport
        {
                protected TTransport transport = null;
-               protected MemoryStream writeBuffer = new MemoryStream(1024);
+               protected MemoryStream writeBuffer;
                protected MemoryStream readBuffer = null;
 
+               private const int header_size = 4;
+               private static byte[] header_dummy = new byte[header_size]; // used as header placeholder while initilizing new write buffer
+
                public class Factory : TTransportFactory
                {
                        public override TTransport GetTransport(TTransport trans)
@@ -35,7 +38,12 @@ namespace Thrift.Transport
                        }
                }
 
-               public TFramedTransport(TTransport transport)
+               public TFramedTransport()
+               {
+                       InitWriteBuffer();
+               }
+
+               public TFramedTransport(TTransport transport) : this()
                {
                        this.transport = transport;
                }
@@ -77,8 +85,8 @@ namespace Thrift.Transport
 
                private void ReadFrame()
                {
-                       byte[] i32rd = new byte[4];
-                       transport.ReadAll(i32rd, 0, 4);
+                       byte[] i32rd = new byte[header_size];
+                       transport.ReadAll(i32rd, 0, header_size);
                        int size =
                                ((i32rd[0] & 0xff) << 24) |
                                ((i32rd[1] & 0xff) << 16) |
@@ -99,16 +107,31 @@ namespace Thrift.Transport
                {
                        byte[] buf = writeBuffer.GetBuffer();
                        int len = (int)writeBuffer.Length;
-                       writeBuffer = new MemoryStream(writeBuffer.Capacity);
-
-                       byte[] i32out = new byte[4];
-                       i32out[0] = (byte)(0xff & (len >> 24));
-                       i32out[1] = (byte)(0xff & (len >> 16));
-                       i32out[2] = (byte)(0xff & (len >> 8));
-                       i32out[3] = (byte)(0xff & (len));
-                       transport.Write(i32out, 0, 4);
+                       int data_len = len - header_size;
+                       if ( data_len < 0 )
+                               throw new System.InvalidOperationException (); // logic error actually
+
+                       InitWriteBuffer();
+
+                       // Inject message header into the reserved buffer space
+                       buf[0] = (byte)(0xff & (data_len >> 24));
+                       buf[1] = (byte)(0xff & (data_len >> 16));
+                       buf[2] = (byte)(0xff & (data_len >> 8));
+                       buf[3] = (byte)(0xff & (data_len));
+
+                       // Send the entire message at once
                        transport.Write(buf, 0, len);
+
                        transport.Flush();
                }
+
+               private void InitWriteBuffer ()
+               {
+                       // Create new buffer instance
+                       writeBuffer = new MemoryStream(1024);
+
+                       // Reserve space for message header to be put right before sending it out
+                       writeBuffer.Write ( header_dummy, 0, header_size );
+               }
        }
 }
index 1d7c75e..60fc995 100644 (file)
@@ -39,7 +39,7 @@ namespace Test
                                int port = 9090;
                                string url = null;
                                int numThreads = 1;
-                               bool buffered = false;
+                               bool buffered = false, framed = false;
 
                                try
                                {
@@ -67,6 +67,11 @@ namespace Test
                                                        buffered = true;
                                                        Console.WriteLine("Using buffered sockets");
                                                }
+                                               else if (args[i] == "-f" || args[i] == "-framed")
+                                               {
+                                                       framed = true;
+                                                       Console.WriteLine("Using framed transport");
+                                               }
                                                else if (args[i] == "-t")
                                                {
                                                        numThreads = Convert.ToInt32(args[++i]);
@@ -89,16 +94,13 @@ namespace Test
                                        threads[test] = t;
                                        if (url == null)
                                        {
-                                               TSocket socket = new TSocket(host, port);
+                                               TTransport trans = new TSocket(host, port);
                                                if (buffered)
-                                               {
-                                                       TBufferedTransport buffer = new TBufferedTransport(socket);
-                                                       t.Start(buffer);
-                                               }
-                                               else
-                                               {
-                                                       t.Start(socket);
-                                               }
+                                                       trans = new TBufferedTransport(trans as TStreamTransport);
+                                               if (framed)
+                                                       trans = new TFramedTransport(trans);
+                                                       
+                                               t.Start(trans);
                                        }
                                        else
                                        {
@@ -428,6 +430,12 @@ namespace Test
 
                        Console.WriteLine("Test Oneway(1)");
                        client.testOneway(1);
+
+                       Console.Write("Test Calltime()");
+                       var startt = DateTime.UtcNow;
+                       for ( int k=0; k<1000; ++k )
+                               client.testVoid();
+                       Console.WriteLine(" = " + (DateTime.UtcNow - startt).TotalSeconds.ToString() + " ms a testVoid() call" );
                }
        }
 }
index e370640..894ec9c 100644 (file)
@@ -301,7 +301,7 @@ namespace Test
                {
                        try
                        {
-                               bool useBufferedSockets = false;
+                               bool useBufferedSockets = false, useFramed = false;
                                int port = 9090;
                                if (args.Length > 0)
                                {
@@ -309,7 +309,23 @@ namespace Test
 
                                        if (args.Length > 1)
                                        {
-                                               bool.TryParse(args[1], out useBufferedSockets);
+                                               if ( args[1] == "raw" )
+                                               {
+                                                       // as default
+                                               }
+                                               else if ( args[1] == "buffered" )
+                                               {
+                                                       useBufferedSockets = true;
+                                               }
+                                               else if ( args[1] == "framed" )
+                                               {
+                                                       useFramed = true;
+                                               }
+                                               else
+                                               {
+                                                       // Fall back to the older boolean syntax
+                                                       bool.TryParse(args[1], out useBufferedSockets);
+                                               }
                                        }
                                }
 
@@ -320,10 +336,12 @@ namespace Test
                                // Transport
                                TServerSocket tServerSocket = new TServerSocket(port, 0, useBufferedSockets);
 
-                               TServer serverEngine;
-
                                // Simple Server
-                               serverEngine = new TSimpleServer(testProcessor, tServerSocket);
+                               TServer serverEngine;
+                               if ( useFramed )
+                                       serverEngine = new TSimpleServer(testProcessor, tServerSocket, new TFramedTransport.Factory());
+                               else
+                                       serverEngine = new TSimpleServer(testProcessor, tServerSocket);
 
                                // ThreadPool Server
                                // serverEngine = new TThreadPoolServer(testProcessor, tServerSocket);
@@ -334,7 +352,10 @@ namespace Test
                                testHandler.server = serverEngine;
 
                                // Run it
-                               Console.WriteLine("Starting the server on port " + port + (useBufferedSockets ? " with buffered socket" : "") + "...");
+                               Console.WriteLine("Starting the server on port " + port + 
+                                       (useBufferedSockets ? " with buffered socket" : "") + 
+                                       (useFramed ? " with framed transport" : "") + 
+                                       "...");
                                serverEngine.Serve();
 
                        }