From 8a70196d00b667b5e890cb0974373d2a2af4f82c Mon Sep 17 00:00:00 2001 From: Jens Geyer Date: Mon, 25 Mar 2013 01:28:12 +0200 Subject: [PATCH] THRIFT-1899 Delphi: Support for Multiplexing Services on any Transport, Protocol and Server Patch: Jens Geyer --- lib/delphi/src/Thrift.Processor.Multiplex.pas | 182 ++++++++++ lib/delphi/src/Thrift.Protocol.Multiplex.pas | 107 ++++++ lib/delphi/src/Thrift.Protocol.pas | 333 ++++++++++++++++++ .../multiplexed/Multiplex.Client.Main.pas | 130 +++++++ .../multiplexed/Multiplex.Server.Main.pas | 201 +++++++++++ .../multiplexed/Multiplex.Test.Client.dpr | 65 ++++ .../multiplexed/Multiplex.Test.Common.pas | 35 ++ .../multiplexed/Multiplex.Test.Server.dpr | 65 ++++ 8 files changed, 1118 insertions(+) create mode 100644 lib/delphi/src/Thrift.Processor.Multiplex.pas create mode 100644 lib/delphi/src/Thrift.Protocol.Multiplex.pas create mode 100644 lib/delphi/test/multiplexed/Multiplex.Client.Main.pas create mode 100644 lib/delphi/test/multiplexed/Multiplex.Server.Main.pas create mode 100644 lib/delphi/test/multiplexed/Multiplex.Test.Client.dpr create mode 100644 lib/delphi/test/multiplexed/Multiplex.Test.Common.pas create mode 100644 lib/delphi/test/multiplexed/Multiplex.Test.Server.dpr diff --git a/lib/delphi/src/Thrift.Processor.Multiplex.pas b/lib/delphi/src/Thrift.Processor.Multiplex.pas new file mode 100644 index 00000000..b771d437 --- /dev/null +++ b/lib/delphi/src/Thrift.Processor.Multiplex.pas @@ -0,0 +1,182 @@ +(* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + *) + +unit Thrift.Processor.Multiplex; + + +interface + +uses + SysUtils, + Generics.Collections, + Thrift, + Thrift.Protocol, + Thrift.Protocol.Multiplex; + +{ TMultiplexedProcessor is a TProcessor allowing a single TServer to provide multiple services. + To do so, you instantiate the processor and then register additional processors with it, + as shown in the following example: + + + TMultiplexedProcessor processor = new TMultiplexedProcessor(); + + processor.registerProcessor( + "Calculator", + new Calculator.Processor(new CalculatorHandler())); + + processor.registerProcessor( + "WeatherReport", + new WeatherReport.Processor(new WeatherReportHandler())); + + TServerTransport t = new TServerSocket(9090); + TSimpleServer server = new TSimpleServer(processor, t); + + server.serve(); +} + + +type + IMultiplexedProcessor = interface( IProcessor) + ['{810FF32D-22A2-4D58-B129-B0590703ECEC}'] + // Register a service with this TMultiplexedProcessor. This allows us + // to broker requests to individual services by using the service name + // to select them at request time. + procedure RegisterProcessor( const serviceName : String; const processor : IProcessor); + end; + + + TMultiplexedProcessorImpl = class( TInterfacedObject, IMultiplexedProcessor, IProcessor) + private type + // Our goal was to work with any protocol. In order to do that, we needed + // to allow them to call readMessageBegin() and get a TMessage in exactly + // the standard format, without the service name prepended to TMessage.name. + TStoredMessageProtocol = class( TProtocolDecorator) + private + FMessageBegin : IMessage; + public + constructor Create( const protocol : IProtocol; const aMsgBegin : IMessage); + function ReadMessageBegin: IMessage; override; + end; + + private + FServiceProcessorMap : TDictionary; + + public + constructor Create; + destructor Destroy; override; + + // Register a service with this TMultiplexedProcessorImpl. This allows us + // to broker requests to individual services by using the service name + // to select them at request time. + procedure RegisterProcessor( const serviceName : String; const processor : IProcessor); + + { This implementation of process performs the following steps: + - Read the beginning of the message. + - Extract the service name from the message. + - Using the service name to locate the appropriate processor. + - Dispatch to the processor, with a decorated instance of TProtocol + that allows readMessageBegin() to return the original TMessage. + + An exception is thrown if the message type is not CALL or ONEWAY + or if the service is unknown (or not properly registered). + } + function Process(const iprot, oprot : IProtocol) : Boolean; + end; + + +implementation + +constructor TMultiplexedProcessorImpl.TStoredMessageProtocol.Create( const protocol : IProtocol; const aMsgBegin : IMessage); +begin + inherited Create( protocol); + FMessageBegin := aMsgBegin; +end; + + +function TMultiplexedProcessorImpl.TStoredMessageProtocol.ReadMessageBegin: IMessage; +begin + result := FMessageBegin; +end; + + +constructor TMultiplexedProcessorImpl.Create; +begin + inherited Create; + FServiceProcessorMap := TDictionary.Create; +end; + + +destructor TMultiplexedProcessorImpl.Destroy; +begin + try + FreeAndNil( FServiceProcessorMap); + finally + inherited Destroy; + end; +end; + + +procedure TMultiplexedProcessorImpl.RegisterProcessor( const serviceName : String; const processor : IProcessor); +begin + FServiceProcessorMap.Add( serviceName, processor); +end; + + +function TMultiplexedProcessorImpl.Process(const iprot, oprot : IProtocol) : Boolean; +var msg, newMsg : IMessage; + idx : Integer; + sService : string; + processor : IProcessor; + protocol : IProtocol; +const + ERROR_INVALID_MSGTYPE = 'Message must be "call" or "oneway"'; + ERROR_INCOMPATIBLE_PROT = 'No service name found in "%s". Client is expected to use TMultiplexProtocol.'; + ERROR_UNKNOWN_SERVICE = 'Service "%s" is not registered with MultiplexedProcessor'; +begin + // Use the actual underlying protocol (e.g. TBinaryProtocol) to read the message header. + // This pulls the message "off the wire", which we'll deal with at the end of this method. + msg := iprot.readMessageBegin(); + if not (msg.Type_ in [TMessageType.Call, TMessageType.Oneway]) + then raise TApplicationException.Create( TApplicationException.TExceptionType.InvalidMessageType, + ERROR_INVALID_MSGTYPE); + + // Extract the service name + idx := Pos( TMultiplexedProtocol.SEPARATOR, msg.Name); + if idx < 1 + then raise TApplicationException.Create( TApplicationException.TExceptionType.InvalidProtocol, + Format(ERROR_INCOMPATIBLE_PROT,[msg.Name])); + + // Create a new TMessage, something that can be consumed by any TProtocol + sService := Copy( msg.Name, 1, idx-1); + if not FServiceProcessorMap.TryGetValue( sService, processor) + then raise TApplicationException.Create( TApplicationException.TExceptionType.InternalError, + Format(ERROR_UNKNOWN_SERVICE,[sService])); + + // Create a new TMessage, removing the service name + Inc( idx, Length(TMultiplexedProtocol.SEPARATOR)); + newMsg := TMessageImpl.Create( Copy( msg.Name, idx, MAXINT), msg.Type_, msg.SeqID); + + // Dispatch processing to the stored processor + protocol := TStoredMessageProtocol.Create( iprot, newMsg); + result := processor.process( protocol, oprot); +end; + + +end. + diff --git a/lib/delphi/src/Thrift.Protocol.Multiplex.pas b/lib/delphi/src/Thrift.Protocol.Multiplex.pas new file mode 100644 index 00000000..2cd24018 --- /dev/null +++ b/lib/delphi/src/Thrift.Protocol.Multiplex.pas @@ -0,0 +1,107 @@ +(* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + *) + +unit Thrift.Protocol.Multiplex; + +interface + +uses Thrift.Protocol; + +{ TMultiplexedProtocol is a protocol-independent concrete decorator + that allows a Thrift client to communicate with a multiplexing Thrift server, + by prepending the service name to the function name during function calls. + + NOTE: THIS IS NOT USED BY SERVERS. + On the server, use TMultiplexedProcessor to handle requests from a multiplexing client. + + This example uses a single socket transport to invoke two services: + + TSocket transport = new TSocket("localhost", 9090); + transport.open(); + + TBinaryProtocol protocol = new TBinaryProtocol(transport); + + TMultiplexedProtocol mp = new TMultiplexedProtocol(protocol, "Calculator"); + Calculator.Client service = new Calculator.Client(mp); + + TMultiplexedProtocol mp2 = new TMultiplexedProtocol(protocol, "WeatherReport"); + WeatherReport.Client service2 = new WeatherReport.Client(mp2); + + System.out.println(service.add(2,2)); + System.out.println(service2.getTemperature()); + +} + +type + TMultiplexedProtocol = class( TProtocolDecorator) + public const + { Used to delimit the service name from the function name } + SEPARATOR = ':'; + + private + FServiceName : String; + + public + { Wrap the specified protocol, allowing it to be used to communicate with a multiplexing server. + The serviceName is required as it is prepended to the message header so that the multiplexing + server can broker the function call to the proper service. + + Args: + protocol ....... Your communication protocol of choice, e.g. TBinaryProtocol. + serviceName .... The service name of the service communicating via this protocol. + } + constructor Create( const aProtocol : IProtocol; const aServiceName : string); + + { Prepends the service name to the function name, separated by SEPARATOR. + Args: The original message. + } + procedure WriteMessageBegin( const msg: IMessage); override; + end; + + +implementation + + +constructor TMultiplexedProtocol.Create(const aProtocol: IProtocol; const aServiceName: string); +begin + ASSERT( aServiceName <> ''); + inherited Create(aProtocol); + FServiceName := aServiceName; +end; + + +procedure TMultiplexedProtocol.WriteMessageBegin( const msg: IMessage); +// Prepends the service name to the function name, separated by TMultiplexedProtocol.SEPARATOR. +var newMsg : IMessage; +begin + case msg.Type_ of + TMessageType.Call, + TMessageType.Oneway : begin + newMsg := TMessageImpl.Create( FServiceName + SEPARATOR + msg.Name, msg.Type_, msg.SeqID); + inherited WriteMessageBegin( newMsg); + end; + + else + inherited WriteMessageBegin( msg); + end; +end; + + +end. + diff --git a/lib/delphi/src/Thrift.Protocol.pas b/lib/delphi/src/Thrift.Protocol.pas index 4c1954c9..b08458aa 100644 --- a/lib/delphi/src/Thrift.Protocol.pas +++ b/lib/delphi/src/Thrift.Protocol.pas @@ -437,6 +437,69 @@ type procedure SetReadLength( readLength: Integer ); end; + + { TProtocolDecorator forwards all requests to an enclosed TProtocol instance, + providing a way to author concise concrete decorator subclasses. The decorator + does not (and should not) modify the behaviour of the enclosed TProtocol + + See p.175 of Design Patterns (by Gamma et al.) + } + TProtocolDecorator = class( TProtocolImpl) + private + FWrappedProtocol : IProtocol; + + public + // Encloses the specified protocol. + // All operations will be forward to the given protocol. Must be non-null. + constructor Create( const aProtocol : IProtocol); + + procedure WriteMessageBegin( const msg: IMessage); override; + procedure WriteMessageEnd; override; + procedure WriteStructBegin( const struc: IStruct); override; + procedure WriteStructEnd; override; + procedure WriteFieldBegin( const field: IField); override; + procedure WriteFieldEnd; override; + procedure WriteFieldStop; override; + procedure WriteMapBegin( const map: IMap); override; + procedure WriteMapEnd; override; + procedure WriteListBegin( const list: IList); override; + procedure WriteListEnd(); override; + procedure WriteSetBegin( const set_: ISet ); override; + procedure WriteSetEnd(); override; + procedure WriteBool( b: Boolean); override; + procedure WriteByte( b: ShortInt); override; + procedure WriteI16( i16: SmallInt); override; + procedure WriteI32( i32: Integer); override; + procedure WriteI64( const i64: Int64); override; + procedure WriteDouble( const d: Double); override; + procedure WriteString( const s: string ); override; + procedure WriteAnsiString( const s: AnsiString); override; + procedure WriteBinary( const b: TBytes); override; + + function ReadMessageBegin: IMessage; override; + procedure ReadMessageEnd(); override; + function ReadStructBegin: IStruct; override; + procedure ReadStructEnd; override; + function ReadFieldBegin: IField; override; + procedure ReadFieldEnd(); override; + function ReadMapBegin: IMap; override; + procedure ReadMapEnd(); override; + function ReadListBegin: IList; override; + procedure ReadListEnd(); override; + function ReadSetBegin: ISet; override; + procedure ReadSetEnd(); override; + function ReadBool: Boolean; override; + function ReadByte: ShortInt; override; + function ReadI16: SmallInt; override; + function ReadI32: Integer; override; + function ReadI64: Int64; override; + function ReadDouble:Double; override; + function ReadBinary: TBytes; override; + function ReadString: string; override; + function ReadAnsiString: AnsiString; override; + end; + + implementation function ConvertInt64ToDouble( const n: Int64): Double; @@ -1228,5 +1291,275 @@ begin Result := TBinaryProtocolImpl.Create( trans, FStrictRead, FStrictWrite); end; + +{ TProtocolDecorator } + +constructor TProtocolDecorator.Create( const aProtocol : IProtocol); +begin + ASSERT( aProtocol <> nil); + inherited Create( aProtocol.Transport); + FWrappedProtocol := aProtocol; +end; + + +procedure TProtocolDecorator.WriteMessageBegin( const msg: IMessage); +begin + FWrappedProtocol.WriteMessageBegin( msg); +end; + + +procedure TProtocolDecorator.WriteMessageEnd; +begin + FWrappedProtocol.WriteMessageEnd; +end; + + +procedure TProtocolDecorator.WriteStructBegin( const struc: IStruct); +begin + FWrappedProtocol.WriteStructBegin( struc); +end; + + +procedure TProtocolDecorator.WriteStructEnd; +begin + FWrappedProtocol.WriteStructEnd; +end; + + +procedure TProtocolDecorator.WriteFieldBegin( const field: IField); +begin + FWrappedProtocol.WriteFieldBegin( field); +end; + + +procedure TProtocolDecorator.WriteFieldEnd; +begin + FWrappedProtocol.WriteFieldEnd; +end; + + +procedure TProtocolDecorator.WriteFieldStop; +begin + FWrappedProtocol.WriteFieldStop; +end; + + +procedure TProtocolDecorator.WriteMapBegin( const map: IMap); +begin + FWrappedProtocol.WriteMapBegin( map); +end; + + +procedure TProtocolDecorator.WriteMapEnd; +begin + FWrappedProtocol.WriteMapEnd; +end; + + +procedure TProtocolDecorator.WriteListBegin( const list: IList); +begin + FWrappedProtocol.WriteListBegin( list); +end; + + +procedure TProtocolDecorator.WriteListEnd(); +begin + FWrappedProtocol.WriteListEnd(); +end; + + +procedure TProtocolDecorator.WriteSetBegin( const set_: ISet ); +begin + FWrappedProtocol.WriteSetBegin( set_); +end; + + +procedure TProtocolDecorator.WriteSetEnd(); +begin + FWrappedProtocol.WriteSetEnd(); +end; + + +procedure TProtocolDecorator.WriteBool( b: Boolean); +begin + FWrappedProtocol.WriteBool( b); +end; + + +procedure TProtocolDecorator.WriteByte( b: ShortInt); +begin + FWrappedProtocol.WriteByte( b); +end; + + +procedure TProtocolDecorator.WriteI16( i16: SmallInt); +begin + FWrappedProtocol.WriteI16( i16); +end; + + +procedure TProtocolDecorator.WriteI32( i32: Integer); +begin + FWrappedProtocol.WriteI32( i32); +end; + + +procedure TProtocolDecorator.WriteI64( const i64: Int64); +begin + FWrappedProtocol.WriteI64( i64); +end; + + +procedure TProtocolDecorator.WriteDouble( const d: Double); +begin + FWrappedProtocol.WriteDouble( d); +end; + + +procedure TProtocolDecorator.WriteString( const s: string ); +begin + FWrappedProtocol.WriteString( s); +end; + + +procedure TProtocolDecorator.WriteAnsiString( const s: AnsiString); +begin + FWrappedProtocol.WriteAnsiString( s); +end; + + +procedure TProtocolDecorator.WriteBinary( const b: TBytes); +begin + FWrappedProtocol.WriteBinary( b); +end; + + +function TProtocolDecorator.ReadMessageBegin: IMessage; +begin + result := FWrappedProtocol.ReadMessageBegin; +end; + + +procedure TProtocolDecorator.ReadMessageEnd(); +begin + FWrappedProtocol.ReadMessageEnd(); +end; + + +function TProtocolDecorator.ReadStructBegin: IStruct; +begin + result := FWrappedProtocol.ReadStructBegin; +end; + + +procedure TProtocolDecorator.ReadStructEnd; +begin + FWrappedProtocol.ReadStructEnd; +end; + + +function TProtocolDecorator.ReadFieldBegin: IField; +begin + result := FWrappedProtocol.ReadFieldBegin; +end; + + +procedure TProtocolDecorator.ReadFieldEnd(); +begin + FWrappedProtocol.ReadFieldEnd(); +end; + + +function TProtocolDecorator.ReadMapBegin: IMap; +begin + result := FWrappedProtocol.ReadMapBegin; +end; + + +procedure TProtocolDecorator.ReadMapEnd(); +begin + FWrappedProtocol.ReadMapEnd(); +end; + + +function TProtocolDecorator.ReadListBegin: IList; +begin + result := FWrappedProtocol.ReadListBegin; +end; + + +procedure TProtocolDecorator.ReadListEnd(); +begin + FWrappedProtocol.ReadListEnd(); +end; + + +function TProtocolDecorator.ReadSetBegin: ISet; +begin + result := FWrappedProtocol.ReadSetBegin; +end; + + +procedure TProtocolDecorator.ReadSetEnd(); +begin + FWrappedProtocol.ReadSetEnd(); +end; + + +function TProtocolDecorator.ReadBool: Boolean; +begin + result := FWrappedProtocol.ReadBool; +end; + + +function TProtocolDecorator.ReadByte: ShortInt; +begin + result := FWrappedProtocol.ReadByte; +end; + + +function TProtocolDecorator.ReadI16: SmallInt; +begin + result := FWrappedProtocol.ReadI16; +end; + + +function TProtocolDecorator.ReadI32: Integer; +begin + result := FWrappedProtocol.ReadI32; +end; + + +function TProtocolDecorator.ReadI64: Int64; +begin + result := FWrappedProtocol.ReadI64; +end; + + +function TProtocolDecorator.ReadDouble:Double; +begin + result := FWrappedProtocol.ReadDouble; +end; + + +function TProtocolDecorator.ReadBinary: TBytes; +begin + result := FWrappedProtocol.ReadBinary; +end; + + +function TProtocolDecorator.ReadString: string; +begin + result := FWrappedProtocol.ReadString; +end; + + +function TProtocolDecorator.ReadAnsiString: AnsiString; +begin + result := FWrappedProtocol.ReadAnsiString; +end; + + + end. diff --git a/lib/delphi/test/multiplexed/Multiplex.Client.Main.pas b/lib/delphi/test/multiplexed/Multiplex.Client.Main.pas new file mode 100644 index 00000000..2cc7ab03 --- /dev/null +++ b/lib/delphi/test/multiplexed/Multiplex.Client.Main.pas @@ -0,0 +1,130 @@ +(* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + *) + +unit Multiplex.Client.Main; + +{.$DEFINE StressTest} // activate to stress-test the server with frequent connects/disconnects +{.$DEFINE PerfTest} // activate to activate the performance test + +interface + +uses + Windows, SysUtils, Classes, + DateUtils, + Generics.Collections, + Thrift, + Thrift.Protocol, + Thrift.Protocol.Multiplex, + Thrift.Transport.Pipes, + Thrift.Transport, + Thrift.Stream, + Thrift.Collections, + Benchmark, // in gen-delphi folder + Aggr, // in gen-delphi folder + Multiplex.Test.Common; + +type + TTestClient = class + protected + FProtocol : IProtocol; + + procedure ParseArgs( const args: array of string); + procedure Setup; + procedure Run; + public + constructor Create( const args: array of string); + class procedure Execute( const args: array of string); + end; + +implementation + + +type + IServiceClient = interface + ['{7745C1C2-AB20-43BA-B6F0-08BF92DE0BAC}'] + procedure Test; + end; + +//--- TTestClient ------------------------------------- + + +class procedure TTestClient.Execute( const args: array of string); +var client : TTestClient; +begin + client := TTestClient.Create(args); + try + client.Run; + finally + client.Free; + end; +end; + + +constructor TTestClient.Create( const args: array of string); +begin + ParseArgs(args); + Setup; +end; + + +procedure TTestClient.ParseArgs( const args: array of string); +begin + if Length(args) <> 0 + then raise Exception.Create('No args accepted so far'); +end; + + +procedure TTestClient.Setup; +var trans : ITransport; +begin + trans := TSocketImpl.Create( 'localhost', 9090); + trans := TFramedTransportImpl.Create( trans); + trans.Open; + FProtocol := TBinaryProtocolImpl.Create( trans, TRUE, TRUE); +end; + + +procedure TTestClient.Run; +var bench : TBenchmarkService.Iface; + aggr : TAggr.Iface; + multiplex : IProtocol; + i : Integer; +begin + try + multiplex := TMultiplexedProtocol.Create( FProtocol, NAME_BENCHMARKSERVICE); + bench := TBenchmarkService.TClient.Create( multiplex); + + multiplex := TMultiplexedProtocol.Create( FProtocol, NAME_AGGR); + aggr := TAggr.TClient.Create( multiplex); + + for i := 1 to 10 + do aggr.addValue( bench.fibonacci(i)); + + for i in aggr.getValues + do Write(IntToStr(i)+' '); + WriteLn; + except + on e:Exception do Writeln(#10+e.Message); + end; +end; + + +end. + + diff --git a/lib/delphi/test/multiplexed/Multiplex.Server.Main.pas b/lib/delphi/test/multiplexed/Multiplex.Server.Main.pas new file mode 100644 index 00000000..4f5cd13e --- /dev/null +++ b/lib/delphi/test/multiplexed/Multiplex.Server.Main.pas @@ -0,0 +1,201 @@ +(* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + *) + +unit Multiplex.Server.Main; + +{$WARN SYMBOL_PLATFORM OFF} + +{.$DEFINE RunEndless} // activate to interactively stress-test the server stop routines via Ctrl+C + +interface + +uses + Windows, SysUtils, + Generics.Collections, + Thrift.Console, + Thrift.Server, + Thrift.Transport, + Thrift.Transport.Pipes, + Thrift.Protocol, + Thrift.Protocol.Multiplex, + Thrift.Processor.Multiplex, + Thrift.Collections, + Thrift.Utils, + Thrift, + Benchmark, // in gen-delphi folder + Aggr, // in gen-delphi folder + Multiplex.Test.Common, + Contnrs; + +type + TTestServer = class + public type + ITestHandler = interface + ['{CAE09AAB-80FB-48E9-B3A8-7F9B96F5419A}'] + procedure SetServer( const AServer : IServer ); + end; + + protected type + TTestHandlerImpl = class( TInterfacedObject, ITestHandler) + private + FServer : IServer; + protected + // ITestHandler + procedure SetServer( const AServer : IServer ); + + property Server : IServer read FServer write SetServer; + end; + + TBenchmarkServiceImpl = class( TTestHandlerImpl, TBenchmarkService.Iface) + protected + // TBenchmarkService.Iface + function fibonacci(n: ShortInt): Integer; + end; + + + TAggrImpl = class( TTestHandlerImpl, TAggr.Iface) + protected + FList : IThriftList; + + // TAggr.Iface + procedure addValue(value: Integer); + function getValues(): IThriftList; + public + constructor Create; + destructor Destroy; override; + end; + + public + class procedure Execute( const args: array of string); + end; + + +implementation + + +{ TTestServer.TTestHandlerImpl } + +procedure TTestServer.TTestHandlerImpl.SetServer( const AServer: IServer); +begin + FServer := AServer; +end; + + +{ TTestServer.TBenchmarkServiceImpl } + +function TTestServer.TBenchmarkServiceImpl.fibonacci(n: ShortInt): Integer; +var prev, next : Integer; +begin + prev := 0; + result := 1; + while n > 0 do begin + next := result + prev; + prev := result; + result := next; + Dec(n); + end; +end; + +{ TTestServer.TAggrImpl } + +constructor TTestServer.TAggrImpl.Create; +begin + inherited Create; + FList := TThriftListImpl.Create; +end; + + +destructor TTestServer.TAggrImpl.Destroy; +begin + try + FreeAndNil( FList); + finally + inherited Destroy; + end; +end; + + +procedure TTestServer.TAggrImpl.addValue(value: Integer); +begin + FList.Add( value); +end; + + +function TTestServer.TAggrImpl.getValues(): IThriftList; +begin + result := FList; +end; + + +{ TTestServer } + +class procedure TTestServer.Execute( const args: array of string); +var + TransportFactory : ITransportFactory; + ProtocolFactory : IProtocolFactory; + ServerTrans : IServerTransport; + benchHandler : TBenchmarkService.Iface; + aggrHandler : TAggr.Iface; + benchProcessor : IProcessor; + aggrProcessor : IProcessor; + multiplex : IMultiplexedProcessor; + ServerEngine : IServer; +begin + try + // create protocol factory, default to BinaryProtocol + ProtocolFactory := TBinaryProtocolImpl.TFactory.Create( TRUE, TRUE); + servertrans := TServerSocketImpl.Create( 9090, 0, FALSE); + TransportFactory := TFramedTransportImpl.TFactory.Create; + + benchHandler := TBenchmarkServiceImpl.Create; + benchProcessor := TBenchmarkService.TProcessorImpl.Create( benchHandler); + + aggrHandler := TAggrImpl.Create; + aggrProcessor := TAggr.TProcessorImpl.Create( aggrHandler); + + multiplex := TMultiplexedProcessorImpl.Create; + multiplex.RegisterProcessor( NAME_BENCHMARKSERVICE, benchProcessor); + multiplex.RegisterProcessor( NAME_AGGR, aggrProcessor); + + ServerEngine := TSimpleServer.Create( multiplex, + ServerTrans, + TransportFactory, + ProtocolFactory); + + (benchHandler as ITestHandler).SetServer( ServerEngine); + (aggrHandler as ITestHandler).SetServer( ServerEngine); + + Console.WriteLine('Starting the server ...'); + ServerEngine.serve(); + + (benchHandler as ITestHandler).SetServer( nil); + (aggrHandler as ITestHandler).SetServer( nil); + + except + on E: Exception do + begin + Console.Write( E.Message); + end; + end; + Console.WriteLine( 'done.'); +end; + + +end. + diff --git a/lib/delphi/test/multiplexed/Multiplex.Test.Client.dpr b/lib/delphi/test/multiplexed/Multiplex.Test.Client.dpr new file mode 100644 index 00000000..23e296a3 --- /dev/null +++ b/lib/delphi/test/multiplexed/Multiplex.Test.Client.dpr @@ -0,0 +1,65 @@ +(* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + *) + + +program Multiplex.Test.Client; + +{$APPTYPE CONSOLE} + +uses + SysUtils, + Multiplex.Client.Main in 'Multiplex.Client.Main.pas', + Thrift in '..\..\src\Thrift.pas', + Thrift.Transport in '..\..\src\Thrift.Transport.pas', + Thrift.Transport.Pipes in '..\..\src\Thrift.Transport.Pipes.pas', + Thrift.Protocol in '..\..\src\Thrift.Protocol.pas', + Thrift.Protocol.Multiplex in '..\..\src\Thrift.Protocol.Multiplex.pas', + Thrift.Collections in '..\..\src\Thrift.Collections.pas', + Thrift.Server in '..\..\src\Thrift.Server.pas', + Thrift.Stream in '..\..\src\Thrift.Stream.pas', + Thrift.Console in '..\..\src\Thrift.Console.pas', + Thrift.Utils in '..\..\src\Thrift.Utils.pas'; + +var + nParamCount : Integer; + args : array of string; + i : Integer; + arg : string; + s : string; + +begin + try + Writeln( 'Multiplex TestClient '+Thrift.Version); + nParamCount := ParamCount; + SetLength( args, nParamCount); + for i := 1 to nParamCount do + begin + arg := ParamStr( i ); + args[i-1] := arg; + end; + TTestClient.Execute( args ); + Readln; + except + on E: Exception do begin + Writeln(E.ClassName, ': ', E.Message); + ExitCode := $FFFF; + end; + end; +end. + diff --git a/lib/delphi/test/multiplexed/Multiplex.Test.Common.pas b/lib/delphi/test/multiplexed/Multiplex.Test.Common.pas new file mode 100644 index 00000000..231c3adb --- /dev/null +++ b/lib/delphi/test/multiplexed/Multiplex.Test.Common.pas @@ -0,0 +1,35 @@ +(* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + *) + +unit Multiplex.Test.Common; + +interface + +const + NAME_BENCHMARKSERVICE = 'BenchmarkService'; + NAME_AGGR = 'Aggr'; + + +implementation + +// nix + +end. + + diff --git a/lib/delphi/test/multiplexed/Multiplex.Test.Server.dpr b/lib/delphi/test/multiplexed/Multiplex.Test.Server.dpr new file mode 100644 index 00000000..9da1cdc9 --- /dev/null +++ b/lib/delphi/test/multiplexed/Multiplex.Test.Server.dpr @@ -0,0 +1,65 @@ +(* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + *) + +program Multiplex.Test.Server; + +{$APPTYPE CONSOLE} + +uses + SysUtils, + Multiplex.Server.Main in 'Multiplex.Server.Main.pas', + Thrift in '..\..\src\Thrift.pas', + Thrift.Transport in '..\..\src\Thrift.Transport.pas', + Thrift.Transport.Pipes in '..\..\src\Thrift.Transport.Pipes.pas', + Thrift.Protocol in '..\..\src\Thrift.Protocol.pas', + Thrift.Protocol.Multiplex in '..\..\src\Thrift.Protocol.Multiplex.pas', + Thrift.Processor.Multiplex in '..\..\src\Thrift.Processor.Multiplex.pas', + Thrift.Collections in '..\..\src\Thrift.Collections.pas', + Thrift.Server in '..\..\src\Thrift.Server.pas', + Thrift.Console in '..\..\src\Thrift.Console.pas', + Thrift.Utils in '..\..\src\Thrift.Utils.pas', + Thrift.Stream in '..\..\src\Thrift.Stream.pas'; + +var + nParamCount : Integer; + args : array of string; + i : Integer; + arg : string; + s : string; + +begin + try + Writeln( 'Multiplex TestServer '+Thrift.Version); + nParamCount := ParamCount; + SetLength( args, nParamCount); + for i := 1 to nParamCount do + begin + arg := ParamStr( i ); + args[i-1] := arg; + end; + TTestServer.Execute( args ); + Writeln('Press ENTER to close ... '); Readln; + except + on E: Exception do + Writeln(E.ClassName, ': ', E.Message); + end; +end. + + + -- 2.17.1