From 7d95246f16234deba11f82c830d1970f6ab14286 Mon Sep 17 00:00:00 2001 From: Jens Geyer Date: Fri, 26 Jul 2013 01:01:11 +0200 Subject: [PATCH] THRIFT-2083 Improve the go lib: buffered Transport, save memory allocation, handle concurrent request Patch: Feng Shen --- lib/go/thrift/binary_protocol.go | 48 +++++------------ lib/go/thrift/buffed_transport.go | 87 +++++++++++++++++++++++++++++++ lib/go/thrift/simple_server.go | 10 ++-- 3 files changed, 107 insertions(+), 38 deletions(-) create mode 100644 lib/go/thrift/buffed_transport.go diff --git a/lib/go/thrift/binary_protocol.go b/lib/go/thrift/binary_protocol.go index 5880f65c..6fb86247 100644 --- a/lib/go/thrift/binary_protocol.go +++ b/lib/go/thrift/binary_protocol.go @@ -24,15 +24,15 @@ import ( "fmt" "io" "math" - "strings" ) type TBinaryProtocol struct { - trans TTransport + trans TTransport strictRead bool strictWrite bool readLength int checkReadLength bool + buffer [8]byte } type TBinaryProtocolFactory struct { @@ -180,33 +180,22 @@ func (p *TBinaryProtocol) WriteByte(value byte) error { } func (p *TBinaryProtocol) WriteI16(value int16) error { - h := byte(0xff & (value >> 8)) - l := byte(0xff & value) - v := []byte{h, l} + v := p.buffer[0:2] + binary.BigEndian.PutUint16(v, uint16(value)) _, e := p.trans.Write(v) return NewTProtocolException(e) } func (p *TBinaryProtocol) WriteI32(value int32) error { - a := byte(0xff & (value >> 24)) - b := byte(0xff & (value >> 16)) - c := byte(0xff & (value >> 8)) - d := byte(0xff & value) - v := []byte{a, b, c, d} + v := p.buffer[0:4] + binary.BigEndian.PutUint32(v, uint32(value)) _, e := p.trans.Write(v) return NewTProtocolException(e) } func (p *TBinaryProtocol) WriteI64(value int64) error { - a := byte(0xff & (value >> 56)) - b := byte(0xff & (value >> 48)) - c := byte(0xff & (value >> 40)) - d := byte(0xff & (value >> 32)) - e := byte(0xff & (value >> 24)) - f := byte(0xff & (value >> 16)) - g := byte(0xff & (value >> 8)) - h := byte(0xff & value) - v := []byte{a, b, c, d, e, f, g, h} + v := p.buffer[:] + binary.BigEndian.PutUint64(v, uint64(value)) _, err := p.trans.Write(v) return NewTProtocolException(err) } @@ -216,7 +205,7 @@ func (p *TBinaryProtocol) WriteDouble(value float64) error { } func (p *TBinaryProtocol) WriteString(value string) error { - return p.WriteBinaryFromReader(strings.NewReader(value), len(value)) + return p.WriteBinary([]byte(value)) } func (p *TBinaryProtocol) WriteBinary(value []byte) error { @@ -228,15 +217,6 @@ func (p *TBinaryProtocol) WriteBinary(value []byte) error { return NewTProtocolException(err) } -func (p *TBinaryProtocol) WriteBinaryFromReader(reader io.Reader, size int) error { - e := p.WriteI32(int32(size)) - if e != nil { - return e - } - _, err := io.CopyN(p.trans, reader, int64(size)) - return NewTProtocolException(err) -} - /** * Reading methods */ @@ -385,34 +365,34 @@ func (p *TBinaryProtocol) ReadBool() (bool, error) { } func (p *TBinaryProtocol) ReadByte() (value byte, err error) { - buf := []byte{0} + buf := p.buffer[0:1] err = p.readAll(buf) return buf[0], err } func (p *TBinaryProtocol) ReadI16() (value int16, err error) { - buf := []byte{0, 0} + buf := p.buffer[0:2] err = p.readAll(buf) value = int16(binary.BigEndian.Uint16(buf)) return value, err } func (p *TBinaryProtocol) ReadI32() (value int32, err error) { - buf := []byte{0, 0, 0, 0} + buf := p.buffer[0:4] err = p.readAll(buf) value = int32(binary.BigEndian.Uint32(buf)) return value, err } func (p *TBinaryProtocol) ReadI64() (value int64, err error) { - buf := []byte{0, 0, 0, 0, 0, 0, 0, 0} + buf := p.buffer[0:8] err = p.readAll(buf) value = int64(binary.BigEndian.Uint64(buf)) return value, err } func (p *TBinaryProtocol) ReadDouble() (value float64, err error) { - buf := []byte{0, 0, 0, 0, 0, 0, 0, 0} + buf := p.buffer[0:8] err = p.readAll(buf) value = math.Float64frombits(binary.BigEndian.Uint64(buf)) return value, err diff --git a/lib/go/thrift/buffed_transport.go b/lib/go/thrift/buffed_transport.go new file mode 100644 index 00000000..1ba30537 --- /dev/null +++ b/lib/go/thrift/buffed_transport.go @@ -0,0 +1,87 @@ +package thrift + +type TBufferedTransportFactory struct { + size int +} + +type TBuffer struct { + buffer []byte + pos, limit int +} + +type TBufferedTransport struct { + tp TTransport + rbuf *TBuffer + wbuf *TBuffer +} + +func (p *TBufferedTransportFactory) GetTransport(trans TTransport) TTransport { + return NewTBufferedTransport(trans, p.size) +} + +func NewTBufferedTransportFactory(bufferSize int) *TBufferedTransportFactory { + return &TBufferedTransportFactory{size: bufferSize} +} + +func NewTBufferedTransport(trans TTransport, bufferSize int) *TBufferedTransport { + rb := &TBuffer{buffer: make([]byte, bufferSize)} + wb := &TBuffer{buffer: make([]byte, bufferSize), limit: bufferSize} + return &TBufferedTransport{tp: trans, rbuf: rb, wbuf: wb} +} + +func (p *TBufferedTransport) IsOpen() bool { + return p.tp.IsOpen() +} + +func (p *TBufferedTransport) Open() (err error) { + return p.tp.Open() +} + +func (p *TBufferedTransport) Close() (err error) { + return p.tp.Close() +} + +func (p *TBufferedTransport) Read(buf []byte) (n int, err error) { + rbuf := p.rbuf + if rbuf.pos == rbuf.limit { // no more data to read from buffer + rbuf.pos = 0 + // read data, fill buffer + rbuf.limit, err = p.tp.Read(rbuf.buffer) + if err != nil { + return 0, err + } + } + n = copy(buf, rbuf.buffer[rbuf.pos:rbuf.limit]) + rbuf.pos += n + return n, nil +} + +func (p *TBufferedTransport) Write(buf []byte) (n int, err error) { + wbuf := p.wbuf + size := len(buf) + if wbuf.pos+size > wbuf.limit { // buffer is full, flush buffer + p.Flush() + } + n = copy(wbuf.buffer[wbuf.pos:], buf) + wbuf.pos += n + return n, nil +} + +func (p *TBufferedTransport) Flush() error { + start := 0 + wbuf := p.wbuf + for start < wbuf.pos { + n, err := p.tp.Write(wbuf.buffer[start:wbuf.pos]) + if err != nil { + return err + } + start += n + } + + wbuf.pos = 0 + return p.tp.Flush() +} + +func (p *TBufferedTransport) Peek() bool { + return p.rbuf.pos < p.rbuf.limit || p.tp.Peek() +} diff --git a/lib/go/thrift/simple_server.go b/lib/go/thrift/simple_server.go index 17be8d8d..b5cb0e12 100644 --- a/lib/go/thrift/simple_server.go +++ b/lib/go/thrift/simple_server.go @@ -120,12 +120,14 @@ func (p *TSimpleServer) Serve() error { for !p.stopped { client, err := p.serverTransport.Accept() if err != nil { - return err + log.Println("Accept err: ", err) } if client != nil { - if err := p.processRequest(client); err != nil { - log.Println("error processing request:", err) - } + go func() { + if err := p.processRequest(client); err != nil { + log.Println("error processing request:", err) + } + }() } } return nil -- 2.17.1