From: David Reiss Date: Wed, 6 Oct 2010 17:09:42 +0000 (+0000) Subject: THRIFT-928. cpp: Include request/response size in processor callbacks X-Git-Tag: 0.6.0~131 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=ef7200f6e825db54edfa4736192446c96b2ae1d4;p=common%2Fthrift.git THRIFT-928. cpp: Include request/response size in processor callbacks Required updating transport interface. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005129 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/compiler/cpp/src/generate/t_cpp_generator.cc b/compiler/cpp/src/generate/t_cpp_generator.cc index c2f9e2e7..e372b974 100644 --- a/compiler/cpp/src/generate/t_cpp_generator.cc +++ b/compiler/cpp/src/generate/t_cpp_generator.cc @@ -2351,9 +2351,9 @@ void t_cpp_generator::generate_process_function(t_service* tservice, indent() << argsname << " args;" << endl << indent() << "args.read(iprot);" << endl << indent() << "iprot->readMessageEnd();" << endl << - indent() << "iprot->getTransport()->readEnd();" << endl << endl << + indent() << "uint32_t bytes = iprot->getTransport()->readEnd();" << endl << endl << indent() << "if (eventHandler_.get() != NULL) {" << endl << - indent() << " eventHandler_->postRead(ctx, \"" << tfunction->get_name() << "\");" << endl << + indent() << " eventHandler_->postRead(ctx, \"" << tfunction->get_name() << "\", bytes);" << endl << indent() << "}" << endl << endl; @@ -2460,10 +2460,10 @@ void t_cpp_generator::generate_process_function(t_service* tservice, indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl << indent() << "result.write(oprot);" << endl << indent() << "oprot->writeMessageEnd();" << endl << - indent() << "oprot->getTransport()->writeEnd();" << endl << + indent() << "bytes = oprot->getTransport()->writeEnd();" << endl << indent() << "oprot->getTransport()->flush();" << endl << endl << indent() << "if (eventHandler_.get() != NULL) {" << endl << - indent() << " eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\");" << endl << + indent() << " eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\", bytes);" << endl << indent() << "}" << endl; // Close function diff --git a/lib/cpp/src/TProcessor.h b/lib/cpp/src/TProcessor.h index 896f5ae6..7858166e 100644 --- a/lib/cpp/src/TProcessor.h +++ b/lib/cpp/src/TProcessor.h @@ -59,7 +59,7 @@ class TProcessorEventHandler { /** * Called between reading arguments and calling the handler. */ - virtual void postRead(void* ctx, const char* fn_name) {} + virtual void postRead(void* ctx, const char* fn_name, uint32_t bytes) {} /** * Called between calling the handler and writing the response. @@ -69,7 +69,7 @@ class TProcessorEventHandler { /** * Called after writing the response. */ - virtual void postWrite(void* ctx, const char* fn_name) {} + virtual void postWrite(void* ctx, const char* fn_name, uint32_t bytes) {} /** * Called when an async function call completes successfully. diff --git a/lib/cpp/src/transport/TBufferTransports.cpp b/lib/cpp/src/transport/TBufferTransports.cpp index 6097130b..c76f661e 100644 --- a/lib/cpp/src/transport/TBufferTransports.cpp +++ b/lib/cpp/src/transport/TBufferTransports.cpp @@ -262,6 +262,10 @@ void TFramedTransport::flush() { transport_->flush(); } +uint32_t TFramedTransport::writeEnd() { + return wBase_ - wBuf_.get(); +} + const uint8_t* TFramedTransport::borrowSlow(uint8_t* buf, uint32_t* len) { // Don't try to be clever with shifting buffers. // If the fast path failed let the protocol use its slow path. @@ -269,6 +273,10 @@ const uint8_t* TFramedTransport::borrowSlow(uint8_t* buf, uint32_t* len) { return NULL; } +uint32_t TFramedTransport::readEnd() { + // include framing bytes + return rBound_ - rBuf_.get() + sizeof(uint32_t); +} void TMemoryBuffer::computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give) { // Correct rBound_ so we can use the fast path in the future. diff --git a/lib/cpp/src/transport/TBufferTransports.h b/lib/cpp/src/transport/TBufferTransports.h index b542fd5e..f81a6a0d 100644 --- a/lib/cpp/src/transport/TBufferTransports.h +++ b/lib/cpp/src/transport/TBufferTransports.h @@ -348,6 +348,10 @@ class TFramedTransport : public TBufferBase { virtual void flush(); + uint32_t readEnd(); + + uint32_t writeEnd(); + const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len); boost::shared_ptr getUnderlyingTransport() { @@ -612,10 +616,18 @@ class TMemoryBuffer : public TBufferBase { uint32_t readAppendToString(std::string& str, uint32_t len); - void readEnd() { + // return number of bytes read + uint32_t readEnd() { + uint32_t bytes = rBase_ - buffer_; if (rBase_ == wBase_) { resetBuffer(); } + return bytes; + } + + // Return number of bytes written + uint32_t writeEnd() { + return wBase_ - buffer_; } uint32_t available_read() const { diff --git a/lib/cpp/src/transport/THttpTransport.cpp b/lib/cpp/src/transport/THttpTransport.cpp index 4010d6b8..0934f1b6 100644 --- a/lib/cpp/src/transport/THttpTransport.cpp +++ b/lib/cpp/src/transport/THttpTransport.cpp @@ -66,13 +66,14 @@ uint32_t THttpTransport::read(uint8_t* buf, uint32_t len) { return readBuffer_.read(buf, len); } -void THttpTransport::readEnd() { +uint32_t THttpTransport::readEnd() { // Read any pending chunked data (footers etc.) if (chunked_) { while (!chunkedDone_) { readChunked(); } } + return 0; } uint32_t THttpTransport::readMoreData() { diff --git a/lib/cpp/src/transport/THttpTransport.h b/lib/cpp/src/transport/THttpTransport.h index e71dcbd1..cd58bcb7 100644 --- a/lib/cpp/src/transport/THttpTransport.h +++ b/lib/cpp/src/transport/THttpTransport.h @@ -55,7 +55,7 @@ class THttpTransport : public TTransport { uint32_t read(uint8_t* buf, uint32_t len); - void readEnd(); + uint32_t readEnd(); void write(const uint8_t* buf, uint32_t len); diff --git a/lib/cpp/src/transport/TTransport.h b/lib/cpp/src/transport/TTransport.h index f9e20cea..b9c35f0b 100644 --- a/lib/cpp/src/transport/TTransport.h +++ b/lib/cpp/src/transport/TTransport.h @@ -116,10 +116,11 @@ class TTransport { * This can be over-ridden to perform a transport-specific action * e.g. logging the request to a file * + * @return number of bytes read if available, 0 otherwise. */ - virtual void readEnd() { + virtual uint32_t readEnd() { // default behaviour is to do nothing - return; + return 0; } /** @@ -137,10 +138,11 @@ class TTransport { * This can be over-ridden to perform a transport-specific action * at the end of a request. * + * @return number of bytes written if available, 0 otherwise */ - virtual void writeEnd() { + virtual uint32_t writeEnd() { // default behaviour is to do nothing - return; + return 0; } /** @@ -149,7 +151,9 @@ class TTransport { * * @throws TTransportException if an error occurs */ - virtual void flush() {} + virtual void flush() { + // default behaviour is to do nothing + } /** * Attempts to return a pointer to \c len bytes, possibly copied into \c buf. diff --git a/lib/cpp/src/transport/TTransportUtils.cpp b/lib/cpp/src/transport/TTransportUtils.cpp index a840fa6c..72289bca 100644 --- a/lib/cpp/src/transport/TTransportUtils.cpp +++ b/lib/cpp/src/transport/TTransportUtils.cpp @@ -135,16 +135,16 @@ uint32_t TPipedFileReaderTransport::readAll(uint8_t* buf, uint32_t len) { return have; } -void TPipedFileReaderTransport::readEnd() { - TPipedTransport::readEnd(); +uint32_t TPipedFileReaderTransport::readEnd() { + return TPipedTransport::readEnd(); } void TPipedFileReaderTransport::write(const uint8_t* buf, uint32_t len) { TPipedTransport::write(buf, len); } -void TPipedFileReaderTransport::writeEnd() { - TPipedTransport::writeEnd(); +uint32_t TPipedFileReaderTransport::writeEnd() { + return TPipedTransport::writeEnd(); } void TPipedFileReaderTransport::flush() { diff --git a/lib/cpp/src/transport/TTransportUtils.h b/lib/cpp/src/transport/TTransportUtils.h index d65c9167..8b0c076f 100644 --- a/lib/cpp/src/transport/TTransportUtils.h +++ b/lib/cpp/src/transport/TTransportUtils.h @@ -136,7 +136,7 @@ class TPipedTransport : virtual public TTransport { uint32_t read(uint8_t* buf, uint32_t len); - void readEnd() { + uint32_t readEnd() { if (pipeOnRead_) { dstTrans_->write(rBuf_, rPos_); @@ -148,18 +148,22 @@ class TPipedTransport : virtual public TTransport { // If requests are being pipelined, copy down our read-ahead data, // then reset our state. int read_ahead = rLen_ - rPos_; + uint32_t bytes = rPos_; memcpy(rBuf_, rBuf_ + rPos_, read_ahead); rPos_ = 0; rLen_ = read_ahead; + + return bytes; } void write(const uint8_t* buf, uint32_t len); - void writeEnd() { + uint32_t writeEnd() { if (pipeOnWrite_) { dstTrans_->write(wBuf_, wLen_); dstTrans_->flush(); } + return wLen_; } void flush(); @@ -237,9 +241,9 @@ class TPipedFileReaderTransport : public TPipedTransport, void close(); uint32_t read(uint8_t* buf, uint32_t len); uint32_t readAll(uint8_t* buf, uint32_t len); - void readEnd(); + uint32_t readEnd(); void write(const uint8_t* buf, uint32_t len); - void writeEnd(); + uint32_t writeEnd(); void flush(); // TFileReaderTransport functions diff --git a/test/cpp/src/TestServer.cpp b/test/cpp/src/TestServer.cpp index 18bdc548..369237e5 100644 --- a/test/cpp/src/TestServer.cpp +++ b/test/cpp/src/TestServer.cpp @@ -298,13 +298,13 @@ class TestProcessorEventHandler : public TProcessorEventHandler { virtual void preRead(void* ctx, const char* fn_name) { communicate("preRead", ctx, fn_name); } - virtual void postRead(void* ctx, const char* fn_name) { + virtual void postRead(void* ctx, const char* fn_name, uint32_t bytes) { communicate("postRead", ctx, fn_name); } virtual void preWrite(void* ctx, const char* fn_name) { communicate("preWrite", ctx, fn_name); } - virtual void postWrite(void* ctx, const char* fn_name) { + virtual void postWrite(void* ctx, const char* fn_name, uint32_t bytes) { communicate("postWrite", ctx, fn_name); } virtual void asyncComplete(void* ctx, const char* fn_name) {