THRIFT-2083 Improve the go lib: buffered Transport, save memory allocation, handle...
authorJens Geyer <jensg@apache.org>
Thu, 25 Jul 2013 23:01:11 +0000 (01:01 +0200)
committerJens Geyer <jensg@apache.org>
Thu, 25 Jul 2013 23:01:11 +0000 (01:01 +0200)
Patch: Feng Shen

lib/go/thrift/binary_protocol.go
lib/go/thrift/buffed_transport.go [new file with mode: 0644]
lib/go/thrift/simple_server.go

index 5880f65..6fb8624 100644 (file)
@@ -24,15 +24,15 @@ import (
        "fmt"
        "io"
        "math"
-       "strings"
 )
 
 type TBinaryProtocol struct {
-       trans            TTransport
+       trans           TTransport
        strictRead      bool
        strictWrite     bool
        readLength      int
        checkReadLength bool
+       buffer          [8]byte
 }
 
 type TBinaryProtocolFactory struct {
@@ -180,33 +180,22 @@ func (p *TBinaryProtocol) WriteByte(value byte) error {
 }
 
 func (p *TBinaryProtocol) WriteI16(value int16) error {
-       h := byte(0xff & (value >> 8))
-       l := byte(0xff & value)
-       v := []byte{h, l}
+       v := p.buffer[0:2]
+       binary.BigEndian.PutUint16(v, uint16(value))
        _, e := p.trans.Write(v)
        return NewTProtocolException(e)
 }
 
 func (p *TBinaryProtocol) WriteI32(value int32) error {
-       a := byte(0xff & (value >> 24))
-       b := byte(0xff & (value >> 16))
-       c := byte(0xff & (value >> 8))
-       d := byte(0xff & value)
-       v := []byte{a, b, c, d}
+       v := p.buffer[0:4]
+       binary.BigEndian.PutUint32(v, uint32(value))
        _, e := p.trans.Write(v)
        return NewTProtocolException(e)
 }
 
 func (p *TBinaryProtocol) WriteI64(value int64) error {
-       a := byte(0xff & (value >> 56))
-       b := byte(0xff & (value >> 48))
-       c := byte(0xff & (value >> 40))
-       d := byte(0xff & (value >> 32))
-       e := byte(0xff & (value >> 24))
-       f := byte(0xff & (value >> 16))
-       g := byte(0xff & (value >> 8))
-       h := byte(0xff & value)
-       v := []byte{a, b, c, d, e, f, g, h}
+       v := p.buffer[:]
+       binary.BigEndian.PutUint64(v, uint64(value))
        _, err := p.trans.Write(v)
        return NewTProtocolException(err)
 }
@@ -216,7 +205,7 @@ func (p *TBinaryProtocol) WriteDouble(value float64) error {
 }
 
 func (p *TBinaryProtocol) WriteString(value string) error {
-       return p.WriteBinaryFromReader(strings.NewReader(value), len(value))
+       return p.WriteBinary([]byte(value))
 }
 
 func (p *TBinaryProtocol) WriteBinary(value []byte) error {
@@ -228,15 +217,6 @@ func (p *TBinaryProtocol) WriteBinary(value []byte) error {
        return NewTProtocolException(err)
 }
 
-func (p *TBinaryProtocol) WriteBinaryFromReader(reader io.Reader, size int) error {
-       e := p.WriteI32(int32(size))
-       if e != nil {
-               return e
-       }
-       _, err := io.CopyN(p.trans, reader, int64(size))
-       return NewTProtocolException(err)
-}
-
 /**
  * Reading methods
  */
@@ -385,34 +365,34 @@ func (p *TBinaryProtocol) ReadBool() (bool, error) {
 }
 
 func (p *TBinaryProtocol) ReadByte() (value byte, err error) {
-       buf := []byte{0}
+       buf := p.buffer[0:1]
        err = p.readAll(buf)
        return buf[0], err
 }
 
 func (p *TBinaryProtocol) ReadI16() (value int16, err error) {
-       buf := []byte{0, 0}
+       buf := p.buffer[0:2]
        err = p.readAll(buf)
        value = int16(binary.BigEndian.Uint16(buf))
        return value, err
 }
 
 func (p *TBinaryProtocol) ReadI32() (value int32, err error) {
-       buf := []byte{0, 0, 0, 0}
+       buf := p.buffer[0:4]
        err = p.readAll(buf)
        value = int32(binary.BigEndian.Uint32(buf))
        return value, err
 }
 
 func (p *TBinaryProtocol) ReadI64() (value int64, err error) {
-       buf := []byte{0, 0, 0, 0, 0, 0, 0, 0}
+       buf := p.buffer[0:8]
        err = p.readAll(buf)
        value = int64(binary.BigEndian.Uint64(buf))
        return value, err
 }
 
 func (p *TBinaryProtocol) ReadDouble() (value float64, err error) {
-       buf := []byte{0, 0, 0, 0, 0, 0, 0, 0}
+       buf := p.buffer[0:8]
        err = p.readAll(buf)
        value = math.Float64frombits(binary.BigEndian.Uint64(buf))
        return value, err
diff --git a/lib/go/thrift/buffed_transport.go b/lib/go/thrift/buffed_transport.go
new file mode 100644 (file)
index 0000000..1ba3053
--- /dev/null
@@ -0,0 +1,87 @@
+package thrift
+
+type TBufferedTransportFactory struct {
+       size int
+}
+
+type TBuffer struct {
+       buffer     []byte
+       pos, limit int
+}
+
+type TBufferedTransport struct {
+       tp   TTransport
+       rbuf *TBuffer
+       wbuf *TBuffer
+}
+
+func (p *TBufferedTransportFactory) GetTransport(trans TTransport) TTransport {
+       return NewTBufferedTransport(trans, p.size)
+}
+
+func NewTBufferedTransportFactory(bufferSize int) *TBufferedTransportFactory {
+       return &TBufferedTransportFactory{size: bufferSize}
+}
+
+func NewTBufferedTransport(trans TTransport, bufferSize int) *TBufferedTransport {
+       rb := &TBuffer{buffer: make([]byte, bufferSize)}
+       wb := &TBuffer{buffer: make([]byte, bufferSize), limit: bufferSize}
+       return &TBufferedTransport{tp: trans, rbuf: rb, wbuf: wb}
+}
+
+func (p *TBufferedTransport) IsOpen() bool {
+       return p.tp.IsOpen()
+}
+
+func (p *TBufferedTransport) Open() (err error) {
+       return p.tp.Open()
+}
+
+func (p *TBufferedTransport) Close() (err error) {
+       return p.tp.Close()
+}
+
+func (p *TBufferedTransport) Read(buf []byte) (n int, err error) {
+       rbuf := p.rbuf
+       if rbuf.pos == rbuf.limit { // no more data to read from buffer
+               rbuf.pos = 0
+               // read data, fill buffer
+               rbuf.limit, err = p.tp.Read(rbuf.buffer)
+               if err != nil {
+                       return 0, err
+               }
+       }
+       n = copy(buf, rbuf.buffer[rbuf.pos:rbuf.limit])
+       rbuf.pos += n
+       return n, nil
+}
+
+func (p *TBufferedTransport) Write(buf []byte) (n int, err error) {
+       wbuf := p.wbuf
+       size := len(buf)
+       if wbuf.pos+size > wbuf.limit { // buffer is full, flush buffer
+               p.Flush()
+       }
+       n = copy(wbuf.buffer[wbuf.pos:], buf)
+       wbuf.pos += n
+       return n, nil
+}
+
+func (p *TBufferedTransport) Flush() error {
+       start := 0
+       wbuf := p.wbuf
+       for start < wbuf.pos {
+               n, err := p.tp.Write(wbuf.buffer[start:wbuf.pos])
+               if err != nil {
+                       return err
+               }
+               start += n
+       }
+
+       wbuf.pos = 0
+       return p.tp.Flush()
+}
+
+func (p *TBufferedTransport) Peek() bool {
+       return p.rbuf.pos < p.rbuf.limit || p.tp.Peek()
+}
index 17be8d8..b5cb0e1 100644 (file)
@@ -120,12 +120,14 @@ func (p *TSimpleServer) Serve() error {
        for !p.stopped {
                client, err := p.serverTransport.Accept()
                if err != nil {
-                       return err
+                       log.Println("Accept err: ", err)
                }
                if client != nil {
-                       if err := p.processRequest(client); err != nil {
-                               log.Println("error processing request:", err)
-                       }
+                       go func() {
+                               if err := p.processRequest(client); err != nil {
+                                       log.Println("error processing request:", err)
+                               }
+                       }()
                }
        }
        return nil