THRIFT-2195 Delphi: Add event handlers for server and processing events

Patch: Jens Geyer
diff --git a/lib/delphi/src/Thrift.Protocol.pas b/lib/delphi/src/Thrift.Protocol.pas
index e88f1cf..5618d6f 100644
--- a/lib/delphi/src/Thrift.Protocol.pas
+++ b/lib/delphi/src/Thrift.Protocol.pas
@@ -497,6 +497,44 @@
   end;

 

 

+type

+  IRequestEvents = interface

+    ['{F926A26A-5B00-4560-86FA-2CAE3BA73DAF}']
+    // Called before reading arguments.
+    procedure PreRead;
+    // Called between reading arguments and calling the handler.
+    procedure PostRead;
+    // Called between calling the handler and writing the response.
+    procedure PreWrite;
+    // Called after writing the response.
+    procedure PostWrite;
+    // Called when an oneway (async) function call completes successfully.
+    procedure OnewayComplete;
+    // Called if the handler throws an undeclared exception.
+    procedure UnhandledError( const e : Exception);
+    // Called when a client has finished request-handling to clean up
+    procedure CleanupContext;
+  end;
+
+
+  IProcessorEvents = interface
+    ['{A8661119-657C-447D-93C5-512E36162A45}']
+    // Called when a client is about to call the processor.
+    procedure Processing( const transport : ITransport);
+    // Called on any service function invocation
+    function  CreateRequestContext( const aFunctionName : string) : IRequestEvents;
+    // Called when a client has finished request-handling to clean up
+    procedure CleanupContext;
+  end;
+
+
+  IProcessor = interface
+    ['{7BAE92A5-46DA-4F13-B6EA-0EABE233EE5F}']
+    function Process( const iprot :IProtocol; const oprot: IProtocol; const events : IProcessorEvents): Boolean;
+  end;
+
+

+

 implementation

 

 function ConvertInt64ToDouble( const n: Int64): Double;

diff --git a/lib/delphi/src/Thrift.Server.pas b/lib/delphi/src/Thrift.Server.pas
index 6d3ff38..8237a47 100644
--- a/lib/delphi/src/Thrift.Server.pas
+++ b/lib/delphi/src/Thrift.Server.pas
@@ -30,10 +30,26 @@
   Thrift.Transport;
 
 type
+  IServerEvents = interface
+    ['{9E2A99C5-EE85-40B2-9A52-2D1722B18176}']
+    // Called before the server begins.
+    procedure PreServe;
+    // Called when the server transport is ready to accept requests
+    procedure PreAccept;
+    // Called when a new client has connected and the server is about to being processing.
+    function  CreateProcessingContext( const input, output : IProtocol) : IProcessorEvents;
+  end;
+
+
   IServer = interface
-    ['{CF9F56C6-BB39-4C7D-877B-43B416572CE6}']
+    ['{ADC46F2D-8199-4D1C-96D2-87FD54351723}']
     procedure Serve;
     procedure Stop;
+
+    function GetServerEvents : IServerEvents;
+    procedure SetServerEvents( const value : IServerEvents);
+
+    property ServerEvents : IServerEvents read GetServerEvents write SetServerEvents;
   end;
 
   TServerImpl = class abstract( TInterfacedObject, IServer )
@@ -48,9 +64,13 @@
     FInputProtocolFactory : IProtocolFactory;
     FOutputProtocolFactory : IProtocolFactory;
     FLogDelegate : TLogDelegate;
+    FServerEvents : IServerEvents;
 
     class procedure DefaultLogDelegate( const str: string);
 
+    function GetServerEvents : IServerEvents;
+    procedure SetServerEvents( const value : IServerEvents);
+
     procedure Serve; virtual; abstract;
     procedure Stop; virtual; abstract;
   public
@@ -64,7 +84,7 @@
       const ALogDelegate : TLogDelegate
       ); overload;
 
