THRIFT-1880 Make named pipes server work asynchronously (overlapped) to allow for clean server stops
Patch: Jens Geyer
diff --git a/lib/delphi/src/Thrift.Server.pas b/lib/delphi/src/Thrift.Server.pas
index 8af399e..e6ab7ac 100644
--- a/lib/delphi/src/Thrift.Server.pas
+++ b/lib/delphi/src/Thrift.Server.pas
@@ -266,16 +266,18 @@
end;
client := nil;
- InputTransport := nil;
- OutputTransport := nil;
- InputProtocol := nil;
- OutputProtocol := nil;
-
while (not FStop) do
begin
try
+ // clean up any old instances before waiting for clients
+ InputTransport := nil;
+ OutputTransport := nil;
+ InputProtocol := nil;
+ OutputProtocol := nil;
+
client := FServerTransport.Accept;
FLogDelegate( 'Client Connected!');
+
InputTransport := FInputTransportFactory.GetTransport( client );
OutputTransport := FOutputTransportFactory.GetTransport( client );
InputProtocol := FInputProtocolFactory.GetProtocol( InputTransport );
@@ -284,6 +286,7 @@
begin
if FStop then Break;
end;
+
except
on E: TTransportException do
begin
diff --git a/lib/delphi/src/Thrift.Transport.Pipes.pas b/lib/delphi/src/Thrift.Transport.Pipes.pas
index 54e00a4..bf07e1e 100644
--- a/lib/delphi/src/Thrift.Transport.Pipes.pas
+++ b/lib/delphi/src/Thrift.Transport.Pipes.pas
@@ -23,9 +23,8 @@
interface
uses
- Windows, SysUtils, Math, AccCtrl, AclAPI,
+ Windows, SysUtils, Math, AccCtrl, AclAPI, SyncObjs,
Thrift.Transport,
- Thrift.Console,
Thrift.Stream;
const
@@ -36,7 +35,7 @@
//--- Pipe Streams ---
- TPipeStreamBaseImpl = class( TThriftStreamImpl)
+ TPipeStreamBase = class( TThriftStreamImpl)
strict protected
FPipe : THandle;
FTimeout : DWORD;
@@ -55,7 +54,7 @@
end;
- TNamedPipeStreamImpl = class sealed( TPipeStreamBaseImpl)
+ TNamedPipeStreamImpl = class sealed( TPipeStreamBase)
private
FPipeName : string;
FShareMode : DWORD;
@@ -72,7 +71,7 @@
end;
- THandlePipeStreamImpl = class sealed( TPipeStreamBaseImpl)
+ THandlePipeStreamImpl = class sealed( TPipeStreamBase)
private
FSrcHandle : THandle;
@@ -88,12 +87,12 @@
//--- Pipe Transports ---
- IPipe = interface( IStreamTransport)
+ IPipeTransport = interface( IStreamTransport)
['{5E05CC85-434F-428F-BFB2-856A168B5558}']
end;
- TPipeTransportBaseImpl = class( TStreamTransportImpl, IPipe)
+ TPipeTransportBase = class( TStreamTransportImpl, IPipeTransport)
public
// ITransport
function GetIsOpen: Boolean; override;
@@ -102,7 +101,7 @@
end;
- TNamedPipeImpl = class( TPipeTransportBaseImpl)
+ TNamedPipeTransportClientEndImpl = class( TPipeTransportBase)
public
// Named pipe constructors
constructor Create( aPipe : THandle; aOwnsHandle : Boolean); overload;
@@ -113,7 +112,7 @@
end;
- TNamedPipeServerImpl = class( TNamedPipeImpl)
+ TNamedPipeTransportServerEndImpl = class( TNamedPipeTransportClientEndImpl)
strict private
FHandle : THandle;
public
@@ -123,7 +122,7 @@
end;
- TAnonymousPipeImpl = class( TPipeTransportBaseImpl)
+ TAnonymousPipeTransportImpl = class( TPipeTransportBase)
public
// Anonymous pipe constructor
constructor Create( const aPipeRead, aPipeWrite : THandle; aOwnsHandles : Boolean); overload;
@@ -133,7 +132,7 @@
//--- Server Transports ---
- IAnonymousServerPipe = interface( IServerTransport)
+ IAnonymousPipeServerTransport = interface( IServerTransport)
['{7AEE6793-47B9-4E49-981A-C39E9108E9AD}']
// Server side anonymous pipe ends
function ReadHandle : THandle;
@@ -144,19 +143,23 @@
end;
- INamedServerPipe = interface( IServerTransport)
+ INamedPipeServerTransport = interface( IServerTransport)
['{9DF9EE48-D065-40AF-8F67-D33037D3D960}']
function Handle : THandle;
end;
- TServerPipeBaseImpl = class( TServerTransportImpl)
+ TPipeServerTransportBase = class( TServerTransportImpl)
+ protected
+ FStopServer : Boolean;
+ procedure InternalClose; virtual; abstract;
public
procedure Listen; override;
+ procedure Close; override;
end;
- TAnonymousServerPipeImpl = class( TServerPipeBaseImpl, IAnonymousServerPipe)
+ TAnonymousPipeServerTransportImpl = class( TPipeServerTransportBase, IAnonymousPipeServerTransport)
private
FBufSize : DWORD;
@@ -173,41 +176,41 @@
function CreateAnonPipe : Boolean;
- // IAnonymousServerPipe
+ // IAnonymousPipeServerTransport
function ReadHandle : THandle;
function WriteHandle : THandle;
function ClientAnonRead : THandle;
function ClientAnonWrite : THandle;
+ procedure InternalClose; override;
+
public
constructor Create( aBufsize : Cardinal = 4096);
-
- procedure Close; override;
end;
- TNamedServerPipeImpl = class( TServerPipeBaseImpl, INamedServerPipe)
+ TNamedPipeServerTransportImpl = class( TPipeServerTransportBase, INamedPipeServerTransport)
private
FPipeName : string;
FMaxConns : DWORD;
FBufSize : DWORD;
FTimeout : DWORD;
-
- FHandle : THandle;
-
- protected
+ FHandle : THandle;
+ FConnected : Boolean;
+
+
protected
function AcceptImpl: ITransport; override;
- procedure CreateNamedPipe;
+ function CreateNamedPipe : THandle;
+ function CreateTransportInstance : ITransport;
- // INamedServerPipe
+ // INamedPipeServerTransport
function Handle : THandle;
+ procedure InternalClose; override;
public
constructor Create( aPipename : string; aBufsize : Cardinal = 4096;
aMaxConns : Cardinal = PIPE_UNLIMITED_INSTANCES;
aTimeOut : Cardinal = 0);
-
- procedure Close; override;
end;
@@ -236,10 +239,10 @@
-{ TPipeStreamBaseImpl }
+{ TPipeStreamBase }
-constructor TPipeStreamBaseImpl.Create( const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT);
+constructor TPipeStreamBase.Create( const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT);
begin
inherited Create;
FPipe := INVALID_HANDLE_VALUE;
@@ -247,7 +250,7 @@
end;
-destructor TPipeStreamBaseImpl.Destroy;
+destructor TPipeStreamBase.Destroy;
begin
try
Close;
@@ -257,25 +260,25 @@
end;
-procedure TPipeStreamBaseImpl.Close;
+procedure TPipeStreamBase.Close;
begin
ClosePipeHandle( FPipe);
end;
-procedure TPipeStreamBaseImpl.Flush;
+procedure TPipeStreamBase.Flush;
begin
// nothing to do
end;
-function TPipeStreamBaseImpl.IsOpen: Boolean;
+function TPipeStreamBase.IsOpen: Boolean;
begin
result := (FPipe <> INVALID_HANDLE_VALUE);
end;
-procedure TPipeStreamBaseImpl.Write(const buffer: TBytes; offset, count: Integer);
+procedure TPipeStreamBase.Write(const buffer: TBytes; offset, count: Integer);
var cbWritten : DWORD;
begin
if not IsOpen
@@ -288,7 +291,7 @@
end;
-function TPipeStreamBaseImpl.Read( var buffer: TBytes; offset, count: Integer): Integer;
+function TPipeStreamBase.Read( var buffer: TBytes; offset, count: Integer): Integer;
var cbRead, dwErr : DWORD;
bytes, retries : LongInt;
bOk : Boolean;
@@ -310,7 +313,8 @@
then Break; // there are data
dwErr := GetLastError;
- if (dwErr = ERROR_BROKEN_PIPE)
+ if (dwErr = ERROR_INVALID_HANDLE)
+ or (dwErr = ERROR_BROKEN_PIPE)
or (dwErr = ERROR_PIPE_NOT_CONNECTED)
then begin
result := 0; // other side closed the pipe
@@ -333,7 +337,7 @@
end;
-function TPipeStreamBaseImpl.ToArray: TBytes;
+function TPipeStreamBase.ToArray: TBytes;
var bytes : LongInt;
begin
SetLength( result, 0);
@@ -436,34 +440,34 @@
end;
-{ TPipeTransportBaseImpl }
+{ TPipeTransportBase }
-function TPipeTransportBaseImpl.GetIsOpen: Boolean;
+function TPipeTransportBase.GetIsOpen: Boolean;
begin
result := (FInputStream <> nil) and (FInputStream.IsOpen)
and (FOutputStream <> nil) and (FOutputStream.IsOpen);
end;
-procedure TPipeTransportBaseImpl.Open;
+procedure TPipeTransportBase.Open;
begin
FInputStream.Open;
FOutputStream.Open;
end;
-procedure TPipeTransportBaseImpl.Close;
+procedure TPipeTransportBase.Close;
begin
FInputStream.Close;
FOutputStream.Close;
end;
-{ TNamedPipeImpl }
+{ TNamedPipeTransportClientEndImpl }
-constructor TNamedPipeImpl.Create( const aPipeName : string; const aShareMode: DWORD;
+constructor TNamedPipeTransportClientEndImpl.Create( const aPipeName : string; const aShareMode: DWORD;
const aSecurityAttributes: PSecurityAttributes;
const aTimeOut : DWORD);
// Named pipe constructor
@@ -474,7 +478,7 @@
end;
-constructor TNamedPipeImpl.Create( aPipe : THandle; aOwnsHandle : Boolean);
+constructor TNamedPipeTransportClientEndImpl.Create( aPipe : THandle; aOwnsHandle : Boolean);
// Named pipe constructor
begin
inherited Create( nil, nil);
@@ -483,10 +487,10 @@
end;
-{ TNamedPipeServerImpl }
+{ TNamedPipeTransportServerEndImpl }
-constructor TNamedPipeServerImpl.Create( aPipe : THandle; aOwnsHandle : Boolean);
+constructor TNamedPipeTransportServerEndImpl.Create( aPipe : THandle; aOwnsHandle : Boolean);
// Named pipe constructor
begin
FHandle := DuplicatePipeHandle( aPipe);
@@ -494,7 +498,7 @@
end;
-procedure TNamedPipeServerImpl.Close;
+procedure TNamedPipeTransportServerEndImpl.Close;
begin
FlushFileBuffers( FHandle);
DisconnectNamedPipe( FHandle); // force client off the pipe
@@ -504,10 +508,10 @@
end;
-{ TAnonymousPipeImpl }
+{ TAnonymousPipeTransportImpl }
-constructor TAnonymousPipeImpl.Create( const aPipeRead, aPipeWrite : THandle; aOwnsHandles : Boolean);
+constructor TAnonymousPipeTransportImpl.Create( const aPipeRead, aPipeWrite : THandle; aOwnsHandles : Boolean);
// Anonymous pipe constructor
begin
inherited Create( nil, nil);
@@ -516,19 +520,26 @@
end;
-{ TServerPipeBaseImpl }
+{ TPipeServerTransportBase }
-procedure TServerPipeBaseImpl.Listen;
+procedure TPipeServerTransportBase.Listen;
begin
- // not much to do here
+ FStopServer := FALSE;
end;
-{ TAnonymousServerPipeImpl }
+procedure TPipeServerTransportBase.Close;
+begin
+ FStopServer := TRUE;
+ InternalClose;
+end;
-constructor TAnonymousServerPipeImpl.Create( aBufsize : Cardinal);
+{ TAnonymousPipeServerTransportImpl }
+
+
+constructor TAnonymousPipeServerTransportImpl.Create( aBufsize : Cardinal);
// Anonymous pipe CTOR
begin
inherited Create;
@@ -547,7 +558,7 @@
end;
-function TAnonymousServerPipeImpl.AcceptImpl: ITransport;
+function TAnonymousPipeServerTransportImpl.AcceptImpl: ITransport;
var buf : Byte;
br : DWORD;
begin
@@ -556,11 +567,13 @@
and (GetLastError() <> ERROR_MORE_DATA)
then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
'TServerPipe unable to initiate pipe communication');
- result := TAnonymousPipeImpl.Create( FReadHandle, FWriteHandle, FALSE);
+
+ // create the transport impl
+ result := TAnonymousPipeTransportImpl.Create( FReadHandle, FWriteHandle, FALSE);
end;
-procedure TAnonymousServerPipeImpl.Close;
+procedure TAnonymousPipeServerTransportImpl.InternalClose;
begin
ClosePipeHandle( FReadHandle);
ClosePipeHandle( FWriteHandle);
@@ -569,31 +582,31 @@
end;
-function TAnonymousServerPipeImpl.ReadHandle : THandle;
+function TAnonymousPipeServerTransportImpl.ReadHandle : THandle;
begin
result := FReadHandle;
end;
-function TAnonymousServerPipeImpl.WriteHandle : THandle;
+function TAnonymousPipeServerTransportImpl.WriteHandle : THandle;
begin
result := FWriteHandle;
end;
-function TAnonymousServerPipeImpl.ClientAnonRead : THandle;
+function TAnonymousPipeServerTransportImpl.ClientAnonRead : THandle;
begin
result := FClientAnonRead;
end;
-function TAnonymousServerPipeImpl.ClientAnonWrite : THandle;
+function TAnonymousPipeServerTransportImpl.ClientAnonWrite : THandle;
begin
result := FClientAnonWrite;
end;
-function TAnonymousServerPipeImpl.CreateAnonPipe : Boolean;
+function TAnonymousPipeServerTransportImpl.CreateAnonPipe : Boolean;
var sd : PSECURITY_DESCRIPTOR;
sa : SECURITY_ATTRIBUTES; //TSecurityAttributes;
hCAR, hPipeW, hCAW, hPipe : THandle;
@@ -610,14 +623,16 @@
sa.bInheritHandle := TRUE; //allow passing handle to child
if not CreatePipe( hCAR, hPipeW, @sa, FBufSize) then begin //create stdin pipe
- Console.WriteLine( 'TServerPipe CreatePipe (anon) failed, '+SysErrorMessage(GetLastError));
+ raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
+ 'TServerPipe CreatePipe (anon) failed, '+SysErrorMessage(GetLastError));
Exit;
end;
if not CreatePipe( hPipe, hCAW, @sa, FBufSize) then begin //create stdout pipe
- Console.WriteLine( 'TServerPipe CreatePipe (anon) failed, '+SysErrorMessage(GetLastError));
CloseHandle( hCAR);
CloseHandle( hPipeW);
+ raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
+ 'TServerPipe CreatePipe (anon) failed, '+SysErrorMessage(GetLastError));
Exit;
end;
@@ -634,72 +649,128 @@
end;
-{ TNamedServerPipeImpl }
+{ TNamedPipeServerTransportImpl }
-constructor TNamedServerPipeImpl.Create( aPipename : string; aBufsize, aMaxConns, aTimeOut : Cardinal);
+constructor TNamedPipeServerTransportImpl.Create( aPipename : string; aBufsize, aMaxConns, aTimeOut : Cardinal);
// Named Pipe CTOR
begin
inherited Create;
- FPipeName := aPipename;
- FBufsize := aBufSize;
- FMaxConns := Max( 1, Min( PIPE_UNLIMITED_INSTANCES, aMaxConns));
- FHandle := INVALID_HANDLE_VALUE;
- FTimeout := aTimeOut;
+ FPipeName := aPipename;
+ FBufsize := aBufSize;
+ FMaxConns := Max( 1, Min( PIPE_UNLIMITED_INSTANCES, aMaxConns));
+ FHandle := INVALID_HANDLE_VALUE;
+ FTimeout := aTimeOut;
+ FConnected := FALSE;
if Copy(FPipeName,1,2) <> '\\'
then FPipeName := '\\.\pipe\' + FPipeName; // assume localhost
end;
-function TNamedServerPipeImpl.AcceptImpl: ITransport;
-var connectRet : Boolean;
-begin
- CreateNamedPipe;
+function TNamedPipeServerTransportImpl.AcceptImpl: ITransport;
+var dwError, dwWait, dwDummy : DWORD;
+ overlapped : TOverlapped;
+ event : TEvent;
+begin
+ FillChar( overlapped, SizeOf(overlapped), 0);
+ event := TEvent.Create( nil, TRUE, FALSE, ''); // always ManualReset, see MSDN
+ try
+ overlapped.hEvent := event.Handle;
+
+ ASSERT( not FConnected);
+ while not FConnected do begin
+ InternalClose;
+ if FStopServer then Abort;
+ CreateNamedPipe;
- // Wait for the client to connect; if it succeeds, the
- // function returns a nonzero value. If the function returns
- // zero, GetLastError should return ERROR_PIPE_CONNECTED.
- if ConnectNamedPipe( FHandle,nil)
- then connectRet := TRUE
- else connectRet := (GetLastError() = ERROR_PIPE_CONNECTED);
+ // Wait for the client to connect; if it succeeds, the
+ // function returns a nonzero value. If the function returns
+ // zero, GetLastError should return ERROR_PIPE_CONNECTED.
+ if ConnectNamedPipe( Handle, @overlapped)
+ then FConnected := TRUE
+ else begin
+ // ConnectNamedPipe() returns FALSE for OverlappedIO, even if connected.
+ // We have to check GetLastError() explicitly to find out
+ dwError := GetLastError;
+ case dwError of
+ ERROR_PIPE_CONNECTED : begin
+ FConnected := TRUE; // special case: pipe immediately connected
+ end;
+
+ ERROR_IO_PENDING : begin
+ dwWait := WaitForSingleObject( overlapped.hEvent, DEFAULT_THRIFT_PIPE_TIMEOUT);
+ FConnected := (dwWait = WAIT_OBJECT_0)
+ and GetOverlappedResult( Handle, overlapped, dwDummy, TRUE);
+ end;
+
+ else
+ InternalClose;
+ raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
+ 'Client connection failed');
+ end;
+ end;
+ end;
- if not connectRet then begin
- Close;
- raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
- 'TServerPipe: client connection failed');
- end;
+ // create the transport impl
+ result := CreateTransportInstance;
- result := TNamedPipeServerImpl.Create( FHandle, TRUE);
-end;
-
-
-procedure TNamedServerPipeImpl.Close;
-begin
- if FHandle <> INVALID_HANDLE_VALUE
- then try
- FlushFileBuffers( FHandle);
- DisconnectNamedPipe( FHandle);
finally
- ClosePipeHandle( FHandle);
+ event.Free;
end;
end;
-function TNamedServerPipeImpl.Handle : THandle;
+function TNamedPipeServerTransportImpl.CreateTransportInstance : ITransport;
+// create the transport impl
+var hPipe : THandle;
begin
- result := FHandle;
+ hPipe := THandle( InterlockedExchangePointer( Pointer(FHandle), Pointer(INVALID_HANDLE_VALUE)));
+ try
+ FConnected := FALSE;
+ result := TNamedPipeTransportServerEndImpl.Create( hPipe, TRUE);
+ except
+ ClosePipeHandle(hPipe);
+ raise;
+ end;
end;
-procedure TNamedServerPipeImpl.CreateNamedPipe;
+procedure TNamedPipeServerTransportImpl.InternalClose;
+var hPipe : THandle;
+begin
+ hPipe := THandle( InterlockedExchangePointer( Pointer(FHandle), Pointer(INVALID_HANDLE_VALUE)));
+ if hPipe = INVALID_HANDLE_VALUE then Exit;
+
+ try
+ if FConnected
+ then FlushFileBuffers( hPipe)
+ else CancelIo( hPipe);
+ DisconnectNamedPipe( hPipe);
+ finally
+ ClosePipeHandle( hPipe);
+ FConnected := FALSE;
+ end;
+end;
+
+
+function TNamedPipeServerTransportImpl.Handle : THandle;
+begin
+ {$IFDEF WIN64}
+ result := THandle( InterlockedExchangeAdd64( Integer(FHandle), 0));
+ {$ELSE}
+ result := THandle( InterlockedExchangeAdd( Integer(FHandle), 0));
+ {$ENDIF}
+end;
+
+
+function TNamedPipeServerTransportImpl.CreateNamedPipe : THandle;
var SIDAuthWorld : SID_IDENTIFIER_AUTHORITY ;
everyone_sid : PSID;
ea : EXPLICIT_ACCESS;
acl : PACL;
sd : PSECURITY_DESCRIPTOR;
sa : SECURITY_ATTRIBUTES;
- hPipe : THandle;
const
SECURITY_WORLD_SID_AUTHORITY : TSIDIdentifierAuthority = (Value : (0,0,0,0,0,1));
SECURITY_WORLD_RID = $00000000;
@@ -707,6 +778,8 @@
sd := nil;
everyone_sid := nil;
try
+ ASSERT( (FHandle = INVALID_HANDLE_VALUE) and not FConnected);
+
// Windows - set security to allow non-elevated apps
// to access pipes created by elevated apps.
SIDAuthWorld := SECURITY_WORLD_SID_AUTHORITY;
@@ -732,19 +805,20 @@
sa.bInheritHandle := FALSE;
// Create an instance of the named pipe
- hPipe := Windows.CreateNamedPipe( PChar( FPipeName), // pipe name
- PIPE_ACCESS_DUPLEX, // read/write access
- PIPE_TYPE_MESSAGE or // message type pipe
- PIPE_READMODE_MESSAGE, // message-read mode
- FMaxConns, // max. instances
- FBufSize, // output buffer size
- FBufSize, // input buffer size
- FTimeout, // time-out, see MSDN
- @sa); // default security attribute
+ result := Windows.CreateNamedPipe( PChar( FPipeName), // pipe name
+ PIPE_ACCESS_DUPLEX or // read/write access
+ FILE_FLAG_OVERLAPPED, // async mode
+ PIPE_TYPE_MESSAGE or // message type pipe
+ PIPE_READMODE_MESSAGE, // message-read mode
+ FMaxConns, // max. instances
+ FBufSize, // output buffer size
+ FBufSize, // input buffer size
+ FTimeout, // time-out, see MSDN
+ @sa); // default security attribute
- FHandle := hPipe;
- if( FHandle = INVALID_HANDLE_VALUE)
- then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
+ if( result <> INVALID_HANDLE_VALUE)
+ then InterlockedExchangePointer( Pointer(FHandle), Pointer(result))
+ else raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
'CreateNamedPipe() failed ' + IntToStr(GetLastError));
finally
diff --git a/lib/delphi/test/TestClient.pas b/lib/delphi/test/TestClient.pas
index 2f77de8..37fe7d7 100644
--- a/lib/delphi/test/TestClient.pas
+++ b/lib/delphi/test/TestClient.pas
@@ -19,6 +19,9 @@
unit TestClient;
+{.$DEFINE StressTest} // activate to stress-test the server with frequent connects/disconnects
+{.$DEFINE PerfTest} // activate to activate the performance test
+
interface
uses
@@ -63,6 +66,7 @@
procedure ClientTest;
procedure JSONProtocolReadWriteTest;
+ procedure StressTest(const client : TThriftTest.Iface);
protected
procedure Execute; override;
public
@@ -238,11 +242,11 @@
begin
if sPipeName <> '' then begin
Console.WriteLine('Using named pipe ('+sPipeName+')');
- streamtrans := TNamedPipeImpl.Create( sPipeName, 0, nil, TIMEOUT);
+ streamtrans := TNamedPipeTransportClientEndImpl.Create( sPipeName, 0, nil, TIMEOUT);
end
else if bAnonPipe then begin
Console.WriteLine('Using anonymous pipes ('+IntToStr(Integer(hAnonRead))+' and '+IntToStr(Integer(hAnonWrite))+')');
- streamtrans := TAnonymousPipeImpl.Create( hAnonRead, hAnonWrite, FALSE);
+ streamtrans := TAnonymousPipeTransportImpl.Create( hAnonRead, hAnonWrite, FALSE);
end
else begin
Console.WriteLine('Using sockets ('+host+' port '+IntToStr(port)+')');
@@ -370,6 +374,10 @@
client := TThriftTest.TClient.Create( FProtocol);
FTransport.Open;
+ {$IFDEF StressTest}
+ StressTest( client);
+ {$ENDIF StressTest}
+
// in-depth exception test
// (1) do we get an exception at all?
// (2) do we get the right exception?
@@ -422,6 +430,11 @@
s := client.testString('Test');
Expect( s = 'Test', 'testString(''Test'') = "'+s+'"');
+ s := client.testString(HUGE_TEST_STRING);
+ Expect( length(s) = length(HUGE_TEST_STRING),
+ 'testString( lenght(HUGE_TEST_STRING) = '+IntToStr(Length(HUGE_TEST_STRING))+') '
+ +'=> length(result) = '+IntToStr(Length(s)));
+
i8 := client.testByte(1);
Expect( i8 = 1, 'testByte(1) = ' + IntToStr( i8 ));
@@ -831,6 +844,7 @@
Expect( TRUE, 'Test Oneway(1)'); // success := no exception
// call time
+ {$IFDEF PerfTest}
StartTestGroup( 'Test Calltime()');
StartTick := GetTIckCount;
for k := 0 to 1000 - 1 do
@@ -838,12 +852,31 @@
client.testVoid();
end;
Console.WriteLine(' = ' + FloatToStr( (GetTickCount - StartTick) / 1000 ) + ' ms a testVoid() call' );
+ {$ENDIF PerfTest}
// no more tests here
StartTestGroup( '');
end;
+procedure TClientThread.StressTest(const client : TThriftTest.Iface);
+begin
+ while TRUE do begin
+ try
+ if not FTransport.IsOpen then FTransport.Open; // re-open connection, server has already closed
+ try
+ client.testString('Test');
+ Write('.');
+ finally
+ if FTransport.IsOpen then FTransport.Close;
+ end;
+ except
+ on e:Exception do Writeln(#10+e.message);
+ end;
+ end;
+end;
+
+
procedure TClientThread.JSONProtocolReadWriteTest;
// Tests only then read/write procedures of the JSON protocol
// All tests succeed, if we can read what we wrote before
diff --git a/lib/delphi/test/TestConstants.pas b/lib/delphi/test/TestConstants.pas
index b6664ef..f21a4bb 100644
--- a/lib/delphi/test/TestConstants.pas
+++ b/lib/delphi/test/TestConstants.pas
@@ -33,6 +33,79 @@
BINARY_STRICT_READ = FALSE;
BINARY_STRICT_WRITE = FALSE;
+ HUGE_TEST_STRING = 'Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy '
+ + 'eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam '
+ + 'voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+ + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit '
+ + 'amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam '
+ + 'nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed '
+ + 'diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+ + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. '
+ + 'Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy '
+ + 'eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam '
+ + 'voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+ + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit '
+ + 'amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam '
+ + 'nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed '
+ + 'diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+ + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. '
+ + 'Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy '
+ + 'eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam '
+ + 'voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+ + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit '
+ + 'amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam '
+ + 'nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed '
+ + 'diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+ + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. '
+ + 'Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy '
+ + 'eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam '
+ + 'voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+ + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit '
+ + 'amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam '
+ + 'nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed '
+ + 'diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+ + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. '
+ + 'Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy '
+ + 'eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam '
+ + 'voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+ + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit '
+ + 'amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam '
+ + 'nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed '
+ + 'diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+ + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. '
+ + 'Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy '
+ + 'eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam '
+ + 'voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+ + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit '
+ + 'amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam '
+ + 'nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed '
+ + 'diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+ + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. '
+ + 'Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy '
+ + 'eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam '
+ + 'voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+ + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit '
+ + 'amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam '
+ + 'nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed '
+ + 'diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+ + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. '
+ + 'Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy '
+ + 'eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam '
+ + 'voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+ + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit '
+ + 'amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam '
+ + 'nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed '
+ + 'diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+ + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. '
+ + 'Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy '
+ + 'eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam '
+ + 'voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+ + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit '
+ + 'amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam '
+ + 'nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed '
+ + 'diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet '
+ + 'clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. ';
+
implementation
// nothing
diff --git a/lib/delphi/test/TestServer.pas b/lib/delphi/test/TestServer.pas
index 791468b..7b74e58 100644
--- a/lib/delphi/test/TestServer.pas
+++ b/lib/delphi/test/TestServer.pas
@@ -21,6 +21,8 @@
{$WARN SYMBOL_PLATFORM OFF}
+{.$DEFINE RunEndless} // activate to interactively stress-test the server stop routines via Ctrl+C
+
interface
uses
@@ -46,6 +48,7 @@
ITestHandler = interface( TThriftTest.Iface )
procedure SetServer( const AServer : IServer );
+ procedure TestStop;
end;
TTestHandlerImpl = class( TInterfacedObject, ITestHandler )
@@ -73,17 +76,45 @@
function testMultiException(const arg0: string; const arg1: string): IXtruct;
procedure testOneway(secondsToSleep: Integer);
- procedure testStop;
-
+ procedure TestStop;
procedure SetServer( const AServer : IServer );
end;
- class procedure LaunchAnonPipeChild( const app : string; const transport : IAnonymousServerPipe);
+ class procedure LaunchAnonPipeChild( const app : string; const transport : IAnonymousPipeServerTransport);
class procedure Execute( const args: array of string);
end;
implementation
+
+var g_Handler : TTestServer.ITestHandler = nil;
+
+
+function MyConsoleEventHandler( dwCtrlType : DWORD) : BOOL; stdcall;
+// Note that this Handler procedure is called from another thread
+var handler : TTestServer.ITestHandler;
+begin
+ result := TRUE;
+ try
+ case dwCtrlType of
+ CTRL_C_EVENT : Console.WriteLine( 'Ctrl+C pressed');
+ CTRL_BREAK_EVENT : Console.WriteLine( 'Ctrl+Break pressed');
+ CTRL_CLOSE_EVENT : Console.WriteLine( 'Received CloseTask signal');
+ CTRL_LOGOFF_EVENT : Console.WriteLine( 'Received LogOff signal');
+ CTRL_SHUTDOWN_EVENT : Console.WriteLine( 'Received Shutdown signal');
+ else
+ Console.WriteLine( 'Received console event #'+IntToStr(Integer(dwCtrlType)));
+ end;
+
+ handler := g_Handler;
+ if handler <> nil then handler.TestStop;
+
+ except
+ // catch all
+ end;
+end;
+
+
{ TTestServer.TTestHandlerImpl }
procedure TTestServer.TTestHandlerImpl.SetServer( const AServer: IServer);
@@ -405,7 +436,7 @@
{ TTestServer }
-class procedure TTestServer.LaunchAnonPipeChild( const app : string; const transport : IAnonymousServerPipe);
+class procedure TTestServer.LaunchAnonPipeChild( const app : string; const transport : IAnonymousPipeServerTransport);
//Launch child process and pass R/W anonymous pipe handles on cmd line.
//This is a simple example and does not include elevation or other
//advanced features.
@@ -457,8 +488,8 @@
testProcessor : IProcessor;
ServerTrans : IServerTransport;
ServerEngine : IServer;
- anonymouspipe : IAnonymousServerPipe;
- namedpipe : INamedServerPipe;
+ anonymouspipe : IAnonymousPipeServerTransport;
+ namedpipe : INamedPipeServerTransport;
TransportFactory : ITransportFactory;
ProtocolFactory : IProtocolFactory;
i : Integer;
@@ -542,12 +573,12 @@
if sPipeName <> '' then begin
Console.WriteLine('- named pipe ('+sPipeName+')');
- namedpipe := TNamedServerPipeImpl.Create( sPipeName, 4096, PIPE_UNLIMITED_INSTANCES, TIMEOUT);
+ namedpipe := TNamedPipeServerTransportImpl.Create( sPipeName, 4096, PIPE_UNLIMITED_INSTANCES, TIMEOUT);
servertrans := namedpipe;
end
else if AnonPipe then begin
Console.WriteLine('- anonymous pipes');
- anonymouspipe := TAnonymousServerPipeImpl.Create;
+ anonymouspipe := TAnonymousPipeServerTransportImpl.Create;
servertrans := anonymouspipe;
end
else begin
@@ -580,11 +611,18 @@
if AnonPipe
then LaunchAnonPipeChild( ExtractFilePath(ParamStr(0))+'client.exe', anonymouspipe);
+ // install Ctrl+C handler before the server starts
+ g_Handler := testHandler;
+ SetConsoleCtrlHandler( @MyConsoleEventHandler, TRUE);
Console.WriteLine('');
- Console.WriteLine('Starting the server ...');
- serverEngine.Serve;
+ repeat
+ Console.WriteLine('Starting the server ...');
+ serverEngine.Serve;
+ until {$IFDEF RunEndless} FALSE {$ELSE} TRUE {$ENDIF};
+
testHandler.SetServer( nil);
+ g_Handler := nil;
except
on E: Exception do
@@ -595,4 +633,5 @@
Console.WriteLine( 'done.');
end;
+
end.
diff --git a/lib/delphi/test/server.dpr b/lib/delphi/test/server.dpr
index 5fad6eb..ca485af 100644
--- a/lib/delphi/test/server.dpr
+++ b/lib/delphi/test/server.dpr
@@ -54,7 +54,7 @@
args[i-1] := arg;
end;
TTestServer.Execute( args );
- Readln;
+ Writeln('Press ENTER to close ... '); Readln;
except
on E: Exception do
Writeln(E.ClassName, ': ', E.Message);