From 01640408045af154883bb420aa4e589195af7d12 Mon Sep 17 00:00:00 2001 From: Jens Geyer Date: Wed, 25 Sep 2013 21:12:21 +0200 Subject: [PATCH] THRIFT-2195 Delphi: Add event handlers for server and processing events Patch: Jens Geyer --- .../cpp/src/generate/t_delphi_generator.cc | 198 ++++++++++++------ lib/delphi/src/Thrift.Protocol.pas | 38 ++++ lib/delphi/src/Thrift.Server.pas | 79 ++++++- lib/delphi/src/Thrift.Transport.Pipes.pas | 91 ++++---- lib/delphi/src/Thrift.Transport.pas | 53 ++--- lib/delphi/src/Thrift.pas | 5 - lib/delphi/test/TestServer.pas | 16 +- lib/delphi/test/TestServerEvents.pas | 174 +++++++++++++++ .../codegen/run-Pascal-Codegen-Tests.bat.tmpl | 2 +- lib/delphi/test/server.dpr | 1 + 10 files changed, 498 insertions(+), 159 deletions(-) create mode 100644 lib/delphi/test/TestServerEvents.pas diff --git a/compiler/cpp/src/generate/t_delphi_generator.cc b/compiler/cpp/src/generate/t_delphi_generator.cc index f8f2a1ce..2ad9c70a 100644 --- a/compiler/cpp/src/generate/t_delphi_generator.cc +++ b/compiler/cpp/src/generate/t_delphi_generator.cc @@ -63,9 +63,11 @@ class t_delphi_generator : public t_oop_generator iter = parsed_options.find("ansistr_binary"); ansistr_binary_ = (iter != parsed_options.end()); iter = parsed_options.find("register_types"); - register_types_ = (iter != parsed_options.end()); + register_types_ = (iter != parsed_options.end()); iter = parsed_options.find("constprefix"); - constprefix_ = (iter != parsed_options.end()); + constprefix_ = (iter != parsed_options.end()); + iter = parsed_options.find("events"); + events_ = (iter != parsed_options.end()); out_dir_base_ = "gen-delphi"; @@ -116,9 +118,9 @@ class t_delphi_generator : public t_oop_generator void generate_delphi_struct(t_struct* tstruct, bool is_exception); void generate_delphi_struct_impl( ostream& out, std::string cls_prefix, t_struct* tstruct, bool is_exception, bool is_result = false, bool is_x_factory = false); - void print_delphi_struct_type_factory_func( ostream& out, t_struct* tstruct); - void generate_delphi_struct_type_factory( ostream& out, std::string cls_prefix, t_struct* tstruct, bool is_exception, bool is_result = false, bool is_x_factory = false); - void generate_delphi_struct_type_factory_registration( ostream& out, std::string cls_prefix, t_struct* tstruct, bool is_exception, bool is_result = false, bool is_x_factory = false); + void print_delphi_struct_type_factory_func( ostream& out, t_struct* tstruct); + void generate_delphi_struct_type_factory( ostream& out, std::string cls_prefix, t_struct* tstruct, bool is_exception, bool is_result = false, bool is_x_factory = false); + void generate_delphi_struct_type_factory_registration( ostream& out, std::string cls_prefix, t_struct* tstruct, bool is_exception, bool is_result = false, bool is_x_factory = false); void generate_delphi_struct_definition(std::ostream& out, t_struct* tstruct, bool is_xception=false, bool in_class=false, bool is_result=false, bool is_x_factory = false); void generate_delphi_struct_reader(std::ostream& out, t_struct* tstruct); void generate_delphi_struct_result_writer(std::ostream& out, t_struct* tstruct); @@ -200,8 +202,8 @@ class t_delphi_generator : public t_oop_generator std::ostringstream s_const_impl; std::ostringstream s_struct_impl; std::ostringstream s_service_impl; - std::ostringstream s_type_factory_registration; - std::ostringstream s_type_factory_funcs; + std::ostringstream s_type_factory_registration; + std::ostringstream s_type_factory_funcs; bool has_enum; bool has_const; std::string namespace_dir_; @@ -221,8 +223,9 @@ class t_delphi_generator : public t_oop_generator bool is_void( t_type* type ); int indent_impl_; bool ansistr_binary_; - bool register_types_; + bool register_types_; bool constprefix_; + bool events_; void indent_up_impl(){ ++indent_impl_; }; @@ -446,7 +449,7 @@ void t_delphi_generator::init_generator() { if (register_types_) { - add_delphi_uses_list("Thrift.TypeRegistry"); + add_delphi_uses_list("Thrift.TypeRegistry"); } init_known_types_list(); @@ -513,6 +516,9 @@ void t_delphi_generator::close_generator() { f_all << "const" << endl; indent_up(); indent(f_all) << "c" << tmp_unit << "_Option_AnsiStr_Binary = " << ( ansistr_binary_ ? "True" : "False") << ";" << endl; + indent(f_all) << "c" << tmp_unit << "_Option_Register_Types = " << ( register_types_ ? "True" : "False") << ";" << endl; + indent(f_all) << "c" << tmp_unit << "_Option_ConstPrefix = " << ( constprefix_ ? "True" : "False") << ";" << endl; + indent(f_all) << "c" << tmp_unit << "_Option_Events = " << ( events_ ? "True" : "False") << ";" << endl; indent_down(); f_all << "type" << endl; @@ -534,13 +540,13 @@ void t_delphi_generator::close_generator() { if (register_types_) { - f_all << endl; - f_all << "// Type factory methods and registration" << endl; - f_all << s_type_factory_funcs.str(); - f_all << "procedure RegisterTypeFactories;" << endl; - f_all << "begin" << endl; - f_all << s_type_factory_registration.str(); - f_all << "end;" << endl; + f_all << endl; + f_all << "// Type factory methods and registration" << endl; + f_all << s_type_factory_funcs.str(); + f_all << "procedure RegisterTypeFactories;" << endl; + f_all << "begin" << endl; + f_all << s_type_factory_registration.str(); + f_all << "end;" << endl; } f_all << endl; @@ -548,20 +554,20 @@ void t_delphi_generator::close_generator() { f_all << "initialization" << endl; if ( has_const ) { - f_all << "{$IF CompilerVersion < 21.0}" << endl; + f_all << "{$IF CompilerVersion < 21.0}" << endl; f_all << " " << constants_class.c_str() << "_Initialize;" << endl; - f_all << "{$IFEND}" << endl; + f_all << "{$IFEND}" << endl; } if (register_types_) { - f_all << " RegisterTypeFactories;" << endl; + f_all << " RegisterTypeFactories;" << endl; } f_all << endl; f_all << "finalization" << endl; if ( has_const ) { - f_all << "{$IF CompilerVersion < 21.0}" << endl; + f_all << "{$IF CompilerVersion < 21.0}" << endl; f_all << " " << constants_class.c_str() << "_Finalize;" << endl; - f_all << "{$IFEND}" << endl; + f_all << "{$IFEND}" << endl; } f_all << endl << endl; @@ -1008,8 +1014,8 @@ void t_delphi_generator::generate_delphi_struct(t_struct* tstruct, bool is_excep generate_delphi_struct_impl(s_struct_impl, "", tstruct, is_exception); if (register_types_) { - generate_delphi_struct_type_factory(s_type_factory_funcs, "", tstruct, is_exception); - generate_delphi_struct_type_factory_registration(s_type_factory_registration, "", tstruct, is_exception); + generate_delphi_struct_type_factory(s_type_factory_funcs, "", tstruct, is_exception); + generate_delphi_struct_type_factory_registration(s_type_factory_registration, "", tstruct, is_exception); } } @@ -1152,51 +1158,51 @@ void t_delphi_generator::generate_delphi_struct_impl( ostream& out, string cls_p } void t_delphi_generator::print_delphi_struct_type_factory_func( ostream& out, t_struct* tstruct) { - string struct_intf_name = type_name(tstruct); - out << "Create_"; - out << struct_intf_name; - out << "_Impl"; + string struct_intf_name = type_name(tstruct); + out << "Create_"; + out << struct_intf_name; + out << "_Impl"; } void t_delphi_generator::generate_delphi_struct_type_factory( ostream& out, string cls_prefix, t_struct* tstruct, bool is_exception, bool is_result, bool is_x_factory) { - - if (is_exception) - return; - if (is_result) - return; - if (is_x_factory) - return; - - string struct_intf_name = type_name(tstruct); - string cls_nm = type_name(tstruct,true,false); - - out << "function "; - print_delphi_struct_type_factory_func(out, tstruct); - out << ": "; - out << struct_intf_name; - out << ";" << endl; - out << "begin" << endl; - indent_up(); - indent(out) << "Result := " << cls_nm << ".Create;" << endl; - indent_down(); - out << "end;" << endl << endl; + + if (is_exception) + return; + if (is_result) + return; + if (is_x_factory) + return; + + string struct_intf_name = type_name(tstruct); + string cls_nm = type_name(tstruct,true,false); + + out << "function "; + print_delphi_struct_type_factory_func(out, tstruct); + out << ": "; + out << struct_intf_name; + out << ";" << endl; + out << "begin" << endl; + indent_up(); + indent(out) << "Result := " << cls_nm << ".Create;" << endl; + indent_down(); + out << "end;" << endl << endl; } void t_delphi_generator::generate_delphi_struct_type_factory_registration( ostream& out, string cls_prefix, t_struct* tstruct, bool is_exception, bool is_result, bool is_x_factory) { - if (is_exception) - return; - if (is_result) - return; - if (is_x_factory) - return; + if (is_exception) + return; + if (is_result) + return; + if (is_x_factory) + return; - string struct_intf_name = type_name(tstruct); + string struct_intf_name = type_name(tstruct); - indent(out) << " TypeRegistry.RegisterTypeFactory<" << struct_intf_name << ">("; - print_delphi_struct_type_factory_func(out, tstruct); - out << ");"; - out << endl; + indent(out) << " TypeRegistry.RegisterTypeFactory<" << struct_intf_name << ">("; + print_delphi_struct_type_factory_func(out, tstruct); + out << ");"; + out << endl; } void t_delphi_generator::generate_delphi_struct_definition(ostream &out, t_struct* tstruct, bool is_exception, bool in_class, bool is_result, bool is_x_factory) { @@ -1759,7 +1765,9 @@ void t_delphi_generator::generate_service_server(t_service* tservice) { indent_up(); indent(s_service) << "type" << endl; indent_up(); - indent(s_service) << "TProcessFunction = reference to procedure( seqid: Integer; const iprot: IProtocol; const oprot: IProtocol);" << endl; + indent(s_service) << "TProcessFunction = reference to procedure( seqid: Integer; const iprot: IProtocol; const oprot: IProtocol" << + (events_ ? "; const events : IRequestEvents" : "") << + ");" << endl; indent_down(); indent_down(); indent(s_service) << "protected" << endl; @@ -1771,17 +1779,20 @@ void t_delphi_generator::generate_service_server(t_service* tservice) { indent(s_service) << "public" << endl; indent_up(); if (extends.empty()) { - indent(s_service) << "function Process( const iprot: IProtocol; const oprot: IProtocol): Boolean;" << endl; + indent(s_service) << "function Process( const iprot: IProtocol; const oprot: IProtocol; const events : IProcessorEvents): Boolean;" << endl; } else { - indent(s_service) << "function Process( const iprot: IProtocol; const oprot: IProtocol): Boolean; reintroduce;" << endl; + indent(s_service) << "function Process( const iprot: IProtocol; const oprot: IProtocol; const events : IProcessorEvents): Boolean; reintroduce;" << endl; } - indent_impl(s_service_impl) << "function " << full_cls << ".Process( const iprot: IProtocol; const oprot: IProtocol): Boolean;" << endl;; + indent_impl(s_service_impl) << "function " << full_cls << ".Process( const iprot: IProtocol; const oprot: IProtocol; const events : IProcessorEvents): Boolean;" << endl;; indent_impl(s_service_impl) << "var" << endl; indent_up_impl(); indent_impl(s_service_impl) << "msg : IMessage;" << endl; indent_impl(s_service_impl) << "fn : TProcessFunction;" << endl; indent_impl(s_service_impl) << "x : TApplicationException;" << endl; + if( events_) { + indent_impl(s_service_impl) << "context : IRequestEvents;" << endl; + } indent_down_impl(); indent_impl(s_service_impl) << "begin" << endl; indent_up_impl(); @@ -1805,7 +1816,27 @@ void t_delphi_generator::generate_service_server(t_service* tservice) { indent_impl(s_service_impl) << "Exit;" << endl; indent_down_impl(); indent_impl(s_service_impl) << "end;" << endl; - indent_impl(s_service_impl) << "fn(msg.SeqID, iprot, oprot);" << endl; + if( events_) { + indent_impl(s_service_impl) << "if events <> nil" << endl; + indent_impl(s_service_impl) << "then context := events.CreateRequestContext(msg.Name)" << endl; + indent_impl(s_service_impl) << "else context := nil;" << endl; + indent_impl(s_service_impl) << "try" << endl; + indent_up_impl(); + indent_impl(s_service_impl) << "fn(msg.SeqID, iprot, oprot, context);" << endl; + indent_down_impl(); + indent_impl(s_service_impl) << "finally" << endl; + indent_up_impl(); + indent_impl(s_service_impl) << "if context <> nil then begin" << endl; + indent_up_impl(); + indent_impl(s_service_impl) << "context.CleanupContext;" << endl; + indent_impl(s_service_impl) << "context := nil;" << endl; + indent_down_impl(); + indent_impl(s_service_impl) << "end;" << endl; + indent_down_impl(); + indent_impl(s_service_impl) << "end;" << endl; + } else { + indent_impl(s_service_impl) << "fn(msg.SeqID, iprot, oprot);" << endl; + } indent_down_impl(); indent_impl(s_service_impl) << "except" << endl; indent_up_impl(); @@ -1863,7 +1894,9 @@ void t_delphi_generator::generate_process_function(t_service* tservice, t_functi string result_intfnm = normalize_clsnm(org_resultname, "I"); indent(s_service) << - "procedure " << funcname << "_Process( seqid: Integer; const iprot: IProtocol; const oprot: IProtocol);" << endl; + "procedure " << funcname << "_Process( seqid: Integer; const iprot: IProtocol; const oprot: IProtocol" << + (events_ ? "; const events : IRequestEvents" : "") << + ");" << endl; if (tfunction->is_oneway()) { indent_impl(s_service_impl) << "// one way processor" << endl; @@ -1872,7 +1905,9 @@ void t_delphi_generator::generate_process_function(t_service* tservice, t_functi } indent_impl(s_service_impl) << - "procedure " << full_cls << "." << funcname << "_Process( seqid: Integer; const iprot: IProtocol; const oprot: IProtocol);" << endl; + "procedure " << full_cls << "." << funcname << "_Process( seqid: Integer; const iprot: IProtocol; const oprot: IProtocol" << + (events_ ? "; const events : IRequestEvents" : "") << + ");" << endl; indent_impl(s_service_impl) << "var" << endl; indent_up_impl(); indent_impl(s_service_impl) << "args: " << args_intfnm << ";" << endl; @@ -1884,9 +1919,16 @@ void t_delphi_generator::generate_process_function(t_service* tservice, t_functi indent_down_impl(); indent_impl(s_service_impl) << "begin" << endl; indent_up_impl(); + + if( events_) { + indent_impl(s_service_impl) << "if events <> nil then events.PreRead;" << endl; + } indent_impl(s_service_impl) << "args := " << args_clsnm << "Impl.Create;" << endl; indent_impl(s_service_impl) << "args.Read(iprot);" << endl; indent_impl(s_service_impl) << "iprot.ReadMessageEnd();" << endl; + if( events_) { + indent_impl(s_service_impl) << "if events <> nil then events.PostRead;" << endl; + } t_struct* xs = tfunction->get_xceptions(); const std::vector& xceptions = xs->get_members(); @@ -1896,7 +1938,7 @@ void t_delphi_generator::generate_process_function(t_service* tservice, t_functi indent_impl(s_service_impl) << "ret := " << result_clsnm << "Impl.Create;" << endl; } - if (!tfunction->is_oneway() && xceptions.size() > 0) { + if (events_ || (!tfunction->is_oneway() && xceptions.size() > 0)) { indent_impl(s_service_impl) << "try" << endl; indent_up_impl(); } @@ -1926,7 +1968,7 @@ void t_delphi_generator::generate_process_function(t_service* tservice, t_functi "args." << prop_name(*f_iter) << " := " << empty_value((*f_iter)->get_type()) << ";" << endl; } - if (!tfunction->is_oneway() && xceptions.size() > 0) { + if (events_ || (!tfunction->is_oneway() && xceptions.size() > 0)) { indent_down_impl(); indent_impl(s_service_impl) << "except" << endl; indent_up_impl(); @@ -1942,16 +1984,33 @@ void t_delphi_generator::generate_process_function(t_service* tservice, t_functi indent_down_impl(); indent_impl(s_service_impl) << "end;" << endl; } + if( events_) { + indent_impl(s_service_impl) << "on E: Exception do" << endl; + indent_impl(s_service_impl) << "begin" << endl; + indent_up_impl(); + indent_impl(s_service_impl) << "if events <> nil then events.UnhandledError(E);" << endl; + indent_impl(s_service_impl) << "raise; // let it bubble up" << endl; + indent_down_impl(); + indent_impl(s_service_impl) << "end;" << endl; + } indent_down_impl(); indent_impl(s_service_impl) << "end;" << endl; } if (! tfunction->is_oneway()) { + if (events_) { + indent_impl(s_service_impl) << "if events <> nil then events.PreWrite;" << endl; + } indent_impl(s_service_impl) << "msg := TMessageImpl.Create('" << tfunction->get_name() << "', TMessageType.Reply, seqid); " << endl; indent_impl(s_service_impl) << "oprot.WriteMessageBegin( msg); " << endl; indent_impl(s_service_impl) << "ret.Write(oprot);" << endl; indent_impl(s_service_impl) << "oprot.WriteMessageEnd();" << endl; indent_impl(s_service_impl) << "oprot.Transport.Flush();" << endl; + if (events_) { + indent_impl(s_service_impl) << "if events <> nil then events.PostWrite;" << endl; + } + } else if (events_) { + indent_impl(s_service_impl) << "if events <> nil then events.OnewayComplete;" << endl; } indent_down_impl(); @@ -3178,5 +3237,6 @@ THRIFT_REGISTER_GENERATOR(delphi, "delphi", " ansistr_binary: Use AnsiString for binary datatype (default is TBytes).\n" " register_types: Enable TypeRegistry, allows for creation of struct, union\n" " and container instances by interface or TypeInfo()\n" -" constprefix: Name TConstants classes after IDL to reduce ambiguities\n"); +" constprefix: Name TConstants classes after IDL to reduce ambiguities\n" +" events: Enable and use processing events in the generated code.\n"); diff --git a/lib/delphi/src/Thrift.Protocol.pas b/lib/delphi/src/Thrift.Protocol.pas index e88f1cfa..5618d6f8 100644 --- a/lib/delphi/src/Thrift.Protocol.pas +++ b/lib/delphi/src/Thrift.Protocol.pas @@ -497,6 +497,44 @@ type end; +type + IRequestEvents = interface + ['{F926A26A-5B00-4560-86FA-2CAE3BA73DAF}'] + // Called before reading arguments. + procedure PreRead; + // Called between reading arguments and calling the handler. + procedure PostRead; + // Called between calling the handler and writing the response. + procedure PreWrite; + // Called after writing the response. + procedure PostWrite; + // Called when an oneway (async) function call completes successfully. + procedure OnewayComplete; + // Called if the handler throws an undeclared exception. + procedure UnhandledError( const e : Exception); + // Called when a client has finished request-handling to clean up + procedure CleanupContext; + end; + + + IProcessorEvents = interface + ['{A8661119-657C-447D-93C5-512E36162A45}'] + // Called when a client is about to call the processor. + procedure Processing( const transport : ITransport); + // Called on any service function invocation + function CreateRequestContext( const aFunctionName : string) : IRequestEvents; + // Called when a client has finished request-handling to clean up + procedure CleanupContext; + end; + + + IProcessor = interface + ['{7BAE92A5-46DA-4F13-B6EA-0EABE233EE5F}'] + function Process( const iprot :IProtocol; const oprot: IProtocol; const events : IProcessorEvents): Boolean; + end; + + + implementation function ConvertInt64ToDouble( const n: Int64): Double; diff --git a/lib/delphi/src/Thrift.Server.pas b/lib/delphi/src/Thrift.Server.pas index 6d3ff385..8237a47c 100644 --- a/lib/delphi/src/Thrift.Server.pas +++ b/lib/delphi/src/Thrift.Server.pas @@ -30,10 +30,26 @@ uses Thrift.Transport; type + IServerEvents = interface + ['{9E2A99C5-EE85-40B2-9A52-2D1722B18176}'] + // Called before the server begins. + procedure PreServe; + // Called when the server transport is ready to accept requests + procedure PreAccept; + // Called when a new client has connected and the server is about to being processing. + function CreateProcessingContext( const input, output : IProtocol) : IProcessorEvents; + end; + + IServer = interface - ['{CF9F56C6-BB39-4C7D-877B-43B416572CE6}'] + ['{ADC46F2D-8199-4D1C-96D2-87FD54351723}'] procedure Serve; procedure Stop; + + function GetServerEvents : IServerEvents; + procedure SetServerEvents( const value : IServerEvents); + + property ServerEvents : IServerEvents read GetServerEvents write SetServerEvents; end; TServerImpl = class abstract( TInterfacedObject, IServer ) @@ -48,9 +64,13 @@ type FInputProtocolFactory : IProtocolFactory; FOutputProtocolFactory : IProtocolFactory; FLogDelegate : TLogDelegate; + FServerEvents : IServerEvents; class procedure DefaultLogDelegate( const str: string); + function GetServerEvents : IServerEvents; + procedure SetServerEvents( const value : IServerEvents); + procedure Serve; virtual; abstract; procedure Stop; virtual; abstract; public @@ -64,7 +84,7 @@ type const ALogDelegate : TLogDelegate ); overload; - constructor Create( + constructor Create( const AProcessor :IProcessor; const AServerTransport: IServerTransport ); overload; @@ -122,7 +142,7 @@ begin InputTransFactory := TTransportFactoryImpl.Create; OutputTransFactory := TTransportFactoryImpl.Create; - //no inherited; + //no inherited; Create( AProcessor, AServerTransport, @@ -202,13 +222,27 @@ constructor TServerImpl.Create( const AProcessor: IProcessor; const AServerTransport: IServerTransport; const ATransportFactory: ITransportFactory; const AProtocolFactory: IProtocolFactory); begin - //no inherited; + //no inherited; Create( AProcessor, AServerTransport, ATransportFactory, ATransportFactory, AProtocolFactory, AProtocolFactory, DefaultLogDelegate); end; + +function TServerImpl.GetServerEvents : IServerEvents; +begin + result := FServerEvents; +end; + + +procedure TServerImpl.SetServerEvents( const value : IServerEvents); +begin + // if you need more than one, provide a specialized IServerEvents implementation + FServerEvents := value; +end; + + { TSimpleServer } constructor TSimpleServer.Create( const AProcessor: IProcessor; @@ -267,6 +301,7 @@ var OutputTransport : ITransport; InputProtocol : IProtocol; OutputProtocol : IProtocol; + context : IProcessorEvents; begin try FServerTransport.Listen; @@ -277,6 +312,9 @@ begin end; end; + if FServerEvents <> nil + then FServerEvents.PreServe; + client := nil; while (not FStop) do begin @@ -287,16 +325,34 @@ begin InputProtocol := nil; OutputProtocol := nil; - client := FServerTransport.Accept; + client := FServerTransport.Accept( procedure + begin + if FServerEvents <> nil + then FServerEvents.PreAccept; + end); + + if client = nil then begin + if FStop + then Abort // silent exception + else raise TTransportException.Create( 'ServerTransport.Accept() may not return NULL' ); + end; + FLogDelegate( 'Client Connected!'); InputTransport := FInputTransportFactory.GetTransport( client ); OutputTransport := FOutputTransportFactory.GetTransport( client ); InputProtocol := FInputProtocolFactory.GetProtocol( InputTransport ); OutputProtocol := FOutputProtocolFactory.GetProtocol( OutputTransport ); - while ( FProcessor.Process( InputProtocol, OutputProtocol )) do - begin - if FStop then Break; + + if FServerEvents <> nil + then context := FServerEvents.CreateProcessingContext( InputProtocol, OutputProtocol) + else context := nil; + + while not FStop do begin + if context <> nil + then context.Processing( client); + if not FProcessor.Process( InputProtocol, OutputProtocol, context) + then Break; end; except @@ -311,6 +367,13 @@ begin FLogDelegate( E.ToString); end; end; + + if context <> nil + then begin + context.CleanupContext; + context := nil; + end; + if InputTransport <> nil then begin InputTransport.Close; diff --git a/lib/delphi/src/Thrift.Transport.Pipes.pas b/lib/delphi/src/Thrift.Transport.Pipes.pas index bf07e1e8..c2696f41 100644 --- a/lib/delphi/src/Thrift.Transport.Pipes.pas +++ b/lib/delphi/src/Thrift.Transport.Pipes.pas @@ -172,7 +172,7 @@ type FClientAnonWrite : THandle; protected - function AcceptImpl: ITransport; override; + function Accept(const fnAccepting: TProc): ITransport; override; function CreateAnonPipe : Boolean; @@ -197,9 +197,10 @@ type FTimeout : DWORD; FHandle : THandle; FConnected : Boolean; - - protected - function AcceptImpl: ITransport; override; + + + protected + function Accept(const fnAccepting: TProc): ITransport; override; function CreateNamedPipe : THandle; function CreateTransportInstance : ITransport; @@ -558,10 +559,13 @@ begin end; -function TAnonymousPipeServerTransportImpl.AcceptImpl: ITransport; +function TAnonymousPipeServerTransportImpl.Accept(const fnAccepting: TProc): ITransport; var buf : Byte; br : DWORD; begin + if Assigned(fnAccepting) + then fnAccepting(); + // This 0-byte read serves merely as a blocking call. if not ReadFile( FReadHandle, buf, 0, br, nil) and (GetLastError() <> ERROR_MORE_DATA) @@ -668,55 +672,62 @@ begin end; -function TNamedPipeServerTransportImpl.AcceptImpl: ITransport; +function TNamedPipeServerTransportImpl.Accept(const fnAccepting: TProc): ITransport; var dwError, dwWait, dwDummy : DWORD; overlapped : TOverlapped; - event : TEvent; -begin + event : TEvent; +begin FillChar( overlapped, SizeOf(overlapped), 0); event := TEvent.Create( nil, TRUE, FALSE, ''); // always ManualReset, see MSDN try - overlapped.hEvent := event.Handle; - - ASSERT( not FConnected); - while not FConnected do begin + overlapped.hEvent := event.Handle; + + ASSERT( not FConnected); + while not FConnected do begin InternalClose; if FStopServer then Abort; CreateNamedPipe; + if Assigned(fnAccepting) + then fnAccepting(); + // Wait for the client to connect; if it succeeds, the // function returns a nonzero value. If the function returns // zero, GetLastError should return ERROR_PIPE_CONNECTED. - if ConnectNamedPipe( Handle, @overlapped) - then FConnected := TRUE - else begin - // ConnectNamedPipe() returns FALSE for OverlappedIO, even if connected. - // We have to check GetLastError() explicitly to find out - dwError := GetLastError; - case dwError of - ERROR_PIPE_CONNECTED : begin - FConnected := TRUE; // special case: pipe immediately connected - end; - - ERROR_IO_PENDING : begin - dwWait := WaitForSingleObject( overlapped.hEvent, DEFAULT_THRIFT_PIPE_TIMEOUT); - FConnected := (dwWait = WAIT_OBJECT_0) - and GetOverlappedResult( Handle, overlapped, dwDummy, TRUE); - end; - - else - InternalClose; - raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, - 'Client connection failed'); - end; - end; - end; + if ConnectNamedPipe( Handle, @overlapped) then begin + FConnected := TRUE; + Break; + end; + + // ConnectNamedPipe() returns FALSE for OverlappedIO, even if connected. + // We have to check GetLastError() explicitly to find out + dwError := GetLastError; + case dwError of + ERROR_PIPE_CONNECTED : begin + FConnected := not FStopServer; // special case: pipe immediately connected + end; + + ERROR_IO_PENDING : begin + repeat + dwWait := WaitForSingleObject( overlapped.hEvent, DEFAULT_THRIFT_PIPE_TIMEOUT); + until (dwWait <> WAIT_TIMEOUT) or FStopServer; + FConnected := (dwWait = WAIT_OBJECT_0) + and GetOverlappedResult( Handle, overlapped, dwDummy, TRUE) + and not FStopServer; + end; + + else + InternalClose; + raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, + 'Client connection failed'); + end; + end; // create the transport impl result := CreateTransportInstance; finally - event.Free; + event.Free; end; end; @@ -730,9 +741,9 @@ begin FConnected := FALSE; result := TNamedPipeTransportServerEndImpl.Create( hPipe, TRUE); except - ClosePipeHandle(hPipe); - raise; - end; + ClosePipeHandle(hPipe); + raise; + end; end; diff --git a/lib/delphi/src/Thrift.Transport.pas b/lib/delphi/src/Thrift.Transport.pas index 0d5b384d..b567aefd 100644 --- a/lib/delphi/src/Thrift.Transport.pas +++ b/lib/delphi/src/Thrift.Transport.pas @@ -129,19 +129,17 @@ type end; IServerTransport = interface - ['{BF6B7043-DA22-47BF-8B11-2B88EC55FE12}'] + ['{C43B87ED-69EA-47C4-B77C-15E288252900}'] procedure Listen; procedure Close; - function Accept: ITransport; + function Accept( const fnAccepting: TProc): ITransport; end; TServerTransportImpl = class( TInterfacedObject, IServerTransport) protected - function AcceptImpl: ITransport; virtual; abstract; - public procedure Listen; virtual; abstract; procedure Close; virtual; abstract; - function Accept: ITransport; + function Accept( const fnAccepting: TProc): ITransport; virtual; abstract; end; ITransportFactory = interface @@ -226,7 +224,7 @@ type FUseBufferedSocket : Boolean; FOwnsServer : Boolean; protected - function AcceptImpl: ITransport; override; + function Accept( const fnAccepting: TProc) : ITransport; override; public constructor Create( const AServer: TTcpServer ); overload; constructor Create( const AServer: TTcpServer; AClientTimeout: Integer); overload; @@ -518,17 +516,6 @@ begin inherited Create(msg); end; -{ TServerTransportImpl } - -function TServerTransportImpl.Accept: ITransport; -begin - Result := AcceptImpl; - if Result = nil then - begin - raise TTransportException.Create( 'accept() may not return NULL' ); - end; -end; - { TTransportFactoryImpl } function TTransportFactoryImpl.GetTransport( const ATrans: ITransport): ITransport; @@ -557,11 +544,10 @@ begin Create( APort, 0 ); end; -function TServerSocketImpl.AcceptImpl: ITransport; +function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport; var - ret : TCustomIpClient; - ret2 : IStreamTransport; - ret3 : ITransport; + client : TCustomIpClient; + trans : IStreamTransport; begin if FServer = nil then begin @@ -570,29 +556,28 @@ begin end; try - ret := TCustomIpClient.Create(nil); - if ( not FServer.Accept( ret )) then + client := TCustomIpClient.Create(nil); + + if Assigned(fnAccepting) + then fnAccepting(); + + if ( not FServer.Accept( client)) then begin - ret.Free; + client.Free; Result := nil; Exit; end; - if ret = nil then + if client = nil then begin Result := nil; Exit; end; - ret2 := TSocketImpl.Create( ret ); - if FUseBufferedSocket then - begin - ret3 := TBufferedTransportImpl.Create(ret2); - Result := ret3; - end else - begin - Result := ret2; - end; + trans := TSocketImpl.Create( client); + if FUseBufferedSocket + then result := TBufferedTransportImpl.Create( trans) + else result := trans; except on E: Exception do diff --git a/lib/delphi/src/Thrift.pas b/lib/delphi/src/Thrift.pas index 44f12d78..f4b47edc 100644 --- a/lib/delphi/src/Thrift.pas +++ b/lib/delphi/src/Thrift.pas @@ -28,11 +28,6 @@ const Version = '1.0.0-dev'; type - IProcessor = interface - ['{B1538A07-6CAC-4406-8A4C-AFED07C70A89}'] - function Process( const iprot :IProtocol; const oprot: IProtocol): Boolean; - end; - TApplicationException = class( SysUtils.Exception ) public type diff --git a/lib/delphi/test/TestServer.pas b/lib/delphi/test/TestServer.pas index 7b74e58a..656fa150 100644 --- a/lib/delphi/test/TestServer.pas +++ b/lib/delphi/test/TestServer.pas @@ -39,6 +39,7 @@ uses Thrift.Test, Thrift, TestConstants, + TestServerEvents, Contnrs; type @@ -482,7 +483,7 @@ var UseBufferedSockets : Boolean; UseFramed : Boolean; Port : Integer; - AnonPipe : Boolean; + AnonPipe, ServerEvents : Boolean; sPipeName : string; testHandler : ITestHandler; testProcessor : IProcessor; @@ -505,6 +506,7 @@ begin UseBufferedSockets := False; UseFramed := False; AnonPipe := FALSE; + ServerEvents := FALSE; protType := prot_Binary; Port := 9090; sPipeName := ''; @@ -549,8 +551,12 @@ begin Break; end; end; - end else + end + else if ( s = '-events' ) then begin + ServerEvents := True; + end + else begin // Fall back to the older boolean syntax UseBufferedSockets := StrToBoolDef( args[1], UseBufferedSockets); end @@ -607,6 +613,12 @@ begin testHandler.SetServer( ServerEngine); + // test events? + if ServerEvents then begin + Console.WriteLine('- server events test enabled'); + ServerEngine.ServerEvents := TServerEventsImpl.Create; + end; + // start the client now when we have the anon handles, but before the server starts if AnonPipe then LaunchAnonPipeChild( ExtractFilePath(ParamStr(0))+'client.exe', anonymouspipe); diff --git a/lib/delphi/test/TestServerEvents.pas b/lib/delphi/test/TestServerEvents.pas new file mode 100644 index 00000000..8e931c4c --- /dev/null +++ b/lib/delphi/test/TestServerEvents.pas @@ -0,0 +1,174 @@ +(* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + *) + +unit TestServerEvents; + +interface + +uses + SysUtils, + Thrift, + Thrift.Protocol, + Thrift.Transport, + Thrift.Server, + Thrift.Console; + +type + TRequestEventsImpl = class( TInterfacedObject, IRequestEvents) + protected + FStart : TDateTime; + // IRequestProcessingEvents + procedure PreRead; + procedure PostRead; + procedure PreWrite; + procedure PostWrite; + procedure OnewayComplete; + procedure UnhandledError( const e : Exception); + procedure CleanupContext; + public + constructor Create; + end; + + + TProcessorEventsImpl = class( TInterfacedObject, IProcessorEvents) + protected + FReqs : Integer; + // IProcessorEvents + procedure Processing( const transport : ITransport); + function CreateRequestContext( const aFunctionName : string) : IRequestEvents; + procedure CleanupContext; + public + constructor Create; + end; + + + TServerEventsImpl = class( TInterfacedObject, IServerEvents) + protected + // IServerEvents + procedure PreServe; + procedure PreAccept; + function CreateProcessingContext( const input, output : IProtocol) : IProcessorEvents; + end; + + +implementation + +{ TServerEventsImpl } + +procedure TServerEventsImpl.PreServe; +begin + Console.WriteLine('ServerEvents: Server starting to serve requests'); +end; + + +procedure TServerEventsImpl.PreAccept; +begin + Console.WriteLine('ServerEvents: Server transport is ready to accept incoming calls'); +end; + + +function TServerEventsImpl.CreateProcessingContext(const input, output: IProtocol): IProcessorEvents; +begin + result := TProcessorEventsImpl.Create; +end; + + +{ TProcessorEventsImpl } + +constructor TProcessorEventsImpl.Create; +begin + inherited Create; + FReqs := 0; + Console.WriteLine('ProcessorEvents: Client connected, processing begins'); +end; + +procedure TProcessorEventsImpl.Processing(const transport: ITransport); +begin + Console.WriteLine('ProcessorEvents: Processing of incoming request begins'); +end; + + +function TProcessorEventsImpl.CreateRequestContext( const aFunctionName: string): IRequestEvents; +begin + result := TRequestEventsImpl.Create; + Inc( FReqs); +end; + + +procedure TProcessorEventsImpl.CleanupContext; +begin + Console.WriteLine( 'ProcessorEvents: completed after handling '+IntToStr(FReqs)+' requests.'); +end; + + +{ TRequestEventsImpl } + + +constructor TRequestEventsImpl.Create; +begin + inherited Create; + FStart := Now; + Console.WriteLine('RequestEvents: New request'); +end; + + +procedure TRequestEventsImpl.PreRead; +begin + Console.WriteLine('RequestEvents: Reading request message ...'); +end; + + +procedure TRequestEventsImpl.PostRead; +begin + Console.WriteLine('RequestEvents: Reading request message completed'); +end; + +procedure TRequestEventsImpl.PreWrite; +begin + Console.WriteLine('RequestEvents: Writing response message ...'); +end; + + +procedure TRequestEventsImpl.PostWrite; +begin + Console.WriteLine('RequestEvents: Writing response message completed'); +end; + + +procedure TRequestEventsImpl.OnewayComplete; +begin + Console.WriteLine('RequestEvents: Oneway message processed'); +end; + + +procedure TRequestEventsImpl.UnhandledError(const e: Exception); +begin + Console.WriteLine('RequestEvents: Unhandled exception of type '+e.classname); +end; + + +procedure TRequestEventsImpl.CleanupContext; +var millis : Double; +begin + millis := (Now - FStart) * (24*60*60*1000); + Console.WriteLine( 'Request processing completed in '+IntToStr(Round(millis))+' ms'); +end; + + +end. diff --git a/lib/delphi/test/codegen/run-Pascal-Codegen-Tests.bat.tmpl b/lib/delphi/test/codegen/run-Pascal-Codegen-Tests.bat.tmpl index 6ccd2606..80074701 100644 --- a/lib/delphi/test/codegen/run-Pascal-Codegen-Tests.bat.tmpl +++ b/lib/delphi/test/codegen/run-Pascal-Codegen-Tests.bat.tmpl @@ -58,7 +58,7 @@ rem * compile all thrift files, generate PAS and C++ code echo. echo Generating code, please wait ... cd "%TARGET%" -for %%a in (*.thrift) do "%BIN%\thrift.exe" -v --gen delphi:ansistr_binary,register_types "%%a" 2>> "%LOGFILE%" +for %%a in (*.thrift) do "%BIN%\thrift.exe" -v --gen delphi:ansistr_binary,register_types,constprefix,events "%%a" 2>> "%LOGFILE%" REM * for %%a in (*.thrift) do "%BIN%\thrift.exe" -v --gen cpp "%%a" >> NUL: cmd /c start notepad "%LOGFILE%" cd .. diff --git a/lib/delphi/test/server.dpr b/lib/delphi/test/server.dpr index ca485afc..7cf26a46 100644 --- a/lib/delphi/test/server.dpr +++ b/lib/delphi/test/server.dpr @@ -24,6 +24,7 @@ program server; uses SysUtils, TestServer in 'TestServer.pas', + TestServerEvents in 'TestServerEvents.pas', Thrift.Test, // in gen-delphi folder Thrift in '..\src\Thrift.pas', Thrift.Transport in '..\src\Thrift.Transport.pas', -- 2.17.1