From: Jens Geyer Date: Tue, 22 Apr 2014 21:36:27 +0000 (+0200) Subject: THRIFT-2132 Go: Support for Multiplexing Services on any Transport, Protocol and... X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=751c97c2df228ab7d73f7add3a7b3f187aa5326a;p=common%2Fthrift.git THRIFT-2132 Go: Support for Multiplexing Services on any Transport, Protocol and Server Client: Go Patch: Aleksey Pesternikov --- diff --git a/lib/go/test/Makefile.am b/lib/go/test/Makefile.am index 30559421..ca8fa640 100644 --- a/lib/go/test/Makefile.am +++ b/lib/go/test/Makefile.am @@ -24,11 +24,13 @@ THRIFTTEST = $(top_srcdir)/test/ThriftTest.thrift gopath: $(top_srcdir)/compiler/cpp/thrift $(THRIFTTEST) \ IncludesTest.thrift \ NamespacedTest.thrift \ + MultiplexedProtocolTest.thrift \ OnewayTest.thrift mkdir -p gopath/src grep -v list.*map.*list.*map $(THRIFTTEST) > ThriftTest.thrift $(THRIFT) -r IncludesTest.thrift $(THRIFT) BinaryKeyTest.thrift + $(THRIFT) MultiplexedProtocolTest.thrift $(THRIFT) OnewayTest.thrift ln -nfs ../../../thrift gopath/src/thrift ln -nfs ../../tests gopath/src/tests diff --git a/lib/go/test/MultiplexedProtocolTest.thrift b/lib/go/test/MultiplexedProtocolTest.thrift new file mode 100644 index 00000000..0e21061c --- /dev/null +++ b/lib/go/test/MultiplexedProtocolTest.thrift @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +service First { + i64 returnOne(); +} + +service Second { + i64 returnTwo(); +} + diff --git a/lib/go/test/tests/multiplexed_protocol_test.go b/lib/go/test/tests/multiplexed_protocol_test.go new file mode 100644 index 00000000..ee14ee89 --- /dev/null +++ b/lib/go/test/tests/multiplexed_protocol_test.go @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package tests + +import ( + "MuliplexedProtocolTest" + "net" + "testing" + "thrift" + "time" +) + +func FindAvailableTCPServerPort() net.Addr { + if l, err := net.Listen("tcp", "127.0.0.1:0"); err != nil { + panic("Could not find available server port") + } else { + defer l.Close() + return l.Addr() + } +} + +type FirstImpl struct{} + +func (f *FirstImpl) ReturnOne() (r int64, err error) { + return 1, nil +} + +type SecondImpl struct{} + +func (s *SecondImpl) ReturnTwo() (r int64, err error) { + return 2, nil +} + +const TIMEOUT = time.Second + +var addr net.Addr +var server *thrift.TSimpleServer + +var processor = thrift.NewTMultiplexedProcessor() + +func TestInitTwoServers(t *testing.T) { + var err error + protocolFactory := thrift.NewTBinaryProtocolFactoryDefault() + transportFactory := thrift.NewTTransportFactory() + transportFactory = thrift.NewTFramedTransportFactory(transportFactory) + addr = FindAvailableTCPServerPort() + serverTransport, err := thrift.NewTServerSocketTimeout(addr.String(), TIMEOUT) + if err != nil { + t.Fatal("Unable to create server socket", err) + } + server = thrift.NewTSimpleServer4(processor, serverTransport, transportFactory, protocolFactory) + + firstProcessor := MuliplexedProtocolTest.NewFirstProcessor(&FirstImpl{}) + processor.RegisterProcessor("FirstService", firstProcessor) + + secondProcessor := MuliplexedProtocolTest.NewSecondProcessor(&SecondImpl{}) + processor.RegisterProcessor("SecondService", secondProcessor) + + go server.Serve() +} + +var firstClient *MuliplexedProtocolTest.FirstClient + +func TestInitClient1(t *testing.T) { + socket := thrift.NewTSocketFromAddrTimeout(addr, TIMEOUT) + transport := thrift.NewTFramedTransport(socket) + var protocol thrift.TProtocol = thrift.NewTBinaryProtocolTransport(transport) + protocol = thrift.NewTMultiplexedProtocol(protocol, "FirstService") + firstClient = MuliplexedProtocolTest.NewFirstClientProtocol(transport, protocol, protocol) + err := transport.Open() + if err != nil { + t.Fatal("Unable to open client socket", err) + } +} + +var secondClient *MuliplexedProtocolTest.SecondClient + +func TestInitClient2(t *testing.T) { + socket := thrift.NewTSocketFromAddrTimeout(addr, TIMEOUT) + transport := thrift.NewTFramedTransport(socket) + var protocol thrift.TProtocol = thrift.NewTBinaryProtocolTransport(transport) + protocol = thrift.NewTMultiplexedProtocol(protocol, "SecondService") + secondClient = MuliplexedProtocolTest.NewSecondClientProtocol(transport, protocol, protocol) + err := transport.Open() + if err != nil { + t.Fatal("Unable to open client socket", err) + } +} + +//create client without service prefix +func createLegacyClient(t *testing.T) *MuliplexedProtocolTest.SecondClient { + socket := thrift.NewTSocketFromAddrTimeout(addr, TIMEOUT) + transport := thrift.NewTFramedTransport(socket) + var protocol thrift.TProtocol = thrift.NewTBinaryProtocolTransport(transport) + legacyClient := MuliplexedProtocolTest.NewSecondClientProtocol(transport, protocol, protocol) + err := transport.Open() + if err != nil { + t.Fatal("Unable to open client socket", err) + } + return legacyClient +} + +func TestCallFirst(t *testing.T) { + ret, err := firstClient.ReturnOne() + if err != nil { + t.Fatal("Unable to call first server:", err) + } + if ret != 1 { + t.Fatal("Unexpected result from server: ", ret) + } +} + +func TestCallSecond(t *testing.T) { + ret, err := secondClient.ReturnTwo() + if err != nil { + t.Fatal("Unable to call second server:", err) + } + if ret != 2 { + t.Fatal("Unexpected result from server: ", ret) + } +} + +func TestCallLegacy(t *testing.T) { + legacyClient := createLegacyClient(t) + ret, err := legacyClient.ReturnTwo() + //expect error since default processor is not registered + if err == nil { + t.Fatal("Expecting error") + } + //register default processor and call again + processor.RegisterDefault(MuliplexedProtocolTest.NewSecondProcessor(&SecondImpl{})) + legacyClient = createLegacyClient(t) + ret, err = legacyClient.ReturnTwo() + if err != nil { + t.Fatal("Unable to call legacy server:", err) + } + if ret != 2 { + t.Fatal("Unexpected result from server: ", ret) + } +} + +func TestShutdownServerAndClients(t *testing.T) { + firstClient.Transport.Close() + secondClient.Transport.Close() + server.Stop() +} diff --git a/lib/go/thrift/multiplexed_protocol.go b/lib/go/thrift/multiplexed_protocol.go new file mode 100644 index 00000000..3157e0d5 --- /dev/null +++ b/lib/go/thrift/multiplexed_protocol.go @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package thrift + +import ( + "fmt" + "strings" +) + +/* +TMultiplexedProtocol is a protocol-independent concrete decorator +that allows a Thrift client to communicate with a multiplexing Thrift server, +by prepending the service name to the function name during function calls. + +NOTE: THIS IS NOT USED BY SERVERS. On the server, use TMultiplexedProcessor to handle request +from a multiplexing client. + +This example uses a single socket transport to invoke two services: + +socket := thrift.NewTSocketFromAddrTimeout(addr, TIMEOUT) +transport := thrift.NewTFramedTransport(socket) +protocol := thrift.NewTBinaryProtocolTransport(transport) + +mp := thrift.NewTMultiplexedProtocol(protocol, "Calculator") +service := Calculator.NewCalculatorClient(mp) + +mp2 := thrift.NewTMultiplexedProtocol(protocol, "WeatherReport") +service2 := WeatherReport.NewWeatherReportClient(mp2) + +err := transport.Open() +if err != nil { + t.Fatal("Unable to open client socket", err) +} + +fmt.Println(service.Add(2,2)) +fmt.Println(service2.GetTemperature()) +*/ + +type TMultiplexedProtocol struct { + TProtocol + serviceName string +} + +const MULTIPLEXED_SEPARATOR = ":" + +func NewTMultiplexedProtocol(protocol TProtocol, serviceName string) *TMultiplexedProtocol { + return &TMultiplexedProtocol{ + TProtocol: protocol, + serviceName: serviceName, + } +} + +func (t *TMultiplexedProtocol) WriteMessageBegin(name string, typeId TMessageType, seqid int32) error { + if typeId == CALL || typeId == ONEWAY { + return t.TProtocol.WriteMessageBegin(t.serviceName+MULTIPLEXED_SEPARATOR+name, typeId, seqid) + } else { + return t.TProtocol.WriteMessageBegin(name, typeId, seqid) + } +} + +/* +TMultiplexedProcessor is a TProcessor allowing +a single TServer to provide multiple services. + +To do so, you instantiate the processor and then register additional +processors with it, as shown in the following example: + +var processor = thrift.NewTMultiplexedProcessor() + +firstProcessor := +processor.RegisterProcessor("FirstService", firstProcessor) + +processor.registerProcessor( + "Calculator", + Calculator.NewCalculatorProcessor(&CalculatorHandler{}), +) + +processor.registerProcessor( + "WeatherReport", + WeatherReport.NewWeatherReportProcessor(&WeatherReportHandler{}), +) + +serverTransport, err := thrift.NewTServerSocketTimeout(addr, TIMEOUT) +if err != nil { + t.Fatal("Unable to create server socket", err) +} +server := thrift.NewTSimpleServer2(processor, serverTransport) +server.Serve(); +*/ + +type TMultiplexedProcessor struct { + serviceProcessorMap map[string]TProcessor + DefaultProcessor TProcessor +} + +func NewTMultiplexedProcessor() *TMultiplexedProcessor { + return &TMultiplexedProcessor{ + serviceProcessorMap: make(map[string]TProcessor), + } +} + +func (t *TMultiplexedProcessor) RegisterDefault(processor TProcessor) { + t.DefaultProcessor = processor +} + +func (t *TMultiplexedProcessor) RegisterProcessor(name string, processor TProcessor) { + if t.serviceProcessorMap == nil { + t.serviceProcessorMap = make(map[string]TProcessor) + } + t.serviceProcessorMap[name] = processor +} + +func (t *TMultiplexedProcessor) Process(in, out TProtocol) (bool, TException) { + name, typeId, seqid, err := in.ReadMessageBegin() + if err != nil { + return false, err + } + if typeId != CALL && typeId != ONEWAY { + return false, fmt.Errorf("Unexpected message type %v", typeId) + } + //extract the service name + v := strings.SplitN(name, MULTIPLEXED_SEPARATOR, 2) + if len(v) != 2 { + if t.DefaultProcessor != nil { + smb := NewStoredMessageProtocol(in, name, typeId, seqid) + return t.DefaultProcessor.Process(smb, out) + } + return false, fmt.Errorf("Service name not found in message name: %s. Did you forget to use a TMultiplexProtocol in your client?", name) + } + actualProcessor, ok := t.serviceProcessorMap[v[0]] + if !ok { + return false, fmt.Errorf("Service name not found: %s. Did you forget to call registerProcessor()?", v[0]) + } + smb := NewStoredMessageProtocol(in, v[1], typeId, seqid) + return actualProcessor.Process(smb, out) +} + +//Protocol that use stored message for ReadMessageBegin +type storedMessageProtocol struct { + TProtocol + name string + typeId TMessageType + seqid int32 +} + +func NewStoredMessageProtocol(protocol TProtocol, name string, typeId TMessageType, seqid int32) *storedMessageProtocol { + return &storedMessageProtocol{protocol, name, typeId, seqid} +} + +func (s *storedMessageProtocol) ReadMessageBegin() (name string, typeId TMessageType, seqid int32, err error) { + return s.name, s.typeId, s.seqid, nil +}