-    constructor Create( 
+    constructor Create(
       const AProcessor :IProcessor;
       const AServerTransport: IServerTransport
 	  ); overload;
@@ -122,7 +142,7 @@
   InputTransFactory := TTransportFactoryImpl.Create;
   OutputTransFactory := TTransportFactoryImpl.Create;
 
-  //no inherited;  
+  //no inherited;
   Create(
     AProcessor,
     AServerTransport,
@@ -202,13 +222,27 @@
   const AServerTransport: IServerTransport; const ATransportFactory: ITransportFactory;
   const AProtocolFactory: IProtocolFactory);
 begin
-  //no inherited;  
+  //no inherited;
   Create( AProcessor, AServerTransport,
           ATransportFactory, ATransportFactory,
           AProtocolFactory, AProtocolFactory,
           DefaultLogDelegate);
 end;
 
+
+function TServerImpl.GetServerEvents : IServerEvents;
+begin
+  result := FServerEvents;
+end;
+
+
+procedure TServerImpl.SetServerEvents( const value : IServerEvents);
+begin
+  // if you need more than one, provide a specialized IServerEvents implementation
+  FServerEvents := value;
+end;
+
+
 { TSimpleServer }
 
 constructor TSimpleServer.Create( const AProcessor: IProcessor;
@@ -267,6 +301,7 @@
   OutputTransport : ITransport;
   InputProtocol : IProtocol;
   OutputProtocol : IProtocol;
+  context : IProcessorEvents;
 begin
   try
     FServerTransport.Listen;
@@ -277,6 +312,9 @@
     end;
   end;
 
+  if FServerEvents <> nil
+  then FServerEvents.PreServe;
+
   client := nil;
   while (not FStop) do
   begin
@@ -287,16 +325,34 @@
       InputProtocol := nil;
       OutputProtocol := nil;
 
-      client := FServerTransport.Accept;
+      client := FServerTransport.Accept( procedure
+                                         begin
+                                           if FServerEvents <> nil

+                                           then FServerEvents.PreAccept;
+                                         end);

+

+      if client = nil then begin

+        if FStop

+        then Abort  // silent exception

+        else raise TTransportException.Create( 'ServerTransport.Accept() may not return NULL' );

+      end;

+

       FLogDelegate( 'Client Connected!');
 
       InputTransport := FInputTransportFactory.GetTransport( client );
       OutputTransport := FOutputTransportFactory.GetTransport( client );
       InputProtocol := FInputProtocolFactory.GetProtocol( InputTransport );
       OutputProtocol := FOutputProtocolFactory.GetProtocol( OutputTransport );
-      while ( FProcessor.Process( InputProtocol, OutputProtocol )) do
-      begin
-        if FStop then Break;
+
+      if FServerEvents <> nil
+      then context := FServerEvents.CreateProcessingContext( InputProtocol, OutputProtocol)
+      else context := nil;
+
+      while not FStop do begin
+        if context <> nil
+        then context.Processing( client);
+        if not FProcessor.Process( InputProtocol, OutputProtocol, context)
+        then Break;
       end;
 
     except
@@ -311,6 +367,13 @@
         FLogDelegate( E.ToString);
       end;
     end;
+
+    if context <> nil
+    then begin
+      context.CleanupContext;
+      context := nil;
+    end;
+
     if InputTransport <> nil then
     begin
       InputTransport.Close;
diff --git a/lib/delphi/src/Thrift.Transport.Pipes.pas b/lib/delphi/src/Thrift.Transport.Pipes.pas
index bf07e1e..c2696f4 100644
--- a/lib/delphi/src/Thrift.Transport.Pipes.pas
+++ b/lib/delphi/src/Thrift.Transport.Pipes.pas
@@ -172,7 +172,7 @@
     FClientAnonWrite  : THandle;
 
   protected
-    function AcceptImpl: ITransport; override;
+    function Accept(const fnAccepting: TProc): ITransport; override;
 
     function CreateAnonPipe : Boolean;
 
@@ -197,9 +197,10 @@
     FTimeout      : DWORD;
     FHandle       : THandle;
     FConnected    : Boolean;
-

-
  protected
-    function AcceptImpl: ITransport; override;
+
+
+  protected
+    function Accept(const fnAccepting: TProc): ITransport; override;
     function CreateNamedPipe : THandle;
     function CreateTransportInstance : ITransport;
 
@@ -558,10 +559,13 @@
 end;
 
 
-function TAnonymousPipeServerTransportImpl.AcceptImpl: ITransport;
+function TAnonymousPipeServerTransportImpl.Accept(const fnAccepting: TProc): ITransport;
 var buf    : Byte;
     br     : DWORD;
 begin
+  if Assigned(fnAccepting)
+  then fnAccepting();
+
   // This 0-byte read serves merely as a blocking call.
   if not ReadFile( FReadHandle, buf, 0, br, nil)
   and (GetLastError() <> ERROR_MORE_DATA)
@@ -668,55 +672,62 @@
 end;
 
 
-function TNamedPipeServerTransportImpl.AcceptImpl: ITransport;
+function TNamedPipeServerTransportImpl.Accept(const fnAccepting: TProc): ITransport;
 var dwError, dwWait, dwDummy : DWORD;
     overlapped : TOverlapped;
-    event      : TEvent;

-begin

+    event      : TEvent;
+begin
   FillChar( overlapped, SizeOf(overlapped), 0);
   event := TEvent.Create( nil, TRUE, FALSE, '');  // always ManualReset, see MSDN
   try
-    overlapped.hEvent := event.Handle;

-

-    ASSERT( not FConnected);

-    while not FConnected do begin

+    overlapped.hEvent := event.Handle;
+
+    ASSERT( not FConnected);
+    while not FConnected do begin
       InternalClose;
       if FStopServer then Abort;
       CreateNamedPipe;
 
+      if Assigned(fnAccepting)
+      then fnAccepting();
+
       // Wait for the client to connect; if it succeeds, the
       // function returns a nonzero value. If the function returns
       // zero, GetLastError should return ERROR_PIPE_CONNECTED.
-      if ConnectNamedPipe( Handle, @overlapped)
-      then FConnected := TRUE
-      else begin
-        // ConnectNamedPipe() returns FALSE for OverlappedIO, even if connected.
-        // We have to check GetLastError() explicitly to find out

-        dwError := GetLastError;

-        case dwError of

-          ERROR_PIPE_CONNECTED : begin

-            FConnected := TRUE;  // special case: pipe immediately connected

-          end;

-

-          ERROR_IO_PENDING : begin

-            dwWait := WaitForSingleObject( overlapped.hEvent, DEFAULT_THRIFT_PIPE_TIMEOUT);

-            FConnected := (dwWait = WAIT_OBJECT_0)

-                      and GetOverlappedResult( Handle, overlapped, dwDummy, TRUE);

-          end;

-

-        else

-          InternalClose;

-          raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,

-                                            'Client connection failed');
-        end;

-      end;

-    end;

+      if ConnectNamedPipe( Handle, @overlapped) then begin
+        FConnected := TRUE;
+        Break;
+      end;
+
+      // ConnectNamedPipe() returns FALSE for OverlappedIO, even if connected.
+      // We have to check GetLastError() explicitly to find out
+      dwError := GetLastError;
+      case dwError of
+        ERROR_PIPE_CONNECTED : begin
+          FConnected := not FStopServer;  // special case: pipe immediately connected
+        end;
+
+        ERROR_IO_PENDING : begin
+          repeat
+            dwWait := WaitForSingleObject( overlapped.hEvent, DEFAULT_THRIFT_PIPE_TIMEOUT);
+          until (dwWait <> WAIT_TIMEOUT) or FStopServer;
+          FConnected := (dwWait = WAIT_OBJECT_0)
+                    and GetOverlappedResult( Handle, overlapped, dwDummy, TRUE)
+                    and not FStopServer;
+        end;
+
+      else
+        InternalClose;
+        raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
+                                          'Client connection failed');
+      end;
+    end;
 
     // create the transport impl
     result := CreateTransportInstance;
 
   finally
