From: Roger Meier Date: Tue, 5 Jun 2012 19:57:10 +0000 (+0000) Subject: Thrift-1558 _xplatform_pipe_6-5-2012.patch X-Git-Tag: 0.9.1~348 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=7d0a0401534e0a02206d7ddb734e6a7dfa41bb42;p=common%2Fthrift.git Thrift-1558 _xplatform_pipe_6-5-2012.patch Patch: Peace C git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1346555 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am index 4834e9f0..4a7ef836 100755 --- a/lib/cpp/Makefile.am +++ b/lib/cpp/Makefile.am @@ -71,6 +71,8 @@ libthrift_la_SOURCES = src/thrift/Thrift.cpp \ src/thrift/transport/THttpClient.cpp \ src/thrift/transport/THttpServer.cpp \ src/thrift/transport/TSocket.cpp \ + src/thrift/transport/TPipe.cpp \ + src/thrift/transport/TPipeServer.cpp \ src/thrift/transport/TSSLSocket.cpp \ src/thrift/transport/TSocketPool.cpp \ src/thrift/transport/TServerSocket.cpp \ @@ -169,6 +171,8 @@ include_transport_HEADERS = \ src/thrift/transport/THttpClient.h \ src/thrift/transport/THttpServer.h \ src/thrift/transport/TSocket.h \ + src/thrift/transport/TPipe.h \ + src/thrift/transport/TPipeServer.h \ src/thrift/transport/TSSLSocket.h \ src/thrift/transport/TSocketPool.h \ src/thrift/transport/TVirtualTransport.h \ diff --git a/lib/cpp/src/thrift/transport/TPipe.cpp b/lib/cpp/src/thrift/transport/TPipe.cpp index 2c7cf56d..ae98a478 100644 --- a/lib/cpp/src/thrift/transport/TPipe.cpp +++ b/lib/cpp/src/thrift/transport/TPipe.cpp @@ -17,8 +17,6 @@ * under the License. */ -#ifdef _WIN32 - #include "TTransportException.h" #include "TPipe.h" @@ -31,33 +29,56 @@ using namespace std; */ //---- Constructors ---- -TPipe::TPipe(HANDLE hpipe) : +TPipe::TPipe(int Pipe) : pipename_(""), - hPipe_(hpipe), + Pipe_(Pipe), TimeoutSeconds_(3), isAnonymous(false) -{} +{ +#ifndef _WIN32 + GlobalOutput.perror("TPipe: constructor using a pipe handle is not supported under *NIX", -99); + throw TTransportException(TTransportException::NOT_OPEN, " constructor using a pipe handle is not supported under *NIX"); +#endif +} TPipe::TPipe(string pipename) : pipename_(pipename), - hPipe_(INVALID_HANDLE_VALUE), + Pipe_(-1), TimeoutSeconds_(3), isAnonymous(false) -{} +{ +#ifdef _WIN32 + if(pipename_.find("\\\\") == -1) { + pipename_ = "\\\\.\\pipe\\" + pipename_; + } +#else + dsocket.reset(new TSocket(pipename)); +#endif +} -TPipe::TPipe(HANDLE hPipeRd, HANDLE hPipeWrt) : +TPipe::TPipe(int PipeRd, int PipeWrt) : pipename_(""), - hPipe_(hPipeRd), - hPipeWrt_(hPipeWrt), + Pipe_(PipeRd), + PipeWrt_(PipeWrt), TimeoutSeconds_(3), isAnonymous(true) -{} +{ +#ifndef _WIN32 + GlobalOutput.perror("TPipe: Anonymous pipes not yet supported under *NIX", -99); + throw TTransportException(TTransportException::NOT_OPEN, " Anonymous pipes not yet supported under *NIX"); +#endif +} TPipe::TPipe() : pipename_(""), - hPipe_(INVALID_HANDLE_VALUE), + Pipe_(-1), TimeoutSeconds_(3) -{} +{ +#ifndef _WIN32 + GlobalOutput.perror("TPipe: Anonymous pipes not yet supported under *NIX", -99); + throw TTransportException(TTransportException::NOT_OPEN, " Anonymous pipes not yet supported under *NIX"); +#endif +} //---- Destructor ---- TPipe::~TPipe() { @@ -66,20 +87,22 @@ TPipe::~TPipe() { bool TPipe::isOpen() { - return (hPipe_ != INVALID_HANDLE_VALUE); + return (Pipe_ != -1); } //--------------------------------------------------------- // Transport callbacks //--------------------------------------------------------- +#ifdef _WIN32 //Windows callbacks + bool TPipe::peek() { if (!isOpen()) { return false; } DWORD bytesavail = 0; int PeekRet = 0; - PeekRet = PeekNamedPipe(hPipe_, NULL, 0, NULL, &bytesavail, NULL); + PeekRet = PeekNamedPipe((HANDLE)Pipe_, NULL, 0, NULL, &bytesavail, NULL); return (PeekRet != 0 && bytesavail > 0); } @@ -90,6 +113,7 @@ void TPipe::open() { int SleepInterval = 500; //ms int retries = TimeoutSeconds_ * 1000 / SleepInterval; + HANDLE hPipe_; for(int i=0; ipeek(); +} + +void TPipe::open() { + dsocket->open(); +} + +void TPipe::close() { + dsocket->close(); +} + +uint32_t TPipe::read(uint8_t* buf, uint32_t len) { + return dsocket->read(buf, len); +} + +void TPipe::write(const uint8_t* buf, uint32_t len) { + dsocket->write(buf, len); +} +#endif //callbacks + //--------------------------------------------------------- // Accessors @@ -180,20 +227,20 @@ void TPipe::setPipename(std::string pipename) { pipename_ = pipename; } -HANDLE TPipe::getPipeHandle() { - return hPipe_; +int TPipe::getPipeHandle() { + return Pipe_; } -void TPipe::setPipeHandle(HANDLE pipehandle) { - hPipe_ = pipehandle; +void TPipe::setPipeHandle(int pipehandle) { + Pipe_ = pipehandle; } -HANDLE TPipe::getWrtPipeHandle() { - return hPipeWrt_; +int TPipe::getWrtPipeHandle() { + return PipeWrt_; } -void TPipe::setWrtPipeHandle(HANDLE pipehandle) { - hPipeWrt_ = pipehandle; +void TPipe::setWrtPipeHandle(int pipehandle) { + PipeWrt_ = pipehandle; } long TPipe::getConnectTimeout() { @@ -205,5 +252,3 @@ void TPipe::setConnectTimeout(long seconds) { } }}} // apache::thrift::transport - -#endif //_WIN32 diff --git a/lib/cpp/src/thrift/transport/TPipe.h b/lib/cpp/src/thrift/transport/TPipe.h index bca3e270..942f54f7 100644 --- a/lib/cpp/src/thrift/transport/TPipe.h +++ b/lib/cpp/src/thrift/transport/TPipe.h @@ -19,10 +19,12 @@ #ifndef _THRIFT_TRANSPORT_TPIPE_H_ #define _THRIFT_TRANSPORT_TPIPE_H_ 1 -#ifdef _WIN32 -#include "TTransport.h" -#include "TVirtualTransport.h" +#include +#include +#ifndef _WIN32 +# include "TSocket.h" +#endif namespace apache { namespace thrift { namespace transport { @@ -36,10 +38,10 @@ class TPipe : public TVirtualTransport { // Constructs a new pipe object. TPipe(); // Named pipe constructors - - TPipe(HANDLE hPipe); - TPipe(std::string path); + TPipe(int Pipe); + TPipe(std::string pipename); // Anonymous pipe - - TPipe(HANDLE hPipeRd, HANDLE hPipeWrt); + TPipe(int PipeRd, int PipeWrt); // Destroys the pipe object, closing it if necessary. virtual ~TPipe(); @@ -66,24 +68,27 @@ class TPipe : public TVirtualTransport { //Accessors std::string getPipename(); void setPipename(std::string pipename); - HANDLE getPipeHandle(); //doubles as the read handle for anon pipe - void setPipeHandle(HANDLE pipehandle); - HANDLE getWrtPipeHandle(); - void setWrtPipeHandle(HANDLE pipehandle); + int getPipeHandle(); //doubles as the read handle for anon pipe + void setPipeHandle(int pipehandle); + int getWrtPipeHandle(); + void setWrtPipeHandle(int pipehandle); long getConnectTimeout(); void setConnectTimeout(long seconds); private: std::string pipename_; //Named pipe handles are R/W, while anonymous pipes are one or the other (half duplex). - HANDLE hPipe_, hPipeWrt_; + int Pipe_, PipeWrt_; long TimeoutSeconds_; bool isAnonymous; +#ifndef _WIN32 + //*NIX named pipe implementation uses domain socket + boost::shared_ptr dsocket; +#endif }; }}} // apache::thrift::transport -#endif //_WIN32 #endif // #ifndef _THRIFT_TRANSPORT_TPIPE_H_ diff --git a/lib/cpp/src/thrift/transport/TPipeServer.cpp b/lib/cpp/src/thrift/transport/TPipeServer.cpp index 6f2f73db..73a52196 100644 --- a/lib/cpp/src/thrift/transport/TPipeServer.cpp +++ b/lib/cpp/src/thrift/transport/TPipeServer.cpp @@ -17,8 +17,6 @@ * under the License. */ -#ifdef _WIN32 - #ifdef HAVE_CONFIG_H #include #endif @@ -27,8 +25,10 @@ #include "TPipe.h" #include "TPipeServer.h" #include -#include -#include +#ifdef _WIN32 +# include +# include +#endif //_WIN32 namespace apache { namespace thrift { namespace transport { @@ -39,15 +39,23 @@ using boost::shared_ptr; TPipeServer::TPipeServer(string pipename, uint32_t bufsize) : pipename_(pipename), bufsize_(bufsize), - hPipe_(INVALID_HANDLE_VALUE), - isAnonymous(false), - maxconns_(TPIPE_SERVER_MAX_CONNS_DEFAULT) - {} + Pipe_(-1), + maxconns_(TPIPE_SERVER_MAX_CONNS_DEFAULT), + isAnonymous(false) + { +#ifdef _WIN32 + if(pipename_.find("\\\\") == 0) { + pipename_ = "\\\\.\\pipe\\" + pipename_; + } +#else + dsrvsocket.reset(new TServerSocket(pipename)); +#endif + } TPipeServer::TPipeServer(string pipename, uint32_t bufsize, uint32_t maxconnections) : pipename_(pipename), bufsize_(bufsize), - hPipe_(INVALID_HANDLE_VALUE), + Pipe_(-1), isAnonymous(false) { //Restrict maxconns_ to 1-255 if(maxconnections == 0) @@ -56,23 +64,40 @@ TPipeServer::TPipeServer(string pipename, uint32_t bufsize, uint32_t maxconnecti maxconns_ = 255; else maxconns_ = maxconnections; + +#ifdef _WIN32 + if(pipename_.find("\\\\") == -1) { + pipename_ = "\\\\.\\pipe\\" + pipename_; + } +#else + dsrvsocket.reset(new TServerSocket(pipename)); +#endif } TPipeServer::TPipeServer(string pipename) : pipename_(pipename), bufsize_(1024), - hPipe_(INVALID_HANDLE_VALUE), - isAnonymous(false), - maxconns_(TPIPE_SERVER_MAX_CONNS_DEFAULT) - {} + Pipe_(-1), + maxconns_(TPIPE_SERVER_MAX_CONNS_DEFAULT), + isAnonymous(false) + { +#ifdef _WIN32 + if(pipename_.find("\\\\") == 0) { + pipename_ = "\\\\.\\pipe\\" + pipename_; + } +#else + dsrvsocket.reset(new TServerSocket(pipename)); +#endif + } TPipeServer::TPipeServer(int bufsize) : pipename_(""), bufsize_(bufsize), - hPipe_(INVALID_HANDLE_VALUE), - isAnonymous(true), - maxconns_(1) + Pipe_(-1), + maxconns_(1), + isAnonymous(true) { +#ifdef _WIN32 //The anonymous pipe needs to be created first so that the server can //pass the handles on to the client before the serve (acceptImpl) //blocking call. @@ -80,19 +105,28 @@ TPipeServer::TPipeServer(int bufsize) : GlobalOutput.perror("TPipeServer Create(Anon)Pipe failed, GLE=", GetLastError()); throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer Create(Anon)Pipe failed"); } +#else + GlobalOutput.perror("TPipeServer: Anonymous pipes not yet supported under *NIX", -99); + throw TTransportException(TTransportException::NOT_OPEN, " Anonymous pipes not yet supported under *NIX"); +#endif } TPipeServer::TPipeServer() : pipename_(""), bufsize_(1024), - hPipe_(INVALID_HANDLE_VALUE), - isAnonymous(true), - maxconns_(1) + Pipe_(-1), + maxconns_(1), + isAnonymous(true) { +#ifdef _WIN32 if (!TCreateAnonPipe()) { GlobalOutput.perror("TPipeServer Create(Anon)Pipe failed, GLE=", GetLastError()); throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer Create(Anon)Pipe failed"); } +#else + GlobalOutput.perror("TPipeServer: Anonymous pipes not yet supported under *NIX", -99); + throw TTransportException(TTransportException::NOT_OPEN, " Anonymous pipes not yet supported under *NIX"); +#endif } //---- Destructor ---- @@ -104,6 +138,8 @@ TPipeServer::~TPipeServer() { // Transport callbacks //--------------------------------------------------------- +#ifdef _WIN32 + shared_ptr TPipeServer::acceptImpl() { shared_ptr client; @@ -113,7 +149,7 @@ shared_ptr TPipeServer::acceptImpl() { byte buf; DWORD br; int fSuccess = ReadFile( - hPipe_, // pipe handle + (HANDLE)Pipe_, // pipe handle &buf, // buffer to receive reply 0, // size of buffer &br, // number of bytes read @@ -123,7 +159,7 @@ shared_ptr TPipeServer::acceptImpl() { GlobalOutput.perror("TPipeServer unable to initiate pipe comms, GLE=", GetLastError()); throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer unable to initiate pipe comms"); } - client.reset(new TPipe(hPipe_, hPipeW_)); + client.reset(new TPipe(Pipe_, PipeW_)); } else { //Named Pipe @@ -138,7 +174,7 @@ shared_ptr TPipeServer::acceptImpl() { // Wait for the client to connect; if it succeeds, the // function returns a nonzero value. If the function returns // zero, GetLastError should return ERROR_PIPE_CONNECTED. - ConnectRet = ConnectNamedPipe(hPipe_, NULL) ? + ConnectRet = ConnectNamedPipe((HANDLE)Pipe_, NULL) ? TRUE : (GetLastError() == ERROR_PIPE_CONNECTED); if (ConnectRet == TRUE) @@ -153,34 +189,34 @@ shared_ptr TPipeServer::acceptImpl() { throw TTransportException(TTransportException::NOT_OPEN, "TPipeServer: client connection failed"); } } - client.reset(new TPipe(hPipe_)); + client.reset(new TPipe(Pipe_)); } return client; } void TPipeServer::interrupt() { - if(hPipe_ != INVALID_HANDLE_VALUE) { - CancelIo(hPipe_); + if(Pipe_ != -1) { + CancelIo((HANDLE)Pipe_); } } void TPipeServer::close() { if(!isAnonymous) { - if(hPipe_ != INVALID_HANDLE_VALUE) { - DisconnectNamedPipe(hPipe_); - CloseHandle(hPipe_); - hPipe_ = INVALID_HANDLE_VALUE; + if(Pipe_ != -1) { + DisconnectNamedPipe((HANDLE)Pipe_); + CloseHandle((HANDLE)Pipe_); + Pipe_ = -1; } } else { try { - CloseHandle(hPipe_); - CloseHandle(hPipeW_); - CloseHandle(ClientAnonRead); - CloseHandle(ClientAnonWrite); + CloseHandle((HANDLE)Pipe_); + CloseHandle((HANDLE)PipeW_); + CloseHandle((HANDLE)ClientAnonRead); + CloseHandle((HANDLE)ClientAnonWrite); } catch(...) { GlobalOutput.perror("TPipeServer anon close GLE=", GetLastError()); @@ -219,7 +255,7 @@ bool TPipeServer::TCreateNamedPipe() { sa.bInheritHandle = FALSE; // Create an instance of the named pipe - hPipe_ = CreateNamedPipe( + HANDLE hPipe_ = CreateNamedPipe( pipename_.c_str(), // pipe name PIPE_ACCESS_DUPLEX, // read/write access PIPE_TYPE_MESSAGE | // message type pipe @@ -230,7 +266,16 @@ bool TPipeServer::TCreateNamedPipe() { 0, // client time-out &sa); // default security attribute - return (hPipe_ != INVALID_HANDLE_VALUE); + if(hPipe_ == INVALID_HANDLE_VALUE) + { + Pipe_ = -1; + GlobalOutput.perror("TPipeServer::TCreateNamedPipe() GLE=", GetLastError()); + throw TTransportException(TTransportException::NOT_OPEN, "TCreateNamedPipe() failed", GetLastError()); + return false; + } + + Pipe_ = (int)hPipe_; + return true; } bool TPipeServer::TCreateAnonPipe() { @@ -244,22 +289,55 @@ bool TPipeServer::TCreateAnonPipe() { sa.nLength = sizeof(SECURITY_ATTRIBUTES); sa.bInheritHandle = true; //allow passing handle to child - if (!CreatePipe(&ClientAnonRead,&hPipeW_,&sa,0)) //create stdin pipe + HANDLE ClientAnonReadH, PipeW_H, ClientAnonWriteH, Pipe_H; + if (!CreatePipe(&ClientAnonReadH,&PipeW_H,&sa,0)) //create stdin pipe { GlobalOutput.perror("TPipeServer CreatePipe (anon) failed, GLE=", GetLastError()); return false; } - if (!CreatePipe(&hPipe_,&ClientAnonWrite,&sa,0)) //create stdout pipe + if (!CreatePipe(&Pipe_H,&ClientAnonWriteH,&sa,0)) //create stdout pipe { GlobalOutput.perror("TPipeServer CreatePipe (anon) failed, GLE=", GetLastError()); - CloseHandle(ClientAnonRead); - CloseHandle(hPipeW_); + CloseHandle(ClientAnonReadH); + CloseHandle(PipeW_H); return false; } + ClientAnonRead = (int)ClientAnonReadH; + ClientAnonWrite = (int)ClientAnonWriteH; + Pipe_ = (int)Pipe_H; + PipeW_ = (int)PipeW_H; return true; } +#else +//*NIX implementation uses Unix Domain Sockets. +void TPipeServer::listen() { + dsrvsocket->listen(); +} + +shared_ptr TPipeServer::acceptImpl() { +// return boost::shared_dynamic_cast(dsrvsocket)->accept(); + return dsrvsocket->accept(); +} + +void TPipeServer::interrupt() { + dsrvsocket->interrupt(); +} + +void TPipeServer::close() { + dsrvsocket->close(); +} + +bool TPipeServer::TCreateNamedPipe() { + return false; //placeholder +} + +bool TPipeServer::TCreateAnonPipe() { + return false; //currently unimplemented +} +#endif //_WIN32 + //--------------------------------------------------------- // Accessors @@ -281,21 +359,21 @@ void TPipeServer::setBufferSize(int bufsize) { bufsize_ = bufsize; } -HANDLE TPipeServer::getPipeHandle() { - return hPipe_; +int TPipeServer::getPipeHandle() { + return Pipe_; } -HANDLE TPipeServer::getWrtPipeHandle() +int TPipeServer::getWrtPipeHandle() { - return hPipeW_; + return PipeW_; } -HANDLE TPipeServer::getClientRdPipeHandle() +int TPipeServer::getClientRdPipeHandle() { return ClientAnonRead; } -HANDLE TPipeServer::getClientWrtPipeHandle() +int TPipeServer::getClientWrtPipeHandle() { return ClientAnonWrite; } @@ -309,5 +387,3 @@ void TPipeServer::setAnonymous(bool anon) { } }}} // apache::thrift::transport - -#endif //_WIN32 diff --git a/lib/cpp/src/thrift/transport/TPipeServer.h b/lib/cpp/src/thrift/transport/TPipeServer.h index 1732546a..a5c4528b 100644 --- a/lib/cpp/src/thrift/transport/TPipeServer.h +++ b/lib/cpp/src/thrift/transport/TPipeServer.h @@ -19,10 +19,12 @@ #ifndef _THRIFT_TRANSPORT_TSERVERWINPIPES_H_ #define _THRIFT_TRANSPORT_TSERVERWINPIPES_H_ 1 -#ifdef _WIN32 #include "TServerTransport.h" #include +#ifndef _WIN32 +# include "TServerSocket.h" +#endif #define TPIPE_SERVER_MAX_CONNS_DEFAULT 10 @@ -46,7 +48,6 @@ class TPipeServer : public TServerTransport { ~TPipeServer(); //Standard transport callbacks - //void listen(); //Unnecessary for Windows pipes void interrupt(); void close(); protected: @@ -61,10 +62,10 @@ class TPipeServer : public TServerTransport { void setPipename(std::string pipename); int getBufferSize(); void setBufferSize(int bufsize); - HANDLE getPipeHandle(); //Named Pipe R/W -or- Anonymous pipe Read handle - HANDLE getWrtPipeHandle(); - HANDLE getClientRdPipeHandle(); - HANDLE getClientWrtPipeHandle(); + int getPipeHandle(); //Named Pipe R/W -or- Anonymous pipe Read handle + int getWrtPipeHandle(); + int getClientRdPipeHandle(); + int getClientWrtPipeHandle(); bool getAnonymous(); void setAnonymous(bool anon); @@ -72,14 +73,20 @@ class TPipeServer : public TServerTransport { std::string pipename_; uint32_t bufsize_; uint32_t maxconns_; - HANDLE hPipe_; //Named Pipe (R/W) or Anonymous Pipe (R) - HANDLE hPipeW_; //Anonymous Pipe (W) - HANDLE ClientAnonRead, ClientAnonWrite; //Client side anonymous pipe handles + int Pipe_; //Named Pipe (R/W) or Anonymous Pipe (R) + int PipeW_; //Anonymous Pipe (W) + int ClientAnonRead, ClientAnonWrite; //Client side anonymous pipe handles //? Do we need duplicates to send to client? bool isAnonymous; + +public: +#ifndef _WIN32 + //*NIX named pipe implementation uses domain socket + void listen(); //Only needed for domain sockets + boost::shared_ptr dsrvsocket; +#endif }; }}} // apache::thrift::transport -#endif //_WIN32 #endif // #ifndef _THRIFT_TRANSPORT_TSERVERWINPIPES_H_