From: Jens Geyer Date: Fri, 7 Mar 2014 18:42:28 +0000 (+0100) Subject: THRIFT-2383 contrib: sample for connecting Thrift with Rebus X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=b39b5ea95d9b1851c437eaa80bc73d0acb643fa6;p=common%2Fthrift.git THRIFT-2383 contrib: sample for connecting Thrift with Rebus Patch: Jens Geyer --- diff --git a/contrib/Rebus/App.config b/contrib/Rebus/App.config new file mode 100644 index 00000000..4208af6b --- /dev/null +++ b/contrib/Rebus/App.config @@ -0,0 +1,33 @@ + + + + + +
+ + + + + + + + + + diff --git a/contrib/Rebus/Program.cs b/contrib/Rebus/Program.cs new file mode 100644 index 00000000..563c62ad --- /dev/null +++ b/contrib/Rebus/Program.cs @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Rebus.Configuration; +using Rebus.RabbitMQ; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using RebusSample.Client; +using RebusSample.Server; + +namespace RebusSample +{ + class Program + { + static BuiltinContainerAdapter StartRequestServer(string server) + { + // client Rebus configuration + var adapter = new BuiltinContainerAdapter(); + Configure.With(adapter) + .Transport(t => t.UseRabbitMq("amqp://" + server, "MathRequests", "MathRequestErrors")) + .MessageOwnership(o => o.FromRebusConfigurationSection()) + .CreateBus().Start(); + + // register all relevant message handlers + adapter.Register(typeof(MathRequestCallHandler)); + return adapter; + } + + + static BuiltinContainerAdapter StartResponseServer(string server) + { + // client Rebus configuration + var adapter = new BuiltinContainerAdapter(); + Configure.With(adapter) + .Transport(t => t.UseRabbitMq("amqp://" + server, "MathResponses", "MathResponseErrors")) + .MessageOwnership(o => o.FromRebusConfigurationSection()) + .CreateBus().Start(); + + // register all relevant message handlers + adapter.Register(typeof(MathResponseCallHandler)); + return adapter; + } + + static void Main(string[] args) + { + string server = "localhost"; + + // start all servers + var req = StartRequestServer(server); + var rsp = StartResponseServer(server); + + // send the first message + var random = new Random(); + var client = new MathRequestClient(server); + client.DoTheMath(random.Next(), random.Next()); + + // now what? + Console.Write("Hit to stop ... "); + Console.ReadLine(); + } + } +} diff --git a/contrib/Rebus/Properties/AssemblyInfo.cs b/contrib/Rebus/Properties/AssemblyInfo.cs new file mode 100644 index 00000000..e476eab7 --- /dev/null +++ b/contrib/Rebus/Properties/AssemblyInfo.cs @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +[assembly: AssemblyTitle("RebusSample")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("RebusSample")] +[assembly: AssemblyCopyright("Copyright © 2014")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +[assembly: ComVisible(false)] + +[assembly: Guid("0af10984-40d3-453d-b1e5-421529e8c7e2")] + +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/contrib/Rebus/README b/contrib/Rebus/README new file mode 100644 index 00000000..bbb9c496 --- /dev/null +++ b/contrib/Rebus/README @@ -0,0 +1,21 @@ +Sample code for the combination of Thrift with Rebus. + +Rebus is a .NET service bus, similar to NServiceBus, but more lightweight. +It ihas been mainly written by Mogens Heller Grabe and is currently hosted +on GitHub (https://github.com/rebus-org/Rebus) + +As with all ServiceBus or MQ scenarios, due to the highly asynchronous +operations it is recommended to do all calls as "oneway void" calls. + +The configuration can be done via App.Config, via code or even mixed from +both locations. Refer to the Rebus documentation for further details. For +this example, since we are effectively implementing two queue listeners in +only one single process, we do configuration of incoming and error queues +in the code. + +If you want to communicate with non-NET languages, you may need a customized +serializer as well, in order to override Rebus' default wire format. Please +refer to the Rebus docs on how to do that (it's not that hard, really). + +Additional requirements: +- RabbitMQ .NET client (see nuget) diff --git a/contrib/Rebus/RebusSample.csproj b/contrib/Rebus/RebusSample.csproj new file mode 100644 index 00000000..4058a6da --- /dev/null +++ b/contrib/Rebus/RebusSample.csproj @@ -0,0 +1,102 @@ + + + + + + Debug + AnyCPU + {264E2126-EDE0-4B47-89C1-B397B25BB13D} + Exe + Properties + RebusSample + RebusSample + v4.5 + 512 + + + AnyCPU + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + AnyCPU + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + ..\..\..\..\..\Toolbox\ServiceBus\3rdparty\rabbitmq-dotnet-client-3.2.1-dotnet-3.0\bin\RabbitMQ.Client.dll + + + ..\..\..\..\..\Toolbox\ServiceBus\3rdparty\Rebus-master\deploy\NET40\Rebus.dll + + + ..\..\..\..\..\Toolbox\ServiceBus\3rdparty\Rebus-master\deploy\NET40\Rebus.RabbitMQ.dll + + + + + + + + + + + + + + + + + + + + + + + + + {499eb63c-d74c-47e8-ae48-a2fc94538e9d} + Thrift + + + + + cd $(ProjectDir) +if not exist gen-csharp\*.cs thrift -gen csharp sample.thrift + + + + \ No newline at end of file diff --git a/contrib/Rebus/RebusSample.sln b/contrib/Rebus/RebusSample.sln new file mode 100644 index 00000000..284ef36a --- /dev/null +++ b/contrib/Rebus/RebusSample.sln @@ -0,0 +1,28 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio 2013 +VisualStudioVersion = 12.0.30110.0 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RebusSample", "RebusSample.csproj", "{264E2126-EDE0-4B47-89C1-B397B25BB13D}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Thrift", "..\..\lib\csharp\src\Thrift.csproj", "{499EB63C-D74C-47E8-AE48-A2FC94538E9D}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {264E2126-EDE0-4B47-89C1-B397B25BB13D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {264E2126-EDE0-4B47-89C1-B397B25BB13D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {264E2126-EDE0-4B47-89C1-B397B25BB13D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {264E2126-EDE0-4B47-89C1-B397B25BB13D}.Release|Any CPU.Build.0 = Release|Any CPU + {499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection +EndGlobal diff --git a/contrib/Rebus/ServiceImpl/Both.cs b/contrib/Rebus/ServiceImpl/Both.cs new file mode 100644 index 00000000..fba67ec1 --- /dev/null +++ b/contrib/Rebus/ServiceImpl/Both.cs @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; + + +namespace RebusSample +{ + // generic data container for serialized Thrift calls + public class GenericThriftServiceCall + { + public byte[] rawBytes; + } + + // specific containers (one per Thrift service) to leverage Rebus' handler routing + public class MathRequestCall : GenericThriftServiceCall { } + public class MathResponseCall : GenericThriftServiceCall { } + +} \ No newline at end of file diff --git a/contrib/Rebus/ServiceImpl/Client.cs b/contrib/Rebus/ServiceImpl/Client.cs new file mode 100644 index 00000000..2408041a --- /dev/null +++ b/contrib/Rebus/ServiceImpl/Client.cs @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Rebus; +using Rebus.Configuration; +using Rebus.Messages; +using Rebus.RabbitMQ; +using System; +using System.Collections.Generic; +using System.IO; +using Thrift.Protocol; +using Thrift.Transport; + +/* + * The client emits calls to BasicMathServers + * + * The client implements the BasicMathClient service. + * If the server has processed our request, we get the results back through this service + */ + +namespace RebusSample.Client +{ + + // handler to be registered with Rebus + class MathResponseCallHandler : IHandleMessages + { + public void Handle(MathResponseCall message) + { + // Thrift protocol/transport stack + var stm = new MemoryStream(message.rawBytes); + var trns = new TStreamTransport(stm, null); + var prot = new TBinaryProtocol(trns); + + // create a processor and let him handle the call + var hndl = new MathResponsesHandler(); + var proc = new BasicMathClient.Processor(hndl); + proc.Process(prot, null); // oneway only + } + } + + + // serves incoming responses with calculation results + internal class MathResponsesHandler : BasicMathClient.Iface + { + public void FourResults(int added, int multiplied, int subtracted, int divided) + { + Console.WriteLine("added = {0}", added); + Console.WriteLine("multiplied= {0}", multiplied); + Console.WriteLine("subtracted = {0}", subtracted); + Console.WriteLine("divided = {0}", divided); + + PingAndDoAnotherCalculation(); + } + + + public void ThreeResults(int added, int multiplied, int subtracted) + { + Console.WriteLine("added = {0}", added); + Console.WriteLine("multiplied= {0}", multiplied); + Console.WriteLine("subtracted = {0}", subtracted); + Console.WriteLine("DIV/0 error during division"); + + PingAndDoAnotherCalculation(); + } + + + public void Pong(long value) + { + var latency = DateTime.Now.Ticks - value; + Console.WriteLine("Ping took {0} ms", new DateTime(latency).Millisecond); + } + + + private void PingAndDoAnotherCalculation() + { + var random = new Random(); + var client = new MathRequestClient("localhost"); + client.Ping(DateTime.Now.Ticks); + client.DoTheMath(random.Next(), random.Next()); + } + } + + + // provides the client-side interface for calculation requests + internal class MathRequestClient : BasicMathServer.Iface + { + private BuiltinContainerAdapter MQAdapter; + + + public MathRequestClient(string server) + { + MQAdapter = new BuiltinContainerAdapter(); + Configure.With(MQAdapter) + .Transport(t => t.UseRabbitMqInOneWayMode("amqp://" + server)) // we need send only + .MessageOwnership(o => o.FromRebusConfigurationSection()) + .CreateBus().Start(); + } + + + public void SerializeThriftCall(Action action) + { + // Thrift protocol/transport stack + var stm = new MemoryStream(); + var trns = new TStreamTransport(null, stm); + var prot = new TBinaryProtocol(trns); + + // serialize the call into a bunch of bytes + var client = new BasicMathServer.Client(prot); + if( action != null) + action(client); + else + throw new ArgumentException("action must not be null"); + + // make sure everything is written to the MemoryStream + trns.Flush(); + + // send the message + var msg = new MathRequestCall() { rawBytes = stm.ToArray() }; + MQAdapter.Bus.Send(msg); + } + + + public void Ping(long value) + { + SerializeThriftCall(client => + { + client.Ping(value); + }); + } + + + public void DoTheMath( int arg1, int arg2) + { + SerializeThriftCall(client => + { + client.DoTheMath(arg1, arg2); + }); + } + } +} + diff --git a/contrib/Rebus/ServiceImpl/Server.cs b/contrib/Rebus/ServiceImpl/Server.cs new file mode 100644 index 00000000..149d513c --- /dev/null +++ b/contrib/Rebus/ServiceImpl/Server.cs @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Rebus; +using Rebus.Configuration; +using Rebus.Messages; +using Rebus.RabbitMQ; +using System; +using System.Collections.Generic; +using System.IO; +using Thrift.Protocol; +using Thrift.Transport; + +/* + * The server implements the BasicMathServer service . + * All results are sent back to the client via the BasicMathClient service + */ + + +namespace RebusSample.Server +{ + // handler to be registered with Rebus + class MathRequestCallHandler : IHandleMessages + { + public void Handle(MathRequestCall message) + { + // Thrift protocol/transport stack + var stm = new MemoryStream(message.rawBytes); + var trns = new TStreamTransport(stm, null); + var prot = new TBinaryProtocol(trns); + + // create a processor and let him handle the call + var hndl = new MathRequestsHandler(); + var proc = new BasicMathServer.Processor(hndl); + proc.Process(prot, null); // oneway only + } + } + + + // serves incoming calculation requests + internal class MathRequestsHandler : BasicMathServer.Iface + { + public void Ping(long value) + { + var client = new MathResponseClient("localhost"); + client.Pong(value); + } + + + public void DoTheMath(int arg1, int arg2) + { + var client = new MathResponseClient("localhost"); + if( arg2 != 0) + client.FourResults( arg1+arg2, arg1*arg2, arg1-arg2, arg1/arg2); + else + client.ThreeResults( arg1+arg2, arg1*arg2, arg1-arg2); + } + } + + + // provides the client-side interface for calculation responses + internal class MathResponseClient : BasicMathClient.Iface + { + private BuiltinContainerAdapter MQAdapter; + + + public MathResponseClient(string server) + { + MQAdapter = new BuiltinContainerAdapter(); + Configure.With(MQAdapter) + .Transport(t => t.UseRabbitMqInOneWayMode("amqp://" + server)) // we need send only + .MessageOwnership(o => o.FromRebusConfigurationSection()) + .CreateBus().Start(); + } + + + public void SerializeThriftCall(Action action) + { + // Thrift protocol/transport stack + var stm = new MemoryStream(); + var trns = new TStreamTransport(null, stm); + var prot = new TBinaryProtocol(trns); + + // serialize the call into a bunch of bytes + var client = new BasicMathClient.Client(prot); + if (action != null) + action(client); + else + throw new ArgumentException("action must not be null"); + + // make sure everything is written to the MemoryStream + trns.Flush(); + + // send the message + var msg = new MathResponseCall() { rawBytes = stm.ToArray() }; + MQAdapter.Bus.Send(msg); + } + + + public void Pong(long value) + { + SerializeThriftCall(client => + { + client.Pong(value); + }); + } + + + public void ThreeResults(int added, int multiplied, int suctracted) + { + SerializeThriftCall(client => + { + client.ThreeResults(added, multiplied, suctracted); + }); + } + + + public void FourResults(int added, int multiplied, int suctracted, int divided) + { + SerializeThriftCall(client => + { + client.FourResults(added, multiplied, suctracted, divided); + }); + } + } +} + diff --git a/contrib/Rebus/sample.thrift b/contrib/Rebus/sample.thrift new file mode 100644 index 00000000..fe1d21a8 --- /dev/null +++ b/contrib/Rebus/sample.thrift @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +service BasicMathServer { + oneway void DoTheMath( 1: i32 arg1, 2: i32 arg2) + oneway void Ping(1: i64 value) +} + +service BasicMathClient { + oneway void ThreeResults( 1 : i32 added, 2 : i32 multiplied, 3 : i32 subtracted); + oneway void FourResults( 1 : i32 added, 2 : i32 multiplied, 3 : i32 subtracted, 4 : i32 divided); + oneway void Pong(1: i64 value) +}