From 7070aaa23bce996f9c40f75903d72fe427072713 Mon Sep 17 00:00:00 2001 From: T Jake Luciani Date: Thu, 27 Jan 2011 02:51:51 +0000 Subject: [PATCH] THRIFT-904: disable nagle and linger git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1063966 13f79535-47bb-0310-9956-ffa450edef68 --- configure.ac | 2 +- lib/csharp/src/Transport/TFramedTransport.cs | 47 +++++++++++++++----- test/csharp/ThriftTest/TestClient.cs | 28 +++++++----- test/csharp/ThriftTest/TestServer.cs | 33 +++++++++++--- 4 files changed, 81 insertions(+), 29 deletions(-) diff --git a/configure.ac b/configure.ac index b1ab0a7c..6f18c0e0 100644 --- a/configure.ac +++ b/configure.ac @@ -17,7 +17,7 @@ # under the License. # -AC_PREREQ(2.65) +#AC_PREREQ(2.65) AC_INIT([thrift], [0.7.0-dev]) diff --git a/lib/csharp/src/Transport/TFramedTransport.cs b/lib/csharp/src/Transport/TFramedTransport.cs index b7ad5f24..e259f5a5 100644 --- a/lib/csharp/src/Transport/TFramedTransport.cs +++ b/lib/csharp/src/Transport/TFramedTransport.cs @@ -24,9 +24,12 @@ namespace Thrift.Transport public class TFramedTransport : TTransport { protected TTransport transport = null; - protected MemoryStream writeBuffer = new MemoryStream(1024); + protected MemoryStream writeBuffer; protected MemoryStream readBuffer = null; + private const int header_size = 4; + private static byte[] header_dummy = new byte[header_size]; // used as header placeholder while initilizing new write buffer + public class Factory : TTransportFactory { public override TTransport GetTransport(TTransport trans) @@ -35,7 +38,12 @@ namespace Thrift.Transport } } - public TFramedTransport(TTransport transport) + public TFramedTransport() + { + InitWriteBuffer(); + } + + public TFramedTransport(TTransport transport) : this() { this.transport = transport; } @@ -77,8 +85,8 @@ namespace Thrift.Transport private void ReadFrame() { - byte[] i32rd = new byte[4]; - transport.ReadAll(i32rd, 0, 4); + byte[] i32rd = new byte[header_size]; + transport.ReadAll(i32rd, 0, header_size); int size = ((i32rd[0] & 0xff) << 24) | ((i32rd[1] & 0xff) << 16) | @@ -99,16 +107,31 @@ namespace Thrift.Transport { byte[] buf = writeBuffer.GetBuffer(); int len = (int)writeBuffer.Length; - writeBuffer = new MemoryStream(writeBuffer.Capacity); - - byte[] i32out = new byte[4]; - i32out[0] = (byte)(0xff & (len >> 24)); - i32out[1] = (byte)(0xff & (len >> 16)); - i32out[2] = (byte)(0xff & (len >> 8)); - i32out[3] = (byte)(0xff & (len)); - transport.Write(i32out, 0, 4); + int data_len = len - header_size; + if ( data_len < 0 ) + throw new System.InvalidOperationException (); // logic error actually + + InitWriteBuffer(); + + // Inject message header into the reserved buffer space + buf[0] = (byte)(0xff & (data_len >> 24)); + buf[1] = (byte)(0xff & (data_len >> 16)); + buf[2] = (byte)(0xff & (data_len >> 8)); + buf[3] = (byte)(0xff & (data_len)); + + // Send the entire message at once transport.Write(buf, 0, len); + transport.Flush(); } + + private void InitWriteBuffer () + { + // Create new buffer instance + writeBuffer = new MemoryStream(1024); + + // Reserve space for message header to be put right before sending it out + writeBuffer.Write ( header_dummy, 0, header_size ); + } } } diff --git a/test/csharp/ThriftTest/TestClient.cs b/test/csharp/ThriftTest/TestClient.cs index 1d7c75ea..60fc995e 100644 --- a/test/csharp/ThriftTest/TestClient.cs +++ b/test/csharp/ThriftTest/TestClient.cs @@ -39,7 +39,7 @@ namespace Test int port = 9090; string url = null; int numThreads = 1; - bool buffered = false; + bool buffered = false, framed = false; try { @@ -67,6 +67,11 @@ namespace Test buffered = true; Console.WriteLine("Using buffered sockets"); } + else if (args[i] == "-f" || args[i] == "-framed") + { + framed = true; + Console.WriteLine("Using framed transport"); + } else if (args[i] == "-t") { numThreads = Convert.ToInt32(args[++i]); @@ -89,16 +94,13 @@ namespace Test threads[test] = t; if (url == null) { - TSocket socket = new TSocket(host, port); + TTransport trans = new TSocket(host, port); if (buffered) - { - TBufferedTransport buffer = new TBufferedTransport(socket); - t.Start(buffer); - } - else - { - t.Start(socket); - } + trans = new TBufferedTransport(trans as TStreamTransport); + if (framed) + trans = new TFramedTransport(trans); + + t.Start(trans); } else { @@ -428,6 +430,12 @@ namespace Test Console.WriteLine("Test Oneway(1)"); client.testOneway(1); + + Console.Write("Test Calltime()"); + var startt = DateTime.UtcNow; + for ( int k=0; k<1000; ++k ) + client.testVoid(); + Console.WriteLine(" = " + (DateTime.UtcNow - startt).TotalSeconds.ToString() + " ms a testVoid() call" ); } } } diff --git a/test/csharp/ThriftTest/TestServer.cs b/test/csharp/ThriftTest/TestServer.cs index e3706404..894ec9c2 100644 --- a/test/csharp/ThriftTest/TestServer.cs +++ b/test/csharp/ThriftTest/TestServer.cs @@ -301,7 +301,7 @@ namespace Test { try { - bool useBufferedSockets = false; + bool useBufferedSockets = false, useFramed = false; int port = 9090; if (args.Length > 0) { @@ -309,7 +309,23 @@ namespace Test if (args.Length > 1) { - bool.TryParse(args[1], out useBufferedSockets); + if ( args[1] == "raw" ) + { + // as default + } + else if ( args[1] == "buffered" ) + { + useBufferedSockets = true; + } + else if ( args[1] == "framed" ) + { + useFramed = true; + } + else + { + // Fall back to the older boolean syntax + bool.TryParse(args[1], out useBufferedSockets); + } } } @@ -320,10 +336,12 @@ namespace Test // Transport TServerSocket tServerSocket = new TServerSocket(port, 0, useBufferedSockets); - TServer serverEngine; - // Simple Server - serverEngine = new TSimpleServer(testProcessor, tServerSocket); + TServer serverEngine; + if ( useFramed ) + serverEngine = new TSimpleServer(testProcessor, tServerSocket, new TFramedTransport.Factory()); + else + serverEngine = new TSimpleServer(testProcessor, tServerSocket); // ThreadPool Server // serverEngine = new TThreadPoolServer(testProcessor, tServerSocket); @@ -334,7 +352,10 @@ namespace Test testHandler.server = serverEngine; // Run it - Console.WriteLine("Starting the server on port " + port + (useBufferedSockets ? " with buffered socket" : "") + "..."); + Console.WriteLine("Starting the server on port " + port + + (useBufferedSockets ? " with buffered socket" : "") + + (useFramed ? " with framed transport" : "") + + "..."); serverEngine.Serve(); } -- 2.17.1