| #include <tr1/functional> | 
 | #include "protocol/TBinaryProtocol.h" | 
 | #include "async/TAsyncProtocolProcessor.h" | 
 | #include "async/TEvhttpServer.h" | 
 | #include "async/TEvhttpClientChannel.h" | 
 | #include "Aggr.h" | 
 |  | 
 | using std::tr1::bind; | 
 | using std::tr1::placeholders::_1; | 
 |  | 
 | using apache::thrift::TException; | 
 | using apache::thrift::protocol::TBinaryProtocolFactory; | 
 | using apache::thrift::protocol::TProtocolFactory; | 
 | using apache::thrift::async::TEvhttpServer; | 
 | using apache::thrift::async::TAsyncProcessor; | 
 | using apache::thrift::async::TAsyncBufferProcessor; | 
 | using apache::thrift::async::TAsyncProtocolProcessor; | 
 | using apache::thrift::async::TAsyncChannel; | 
 | using apache::thrift::async::TEvhttpClientChannel; | 
 |  | 
 | class AggrAsyncHandler : public AggrCobSvIf { | 
 |  protected: | 
 |   struct RequestContext { | 
 |     std::tr1::function<void(std::vector<int32_t> const& _return)> cob; | 
 |     std::vector<int32_t> ret; | 
 |     int pending_calls; | 
 |   }; | 
 |  | 
 |  public: | 
 |   AggrAsyncHandler() | 
 |     : eb_(NULL) | 
 |     , pfact_(new TBinaryProtocolFactory()) | 
 |   { | 
 |     leaf_ports_.push_back(8081); | 
 |     leaf_ports_.push_back(8082); | 
 |   } | 
 |  | 
 |   void addValue(std::tr1::function<void()> cob, const int32_t value) { | 
 |     // Silently drop writes to the aggrgator. | 
 |     return cob(); | 
 |   } | 
 |  | 
 |   void getValues(std::tr1::function<void( | 
 |         std::vector<int32_t> const& _return)> cob, | 
 |       std::tr1::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob) { | 
 |     RequestContext* ctx = new RequestContext(); | 
 |     ctx->cob = cob; | 
 |     ctx->pending_calls = leaf_ports_.size(); | 
 |     for (std::vector<int>::iterator it = leaf_ports_.begin(); | 
 |         it != leaf_ports_.end(); ++it) { | 
 |       boost::shared_ptr<TAsyncChannel> channel( | 
 |           new TEvhttpClientChannel( | 
 |             "localhost", "/", "127.0.0.1", *it, eb_)); | 
 |       AggrCobClient* client = new AggrCobClient(channel, pfact_.get()); | 
 |       client->getValues(std::tr1::bind(&AggrAsyncHandler::clientReturn, this, ctx, _1)); | 
 |     } | 
 |   } | 
 |  | 
 |   void setEventBase(struct event_base* eb) { | 
 |     eb_ = eb; | 
 |   } | 
 |  | 
 |   void clientReturn(RequestContext* ctx, AggrCobClient* client) { | 
 |     ctx->pending_calls -= 1; | 
 |  | 
 |     try { | 
 |       std::vector<int32_t> subret; | 
 |       client->recv_getValues(subret); | 
 |       ctx->ret.insert(ctx->ret.end(), subret.begin(), subret.end()); | 
 |     } catch (TException& exn) { | 
 |       // TODO: Log error | 
 |     } | 
 |  | 
 |     delete client; | 
 |  | 
 |     if (ctx->pending_calls == 0) { | 
 |       ctx->cob(ctx->ret); | 
 |       delete ctx; | 
 |     } | 
 |   } | 
 |  | 
 |  protected: | 
 |   struct event_base* eb_; | 
 |   std::vector<int> leaf_ports_; | 
 |   boost::shared_ptr<TProtocolFactory> pfact_; | 
 | }; | 
 |  | 
 |  | 
 | int main() { | 
 |   boost::shared_ptr<AggrAsyncHandler> handler(new AggrAsyncHandler()); | 
 |   boost::shared_ptr<TAsyncProcessor> proc(new AggrAsyncProcessor(handler)); | 
 |   boost::shared_ptr<TProtocolFactory> pfact(new TBinaryProtocolFactory()); | 
 |   boost::shared_ptr<TAsyncBufferProcessor> bufproc(new TAsyncProtocolProcessor(proc, pfact)); | 
 |   boost::shared_ptr<TEvhttpServer> server(new TEvhttpServer(bufproc, 8080)); | 
 |   handler->setEventBase(server->getEventBase()); | 
 |   server->serve(); | 
 | } |