THRIFT-2195 Delphi: Add event handlers for server and processing events
Patch: Jens Geyer
diff --git a/lib/delphi/src/Thrift.Protocol.pas b/lib/delphi/src/Thrift.Protocol.pas
index e88f1cf..5618d6f 100644
--- a/lib/delphi/src/Thrift.Protocol.pas
+++ b/lib/delphi/src/Thrift.Protocol.pas
@@ -497,6 +497,44 @@
end;
+type
+ IRequestEvents = interface
+ ['{F926A26A-5B00-4560-86FA-2CAE3BA73DAF}']
+ // Called before reading arguments.
+ procedure PreRead;
+ // Called between reading arguments and calling the handler.
+ procedure PostRead;
+ // Called between calling the handler and writing the response.
+ procedure PreWrite;
+ // Called after writing the response.
+ procedure PostWrite;
+ // Called when an oneway (async) function call completes successfully.
+ procedure OnewayComplete;
+ // Called if the handler throws an undeclared exception.
+ procedure UnhandledError( const e : Exception);
+ // Called when a client has finished request-handling to clean up
+ procedure CleanupContext;
+ end;
+
+
+ IProcessorEvents = interface
+ ['{A8661119-657C-447D-93C5-512E36162A45}']
+ // Called when a client is about to call the processor.
+ procedure Processing( const transport : ITransport);
+ // Called on any service function invocation
+ function CreateRequestContext( const aFunctionName : string) : IRequestEvents;
+ // Called when a client has finished request-handling to clean up
+ procedure CleanupContext;
+ end;
+
+
+ IProcessor = interface
+ ['{7BAE92A5-46DA-4F13-B6EA-0EABE233EE5F}']
+ function Process( const iprot :IProtocol; const oprot: IProtocol; const events : IProcessorEvents): Boolean;
+ end;
+
+
+
implementation
function ConvertInt64ToDouble( const n: Int64): Double;
diff --git a/lib/delphi/src/Thrift.Server.pas b/lib/delphi/src/Thrift.Server.pas
index 6d3ff38..8237a47 100644
--- a/lib/delphi/src/Thrift.Server.pas
+++ b/lib/delphi/src/Thrift.Server.pas
@@ -30,10 +30,26 @@
Thrift.Transport;
type
+ IServerEvents = interface
+ ['{9E2A99C5-EE85-40B2-9A52-2D1722B18176}']
+ // Called before the server begins.
+ procedure PreServe;
+ // Called when the server transport is ready to accept requests
+ procedure PreAccept;
+ // Called when a new client has connected and the server is about to being processing.
+ function CreateProcessingContext( const input, output : IProtocol) : IProcessorEvents;
+ end;
+
+
IServer = interface
- ['{CF9F56C6-BB39-4C7D-877B-43B416572CE6}']
+ ['{ADC46F2D-8199-4D1C-96D2-87FD54351723}']
procedure Serve;
procedure Stop;
+
+ function GetServerEvents : IServerEvents;
+ procedure SetServerEvents( const value : IServerEvents);
+
+ property ServerEvents : IServerEvents read GetServerEvents write SetServerEvents;
end;
TServerImpl = class abstract( TInterfacedObject, IServer )
@@ -48,9 +64,13 @@
FInputProtocolFactory : IProtocolFactory;
FOutputProtocolFactory : IProtocolFactory;
FLogDelegate : TLogDelegate;
+ FServerEvents : IServerEvents;
class procedure DefaultLogDelegate( const str: string);
+ function GetServerEvents : IServerEvents;
+ procedure SetServerEvents( const value : IServerEvents);
+
procedure Serve; virtual; abstract;
procedure Stop; virtual; abstract;
public
@@ -64,7 +84,7 @@
const ALogDelegate : TLogDelegate
); overload;
- constructor Create(
+ constructor Create(
const AProcessor :IProcessor;
const AServerTransport: IServerTransport
); overload;
@@ -122,7 +142,7 @@
InputTransFactory := TTransportFactoryImpl.Create;
OutputTransFactory := TTransportFactoryImpl.Create;
- //no inherited;
+ //no inherited;
Create(
AProcessor,
AServerTransport,
@@ -202,13 +222,27 @@
const AServerTransport: IServerTransport; const ATransportFactory: ITransportFactory;
const AProtocolFactory: IProtocolFactory);
begin
- //no inherited;
+ //no inherited;
Create( AProcessor, AServerTransport,
ATransportFactory, ATransportFactory,
AProtocolFactory, AProtocolFactory,
DefaultLogDelegate);
end;
+
+function TServerImpl.GetServerEvents : IServerEvents;
+begin
+ result := FServerEvents;
+end;
+
+
+procedure TServerImpl.SetServerEvents( const value : IServerEvents);
+begin
+ // if you need more than one, provide a specialized IServerEvents implementation
+ FServerEvents := value;
+end;
+
+
{ TSimpleServer }
constructor TSimpleServer.Create( const AProcessor: IProcessor;
@@ -267,6 +301,7 @@
OutputTransport : ITransport;
InputProtocol : IProtocol;
OutputProtocol : IProtocol;
+ context : IProcessorEvents;
begin
try
FServerTransport.Listen;
@@ -277,6 +312,9 @@
end;
end;
+ if FServerEvents <> nil
+ then FServerEvents.PreServe;
+
client := nil;
while (not FStop) do
begin
@@ -287,16 +325,34 @@
InputProtocol := nil;
OutputProtocol := nil;
- client := FServerTransport.Accept;
+ client := FServerTransport.Accept( procedure
+ begin
+ if FServerEvents <> nil
+ then FServerEvents.PreAccept;
+ end);
+
+ if client = nil then begin
+ if FStop
+ then Abort // silent exception
+ else raise TTransportException.Create( 'ServerTransport.Accept() may not return NULL' );
+ end;
+
FLogDelegate( 'Client Connected!');
InputTransport := FInputTransportFactory.GetTransport( client );
OutputTransport := FOutputTransportFactory.GetTransport( client );
InputProtocol := FInputProtocolFactory.GetProtocol( InputTransport );
OutputProtocol := FOutputProtocolFactory.GetProtocol( OutputTransport );
- while ( FProcessor.Process( InputProtocol, OutputProtocol )) do
- begin
- if FStop then Break;
+
+ if FServerEvents <> nil
+ then context := FServerEvents.CreateProcessingContext( InputProtocol, OutputProtocol)
+ else context := nil;
+
+ while not FStop do begin
+ if context <> nil
+ then context.Processing( client);
+ if not FProcessor.Process( InputProtocol, OutputProtocol, context)
+ then Break;
end;
except
@@ -311,6 +367,13 @@
FLogDelegate( E.ToString);
end;
end;
+
+ if context <> nil
+ then begin
+ context.CleanupContext;
+ context := nil;
+ end;
+
if InputTransport <> nil then
begin
InputTransport.Close;
diff --git a/lib/delphi/src/Thrift.Transport.Pipes.pas b/lib/delphi/src/Thrift.Transport.Pipes.pas
index bf07e1e..c2696f4 100644
--- a/lib/delphi/src/Thrift.Transport.Pipes.pas
+++ b/lib/delphi/src/Thrift.Transport.Pipes.pas
@@ -172,7 +172,7 @@
FClientAnonWrite : THandle;
protected
- function AcceptImpl: ITransport; override;
+ function Accept(const fnAccepting: TProc): ITransport; override;
function CreateAnonPipe : Boolean;
@@ -197,9 +197,10 @@
FTimeout : DWORD;
FHandle : THandle;
FConnected : Boolean;
-
-
protected
- function AcceptImpl: ITransport; override;
+
+
+ protected
+ function Accept(const fnAccepting: TProc): ITransport; override;
function CreateNamedPipe : THandle;
function CreateTransportInstance : ITransport;
@@ -558,10 +559,13 @@
end;
-function TAnonymousPipeServerTransportImpl.AcceptImpl: ITransport;
+function TAnonymousPipeServerTransportImpl.Accept(const fnAccepting: TProc): ITransport;
var buf : Byte;
br : DWORD;
begin
+ if Assigned(fnAccepting)
+ then fnAccepting();
+
// This 0-byte read serves merely as a blocking call.
if not ReadFile( FReadHandle, buf, 0, br, nil)
and (GetLastError() <> ERROR_MORE_DATA)
@@ -668,55 +672,62 @@
end;
-function TNamedPipeServerTransportImpl.AcceptImpl: ITransport;
+function TNamedPipeServerTransportImpl.Accept(const fnAccepting: TProc): ITransport;
var dwError, dwWait, dwDummy : DWORD;
overlapped : TOverlapped;
- event : TEvent;
-begin
+ event : TEvent;
+begin
FillChar( overlapped, SizeOf(overlapped), 0);
event := TEvent.Create( nil, TRUE, FALSE, ''); // always ManualReset, see MSDN
try
- overlapped.hEvent := event.Handle;
-
- ASSERT( not FConnected);
- while not FConnected do begin
+ overlapped.hEvent := event.Handle;
+
+ ASSERT( not FConnected);
+ while not FConnected do begin
InternalClose;
if FStopServer then Abort;
CreateNamedPipe;
+ if Assigned(fnAccepting)
+ then fnAccepting();
+
// Wait for the client to connect; if it succeeds, the
// function returns a nonzero value. If the function returns
// zero, GetLastError should return ERROR_PIPE_CONNECTED.
- if ConnectNamedPipe( Handle, @overlapped)
- then FConnected := TRUE
- else begin
- // ConnectNamedPipe() returns FALSE for OverlappedIO, even if connected.
- // We have to check GetLastError() explicitly to find out
- dwError := GetLastError;
- case dwError of
- ERROR_PIPE_CONNECTED : begin
- FConnected := TRUE; // special case: pipe immediately connected
- end;
-
- ERROR_IO_PENDING : begin
- dwWait := WaitForSingleObject( overlapped.hEvent, DEFAULT_THRIFT_PIPE_TIMEOUT);
- FConnected := (dwWait = WAIT_OBJECT_0)
- and GetOverlappedResult( Handle, overlapped, dwDummy, TRUE);
- end;
-
- else
- InternalClose;
- raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
- 'Client connection failed');
- end;
- end;
- end;
+ if ConnectNamedPipe( Handle, @overlapped) then begin
+ FConnected := TRUE;
+ Break;
+ end;
+
+ // ConnectNamedPipe() returns FALSE for OverlappedIO, even if connected.
+ // We have to check GetLastError() explicitly to find out
+ dwError := GetLastError;
+ case dwError of
+ ERROR_PIPE_CONNECTED : begin
+ FConnected := not FStopServer; // special case: pipe immediately connected
+ end;
+
+ ERROR_IO_PENDING : begin
+ repeat
+ dwWait := WaitForSingleObject( overlapped.hEvent, DEFAULT_THRIFT_PIPE_TIMEOUT);
+ until (dwWait <> WAIT_TIMEOUT) or FStopServer;
+ FConnected := (dwWait = WAIT_OBJECT_0)
+ and GetOverlappedResult( Handle, overlapped, dwDummy, TRUE)
+ and not FStopServer;
+ end;
+
+ else
+ InternalClose;
+ raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
+ 'Client connection failed');
+ end;
+ end;
// create the transport impl
result := CreateTransportInstance;
finally
- event.Free;
+ event.Free;
end;
end;
@@ -730,9 +741,9 @@
FConnected := FALSE;
result := TNamedPipeTransportServerEndImpl.Create( hPipe, TRUE);
except
- ClosePipeHandle(hPipe);
- raise;
- end;
+ ClosePipeHandle(hPipe);
+ raise;
+ end;
end;
diff --git a/lib/delphi/src/Thrift.Transport.pas b/lib/delphi/src/Thrift.Transport.pas
index 0d5b384..b567aef 100644
--- a/lib/delphi/src/Thrift.Transport.pas
+++ b/lib/delphi/src/Thrift.Transport.pas
@@ -129,19 +129,17 @@
end;
IServerTransport = interface
- ['{BF6B7043-DA22-47BF-8B11-2B88EC55FE12}']
+ ['{C43B87ED-69EA-47C4-B77C-15E288252900}']
procedure Listen;
procedure Close;
- function Accept: ITransport;
+ function Accept( const fnAccepting: TProc): ITransport;
end;
TServerTransportImpl = class( TInterfacedObject, IServerTransport)
protected
- function AcceptImpl: ITransport; virtual; abstract;
- public
procedure Listen; virtual; abstract;
procedure Close; virtual; abstract;
- function Accept: ITransport;
+ function Accept( const fnAccepting: TProc): ITransport; virtual; abstract;
end;
ITransportFactory = interface
@@ -226,7 +224,7 @@
FUseBufferedSocket : Boolean;
FOwnsServer : Boolean;
protected
- function AcceptImpl: ITransport; override;
+ function Accept( const fnAccepting: TProc) : ITransport; override;
public
constructor Create( const AServer: TTcpServer ); overload;
constructor Create( const AServer: TTcpServer; AClientTimeout: Integer); overload;
@@ -518,17 +516,6 @@
inherited Create(msg);
end;
-{ TServerTransportImpl }
-
-function TServerTransportImpl.Accept: ITransport;
-begin
- Result := AcceptImpl;
- if Result = nil then
- begin
- raise TTransportException.Create( 'accept() may not return NULL' );
- end;
-end;
-
{ TTransportFactoryImpl }
function TTransportFactoryImpl.GetTransport( const ATrans: ITransport): ITransport;
@@ -557,11 +544,10 @@
Create( APort, 0 );
end;
-function TServerSocketImpl.AcceptImpl: ITransport;
+function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;
var
- ret : TCustomIpClient;
- ret2 : IStreamTransport;
- ret3 : ITransport;
+ client : TCustomIpClient;
+ trans : IStreamTransport;
begin
if FServer = nil then
begin
@@ -570,29 +556,28 @@
end;
try
- ret := TCustomIpClient.Create(nil);
- if ( not FServer.Accept( ret )) then
+ client := TCustomIpClient.Create(nil);
+
+ if Assigned(fnAccepting)
+ then fnAccepting();
+
+ if ( not FServer.Accept( client)) then
begin
- ret.Free;
+ client.Free;
Result := nil;
Exit;
end;
- if ret = nil then
+ if client = nil then
begin
Result := nil;
Exit;
end;
- ret2 := TSocketImpl.Create( ret );
- if FUseBufferedSocket then
- begin
- ret3 := TBufferedTransportImpl.Create(ret2);
- Result := ret3;
- end else
- begin
- Result := ret2;
- end;
+ trans := TSocketImpl.Create( client);
+ if FUseBufferedSocket
+ then result := TBufferedTransportImpl.Create( trans)
+ else result := trans;
except
on E: Exception do
diff --git a/lib/delphi/src/Thrift.pas b/lib/delphi/src/Thrift.pas
index 44f12d7..f4b47ed 100644
--- a/lib/delphi/src/Thrift.pas
+++ b/lib/delphi/src/Thrift.pas
@@ -28,11 +28,6 @@
Version = '1.0.0-dev';
type
- IProcessor = interface
- ['{B1538A07-6CAC-4406-8A4C-AFED07C70A89}']
- function Process( const iprot :IProtocol; const oprot: IProtocol): Boolean;
- end;
-
TApplicationException = class( SysUtils.Exception )
public
type