From: David Reiss Date: Tue, 31 Aug 2010 16:58:41 +0000 (+0000) Subject: THRIFT-812. contrib: Add a demo of using Thrift over ZeroMQ X-Git-Tag: 0.5.0~83 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=9f3296bca00927ec5bac7ccdecdf2fbd68be9744;p=common%2Fthrift.git THRIFT-812. contrib: Add a demo of using Thrift over ZeroMQ git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@991260 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/contrib/zeromq/Makefile b/contrib/zeromq/Makefile new file mode 100644 index 00000000..a1d71568 --- /dev/null +++ b/contrib/zeromq/Makefile @@ -0,0 +1,37 @@ +THRIFT = thrift +CXXFLAGS = `pkg-config --cflags libzmq thrift` +LDLIBS = `pkg-config --libs libzmq thrift` + +CXXFLAGS += -g3 -O0 + +GENNAMES = Storage storage_types +GENHEADERS = $(addsuffix .h, $(GENNAMES)) +GENSRCS = $(addsuffix .cpp, $(GENNAMES)) +GENOBJS = $(addsuffix .o, $(GENNAMES)) + +PYLIBS = storage/__init__.py + +PROGS = test-client test-server test-sender test-receiver + +all: $(PYLIBS) $(PROGS) + +test-client: test-client.o TZmqClient.o $(GENOBJS) +test-server: test-server.o TZmqServer.o $(GENOBJS) +test-sender: test-sender.o TZmqClient.o $(GENOBJS) +test-receiver: test-receiver.o TZmqServer.o $(GENOBJS) + +test-client.o test-server.o test-sender.o test-receiver.o: $(GENSRCS) + +storage/__init__.py: storage.thrift + $(RM) $(dir $@) + $(THRIFT) --gen py:newstyle $< + mv gen-py/$(dir $@) . + +$(GENSRCS): storage.thrift + $(THRIFT) --gen cpp $< + mv $(addprefix gen-cpp/, $(GENSRCS) $(GENHEADERS)) . + +clean: + $(RM) -r *.o $(PROGS) storage $(GENSRCS) $(GENHEADERS) + +.PHONY: clean diff --git a/contrib/zeromq/README b/contrib/zeromq/README new file mode 100644 index 00000000..9e0b5bd3 --- /dev/null +++ b/contrib/zeromq/README @@ -0,0 +1,30 @@ +This directory contains some glue code to allow Thrift RPCs to be sent over +ZeroMQ. Included are client and server implementations for Python and C++, +along with a simple demo interface (with a working client and server for +each language). + +Thrift was designed for stream-based interfaces like TCP, but ZeroMQ is +message-based, so there is a small impedance mismatch. Most of issues are +hidden from developers, but one cannot be: oneway methods have to be handled +differently from normal ones. ZeroMQ requires the messaging pattern to be +declared at socket creation time, so an application cannot decide on a +message-by-message basis whether to send a reply. Therefore, this +implementation makes it the client's responsibility to ensure that ZMQ_REQ +sockets are used for normal methods and ZMQ_DOWNSTREAM sockets are used for +oneway methods. In addition, services that expose both types of methods +have to expose two servers (on two ports), but the TZmqMultiServer makes it +easy to run the two together in the same thread. + +This code was tested with ZeroMQ 2.0.7 and pyzmq afabbb5b9bd3. + +To build, simply install Thrift and ZeroMQ, then run "make". If you install +in a non-standard location, make sure to set THRIFT to the location of the +Thrift code generator on the make command line and PKG_CONFIG_PATH to a path +that includes the pkgconfig files for both Thrift and ZeroMQ. The test +servers take no arguments. Run the test clients with no arguments to +retrieve the stored value or with an integer argument to increment it by +that amount. + +This code is not quite what I would consider production-ready. It doesn't +support all of the normal hooks into Thrift, and its performance is +sub-optimal because it does some unnecessary copying. diff --git a/contrib/zeromq/TZmqClient.cpp b/contrib/zeromq/TZmqClient.cpp new file mode 100644 index 00000000..133204e3 --- /dev/null +++ b/contrib/zeromq/TZmqClient.cpp @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "TZmqClient.h" +#include + +namespace apache { namespace thrift { namespace transport { + +uint32_t TZmqClient::read(uint8_t* buf, uint32_t len) { + if (rbuf_.available_read() == 0) { + (void)sock_.recv(&msg_); + rbuf_.resetBuffer((uint8_t*)msg_.data(), msg_.size()); + } + return rbuf_.read(buf, len); +} + +void TZmqClient::write(const uint8_t* buf, uint32_t len) { + return wbuf_.write(buf, len); +} + +void TZmqClient::writeEnd() { + uint8_t* buf; + uint32_t size; + wbuf_.getBuffer(&buf, &size); + zmq::message_t msg(size); + std::memcpy(msg.data(), buf, size); + (void)sock_.send(msg); + wbuf_.resetBuffer(true); +} + +}}} // apache::thrift::transport diff --git a/contrib/zeromq/TZmqClient.h b/contrib/zeromq/TZmqClient.h new file mode 100644 index 00000000..9544503a --- /dev/null +++ b/contrib/zeromq/TZmqClient.h @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_TRANSPORT_TZMQCLIENT_H_ +#define _THRIFT_TRANSPORT_TZMQCLIENT_H_ 1 + +#include +#include + +namespace apache { namespace thrift { namespace transport { + +class TZmqClient : public TTransport { + public: + TZmqClient(zmq::context_t& ctx, const std::string& endpoint, int type) + : sock_(ctx, type) + , endpoint_(endpoint) + , wbuf_() + , rbuf_() + , msg_() + , zmq_type_(type) + {} + + void open() { + if(zmq_type_ == ZMQ_PUB) { + sock_.bind(endpoint_.c_str()); + } + else { + sock_.connect(endpoint_.c_str()); + } + } + + uint32_t read(uint8_t* buf, uint32_t len); + + void write(const uint8_t* buf, uint32_t len); + + void writeEnd(); + + protected: + std::string endpoint_; + zmq::socket_t sock_; + TMemoryBuffer wbuf_; + TMemoryBuffer rbuf_; + zmq::message_t msg_; + int zmq_type_; +}; + +}}} // apache::thrift::transport + +#endif // #ifndef _THRIFT_TRANSPORT_TZMQCLIENT_H_ diff --git a/contrib/zeromq/TZmqClient.py b/contrib/zeromq/TZmqClient.py new file mode 100644 index 00000000..d5606973 --- /dev/null +++ b/contrib/zeromq/TZmqClient.py @@ -0,0 +1,63 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import zmq +from cStringIO import StringIO +from thrift.transport.TTransport import TTransportBase, CReadableTransport + +class TZmqClient(TTransportBase, CReadableTransport): + def __init__(self, ctx, endpoint, sock_type): + self._sock = ctx.socket(sock_type) + self._endpoint = endpoint + self._wbuf = StringIO() + self._rbuf = StringIO() + + def open(self): + self._sock.connect(self._endpoint) + + def read(self, size): + ret = self._rbuf.read(size) + if len(ret) != 0: + return ret + self._read_message() + return self._rbuf.read(size) + + def _read_message(self): + msg = self._sock.recv() + self._rbuf = StringIO(msg) + + def write(self, buf): + self._wbuf.write(buf) + + def flush(self): + msg = self._wbuf.getvalue() + self._wbuf = StringIO() + self._sock.send(msg) + + # Implement the CReadableTransport interface. + @property + def cstringio_buf(self): + return self._rbuf + + # NOTE: This will probably not actually work. + def cstringio_refill(self, prefix, reqlen): + while len(prefix) < reqlen: + self.read_message() + prefix += self._rbuf.getvalue() + self._rbuf = StringIO(prefix) + return self._rbuf diff --git a/contrib/zeromq/TZmqServer.cpp b/contrib/zeromq/TZmqServer.cpp new file mode 100644 index 00000000..c6142d7c --- /dev/null +++ b/contrib/zeromq/TZmqServer.cpp @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "TZmqServer.h" +#include +#include + +using boost::shared_ptr; +using apache::thrift::transport::TMemoryBuffer; +using apache::thrift::protocol::TProtocol; + +namespace apache { namespace thrift { namespace server { + + +bool TZmqServer::serveOne(int recv_flags) { + zmq::message_t msg; + bool received = sock_.recv(&msg, recv_flags); + if (!received) { + return false; + } + shared_ptr inputTransport(new TMemoryBuffer((uint8_t*)msg.data(), msg.size())); + shared_ptr outputTransport(new TMemoryBuffer()); + shared_ptr inputProtocol( + inputProtocolFactory_->getProtocol(inputTransport)); + shared_ptr outputProtocol( + outputProtocolFactory_->getProtocol(outputTransport)); + + processor_->process(inputProtocol, outputProtocol); + + if (zmq_type_ == ZMQ_REP) { + uint8_t* buf; + uint32_t size; + outputTransport->getBuffer(&buf, &size); + msg.rebuild(size); + std::memcpy(msg.data(), buf, size); + (void)sock_.send(msg); + } + + return true; +} + + +void TZmqMultiServer::serveOne(long timeout) { + boost::scoped_ptr items(setupPoll()); + serveActive(items.get(), timeout); +} + + +void TZmqMultiServer::serveForever() { + boost::scoped_ptr items(setupPoll()); + while (true) { + serveActive(items.get(), -1); + } +} + + +zmq::pollitem_t* TZmqMultiServer::setupPoll() { + zmq::pollitem_t* items = new zmq::pollitem_t[servers_.size()]; + for (int i = 0; i < servers_.size(); ++i) { + items[i].socket = servers_[i]->getSocket(); + items[i].events = ZMQ_POLLIN; + } + return items; +} + +void TZmqMultiServer::serveActive(zmq::pollitem_t* items, long timeout) { + int rc = zmq::poll(items, servers_.size(), timeout); + if (rc == 0) { + return; + } + for (int i = 0; i < servers_.size(); ++i) { + if ((items[i].revents & ZMQ_POLLIN) != 0) { + // Should we pass ZMQ_NOBLOCK here to be safe? + servers_[i]->serveOne(); + } + } +} + + +}}} // apache::thrift::server diff --git a/contrib/zeromq/TZmqServer.h b/contrib/zeromq/TZmqServer.h new file mode 100644 index 00000000..1603eacf --- /dev/null +++ b/contrib/zeromq/TZmqServer.h @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_SERVER_TZMQSERVER_H_ +#define _THRIFT_SERVER_TZMQSERVER_H_ 1 + +#include +#include + +namespace apache { namespace thrift { namespace server { + +class TZmqServer : public TServer { + public: + TZmqServer( + boost::shared_ptr processor, + zmq::context_t& ctx, const std::string& endpoint, int type) + : TServer(processor) + , zmq_type_(type) + , sock_(ctx, type) + { + if(zmq_type_ == ZMQ_SUB) { + sock_.setsockopt(ZMQ_SUBSCRIBE, "", 0) ; // listen to all messages + sock_.connect(endpoint.c_str()) ; + } + else { + sock_.bind(endpoint.c_str()); + } + } + + bool serveOne(int recv_flags = 0); + void serve() { + while (true) { + serveOne(); + } + } + + zmq::socket_t& getSocket() { + return sock_; + } + + private: + int zmq_type_; + zmq::socket_t sock_; +}; + + +class TZmqMultiServer { + public: + void serveOne(long timeout = -1); + void serveForever(); + + std::vector& servers() { + return servers_; + } + + private: + zmq::pollitem_t* setupPoll(); + void serveActive(zmq::pollitem_t* items, long timeout); + std::vector servers_; +}; + + +}}} // apache::thrift::server + +#endif // #ifndef _THRIFT_SERVER_TZMQSERVER_H_ diff --git a/contrib/zeromq/TZmqServer.py b/contrib/zeromq/TZmqServer.py new file mode 100644 index 00000000..c83cc8d5 --- /dev/null +++ b/contrib/zeromq/TZmqServer.py @@ -0,0 +1,78 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import logging +import zmq +import thrift.server.TServer +import thrift.transport.TTransport + +class TZmqServer(thrift.server.TServer.TServer): + def __init__(self, processor, ctx, endpoint, sock_type): + thrift.server.TServer.TServer.__init__(self, processor, None) + self.zmq_type = sock_type + self.socket = ctx.socket(sock_type) + self.socket.bind(endpoint) + + def serveOne(self): + msg = self.socket.recv() + itrans = thrift.transport.TTransport.TMemoryBuffer(msg) + otrans = thrift.transport.TTransport.TMemoryBuffer() + iprot = self.inputProtocolFactory.getProtocol(itrans) + oprot = self.outputProtocolFactory.getProtocol(otrans) + + try: + self.processor.process(iprot, oprot) + except Exception: + logging.exception("Exception while processing request") + # Fall through and send back a response, even if empty or incomplete. + + if self.zmq_type == zmq.REP: + msg = otrans.getvalue() + self.socket.send(msg) + + def serve(self): + while True: + self.serveOne() + + +class TZmqMultiServer(object): + def __init__(self): + self.servers = [] + + def serveOne(self, timeout = -1): + self._serveActive(self._setupPoll(), timeout) + + def serveForever(self): + poll_info = self._setupPoll() + while True: + self._serveActive(poll_info, -1) + + def _setupPoll(self): + server_map = {} + poller = zmq.Poller() + for server in self.servers: + server_map[server.socket] = server + poller.register(server.socket, zmq.POLLIN) + return (server_map, poller) + + def _serveActive(self, poll_info, timeout): + (server_map, poller) = poll_info + ready = dict(poller.poll()) + for sock, state in ready.items(): + assert (state & zmq.POLLIN) != 0 + server_map[sock].serveOne() diff --git a/contrib/zeromq/storage.thrift b/contrib/zeromq/storage.thrift new file mode 100644 index 00000000..a1ea9675 --- /dev/null +++ b/contrib/zeromq/storage.thrift @@ -0,0 +1,4 @@ +service Storage { + oneway void incr(1: i32 amount); + i32 get(); +} diff --git a/contrib/zeromq/test-client.cpp b/contrib/zeromq/test-client.cpp new file mode 100644 index 00000000..64e20f69 --- /dev/null +++ b/contrib/zeromq/test-client.cpp @@ -0,0 +1,40 @@ +#include +#include +#include + +#include "zmq.hpp" +#include "TZmqClient.h" +#include "Storage.h" + +using boost::shared_ptr; +using apache::thrift::transport::TZmqClient; +using apache::thrift::protocol::TBinaryProtocol; + +int main(int argc, char** argv) { + const char* endpoint = "tcp://127.0.0.1:9090"; + int socktype = ZMQ_REQ; + int incr = 0; + if (argc > 1) { + incr = atoi(argv[1]); + if (incr) { + socktype = ZMQ_DOWNSTREAM; + endpoint = "tcp://127.0.0.1:9091"; + } + } + + zmq::context_t ctx(1); + shared_ptr transport(new TZmqClient(ctx, endpoint, socktype)); + shared_ptr protocol(new TBinaryProtocol(transport)); + StorageClient client(protocol); + transport->open(); + + if (incr) { + client.incr(incr); + usleep(50000); + } else { + int value = client.get(); + std::cout << value << std::endl; + } + + return 0; +} diff --git a/contrib/zeromq/test-client.py b/contrib/zeromq/test-client.py new file mode 100755 index 00000000..1886d9ca --- /dev/null +++ b/contrib/zeromq/test-client.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python +import sys +import time +import zmq +import TZmqClient +import thrift.protocol.TBinaryProtocol +import storage.ttypes +import storage.Storage + + +def main(args): + endpoint = "tcp://127.0.0.1:9090" + socktype = zmq.REQ + incr = 0 + if len(args) > 1: + incr = int(args[1]) + if incr: + socktype = zmq.DOWNSTREAM + endpoint = "tcp://127.0.0.1:9091" + + ctx = zmq.Context() + transport = TZmqClient.TZmqClient(ctx, endpoint, socktype) + protocol = thrift.protocol.TBinaryProtocol.TBinaryProtocolAccelerated(transport) + client = storage.Storage.Client(protocol) + transport.open() + + if incr: + client.incr(incr) + time.sleep(0.05) + else: + value = client.get() + print value + + +if __name__ == "__main__": + main(sys.argv) diff --git a/contrib/zeromq/test-receiver.cpp b/contrib/zeromq/test-receiver.cpp new file mode 100644 index 00000000..8fe69da9 --- /dev/null +++ b/contrib/zeromq/test-receiver.cpp @@ -0,0 +1,40 @@ +#include "zmq.hpp" +#include "TZmqServer.h" +#include "Storage.h" + +using boost::shared_ptr; +using apache::thrift::TProcessor; +using apache::thrift::server::TZmqServer; +using apache::thrift::server::TZmqMultiServer; + +class StorageHandler : virtual public StorageIf { + public: + StorageHandler() + : value_(0) + {} + + void incr(const int32_t amount) { + value_ += amount; + printf("value_: %i\n", value_) ; + } + + int32_t get() { + return value_; + } + + private: + int32_t value_; + +}; + + +int main(int argc, char *argv[]) { + shared_ptr handler(new StorageHandler()); + shared_ptr processor(new StorageProcessor(handler)); + + zmq::context_t ctx(1); + TZmqServer oneway_server(processor, ctx, "epgm://eth0;239.192.1.1:5555", ZMQ_SUB); + oneway_server.serve(); + + return 0; +} diff --git a/contrib/zeromq/test-sender.cpp b/contrib/zeromq/test-sender.cpp new file mode 100644 index 00000000..ca05709d --- /dev/null +++ b/contrib/zeromq/test-sender.cpp @@ -0,0 +1,32 @@ +#include +#include +#include + +#include "zmq.hpp" +#include "TZmqClient.h" +#include "Storage.h" + +using boost::shared_ptr; +using apache::thrift::transport::TZmqClient; +using apache::thrift::protocol::TBinaryProtocol; + +int main(int argc, char** argv) { + const char* endpoint = "epgm://eth0;239.192.1.1:5555"; + int socktype = ZMQ_PUB; + int incr = 1; + if (argc > 1) { + incr = atoi(argv[1]); + } + + zmq::context_t ctx(1); + shared_ptr transport(new TZmqClient(ctx, endpoint, socktype)); + shared_ptr protocol(new TBinaryProtocol(transport)); + StorageClient client(protocol); + + transport->open(); + + client.incr(incr); + usleep(50000); + + return 0; +} diff --git a/contrib/zeromq/test-server.cpp b/contrib/zeromq/test-server.cpp new file mode 100644 index 00000000..c624b0d8 --- /dev/null +++ b/contrib/zeromq/test-server.cpp @@ -0,0 +1,43 @@ +#include "zmq.hpp" +#include "TZmqServer.h" +#include "Storage.h" + +using boost::shared_ptr; +using apache::thrift::TProcessor; +using apache::thrift::server::TZmqServer; +using apache::thrift::server::TZmqMultiServer; + +class StorageHandler : virtual public StorageIf { + public: + StorageHandler() + : value_(0) + {} + + void incr(const int32_t amount) { + value_ += amount; + } + + int32_t get() { + return value_; + } + + private: + int32_t value_; + +}; + + +int main(int argc, char *argv[]) { + shared_ptr handler(new StorageHandler()); + shared_ptr processor(new StorageProcessor(handler)); + + zmq::context_t ctx(1); + TZmqServer reqrep_server(processor, ctx, "tcp://0.0.0.0:9090", ZMQ_REP); + TZmqServer oneway_server(processor, ctx, "tcp://0.0.0.0:9091", ZMQ_UPSTREAM); + TZmqMultiServer multiserver; + multiserver.servers().push_back(&reqrep_server); + multiserver.servers().push_back(&oneway_server); + multiserver.serveForever(); + + return 0; +} diff --git a/contrib/zeromq/test-server.py b/contrib/zeromq/test-server.py new file mode 100755 index 00000000..5767b71f --- /dev/null +++ b/contrib/zeromq/test-server.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python +import zmq +import TZmqServer +import storage.ttypes +import storage.Storage + + +class StorageHandler(storage.Storage.Iface): + def __init__(self): + self.value = 0 + + def incr(self, amount): + self.value += amount + + def get(self): + return self.value + + +def main(): + handler = StorageHandler() + processor = storage.Storage.Processor(handler) + + ctx = zmq.Context() + reqrep_server = TZmqServer.TZmqServer(processor, ctx, "tcp://0.0.0.0:9090", zmq.REP) + oneway_server = TZmqServer.TZmqServer(processor, ctx, "tcp://0.0.0.0:9091", zmq.UPSTREAM) + multiserver = TZmqServer.TZmqMultiServer() + multiserver.servers.append(reqrep_server) + multiserver.servers.append(oneway_server) + multiserver.serveForever() + + +if __name__ == "__main__": + main()