-    event.Free;

+    event.Free;
   end;
 end;
 
@@ -730,9 +741,9 @@
     FConnected := FALSE;
     result := TNamedPipeTransportServerEndImpl.Create( hPipe, TRUE);
   except
-    ClosePipeHandle(hPipe);

-    raise;

-  end;

+    ClosePipeHandle(hPipe);
+    raise;
+  end;
 end;
 
 
diff --git a/lib/delphi/src/Thrift.Transport.pas b/lib/delphi/src/Thrift.Transport.pas
index 0d5b384..b567aef 100644
--- a/lib/delphi/src/Thrift.Transport.pas
+++ b/lib/delphi/src/Thrift.Transport.pas
@@ -129,19 +129,17 @@
   end;

 

   IServerTransport = interface

-    ['{BF6B7043-DA22-47BF-8B11-2B88EC55FE12}']

+    ['{C43B87ED-69EA-47C4-B77C-15E288252900}']

     procedure Listen;

     procedure Close;

-    function Accept: ITransport;

+    function Accept( const fnAccepting: TProc): ITransport;

   end;

 

   TServerTransportImpl = class( TInterfacedObject, IServerTransport)

   protected

-    function AcceptImpl: ITransport; virtual; abstract;

-  public

     procedure Listen; virtual; abstract;

     procedure Close; virtual; abstract;

