From b2501a71a79304fa27dfd6d2e55b75d8eacf0cef Mon Sep 17 00:00:00 2001 From: Ben Craig Date: Fri, 13 Sep 2013 12:29:43 -0500 Subject: [PATCH] THRIFT-2069: TPipeServer creates overlapped pipes, then uses synchronous I/O on them with TPipe Client: cpp Patch: Ben Craig --- lib/cpp/src/thrift/transport/TPipe.cpp | 338 +++++++++--- lib/cpp/src/thrift/transport/TPipe.h | 18 +- lib/cpp/src/thrift/transport/TPipeServer.cpp | 508 ++++++++++-------- lib/cpp/src/thrift/transport/TPipeServer.h | 44 +- .../windows/OverlappedSubmissionThread.cpp | 156 ++++++ .../windows/OverlappedSubmissionThread.h | 129 +++++ lib/cpp/src/thrift/windows/Sync.h | 102 ++++ 7 files changed, 970 insertions(+), 325 deletions(-) create mode 100644 lib/cpp/src/thrift/windows/OverlappedSubmissionThread.cpp create mode 100644 lib/cpp/src/thrift/windows/OverlappedSubmissionThread.h create mode 100644 lib/cpp/src/thrift/windows/Sync.h diff --git a/lib/cpp/src/thrift/transport/TPipe.cpp b/lib/cpp/src/thrift/transport/TPipe.cpp index 92e2912a..3bb3dac2 100644 --- a/lib/cpp/src/thrift/transport/TPipe.cpp +++ b/lib/cpp/src/thrift/transport/TPipe.cpp @@ -19,6 +19,10 @@ #include #include +#ifdef _WIN32 + #include + #include +#endif namespace apache { namespace thrift { namespace transport { @@ -29,123 +33,301 @@ using namespace std; */ #ifdef _WIN32 + +uint32_t pipe_read(HANDLE pipe, uint8_t* buf, uint32_t len); +void pipe_write(HANDLE pipe, const uint8_t* buf, uint32_t len); + +uint32_t pseudo_sync_read(HANDLE pipe, HANDLE event, uint8_t* buf, uint32_t len); +void pseudo_sync_write(HANDLE pipe, HANDLE event, const uint8_t* buf, uint32_t len); + +class TPipeImpl : boost::noncopyable { +public: + TPipeImpl() {} + virtual ~TPipeImpl() = 0 {} + virtual uint32_t read(uint8_t* buf, uint32_t len) = 0; + virtual void write(const uint8_t* buf, uint32_t len) = 0; + virtual HANDLE getPipeHandle() = 0; //doubles as the read handle for anon pipe + virtual void setPipeHandle(HANDLE pipehandle) = 0; + virtual HANDLE getWrtPipeHandle() {return INVALID_HANDLE_VALUE;} + virtual void setWrtPipeHandle(HANDLE) {} + virtual bool isBufferedDataAvailable() { return false; } + virtual HANDLE getNativeWaitHandle() { return INVALID_HANDLE_VALUE; } +}; + +class TNamedPipeImpl : public TPipeImpl { +public: + explicit TNamedPipeImpl(HANDLE pipehandle) : Pipe_(pipehandle) {} + virtual ~TNamedPipeImpl() {} + virtual uint32_t read(uint8_t* buf, uint32_t len) { + return pseudo_sync_read (Pipe_.h, read_event_.h, buf, len); + } + virtual void write(const uint8_t* buf, uint32_t len) { + pseudo_sync_write(Pipe_.h, write_event_.h, buf, len); + } + + virtual HANDLE getPipeHandle() {return Pipe_.h;} + virtual void setPipeHandle(HANDLE pipehandle) {Pipe_.reset(pipehandle);} +private: + TManualResetEvent read_event_; + TManualResetEvent write_event_; + TAutoHandle Pipe_; +}; + +class TAnonPipeImpl : public TPipeImpl { +public: + TAnonPipeImpl(HANDLE PipeRd, HANDLE PipeWrt) : PipeRd_(PipeRd), PipeWrt_(PipeWrt) {} + virtual ~TAnonPipeImpl() {} + virtual uint32_t read(uint8_t* buf, uint32_t len) {return pipe_read (PipeRd_.h, buf, len);} + virtual void write(const uint8_t* buf, uint32_t len) { pipe_write(PipeWrt_.h, buf, len);} + + virtual HANDLE getPipeHandle() {return PipeRd_.h;} + virtual void setPipeHandle(HANDLE PipeRd) {PipeRd_.reset(PipeRd);} + virtual HANDLE getWrtPipeHandle() {return PipeWrt_.h;} + virtual void setWrtPipeHandle(HANDLE PipeWrt) {PipeWrt_.reset(PipeWrt);} +private: + TAutoHandle PipeRd_; + TAutoHandle PipeWrt_; +}; + +// If you want a select-like loop to work, use this subclass. Be warned... +// the read implementation has several context switches, so this is slower +// than using the regular named pipe implementation +class TWaitableNamedPipeImpl : public TPipeImpl { +public: + explicit TWaitableNamedPipeImpl(HANDLE pipehandle) : + Pipe_(pipehandle), + begin_unread_idx_(0), + end_unread_idx_(0) + { + readOverlap_.action = TOverlappedWorkItem::READ; + readOverlap_.h = Pipe_.h; + cancelOverlap_.action = TOverlappedWorkItem::CANCELIO; + cancelOverlap_.h = Pipe_.h; + buffer_.resize(1024 /*arbitrary buffer size*/, '\0'); + beginAsyncRead(&buffer_[0], static_cast(buffer_.size())); + } + virtual ~TWaitableNamedPipeImpl() { + // see if there is an outstanding read request + if(begin_unread_idx_ == end_unread_idx_) { + // if so, cancel it, and wait for the dead completion + thread_->addWorkItem(&cancelOverlap_); + readOverlap_.overlappedResults(false /*ignore errors*/); + } + } + virtual uint32_t read(uint8_t* buf, uint32_t len); + virtual void write(const uint8_t* buf, uint32_t len) { + pseudo_sync_write(Pipe_.h, write_event_.h, buf, len); + } + + virtual HANDLE getPipeHandle() {return Pipe_.h;} + virtual void setPipeHandle(HANDLE pipehandle) {Pipe_.reset(pipehandle);} + virtual bool isBufferedDataAvailable() {return begin_unread_idx_ < end_unread_idx_;} + virtual HANDLE getNativeWaitHandle() { return ready_event_.h; } +private: + void beginAsyncRead(uint8_t* buf, uint32_t len); + uint32_t endAsyncRead(); + + TAutoOverlapThread thread_; + TAutoHandle Pipe_; + TOverlappedWorkItem readOverlap_; + TOverlappedWorkItem cancelOverlap_; + TManualResetEvent ready_event_; + TManualResetEvent write_event_; + std::vector buffer_; + uint32_t begin_unread_idx_; + uint32_t end_unread_idx_; +}; + +void TWaitableNamedPipeImpl::beginAsyncRead(uint8_t* buf, uint32_t len) +{ + begin_unread_idx_ = end_unread_idx_ = 0; + readOverlap_.reset(buf, len, ready_event_.h); + thread_->addWorkItem(&readOverlap_); + if(readOverlap_.success == FALSE && readOverlap_.last_error != ERROR_IO_PENDING) + { + GlobalOutput.perror("TPipe ::ReadFile errored GLE=", readOverlap_.last_error); + throw TTransportException(TTransportException::UNKNOWN, "TPipe: ReadFile failed"); + } +} + +uint32_t TWaitableNamedPipeImpl::endAsyncRead() +{ + return readOverlap_.overlappedResults(); +} + +uint32_t TWaitableNamedPipeImpl::read(uint8_t* buf, uint32_t len) +{ + if(begin_unread_idx_ == end_unread_idx_) { + end_unread_idx_ = endAsyncRead(); + } + + uint32_t bytes_to_copy = (std::min)(len, end_unread_idx_-begin_unread_idx_); + memcpy(buf, &buffer_[begin_unread_idx_], bytes_to_copy); + begin_unread_idx_ += bytes_to_copy; + if(begin_unread_idx_ != end_unread_idx_) + { + assert(len == bytes_to_copy); + // we were able to fulfill the read with just the bytes in our + // buffer, and we still have buffer left + return bytes_to_copy; + } + uint32_t bytes_copied = bytes_to_copy; + + //all of the requested data has been read. Kick off an async read for the next round. + beginAsyncRead(&buffer_[0], static_cast(buffer_.size())); + + return bytes_copied; +} + +void pseudo_sync_write(HANDLE pipe, HANDLE event, const uint8_t* buf, uint32_t len) +{ + OVERLAPPED tempOverlap; + memset( &tempOverlap, 0, sizeof(tempOverlap)); + tempOverlap.hEvent = event; + + uint32_t written = 0; + while(written < len) + { + BOOL result = ::WriteFile(pipe, buf+written, len-written, NULL, &tempOverlap); + + if(result == FALSE && ::GetLastError() != ERROR_IO_PENDING) + { + GlobalOutput.perror("TPipe ::WriteFile errored GLE=", ::GetLastError()); + throw TTransportException(TTransportException::UNKNOWN, "TPipe: write failed"); + } + + DWORD bytes = 0; + result = ::GetOverlappedResult(pipe, &tempOverlap, &bytes, TRUE); + if(!result) + { + GlobalOutput.perror("TPipe ::GetOverlappedResult errored GLE=", ::GetLastError()); + throw TTransportException(TTransportException::UNKNOWN, "TPipe: GetOverlappedResult failed"); + } + written += bytes; + } +} + +uint32_t pseudo_sync_read(HANDLE pipe, HANDLE event, uint8_t* buf, uint32_t len) +{ + OVERLAPPED tempOverlap; + memset( &tempOverlap, 0, sizeof(tempOverlap)); + tempOverlap.hEvent = event; + + BOOL result = ::ReadFile(pipe, buf, len, NULL, &tempOverlap); + + if(result == FALSE && ::GetLastError() != ERROR_IO_PENDING) + { + GlobalOutput.perror("TPipe ::ReadFile errored GLE=", ::GetLastError()); + throw TTransportException(TTransportException::UNKNOWN, "TPipe: read failed"); + } + + DWORD bytes = 0; + result = ::GetOverlappedResult(pipe, &tempOverlap, &bytes, TRUE); + if(!result) + { + GlobalOutput.perror("TPipe ::GetOverlappedResult errored GLE=", ::GetLastError()); + throw TTransportException(TTransportException::UNKNOWN, "TPipe: GetOverlappedResult failed"); + } + return bytes; +} + //---- Constructors ---- TPipe::TPipe(HANDLE Pipe) : - Pipe_(Pipe), + impl_(new TWaitableNamedPipeImpl(Pipe)), TimeoutSeconds_(3), - isAnonymous(false) + isAnonymous_(false) {} TPipe::TPipe(const char *pipename) : - Pipe_(INVALID_HANDLE_VALUE), TimeoutSeconds_(3), - isAnonymous(false) + isAnonymous_(false) { setPipename(pipename); } TPipe::TPipe(const std::string &pipename) : - Pipe_(INVALID_HANDLE_VALUE), TimeoutSeconds_(3), - isAnonymous(false) + isAnonymous_(false) { setPipename(pipename); } TPipe::TPipe(HANDLE PipeRd, HANDLE PipeWrt) : - Pipe_(PipeRd), - PipeWrt_(PipeWrt), + impl_(new TAnonPipeImpl(PipeRd, PipeWrt)), TimeoutSeconds_(3), - isAnonymous(true) + isAnonymous_(true) {} TPipe::TPipe() : - Pipe_(INVALID_HANDLE_VALUE), - TimeoutSeconds_(3) + TimeoutSeconds_(3), + isAnonymous_(false) {} -//---- Destructor ---- -TPipe::~TPipe() { - close(); -} - +TPipe::~TPipe() {} //--------------------------------------------------------- // Transport callbacks //--------------------------------------------------------- - bool TPipe::isOpen() { - return (Pipe_ != INVALID_HANDLE_VALUE); + return impl_.get() != NULL; } bool TPipe::peek() { - if (!isOpen()) { - return false; - } - DWORD bytesavail = 0; - int PeekRet = 0; - PeekRet = PeekNamedPipe(Pipe_, NULL, 0, NULL, &bytesavail, NULL); - return (PeekRet != 0 && bytesavail > 0); + return isOpen(); } void TPipe::open() { - if (isOpen()) { + if (isOpen()) return; - } - int SleepInterval = 500; //ms - int retries = TimeoutSeconds_ * 1000 / SleepInterval; - HANDLE hPipe_; - for(int i=0; iread(buf, len); +} +uint32_t pipe_read(HANDLE pipe, uint8_t* buf, uint32_t len) +{ DWORD cbRead; int fSuccess = ReadFile( - Pipe_, // pipe handle + pipe, // pipe handle buf, // buffer to receive reply len, // size of buffer &cbRead, // number of bytes read @@ -160,11 +342,14 @@ uint32_t TPipe::read(uint8_t* buf, uint32_t len) { void TPipe::write(const uint8_t* buf, uint32_t len) { if (!isOpen()) throw TTransportException(TTransportException::NOT_OPEN, "Called write on non-open pipe"); + impl_->write(buf, len); +} - HANDLE WritePipe = isAnonymous? PipeWrt_: Pipe_; +void pipe_write(HANDLE pipe, const uint8_t* buf, uint32_t len) +{ DWORD cbWritten; int fSuccess = WriteFile( - WritePipe, // pipe handle + pipe, // pipe handle buf, // message len, // message length &cbWritten, // bytes written @@ -190,19 +375,29 @@ void TPipe::setPipename(const std::string &pipename) { } HANDLE TPipe::getPipeHandle() { - return Pipe_; + if(impl_) return impl_->getPipeHandle(); + return INVALID_HANDLE_VALUE; } void TPipe::setPipeHandle(HANDLE pipehandle) { - Pipe_ = pipehandle; + if(isAnonymous_) + impl_->setPipeHandle(pipehandle); + else + impl_.reset(new TNamedPipeImpl(pipehandle)); } HANDLE TPipe::getWrtPipeHandle() { - return PipeWrt_; + if(impl_) return impl_->getWrtPipeHandle(); + return INVALID_HANDLE_VALUE; } void TPipe::setWrtPipeHandle(HANDLE pipehandle) { - PipeWrt_ = pipehandle; + if(impl_) impl_->setWrtPipeHandle(pipehandle); +} + +HANDLE TPipe::getNativeWaitHandle() { + if(impl_) return impl_->getNativeWaitHandle(); + return INVALID_HANDLE_VALUE; } long TPipe::getConnectTimeout() { @@ -212,6 +407,7 @@ long TPipe::getConnectTimeout() { void TPipe::setConnectTimeout(long seconds) { TimeoutSeconds_ = seconds; } + #endif //_WIN32 }}} // apache::thrift::transport diff --git a/lib/cpp/src/thrift/transport/TPipe.h b/lib/cpp/src/thrift/transport/TPipe.h index 3c1755ba..2e4539c2 100644 --- a/lib/cpp/src/thrift/transport/TPipe.h +++ b/lib/cpp/src/thrift/transport/TPipe.h @@ -25,17 +25,21 @@ #ifndef _WIN32 # include #endif +#include namespace apache { namespace thrift { namespace transport { /** * Windows Pipes implementation of the TTransport interface. - * + * Don't destroy a TPipe at global scope, as that will cause a thread join + * during DLLMain. That also means that client objects using TPipe shouldn't be at global + * scope. */ #ifdef _WIN32 +class TPipeImpl; + class TPipe : public TVirtualTransport { public: - // Constructs a new pipe object. TPipe(); // Named pipe constructors - @@ -78,14 +82,18 @@ class TPipe : public TVirtualTransport { long getConnectTimeout(); void setConnectTimeout(long seconds); + //this function is intended to be used in generic / template situations, + //so its name needs to be the same as TPipeServer's + HANDLE getNativeWaitHandle(); private: + boost::shared_ptr impl_; + std::string pipename_; - //Named pipe handles are R/W, while anonymous pipes are one or the other (half duplex). - HANDLE Pipe_, PipeWrt_; long TimeoutSeconds_; - bool isAnonymous; + bool isAnonymous_; }; + #else typedef TSocket TPipe; #endif diff --git a/lib/cpp/src/thrift/transport/TPipeServer.cpp b/lib/cpp/src/thrift/transport/TPipeServer.cpp index 10fc69b2..e14a94ad 100644 --- a/lib/cpp/src/thrift/transport/TPipeServer.cpp +++ b/lib/cpp/src/thrift/transport/TPipeServer.cpp @@ -23,7 +23,10 @@ #include #include #include +#include + #ifdef _WIN32 +# include # include # include #endif //_WIN32 @@ -35,230 +38,295 @@ namespace apache { namespace thrift { namespace transport { using namespace std; using boost::shared_ptr; +class TPipeServerImpl : boost::noncopyable { +public: + TPipeServerImpl() {} + virtual ~TPipeServerImpl() = 0 {} + virtual void interrupt() = 0; + virtual void close() = 0; + virtual boost::shared_ptr acceptImpl() = 0; + + virtual HANDLE getPipeHandle() = 0; + virtual HANDLE getWrtPipeHandle() = 0; + virtual HANDLE getClientRdPipeHandle()= 0; + virtual HANDLE getClientWrtPipeHandle()= 0; + virtual HANDLE getNativeWaitHandle() {return NULL;} +}; + +class TAnonPipeServer : public TPipeServerImpl { +public: + TAnonPipeServer() + { + //The anonymous pipe needs to be created first so that the server can + //pass the handles on to the client before the serve (acceptImpl) + //blocking call. + if (!createAnonPipe()) { + GlobalOutput.perror("TPipeServer Create(Anon)Pipe failed, GLE=", GetLastError()); + throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer Create(Anon)Pipe failed"); + } + } + + virtual ~TAnonPipeServer() {} + + virtual void interrupt() {} //not currently implemented + virtual void close() { + PipeR_.reset(); + PipeW_.reset(); + ClientAnonRead_.reset(); + ClientAnonWrite_.reset(); + } + + virtual boost::shared_ptr acceptImpl(); + + virtual HANDLE getPipeHandle() {return PipeR_.h;} + virtual HANDLE getWrtPipeHandle() {return PipeW_.h;} + virtual HANDLE getClientRdPipeHandle() {return ClientAnonRead_.h;} + virtual HANDLE getClientWrtPipeHandle() {return ClientAnonWrite_.h;} +private: + bool createAnonPipe(); + + TAutoHandle PipeR_; // Anonymous Pipe (R) + TAutoHandle PipeW_; // Anonymous Pipe (W) + + //Client side anonymous pipe handles + //? Do we need duplicates to send to client? + TAutoHandle ClientAnonRead_; + TAutoHandle ClientAnonWrite_; +}; + +class TNamedPipeServer : public TPipeServerImpl { +public: + TNamedPipeServer( + const std::string &pipename, + uint32_t bufsize, + uint32_t maxconnections) : + stopping_(false), + pipename_(pipename), + bufsize_(bufsize), + maxconns_(maxconnections) + { + connectOverlap_.action = TOverlappedWorkItem::CONNECT; + cancelOverlap_.action = TOverlappedWorkItem::CANCELIO; + initiateNamedConnect(); + } + virtual ~TNamedPipeServer() {} + + virtual void interrupt() + { + TAutoCrit lock(pipe_protect_); + cached_client_.reset(); + if(Pipe_.h != INVALID_HANDLE_VALUE) { + stopping_ = true; + cancelOverlap_.h = Pipe_.h; + // This should wake up GetOverlappedResult + thread_->addWorkItem(&cancelOverlap_); + close(); + } + } + + virtual void close() { + Pipe_.reset(); + } + + virtual boost::shared_ptr acceptImpl(); + + virtual HANDLE getPipeHandle() {return Pipe_.h;} + virtual HANDLE getWrtPipeHandle() {return INVALID_HANDLE_VALUE;} + virtual HANDLE getClientRdPipeHandle() {return INVALID_HANDLE_VALUE;} + virtual HANDLE getClientWrtPipeHandle() {return INVALID_HANDLE_VALUE;} + virtual HANDLE getNativeWaitHandle() {return listen_event_.h;} +private: + bool createNamedPipe(); + void initiateNamedConnect(); + + TAutoOverlapThread thread_; + TOverlappedWorkItem connectOverlap_; + TOverlappedWorkItem cancelOverlap_; + + bool stopping_; + std::string pipename_; + uint32_t bufsize_; + uint32_t maxconns_; + TManualResetEvent listen_event_; + boost::shared_ptr cached_client_; + TAutoHandle Pipe_; + TCriticalSection pipe_protect_; +}; + +HANDLE TPipeServer::getNativeWaitHandle() +{ + if(impl_) return impl_->getNativeWaitHandle(); + return NULL; +} + //---- Constructors ---- TPipeServer::TPipeServer(const std::string &pipename, uint32_t bufsize) : - pipename_(pipename), bufsize_(bufsize), - Pipe_(INVALID_HANDLE_VALUE), - wakeup(INVALID_HANDLE_VALUE), - maxconns_(TPIPE_SERVER_MAX_CONNS_DEFAULT), - isAnonymous(false), - stop_(false) - { - setPipename(pipename); - createWakeupEvent(); - } + isAnonymous_(false) +{ + setMaxConnections(TPIPE_SERVER_MAX_CONNS_DEFAULT); + setPipename(pipename); +} TPipeServer::TPipeServer(const std::string &pipename, uint32_t bufsize, uint32_t maxconnections) : - pipename_(pipename), bufsize_(bufsize), - Pipe_(INVALID_HANDLE_VALUE), - wakeup(INVALID_HANDLE_VALUE), - isAnonymous(false), - stop_(false) - { //Restrict maxconns_ to 1-PIPE_UNLIMITED_INSTANCES - if(maxconnections == 0) - maxconns_ = 1; - else if (maxconnections > PIPE_UNLIMITED_INSTANCES) - maxconns_ = PIPE_UNLIMITED_INSTANCES; - else - maxconns_ = maxconnections; - - setPipename(pipename); - createWakeupEvent(); - } + isAnonymous_(false) +{ + setMaxConnections(maxconnections); + setPipename(pipename); +} TPipeServer::TPipeServer(const std::string &pipename) : - pipename_(pipename), bufsize_(1024), - Pipe_(INVALID_HANDLE_VALUE), - wakeup(INVALID_HANDLE_VALUE), - maxconns_(TPIPE_SERVER_MAX_CONNS_DEFAULT), - isAnonymous(false), - stop_(false) - { - setPipename(pipename); - createWakeupEvent(); - } + isAnonymous_(false) +{ + setMaxConnections(TPIPE_SERVER_MAX_CONNS_DEFAULT); + setPipename(pipename); +} TPipeServer::TPipeServer(int bufsize) : - pipename_(""), bufsize_(bufsize), - Pipe_(INVALID_HANDLE_VALUE), - wakeup(INVALID_HANDLE_VALUE), - maxconns_(1), - isAnonymous(true), - stop_(false) - { - //The anonymous pipe needs to be created first so that the server can - //pass the handles on to the client before the serve (acceptImpl) - //blocking call. - if (!TCreateAnonPipe()) { - GlobalOutput.perror("TPipeServer Create(Anon)Pipe failed, GLE=", GetLastError()); - throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer Create(Anon)Pipe failed"); - } - createWakeupEvent(); + isAnonymous_(true) +{ + setMaxConnections(1); + impl_.reset(new TAnonPipeServer); } TPipeServer::TPipeServer() : - pipename_(""), bufsize_(1024), - Pipe_(INVALID_HANDLE_VALUE), - wakeup(INVALID_HANDLE_VALUE), - maxconns_(1), - isAnonymous(true), - stop_(false) + isAnonymous_(true) { - if (!TCreateAnonPipe()) { - GlobalOutput.perror("TPipeServer Create(Anon)Pipe failed, GLE=", GetLastError()); - throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer Create(Anon)Pipe failed"); - } - createWakeupEvent(); + setMaxConnections(1); + impl_.reset(new TAnonPipeServer); } //---- Destructor ---- -TPipeServer::~TPipeServer() { - close(); - CloseHandle( wakeup); - wakeup = INVALID_HANDLE_VALUE; -} +TPipeServer::~TPipeServer() {} //--------------------------------------------------------- // Transport callbacks //--------------------------------------------------------- +void TPipeServer::listen() { + if(isAnonymous_) return; + impl_.reset(new TNamedPipeServer(pipename_, bufsize_, maxconns_)); +} shared_ptr TPipeServer::acceptImpl() { - shared_ptr client; - - stop_ = FALSE; - - if(isAnonymous) - { //Anonymous Pipe - //This 0-byte read serves merely as a blocking call. - byte buf; - DWORD br; - int fSuccess = ReadFile( - Pipe_, // pipe handle - &buf, // buffer to receive reply - 0, // size of buffer - &br, // number of bytes read - NULL); // not overlapped - - if ( !fSuccess && GetLastError() != ERROR_MORE_DATA ) { - GlobalOutput.perror("TPipeServer unable to initiate pipe comms, GLE=", GetLastError()); - throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer unable to initiate pipe comms"); - } - client.reset(new TPipe(Pipe_, PipeW_)); + return impl_->acceptImpl(); +} + +shared_ptr TAnonPipeServer::acceptImpl() { + //This 0-byte read serves merely as a blocking call. + byte buf; + DWORD br; + int fSuccess = ReadFile( + PipeR_.h, // pipe handle + &buf, // buffer to receive reply + 0, // size of buffer + &br, // number of bytes read + NULL); // not overlapped + + if ( !fSuccess && GetLastError() != ERROR_MORE_DATA ) { + GlobalOutput.perror("TPipeServer unable to initiate pipe comms, GLE=", GetLastError()); + throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer unable to initiate pipe comms"); } - else - { //Named Pipe - if (!TCreateNamedPipe()) { - GlobalOutput.perror("TPipeServer CreateNamedPipe failed, GLE=", GetLastError()); - throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer CreateNamedPipe failed"); - } + shared_ptr client(new TPipe(PipeR_.h, PipeW_.h)); + return client; +} - struct TEventCleaner { - HANDLE hEvent; - ~TEventCleaner() {CloseHandle(hEvent);} - }; +void TNamedPipeServer::initiateNamedConnect() { + if (stopping_) return; + if (!createNamedPipe()) { + GlobalOutput.perror("TPipeServer CreateNamedPipe failed, GLE=", GetLastError()); + throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer CreateNamedPipe failed"); + } - OVERLAPPED overlapped; - memset( &overlapped, 0, sizeof(overlapped)); - overlapped.hEvent = CreateEvent( NULL, TRUE, FALSE, NULL); - { - TEventCleaner cleaner = {overlapped.hEvent}; - while( ! stop_) - { - // Wait for the client to connect; if it succeeds, the - // function returns a nonzero value. If the function returns - // zero, GetLastError should return ERROR_PIPE_CONNECTED. - if( ConnectNamedPipe(Pipe_, &overlapped)) - { - GlobalOutput.printf("Client connected."); - client.reset(new TPipe(Pipe_)); - return client; - } - - DWORD dwErr = GetLastError(); - HANDLE events[2] = {overlapped.hEvent, wakeup}; - switch( dwErr) - { - case ERROR_PIPE_CONNECTED: - GlobalOutput.printf("Client connected."); - client.reset(new TPipe(Pipe_)); - return client; - - case ERROR_IO_PENDING: - DWORD dwWait, dwDummy; - dwWait = WaitForMultipleObjects( 2, events, FALSE, 3000); - switch(dwWait) - { - case WAIT_OBJECT_0: - if(GetOverlappedResult(Pipe_, &overlapped, &dwDummy, TRUE)) - { - GlobalOutput.printf("Client connected."); - client.reset(new TPipe(Pipe_)); - return client; - } - break; - case WAIT_OBJECT_0 + 1: - stop_ = TRUE; - break; - default: - break; - } - break; - - default: - break; - } - - CancelIo(Pipe_); - DisconnectNamedPipe(Pipe_); - } + // The prior connection has been handled, so close the gate + ResetEvent(listen_event_.h); + connectOverlap_.reset(NULL, 0, listen_event_.h); + connectOverlap_.h = Pipe_.h; + thread_->addWorkItem(&connectOverlap_); - close(); - GlobalOutput.perror("TPipeServer ConnectNamedPipe GLE=", GetLastError()); - throw TTransportException(TTransportException::NOT_OPEN, "TPipeServer: client connection failed"); - } + // Wait for the client to connect; if it succeeds, the + // function returns a nonzero value. If the function returns + // zero, GetLastError should return ERROR_PIPE_CONNECTED. + if( connectOverlap_.success ) + { + GlobalOutput.printf("Client connected."); + cached_client_.reset(new TPipe(Pipe_.h)); + Pipe_.release(); + // make sure people know that a connection is ready + SetEvent(listen_event_.h); + return; } - return client; -} - -void TPipeServer::interrupt() { - if(Pipe_ != INVALID_HANDLE_VALUE) { - stop_ = TRUE; - CancelIo(Pipe_); - SetEvent(wakeup); + DWORD dwErr = connectOverlap_.last_error; + switch( dwErr) + { + case ERROR_PIPE_CONNECTED: + GlobalOutput.printf("Client connected."); + cached_client_.reset(new TPipe(Pipe_.h)); + Pipe_.release(); + // make sure people know that a connection is ready + SetEvent(listen_event_.h); + return; + case ERROR_IO_PENDING: + return; //acceptImpl will do the appropriate WaitForMultipleObjects + default: + GlobalOutput.perror("TPipeServer ConnectNamedPipe failed, GLE=", dwErr); + throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer ConnectNamedPipe failed"); } } -void TPipeServer::close() { - if(!isAnonymous) +shared_ptr TNamedPipeServer::acceptImpl() { { - if(Pipe_ != INVALID_HANDLE_VALUE) { - DisconnectNamedPipe(Pipe_); - CloseHandle(Pipe_); - Pipe_ = INVALID_HANDLE_VALUE; + TAutoCrit lock(pipe_protect_); + if(cached_client_.get() != NULL) + { + shared_ptr client; + //zero out cached_client, since we are about to return it. + client.swap(cached_client_); + + //kick off the next connection before returning + initiateNamedConnect(); + return client; //success! } } - else + + if(Pipe_.h == INVALID_HANDLE_VALUE) { + throw TTransportException( + TTransportException::NOT_OPEN, + "TNamedPipeServer: someone called accept on a closed pipe server"); + } + + DWORD dwDummy = 0; + if(GetOverlappedResult(Pipe_.h, &connectOverlap_.overlap, &dwDummy, TRUE)) { - try { - CloseHandle(Pipe_); - CloseHandle(PipeW_); - CloseHandle(ClientAnonRead); - CloseHandle(ClientAnonWrite); - } - catch(...) { - GlobalOutput.perror("TPipeServer anon close GLE=", GetLastError()); - } + TAutoCrit lock(pipe_protect_); + GlobalOutput.printf("Client connected."); + shared_ptr client(new TPipe(Pipe_.h)); + Pipe_.release(); + //kick off the next connection before returning + initiateNamedConnect(); + return client; //success! } + //if we got here, then we are in an error / shutdown case + DWORD gle = GetLastError(); //save error before doing cleanup + close(); + GlobalOutput.perror("TPipeServer ConnectNamedPipe GLE=", gle); + throw TTransportException(TTransportException::NOT_OPEN, "TPipeServer: client connection failed"); +} + +void TPipeServer::interrupt() { + if(impl_) impl_->interrupt(); +} + +void TPipeServer::close() { + if(impl_) impl_->close(); } -bool TPipeServer::TCreateNamedPipe() { +bool TNamedPipeServer::createNamedPipe() { //Windows - set security to allow non-elevated apps //to access pipes created by elevated apps. @@ -288,31 +356,31 @@ bool TPipeServer::TCreateNamedPipe() { sa.bInheritHandle = FALSE; // Create an instance of the named pipe - HANDLE hPipe_ = CreateNamedPipe( + TAutoHandle hPipe(CreateNamedPipe( pipename_.c_str(), // pipe name PIPE_ACCESS_DUPLEX | // read/write access FILE_FLAG_OVERLAPPED, // async mode - PIPE_TYPE_MESSAGE | // message type pipe - PIPE_READMODE_MESSAGE, // message-read mode + PIPE_TYPE_BYTE | // byte type pipe + PIPE_READMODE_BYTE, // byte read mode maxconns_, // max. instances bufsize_, // output buffer size bufsize_, // input buffer size 0, // client time-out - &sa); // default security attribute + &sa)); // security attributes - if(hPipe_ == INVALID_HANDLE_VALUE) + if(hPipe.h == INVALID_HANDLE_VALUE) { - Pipe_ = INVALID_HANDLE_VALUE; + Pipe_.reset(); GlobalOutput.perror("TPipeServer::TCreateNamedPipe() GLE=", GetLastError()); throw TTransportException(TTransportException::NOT_OPEN, "TCreateNamedPipe() failed", GetLastError()); return false; } - Pipe_ = hPipe_; + Pipe_.reset(hPipe.release()); return true; } -bool TPipeServer::TCreateAnonPipe() { +bool TAnonPipeServer::createAnonPipe() { SECURITY_ATTRIBUTES sa; SECURITY_DESCRIPTOR sd; //security information for pipes @@ -335,26 +403,19 @@ bool TPipeServer::TCreateAnonPipe() { CloseHandle(PipeW_H); return false; } - ClientAnonRead = ClientAnonReadH; - ClientAnonWrite = ClientAnonWriteH; - Pipe_ = Pipe_H; - PipeW_ = PipeW_H; - return true; -} + ClientAnonRead_.reset(ClientAnonReadH); + ClientAnonWrite_.reset(ClientAnonWriteH); + PipeR_.reset(Pipe_H); + PipeW_.reset(PipeW_H); -void TPipeServer::createWakeupEvent() { - wakeup = CreateEvent( NULL, TRUE, FALSE, NULL); + return true; } - //--------------------------------------------------------- // Accessors //--------------------------------------------------------- - -string TPipeServer::getPipename() { - return pipename_; -} +string TPipeServer::getPipename() {return pipename_;} void TPipeServer::setPipename(const std::string &pipename) { if(pipename.find("\\\\") == -1) @@ -363,40 +424,27 @@ void TPipeServer::setPipename(const std::string &pipename) { pipename_ = pipename; } -int TPipeServer::getBufferSize() { - return bufsize_; -} +int TPipeServer::getBufferSize() {return bufsize_;} +void TPipeServer::setBufferSize(int bufsize) {bufsize_ = bufsize;} -void TPipeServer::setBufferSize(int bufsize) { - bufsize_ = bufsize; -} +HANDLE TPipeServer::getPipeHandle() {return impl_?impl_->getPipeHandle() :INVALID_HANDLE_VALUE;} +HANDLE TPipeServer::getWrtPipeHandle() {return impl_?impl_->getWrtPipeHandle() :INVALID_HANDLE_VALUE;} +HANDLE TPipeServer::getClientRdPipeHandle() {return impl_?impl_->getClientRdPipeHandle() :INVALID_HANDLE_VALUE;} +HANDLE TPipeServer::getClientWrtPipeHandle() {return impl_?impl_->getClientWrtPipeHandle():INVALID_HANDLE_VALUE;} -HANDLE TPipeServer::getPipeHandle() { - return Pipe_; -} +bool TPipeServer::getAnonymous() { return isAnonymous_; } +void TPipeServer::setAnonymous(bool anon) { isAnonymous_ = anon;} -HANDLE TPipeServer::getWrtPipeHandle() +void TPipeServer::setMaxConnections(uint32_t maxconnections) { - return PipeW_; -} - -HANDLE TPipeServer::getClientRdPipeHandle() -{ - return ClientAnonRead; -} - -HANDLE TPipeServer::getClientWrtPipeHandle() -{ - return ClientAnonWrite; -} - -bool TPipeServer::getAnonymous() { - return isAnonymous; + if(maxconnections == 0) + maxconns_ = 1; + else if (maxconnections > PIPE_UNLIMITED_INSTANCES) + maxconns_ = PIPE_UNLIMITED_INSTANCES; + else + maxconns_ = maxconnections; } -void TPipeServer::setAnonymous(bool anon) { - isAnonymous = anon; -} #endif //_WIN32 }}} // apache::thrift::transport diff --git a/lib/cpp/src/thrift/transport/TPipeServer.h b/lib/cpp/src/thrift/transport/TPipeServer.h index 88a8b6b9..98ecde0f 100755 --- a/lib/cpp/src/thrift/transport/TPipeServer.h +++ b/lib/cpp/src/thrift/transport/TPipeServer.h @@ -23,17 +23,26 @@ #include #include #ifndef _WIN32 -# include "TServerSocket.h" +# include +#endif +#ifdef _WIN32 +# include #endif -#define TPIPE_SERVER_MAX_CONNS_DEFAULT 10 +#define TPIPE_SERVER_MAX_CONNS_DEFAULT PIPE_UNLIMITED_INSTANCES namespace apache { namespace thrift { namespace transport { /** * Windows Pipes implementation of TServerTransport. + * Don't destroy a TPipeServer at global scope, as that will cause a thread join + * during DLLMain. That also means that TServer's using TPipeServer shouldn't be at global + * scope. */ #ifdef _WIN32 +class TPipeServerImpl; +class TPipe; + class TPipeServer : public TServerTransport { public: //Constructors @@ -46,19 +55,13 @@ class TPipeServer : public TServerTransport { TPipeServer(); //Destructor - ~TPipeServer(); + virtual ~TPipeServer(); //Standard transport callbacks - void interrupt(); - void close(); - protected: - boost::shared_ptr acceptImpl(); - - bool TCreateNamedPipe(); - bool TCreateAnonPipe(); - void createWakeupEvent(); + virtual void interrupt(); + virtual void close(); + virtual void listen(); - public: //Accessors std::string getPipename(); void setPipename(const std::string &pipename); @@ -70,18 +73,21 @@ class TPipeServer : public TServerTransport { HANDLE getClientWrtPipeHandle(); bool getAnonymous(); void setAnonymous(bool anon); + void setMaxConnections(uint32_t maxconnections); + + //this function is intended to be used in generic / template situations, + //so its name needs to be the same as TPipe's + HANDLE getNativeWaitHandle(); +protected: + virtual boost::shared_ptr acceptImpl(); private: + boost::shared_ptr impl_; + std::string pipename_; uint32_t bufsize_; - HANDLE Pipe_; //Named Pipe (R/W) or Anonymous Pipe (R) uint32_t maxconns_; - HANDLE PipeW_; //Anonymous Pipe (W) - HANDLE ClientAnonRead, ClientAnonWrite; //Client side anonymous pipe handles - HANDLE wakeup; // wake up event - //? Do we need duplicates to send to client? - bool isAnonymous; - bool stop_; // stop flag + bool isAnonymous_; }; #else //_WIN32 //*NIX named pipe implementation uses domain socket diff --git a/lib/cpp/src/thrift/windows/OverlappedSubmissionThread.cpp b/lib/cpp/src/thrift/windows/OverlappedSubmissionThread.cpp new file mode 100644 index 00000000..5dec390f --- /dev/null +++ b/lib/cpp/src/thrift/windows/OverlappedSubmissionThread.cpp @@ -0,0 +1,156 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#include +#include +#include +#include +#include + +namespace apache { namespace thrift { namespace transport { + +TOverlappedWorkItem::TOverlappedWorkItem() : + SLIST_ENTRY(), + action(UNKNOWN), + h(INVALID_HANDLE_VALUE), + buffer(NULL), + buffer_len(0), + overlap(), + last_error(0), + success(TRUE) +{} + +void TOverlappedWorkItem::reset(uint8_t *buf, uint32_t len, HANDLE event) { + memset( &overlap, 0, sizeof(overlap)); + overlap.hEvent = event; + buffer = buf; + buffer_len = len; + last_error = 0; + success = FALSE; +} + +uint32_t TOverlappedWorkItem::overlappedResults(bool signal_failure) { + DWORD bytes = 0; + BOOL result = ::GetOverlappedResult(h, &overlap, &bytes, TRUE); + if(signal_failure && !result) //get overlapped error case + { + GlobalOutput.perror("TPipe ::GetOverlappedResult errored GLE=", ::GetLastError()); + throw TTransportException(TTransportException::UNKNOWN, "TPipe: GetOverlappedResult failed"); + } + return bytes; +} + +bool TOverlappedWorkItem::process() { + BOOST_SCOPE_EXIT( (&doneSubmittingEvent) ) { + SetEvent(doneSubmittingEvent.h); + } BOOST_SCOPE_EXIT_END + + switch(action) { + case(CONNECT): + success = ::ConnectNamedPipe(h, &overlap); + if(success == FALSE) + last_error = ::GetLastError(); + return true; + case(READ): + success = ::ReadFile(h, buffer, buffer_len, NULL, &overlap); + if(success == FALSE) + last_error = ::GetLastError(); + return true; + case(CANCELIO): + success = ::CancelIo(h); + if(success == FALSE) + last_error = ::GetLastError(); + return true; + case(STOP): + default: + return false; + } +} + +void TOverlappedSubmissionThread::addWorkItem(TOverlappedWorkItem *item) { + InterlockedPushEntrySList(&workList_, item); + SetEvent(workAvailableEvent_.h); + WaitForSingleObject(item->doneSubmittingEvent.h, INFINITE); +} + +TOverlappedSubmissionThread *TOverlappedSubmissionThread::acquire_instance() { + TAutoCrit lock(instanceGuard_); + if(instance_ == NULL) + { + assert(instanceRefCount_ == 0); + instance_ = new TOverlappedSubmissionThread; + } + ++instanceRefCount_; + return instance_; +} +void TOverlappedSubmissionThread::release_instance() { + TAutoCrit lock(instanceGuard_); + if(--instanceRefCount_ == 0) + { + delete instance_; + instance_ = NULL; + } +} + +TOverlappedSubmissionThread::TOverlappedSubmissionThread() { + stopItem_.action = TOverlappedWorkItem::STOP; + + InitializeSListHead(&workList_); + thread_ = (HANDLE)_beginthreadex( + NULL, + 0, + thread_proc, + this, + 0, + NULL); + if(thread_ == 0) { + GlobalOutput.perror("TOverlappedSubmissionThread unable to create thread, errno=", errno); + throw TTransportException(TTransportException::NOT_OPEN, " TOverlappedSubmissionThread unable to create thread"); + } +} + +TOverlappedSubmissionThread::~TOverlappedSubmissionThread() { + addWorkItem(&stopItem_); + ::WaitForSingleObject(thread_, INFINITE); + CloseHandle(thread_); +} + +void TOverlappedSubmissionThread::run() { + for(;;) { + WaitForSingleObject(workAvailableEvent_.h, INFINITE); + //todo check result + SLIST_ENTRY *entry = NULL; + while( (entry = InterlockedPopEntrySList(&workList_)) != NULL) { + TOverlappedWorkItem &item = *static_cast(entry); + if(!item.process()) + return; + } + } +} + +unsigned __stdcall TOverlappedSubmissionThread::thread_proc(void *addr) { + static_cast(addr)->run(); + return 0; +} + +TCriticalSection TOverlappedSubmissionThread::instanceGuard_; +TOverlappedSubmissionThread* TOverlappedSubmissionThread::instance_; +uint32_t TOverlappedSubmissionThread::instanceRefCount_=0; + +}}} //apach::thrift::transport diff --git a/lib/cpp/src/thrift/windows/OverlappedSubmissionThread.h b/lib/cpp/src/thrift/windows/OverlappedSubmissionThread.h new file mode 100644 index 00000000..16b7e24b --- /dev/null +++ b/lib/cpp/src/thrift/windows/OverlappedSubmissionThread.h @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_WINDOWS_OverlappedSubmissionThread_H_ +#define _THRIFT_WINDOWS_OverlappedSubmissionThread_H_ 1 + +#ifndef _WIN32 +#error "OverlappedSubmissionThread.h is only usable on Windows" +#endif + +#include +#include +#include + +/* + *** Why does this class exist? + In short, because we want to enable something similar to a "select" loop, on Windows, with + named pipes. The core of the "select" loop is a call to WaitForMultipleObjects. So that means + we need a signalable object that indicates when data is available. + + A pipe handle doesn't do that. A pipe handle is signaled when a read or write completes, and if + no one has called read or write, then the pipe handle is useless in WaitForMultipleObjects. So + instead, we use overlapped I/O. With overlapped I/O, you call read, and associate an event with + the read. When the read finishes, the event is signaled. This means that when you create a pipe, + you start a read. When the customer calls read on your transport object, you wait for the last + read to finish, and then kick off another. + + There is one big caveat to this though. The thread that initiated the read must stay alive. If + the thread that initiated the read exits, then the read completes in an error state. To ensure + that the initiating thread stays alive, we create a singleton thread whose sole responsibility is + to manage this overlapped I/O requests. This introduces some overhead, but it is overhead that + is necessary for correct behavior. + + This thread currently supports connect, read, and cancel io. So far, I haven't needed to put any + writes on this thread, but if needed, it could be done. The client write buffer would need to be + copied to ensure that it doesn't get invalidated. + + *** How does one use this class? + Create a TOverlappedWorkItem, and fill in the action and "h", then call reset(). Your work item + is now ready to be submitted to the overlapped submission thread. Create a TAutoOverlapThread, + and call thread->addWorkItem with your work item. After addWorkItem completes, you may inspect + last_error and success. At some point in the future, call workItem.overlappedResults to wait + until the operation has completed. +*/ + +namespace apache { namespace thrift { namespace transport { + +DECLSPEC_ALIGN(MEMORY_ALLOCATION_ALIGNMENT) struct TOverlappedWorkItem : public SLIST_ENTRY { + TOverlappedWorkItem(); + + enum action_t { + UNKNOWN = 3000, + CONNECT, + READ, + CANCELIO, + STOP, + }; + + TAutoResetEvent doneSubmittingEvent; + action_t action; + HANDLE h; + uint8_t *buffer; + uint32_t buffer_len; + OVERLAPPED overlap; + + DWORD last_error; + BOOL success; + + void reset(uint8_t *buf, uint32_t len, HANDLE event); + uint32_t overlappedResults(bool signal_failure = true); + bool process(); +}; + +class TOverlappedSubmissionThread : boost::noncopyable +{ +public: + void addWorkItem(TOverlappedWorkItem *item); + +//singleton stuff +public: + static TOverlappedSubmissionThread *acquire_instance(); + static void release_instance(); +private: + static TCriticalSection instanceGuard_; + static TOverlappedSubmissionThread *instance_; + static uint32_t instanceRefCount_; + +//thread details +private: + TOverlappedSubmissionThread(); + ~TOverlappedSubmissionThread(); + void run(); + static unsigned __stdcall thread_proc(void *addr); + +private: + DECLSPEC_ALIGN(MEMORY_ALLOCATION_ALIGNMENT) SLIST_HEADER workList_; + TOverlappedWorkItem stopItem_; + TAutoResetEvent workAvailableEvent_; + HANDLE thread_; +}; + +class TAutoOverlapThread : boost::noncopyable { +private: + TOverlappedSubmissionThread *p; +public: + TAutoOverlapThread() : p(TOverlappedSubmissionThread::acquire_instance()) {} + ~TAutoOverlapThread() {TOverlappedSubmissionThread::release_instance();} + TOverlappedSubmissionThread *operator->() {return p;} +}; + +}}} //apache::thrift::transport + +#endif diff --git a/lib/cpp/src/thrift/windows/Sync.h b/lib/cpp/src/thrift/windows/Sync.h new file mode 100644 index 00000000..ded6ea34 --- /dev/null +++ b/lib/cpp/src/thrift/windows/Sync.h @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_WINDOWS_Sync_H_ +#define _THRIFT_WINDOWS_Sync_H_ 1 + +#ifndef _WIN32 +#error "windows/Sync.h is only usable on Windows" +#endif + +#include +#include +#include + +/* + Lightweight synchronization objects that only make sense on Windows. For cross-platform + code, use the classes found in the concurrency namespace +*/ + +namespace apache { namespace thrift { + +struct TCriticalSection : boost::noncopyable { + CRITICAL_SECTION cs; + TCriticalSection() {InitializeCriticalSection(&cs);} + ~TCriticalSection() {DeleteCriticalSection(&cs);} +}; + +class TAutoCrit : boost::noncopyable { +private: + CRITICAL_SECTION *cs_; +public: + explicit TAutoCrit(TCriticalSection &cs) : cs_(&cs.cs) {EnterCriticalSection(cs_);} + ~TAutoCrit() {LeaveCriticalSection(cs_);} +}; + +struct TAutoResetEvent : boost::noncopyable { + HANDLE h; + + TAutoResetEvent() { + h = CreateEvent( NULL, FALSE, FALSE, NULL); + if(h == NULL) { + GlobalOutput.perror("TAutoResetEvent unable to create event, GLE=", GetLastError()); + throw apache::thrift::concurrency::SystemResourceException("CreateEvent failed"); + } + } + ~TAutoResetEvent() {CloseHandle(h);} +}; + +struct TManualResetEvent : boost::noncopyable { + HANDLE h; + + TManualResetEvent() { + h = CreateEvent( NULL, TRUE, FALSE, NULL); + if(h == NULL) { + GlobalOutput.perror("TManualResetEvent unable to create event, GLE=", GetLastError()); + throw apache::thrift::concurrency::SystemResourceException("CreateEvent failed"); + } + } + ~TManualResetEvent() {CloseHandle(h);} +}; + +struct TAutoHandle : boost::noncopyable { + HANDLE h; + explicit TAutoHandle(HANDLE h_ = INVALID_HANDLE_VALUE) : h(h_) {} + ~TAutoHandle() { + if(h != INVALID_HANDLE_VALUE) + CloseHandle(h); + } + + HANDLE release() { + HANDLE retval = h; + h = INVALID_HANDLE_VALUE; + return retval; + } + void reset(HANDLE h_ = INVALID_HANDLE_VALUE) { + if(h_ == h) + return; + if(h != INVALID_HANDLE_VALUE) + CloseHandle(h); + h = h_; + } +}; + +}} //apache::thrift + +#endif -- 2.17.1