From 552440e6e522499d974800c98c5f4dd869dc29c7 Mon Sep 17 00:00:00 2001 From: Jens Geyer Date: Thu, 21 Mar 2013 19:55:27 +0200 Subject: [PATCH] THRIFT-1890 C++: Make named pipes server work asynchronously Patch: Jens Geyer & Ben Craig --- lib/cpp/src/thrift/transport/TPipeServer.cpp | 127 ++++++++++++++----- lib/cpp/src/thrift/transport/TPipeServer.h | 3 + 2 files changed, 101 insertions(+), 29 deletions(-) diff --git a/lib/cpp/src/thrift/transport/TPipeServer.cpp b/lib/cpp/src/thrift/transport/TPipeServer.cpp index f4d47048..b11d22f3 100644 --- a/lib/cpp/src/thrift/transport/TPipeServer.cpp +++ b/lib/cpp/src/thrift/transport/TPipeServer.cpp @@ -42,17 +42,22 @@ TPipeServer::TPipeServer(const std::string &pipename, uint32_t bufsize) : pipename_(pipename), bufsize_(bufsize), Pipe_(INVALID_HANDLE_VALUE), + wakeup(INVALID_HANDLE_VALUE), maxconns_(TPIPE_SERVER_MAX_CONNS_DEFAULT), - isAnonymous(false) + isAnonymous(false), + stop_(false) { setPipename(pipename); + createWakeupEvent(); } TPipeServer::TPipeServer(const std::string &pipename, uint32_t bufsize, uint32_t maxconnections) : pipename_(pipename), bufsize_(bufsize), Pipe_(INVALID_HANDLE_VALUE), - isAnonymous(false) + wakeup(INVALID_HANDLE_VALUE), + isAnonymous(false), + stop_(false) { //Restrict maxconns_ to 1-PIPE_UNLIMITED_INSTANCES if(maxconnections == 0) maxconns_ = 1; @@ -62,24 +67,30 @@ TPipeServer::TPipeServer(const std::string &pipename, uint32_t bufsize, uint32_t maxconns_ = maxconnections; setPipename(pipename); + createWakeupEvent(); } TPipeServer::TPipeServer(const std::string &pipename) : pipename_(pipename), bufsize_(1024), Pipe_(INVALID_HANDLE_VALUE), + wakeup(INVALID_HANDLE_VALUE), maxconns_(TPIPE_SERVER_MAX_CONNS_DEFAULT), - isAnonymous(false) + isAnonymous(false), + stop_(false) { setPipename(pipename); + createWakeupEvent(); } TPipeServer::TPipeServer(int bufsize) : pipename_(""), bufsize_(bufsize), Pipe_(INVALID_HANDLE_VALUE), + wakeup(INVALID_HANDLE_VALUE), maxconns_(1), - isAnonymous(true) + isAnonymous(true), + stop_(false) { //The anonymous pipe needs to be created first so that the server can //pass the handles on to the client before the serve (acceptImpl) @@ -88,24 +99,30 @@ TPipeServer::TPipeServer(int bufsize) : GlobalOutput.perror("TPipeServer Create(Anon)Pipe failed, GLE=", GetLastError()); throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer Create(Anon)Pipe failed"); } + createWakeupEvent(); } TPipeServer::TPipeServer() : pipename_(""), bufsize_(1024), Pipe_(INVALID_HANDLE_VALUE), + wakeup(INVALID_HANDLE_VALUE), maxconns_(1), - isAnonymous(true) + isAnonymous(true), + stop_(false) { if (!TCreateAnonPipe()) { GlobalOutput.perror("TPipeServer Create(Anon)Pipe failed, GLE=", GetLastError()); throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer Create(Anon)Pipe failed"); } + createWakeupEvent(); } //---- Destructor ---- TPipeServer::~TPipeServer() { close(); + CloseHandle( wakeup); + wakeup = INVALID_HANDLE_VALUE; } //--------------------------------------------------------- @@ -115,6 +132,8 @@ TPipeServer::~TPipeServer() { shared_ptr TPipeServer::acceptImpl() { shared_ptr client; + stop_ = FALSE; + if(isAnonymous) { //Anonymous Pipe //This 0-byte read serves merely as a blocking call. @@ -131,37 +150,79 @@ shared_ptr TPipeServer::acceptImpl() { GlobalOutput.perror("TPipeServer unable to initiate pipe comms, GLE=", GetLastError()); throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer unable to initiate pipe comms"); } - client.reset(new TPipe(Pipe_, PipeW_)); + client.reset(new TPipe(Pipe_, PipeW_)); } else { //Named Pipe - int ConnectRet; - while (true) - { - if (!TCreateNamedPipe()) { - GlobalOutput.perror("TPipeServer CreateNamedPipe failed, GLE=", GetLastError()); - throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer CreateNamedPipe failed"); - } + if (!TCreateNamedPipe()) { + GlobalOutput.perror("TPipeServer CreateNamedPipe failed, GLE=", GetLastError()); + throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer CreateNamedPipe failed"); + } - // Wait for the client to connect; if it succeeds, the - // function returns a nonzero value. If the function returns - // zero, GetLastError should return ERROR_PIPE_CONNECTED. - ConnectRet = ConnectNamedPipe(Pipe_, NULL) ? - TRUE : (GetLastError() == ERROR_PIPE_CONNECTED); + struct TEventCleaner { + HANDLE hEvent; + ~TEventCleaner() {CloseHandle(hEvent);} + }; - if (ConnectRet == TRUE) - { - GlobalOutput.printf("Client connected."); - break; - } - else + OVERLAPPED overlapped; + memset( &overlapped, 0, sizeof(overlapped)); + overlapped.hEvent = CreateEvent( NULL, TRUE, FALSE, NULL); + { + TEventCleaner cleaner = {overlapped.hEvent}; + while( ! stop_) { - close(); - GlobalOutput.perror("TPipeServer ConnectNamedPipe GLE=", GetLastError()); - throw TTransportException(TTransportException::NOT_OPEN, "TPipeServer: client connection failed"); + // Wait for the client to connect; if it succeeds, the + // function returns a nonzero value. If the function returns + // zero, GetLastError should return ERROR_PIPE_CONNECTED. + if( ConnectNamedPipe(Pipe_, &overlapped)) + { + GlobalOutput.printf("Client connected."); + client.reset(new TPipe(Pipe_)); + return client; + } + + DWORD dwErr = GetLastError(); + HANDLE events[2] = {overlapped.hEvent, wakeup}; + switch( dwErr) + { + case ERROR_PIPE_CONNECTED: + GlobalOutput.printf("Client connected."); + client.reset(new TPipe(Pipe_)); + return client; + + case ERROR_IO_PENDING: + DWORD dwWait, dwDummy; + dwWait = WaitForMultipleObjects( 2, events, FALSE, 3000); + switch(dwWait) + { + case WAIT_OBJECT_0: + if(GetOverlappedResult(Pipe_, &overlapped, &dwDummy, TRUE)) + { + GlobalOutput.printf("Client connected."); + client.reset(new TPipe(Pipe_)); + return client; + } + break; + case WAIT_OBJECT_0 + 1: + stop_ = TRUE; + break; + default: + break; + } + break; + + default: + break; + } + + CancelIo(Pipe_); + DisconnectNamedPipe(Pipe_); } + + close(); + GlobalOutput.perror("TPipeServer ConnectNamedPipe GLE=", GetLastError()); + throw TTransportException(TTransportException::NOT_OPEN, "TPipeServer: client connection failed"); } - client.reset(new TPipe(Pipe_)); } return client; @@ -169,7 +230,9 @@ shared_ptr TPipeServer::acceptImpl() { void TPipeServer::interrupt() { if(Pipe_ != INVALID_HANDLE_VALUE) { + stop_ = TRUE; CancelIo(Pipe_); + SetEvent(wakeup); } } @@ -229,7 +292,8 @@ bool TPipeServer::TCreateNamedPipe() { // Create an instance of the named pipe HANDLE hPipe_ = CreateNamedPipe( pipename_.c_str(), // pipe name - PIPE_ACCESS_DUPLEX, // read/write access + PIPE_ACCESS_DUPLEX | // read/write access + FILE_FLAG_OVERLAPPED, // async mode PIPE_TYPE_MESSAGE | // message type pipe PIPE_READMODE_MESSAGE, // message-read mode maxconns_, // max. instances @@ -281,6 +345,11 @@ bool TPipeServer::TCreateAnonPipe() { return true; } +void TPipeServer::createWakeupEvent() { + wakeup = CreateEvent( NULL, TRUE, FALSE, NULL); +} + + //--------------------------------------------------------- // Accessors //--------------------------------------------------------- diff --git a/lib/cpp/src/thrift/transport/TPipeServer.h b/lib/cpp/src/thrift/transport/TPipeServer.h index 624a30a5..4c211a0b 100755 --- a/lib/cpp/src/thrift/transport/TPipeServer.h +++ b/lib/cpp/src/thrift/transport/TPipeServer.h @@ -56,6 +56,7 @@ class TPipeServer : public TServerTransport { bool TCreateNamedPipe(); bool TCreateAnonPipe(); + void createWakeupEvent(); public: //Accessors @@ -77,8 +78,10 @@ class TPipeServer : public TServerTransport { uint32_t maxconns_; HANDLE PipeW_; //Anonymous Pipe (W) HANDLE ClientAnonRead, ClientAnonWrite; //Client side anonymous pipe handles + HANDLE wakeup; // wake up event //? Do we need duplicates to send to client? bool isAnonymous; + bool stop_; // stop flag }; #else //_WIN32 //*NIX named pipe implementation uses domain socket -- 2.17.1