-    function Accept: ITransport;

+    function Accept( const fnAccepting: TProc): ITransport;  virtual; abstract;

   end;

 

   ITransportFactory = interface

@@ -226,7 +224,7 @@
     FUseBufferedSocket : Boolean;

     FOwnsServer : Boolean;

   protected

-    function AcceptImpl: ITransport; override;

+    function Accept( const fnAccepting: TProc) : ITransport; override;

   public

     constructor Create( const AServer: TTcpServer ); overload;

     constructor Create( const AServer: TTcpServer; AClientTimeout: Integer); overload;

@@ -518,17 +516,6 @@
   inherited Create(msg);

 end;

 

-{ TServerTransportImpl }

-

-function TServerTransportImpl.Accept: ITransport;

-begin

-  Result := AcceptImpl;

-  if Result = nil then

-  begin

-    raise TTransportException.Create( 'accept() may not return NULL' );

-  end;

-end;

-

 { TTransportFactoryImpl }

 

 function TTransportFactoryImpl.GetTransport( const ATrans: ITransport): ITransport;

@@ -557,11 +544,10 @@
   Create( APort, 0 );

 end;

 

-function TServerSocketImpl.AcceptImpl: ITransport;

+function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;

 var

-  ret : TCustomIpClient;

-  ret2 : IStreamTransport;

-  ret3 : ITransport;

+  client : TCustomIpClient;

+  trans  : IStreamTransport;

 begin

   if FServer = nil then

   begin

@@ -570,29 +556,28 @@
   end;

 

   try

-    ret := TCustomIpClient.Create(nil);

-    if ( not FServer.Accept( ret )) then

+    client := TCustomIpClient.Create(nil);

+

+    if Assigned(fnAccepting)

+    then fnAccepting();

+

+    if ( not FServer.Accept( client)) then

     begin

-      ret.Free;

+      client.Free;

       Result := nil;

       Exit;

     end;

 

-    if ret = nil then

+    if client = nil then

     begin

       Result := nil;

       Exit;

     end;

 

-    ret2 := TSocketImpl.Create( ret );

-    if FUseBufferedSocket then

-    begin

-      ret3 := TBufferedTransportImpl.Create(ret2);

-      Result := ret3;

-    end else

-    begin

-      Result := ret2;

-    end;

+    trans := TSocketImpl.Create( client);

+    if FUseBufferedSocket

+    then result := TBufferedTransportImpl.Create( trans)

+    else result := trans;

 

   except

     on E: Exception do

diff --git a/lib/delphi/src/Thrift.pas b/lib/delphi/src/Thrift.pas
index 44f12d7..f4b47ed 100644
--- a/lib/delphi/src/Thrift.pas
+++ b/lib/delphi/src/Thrift.pas
@@ -28,11 +28,6 @@
   Version = '1.0.0-dev';
 
 type
-  IProcessor = interface
-    ['{B1538A07-6CAC-4406-8A4C-AFED07C70A89}']
-    function Process( const iprot :IProtocol; const oprot: IProtocol): Boolean;
-  end;
-
   TApplicationException = class( SysUtils.Exception )
   public
     type