From: Jens Geyer Date: Thu, 20 Mar 2014 20:46:17 +0000 (+0200) Subject: THRIFT-2415 Named pipes server performance & message mode X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=e9651367c550a6dd72b5a67a3e5c487bd299eac8;p=common%2Fthrift.git THRIFT-2415 Named pipes server performance & message mode Patch: Jens Geyer --- diff --git a/lib/delphi/src/Thrift.Transport.Pipes.pas b/lib/delphi/src/Thrift.Transport.Pipes.pas index c2696f41..d2816c90 100644 --- a/lib/delphi/src/Thrift.Transport.Pipes.pas +++ b/lib/delphi/src/Thrift.Transport.Pipes.pas @@ -25,12 +25,14 @@ interface uses Windows, SysUtils, Math, AccCtrl, AclAPI, SyncObjs, Thrift.Transport, + Thrift.Utils, Thrift.Stream; const DEFAULT_THRIFT_PIPE_TIMEOUT = 5 * 1000; // ms + type //--- Pipe Streams --- @@ -39,6 +41,7 @@ type strict protected FPipe : THandle; FTimeout : DWORD; + FOverlapped : Boolean; procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override; function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override; @@ -46,25 +49,31 @@ type procedure Close; override; procedure Flush; override; + function ReadDirect( var buffer: TBytes; offset: Integer; count: Integer): Integer; + function ReadOverlapped( var buffer: TBytes; offset: Integer; count: Integer): Integer; + procedure WriteDirect( const buffer: TBytes; offset: Integer; count: Integer); + procedure WriteOverlapped( const buffer: TBytes; offset: Integer; count: Integer); + function IsOpen: Boolean; override; function ToArray: TBytes; override; public - constructor Create( const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT); + constructor Create( aEnableOverlapped : Boolean; const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT); destructor Destroy; override; end; TNamedPipeStreamImpl = class sealed( TPipeStreamBase) - private + strict private FPipeName : string; FShareMode : DWORD; FSecurityAttribs : PSecurityAttributes; - protected + strict protected procedure Open; override; public constructor Create( const aPipeName : string; + const aEnableOverlapped : Boolean; const aShareMode: DWORD = 0; const aSecurityAttributes: PSecurityAttributes = nil; const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT); overload; @@ -72,14 +81,16 @@ type THandlePipeStreamImpl = class sealed( TPipeStreamBase) - private + strict private FSrcHandle : THandle; - protected + strict protected procedure Open; override; public - constructor Create( const aPipeHandle : THandle; aOwnsHandle : Boolean); overload; + constructor Create( const aPipeHandle : THandle; + const aOwnsHandle, aEnableOverlapped : Boolean; + const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT); overload; destructor Destroy; override; end; @@ -104,7 +115,8 @@ type TNamedPipeTransportClientEndImpl = class( TPipeTransportBase) public // Named pipe constructors - constructor Create( aPipe : THandle; aOwnsHandle : Boolean); overload; + constructor Create( aPipe : THandle; aOwnsHandle : Boolean; + const aTimeOut : DWORD); overload; constructor Create( const aPipeName : string; const aShareMode: DWORD = 0; const aSecurityAttributes: PSecurityAttributes = nil; @@ -118,7 +130,8 @@ type public // ITransport procedure Close; override; - constructor Create( aPipe : THandle; aOwnsHandle : Boolean); reintroduce; + constructor Create( aPipe : THandle; aOwnsHandle : Boolean; + const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT); reintroduce; end; @@ -150,17 +163,20 @@ type TPipeServerTransportBase = class( TServerTransportImpl) - protected - FStopServer : Boolean; + strict protected + FStopServer : TEvent; procedure InternalClose; virtual; abstract; + function QueryStopServer : Boolean; public + constructor Create; + destructor Destroy; override; procedure Listen; override; procedure Close; override; end; TAnonymousPipeServerTransportImpl = class( TPipeServerTransportBase, IAnonymousPipeServerTransport) - private + strict private FBufSize : DWORD; // Server side anonymous pipe handles @@ -190,7 +206,7 @@ type TNamedPipeServerTransportImpl = class( TPipeServerTransportBase, INamedPipeServerTransport) - private + strict private FPipeName : string; FMaxConns : DWORD; FBufSize : DWORD; @@ -199,7 +215,7 @@ type FConnected : Boolean; - protected + strict protected function Accept(const fnAccepting: TProc): ITransport; override; function CreateNamedPipe : THandle; function CreateTransportInstance : ITransport; @@ -243,11 +259,14 @@ end; { TPipeStreamBase } -constructor TPipeStreamBase.Create( const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT); +constructor TPipeStreamBase.Create( aEnableOverlapped : Boolean; + const aTimeOut : DWORD = DEFAULT_THRIFT_PIPE_TIMEOUT); begin inherited Create; - FPipe := INVALID_HANDLE_VALUE; - FTimeout := aTimeOut; + ASSERT( aTimeout > 0); + FPipe := INVALID_HANDLE_VALUE; + FTimeout := aTimeOut; + FOverlapped := aEnableOverlapped; end; @@ -280,6 +299,22 @@ end; procedure TPipeStreamBase.Write(const buffer: TBytes; offset, count: Integer); +begin + if FOverlapped + then WriteOverlapped( buffer, offset, count) + else WriteDirect( buffer, offset, count); +end; + + +function TPipeStreamBase.Read( var buffer: TBytes; offset, count: Integer): Integer; +begin + if FOverlapped + then result := ReadOverlapped( buffer, offset, count) + else result := ReadDirect( buffer, offset, count); +end; + + +procedure TPipeStreamBase.WriteDirect(const buffer: TBytes; offset, count: Integer); var cbWritten : DWORD; begin if not IsOpen @@ -292,7 +327,7 @@ begin end; -function TPipeStreamBase.Read( var buffer: TBytes; offset, count: Integer): Integer; +function TPipeStreamBase.ReadDirect( var buffer: TBytes; offset, count: Integer): Integer; var cbRead, dwErr : DWORD; bytes, retries : LongInt; bOk : Boolean; @@ -338,6 +373,84 @@ begin end; +procedure TPipeStreamBase.WriteOverlapped(const buffer: TBytes; offset, count: Integer); +var cbWritten, dwWait, dwError : DWORD; + overlapped : IOverlappedHelper; +begin + if not IsOpen + then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, + 'Called write on non-open pipe'); + + overlapped := TOverlappedHelperImpl.Create; + + if not WriteFile( FPipe, buffer[offset], count, cbWritten, overlapped.OverlappedPtr) + then begin + dwError := GetLastError; + case dwError of + ERROR_IO_PENDING : begin + dwWait := overlapped.WaitFor(FTimeout); + + if (dwWait = WAIT_TIMEOUT) + then raise TTransportException.Create( TTransportException.TExceptionType.TimedOut, + 'Pipe write timed out'); + + if (dwWait <> WAIT_OBJECT_0) + or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbWritten, TRUE) + then raise TTransportException.Create( TTransportException.TExceptionType.Unknown, + 'Pipe write error'); + end; + + else + raise TTransportException.Create( TTransportException.TExceptionType.Unknown, + SysErrorMessage(dwError)); + end; + end; + + ASSERT( DWORD(count) = cbWritten); +end; + + +function TPipeStreamBase.ReadOverlapped( var buffer: TBytes; offset, count: Integer): Integer; +var cbRead, dwWait, dwError : DWORD; + bOk : Boolean; + overlapped : IOverlappedHelper; +begin + if not IsOpen + then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, + 'Called read on non-open pipe'); + + overlapped := TOverlappedHelperImpl.Create; + + // read the data + bOk := ReadFile( FPipe, buffer[offset], count, cbRead, overlapped.OverlappedPtr); + if not bOk then begin + dwError := GetLastError; + case dwError of + ERROR_IO_PENDING : begin + dwWait := overlapped.WaitFor(FTimeout); + + if (dwWait = WAIT_TIMEOUT) + then raise TTransportException.Create( TTransportException.TExceptionType.TimedOut, + 'Pipe read timed out'); + + if (dwWait <> WAIT_OBJECT_0) + or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbRead, TRUE) + then raise TTransportException.Create( TTransportException.TExceptionType.Unknown, + 'Pipe read error'); + end; + + else + raise TTransportException.Create( TTransportException.TExceptionType.Unknown, + SysErrorMessage(dwError)); + end; + end; + + ASSERT( cbRead > 0); // see TTransportImpl.ReadAll() + ASSERT( cbRead = DWORD(count)); + result := cbRead; +end; + + function TPipeStreamBase.ToArray: TBytes; var bytes : LongInt; begin @@ -357,11 +470,13 @@ end; { TNamedPipeStreamImpl } -constructor TNamedPipeStreamImpl.Create( const aPipeName : string; const aShareMode: DWORD; +constructor TNamedPipeStreamImpl.Create( const aPipeName : string; + const aEnableOverlapped : Boolean; + const aShareMode: DWORD; const aSecurityAttributes: PSecurityAttributes; const aTimeOut : DWORD); begin - inherited Create( aTimeout); + inherited Create( aEnableOverlapped, aTimeout); FPipeName := aPipeName; FShareMode := aShareMode; @@ -374,7 +489,6 @@ end; procedure TNamedPipeStreamImpl.Open; var hPipe : THandle; - dwMode : DWORD; begin if IsOpen then Exit; @@ -389,21 +503,13 @@ begin FShareMode, // sharing FSecurityAttribs, // security attributes OPEN_EXISTING, // opens existing pipe - 0, // default attributes + FILE_FLAG_OVERLAPPED or FILE_FLAG_WRITE_THROUGH, // async+fast, please 0); // no template file if hPipe = INVALID_HANDLE_VALUE then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Unable to open pipe, '+SysErrorMessage(GetLastError)); - // pipe connected; change to message-read mode. - dwMode := PIPE_READMODE_MESSAGE; - if not SetNamedPipeHandleState( hPipe, dwMode, nil, nil) then begin - Close; - raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, - 'SetNamedPipeHandleState failed'); - end; - // everything fine FPipe := hPipe; end; @@ -412,9 +518,11 @@ end; { THandlePipeStreamImpl } -constructor THandlePipeStreamImpl.Create( const aPipeHandle : THandle; aOwnsHandle : Boolean); +constructor THandlePipeStreamImpl.Create( const aPipeHandle : THandle; + const aOwnsHandle, aEnableOverlapped : Boolean; + const aTimeOut : DWORD); begin - inherited Create( DEFAULT_THRIFT_PIPE_TIMEOUT); + inherited Create( aEnableOverlapped, aTimeOut); if aOwnsHandle then FSrcHandle := aPipeHandle @@ -474,16 +582,17 @@ constructor TNamedPipeTransportClientEndImpl.Create( const aPipeName : string; c // Named pipe constructor begin inherited Create( nil, nil); - FInputStream := TNamedPipeStreamImpl.Create( aPipeName, aShareMode, aSecurityAttributes, aTimeOut); + FInputStream := TNamedPipeStreamImpl.Create( aPipeName, TRUE, aShareMode, aSecurityAttributes, aTimeOut); FOutputStream := FInputStream; // true for named pipes end; -constructor TNamedPipeTransportClientEndImpl.Create( aPipe : THandle; aOwnsHandle : Boolean); +constructor TNamedPipeTransportClientEndImpl.Create( aPipe : THandle; aOwnsHandle : Boolean; + const aTimeOut : DWORD); // Named pipe constructor begin inherited Create( nil, nil); - FInputStream := THandlePipeStreamImpl.Create( aPipe, aOwnsHandle); + FInputStream := THandlePipeStreamImpl.Create( aPipe, TRUE, aOwnsHandle, aTimeOut); FOutputStream := FInputStream; // true for named pipes end; @@ -491,11 +600,12 @@ end; { TNamedPipeTransportServerEndImpl } -constructor TNamedPipeTransportServerEndImpl.Create( aPipe : THandle; aOwnsHandle : Boolean); +constructor TNamedPipeTransportServerEndImpl.Create( aPipe : THandle; aOwnsHandle : Boolean; + const aTimeOut : DWORD); // Named pipe constructor begin FHandle := DuplicatePipeHandle( aPipe); - inherited Create( aPipe, aOwnsHandle); + inherited Create( aPipe, aOwnsHandle, aTimeOut); end; @@ -516,23 +626,48 @@ constructor TAnonymousPipeTransportImpl.Create( const aPipeRead, aPipeWrite : TH // Anonymous pipe constructor begin inherited Create( nil, nil); - FInputStream := THandlePipeStreamImpl.Create( aPipeRead, aOwnsHandles); - FOutputStream := THandlePipeStreamImpl.Create( aPipeWrite, aOwnsHandles); + // overlapped is not supported with AnonPipes, see MSDN + FInputStream := THandlePipeStreamImpl.Create( aPipeRead, aOwnsHandles, FALSE); + FOutputStream := THandlePipeStreamImpl.Create( aPipeWrite, aOwnsHandles, FALSE); end; { TPipeServerTransportBase } +constructor TPipeServerTransportBase.Create; +begin + inherited Create; + FStopServer := TEvent.Create(nil,TRUE,FALSE,''); // manual reset +end; + + +destructor TPipeServerTransportBase.Destroy; +begin + try + FreeAndNil( FStopServer); + finally + inherited Destroy; + end; +end; + + +function TPipeServerTransportBase.QueryStopServer : Boolean; +begin + result := (FStopServer = nil) + or (FStopServer.WaitFor(0) <> wrTimeout); +end; + + procedure TPipeServerTransportBase.Listen; begin - FStopServer := FALSE; + FStopServer.ResetEvent; end; procedure TPipeServerTransportBase.Close; begin - FStopServer := TRUE; + FStopServer.SetEvent; InternalClose; end; @@ -660,6 +795,7 @@ constructor TNamedPipeServerTransportImpl.Create( aPipename : string; aBufsize, // Named Pipe CTOR begin inherited Create; + ASSERT( aTimeout > 0); FPipeName := aPipename; FBufsize := aBufSize; FMaxConns := Max( 1, Min( PIPE_UNLIMITED_INSTANCES, aMaxConns)); @@ -674,61 +810,54 @@ end; function TNamedPipeServerTransportImpl.Accept(const fnAccepting: TProc): ITransport; var dwError, dwWait, dwDummy : DWORD; - overlapped : TOverlapped; - event : TEvent; -begin - FillChar( overlapped, SizeOf(overlapped), 0); - event := TEvent.Create( nil, TRUE, FALSE, ''); // always ManualReset, see MSDN - try - overlapped.hEvent := event.Handle; + overlapped : IOverlappedHelper; + handles : array[0..1] of THandle; +begin + overlapped := TOverlappedHelperImpl.Create; + + ASSERT( not FConnected); + while not FConnected do begin + InternalClose; + if QueryStopServer then Abort; + CreateNamedPipe; + + if Assigned(fnAccepting) + then fnAccepting(); + + // Wait for the client to connect; if it succeeds, the + // function returns a nonzero value. If the function returns + // zero, GetLastError should return ERROR_PIPE_CONNECTED. + if ConnectNamedPipe( Handle, overlapped.OverlappedPtr) then begin + FConnected := TRUE; + Break; + end; - ASSERT( not FConnected); - while not FConnected do begin - InternalClose; - if FStopServer then Abort; - CreateNamedPipe; - - if Assigned(fnAccepting) - then fnAccepting(); - - // Wait for the client to connect; if it succeeds, the - // function returns a nonzero value. If the function returns - // zero, GetLastError should return ERROR_PIPE_CONNECTED. - if ConnectNamedPipe( Handle, @overlapped) then begin - FConnected := TRUE; - Break; + // ConnectNamedPipe() returns FALSE for OverlappedIO, even if connected. + // We have to check GetLastError() explicitly to find out + dwError := GetLastError; + case dwError of + ERROR_PIPE_CONNECTED : begin + FConnected := not QueryStopServer; // special case: pipe immediately connected end; - // ConnectNamedPipe() returns FALSE for OverlappedIO, even if connected. - // We have to check GetLastError() explicitly to find out - dwError := GetLastError; - case dwError of - ERROR_PIPE_CONNECTED : begin - FConnected := not FStopServer; // special case: pipe immediately connected - end; - - ERROR_IO_PENDING : begin - repeat - dwWait := WaitForSingleObject( overlapped.hEvent, DEFAULT_THRIFT_PIPE_TIMEOUT); - until (dwWait <> WAIT_TIMEOUT) or FStopServer; - FConnected := (dwWait = WAIT_OBJECT_0) - and GetOverlappedResult( Handle, overlapped, dwDummy, TRUE) - and not FStopServer; - end; - - else - InternalClose; - raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, - 'Client connection failed'); + ERROR_IO_PENDING : begin + handles[0] := overlapped.WaitHandle; + handles[1] := FStopServer.Handle; + dwWait := WaitForMultipleObjects( 2, @handles, FALSE, FTimeout); + FConnected := (dwWait = WAIT_OBJECT_0) + and GetOverlappedResult( Handle, overlapped.Overlapped, dwDummy, TRUE) + and not QueryStopServer; end; - end; - - // create the transport impl - result := CreateTransportInstance; - finally - event.Free; + else + InternalClose; + raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, + 'Client connection failed'); + end; end; + + // create the transport impl + result := CreateTransportInstance; end; @@ -739,7 +868,7 @@ begin hPipe := THandle( InterlockedExchangePointer( Pointer(FHandle), Pointer(INVALID_HANDLE_VALUE))); try FConnected := FALSE; - result := TNamedPipeTransportServerEndImpl.Create( hPipe, TRUE); + result := TNamedPipeTransportServerEndImpl.Create( hPipe, TRUE, FTimeout); except ClosePipeHandle(hPipe); raise; @@ -819,8 +948,8 @@ begin result := Windows.CreateNamedPipe( PChar( FPipeName), // pipe name PIPE_ACCESS_DUPLEX or // read/write access FILE_FLAG_OVERLAPPED, // async mode - PIPE_TYPE_MESSAGE or // message type pipe - PIPE_READMODE_MESSAGE, // message-read mode + PIPE_TYPE_BYTE or // byte type pipe + PIPE_READMODE_BYTE, // byte read mode FMaxConns, // max. instances FBufSize, // output buffer size FBufSize, // input buffer size diff --git a/lib/delphi/src/Thrift.Utils.pas b/lib/delphi/src/Thrift.Utils.pas index 72c0dc10..ec8190c4 100644 --- a/lib/delphi/src/Thrift.Utils.pas +++ b/lib/delphi/src/Thrift.Utils.pas @@ -21,10 +21,40 @@ unit Thrift.Utils; interface +uses + Classes, Windows, SysUtils, SyncObjs; + +type + IOverlappedHelper = interface + ['{A1832EFA-2E02-4884-8F09-F0A0277157FA}'] + function Overlapped : TOverlapped; + function OverlappedPtr : POverlapped; + function WaitHandle : THandle; + function WaitFor(dwTimeout: DWORD) : DWORD; + end; + + TOverlappedHelperImpl = class( TInterfacedObject, IOverlappedHelper) + strict protected + FOverlapped : TOverlapped; + FEvent : TEvent; + + // IOverlappedHelper + function Overlapped : TOverlapped; + function OverlappedPtr : POverlapped; + function WaitHandle : THandle; + function WaitFor(dwTimeout: DWORD) : DWORD; + public + constructor Create; + destructor Destroy; override; + end; + + + function IfValue(B: Boolean; const TrueValue, FalseValue: WideString): string; implementation + function IfValue(B: Boolean; const TrueValue, FalseValue: WideString): string; begin if B then @@ -33,4 +63,56 @@ begin Result := FalseValue; end; + +{ TOverlappedHelperImpl } + +constructor TOverlappedHelperImpl.Create; +begin + inherited Create; + FillChar( FOverlapped, SizeOf(FOverlapped), 0); + FEvent := TEvent.Create( nil, TRUE, FALSE, ''); // always ManualReset, see MSDN + FOverlapped.hEvent := FEvent.Handle; +end; + + + +destructor TOverlappedHelperImpl.Destroy; +begin + try + FOverlapped.hEvent := 0; + FreeAndNil( FEvent); + + finally + inherited Destroy; + end; + +end; + + +function TOverlappedHelperImpl.Overlapped : TOverlapped; +begin + result := FOverlapped; +end; + + +function TOverlappedHelperImpl.OverlappedPtr : POverlapped; +begin + result := @FOverlapped; +end; + + +function TOverlappedHelperImpl.WaitHandle : THandle; +begin + result := FOverlapped.hEvent; +end; + + +function TOverlappedHelperImpl.WaitFor( dwTimeout : DWORD) : DWORD; +begin + result := WaitForSingleObject( FOverlapped.hEvent, dwTimeout); +end; + + + + end. diff --git a/lib/delphi/test/TestServer.pas b/lib/delphi/test/TestServer.pas index 656fa150..6aa2dafe 100644 --- a/lib/delphi/test/TestServer.pas +++ b/lib/delphi/test/TestServer.pas @@ -499,7 +499,7 @@ var const // pipe timeouts to be used DEBUG_TIMEOUT = 30 * 1000; - RELEASE_TIMEOUT = 0; // server-side default + RELEASE_TIMEOUT = DEFAULT_THRIFT_PIPE_TIMEOUT; // server-side default TIMEOUT = RELEASE_TIMEOUT; begin try