blob: 52049cbc48c8ea4ea2383406bb533bfa0db4f784 [file] [log] [blame]
Christian Lavoieafc6d8f2011-02-20 02:39:19 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20package thrift
21
22import (
23 "encoding/binary"
24 "bytes"
25 "os"
26)
27
28
29type TFramedTransport struct {
30 transport TTransport
31 writeBuffer *bytes.Buffer
32 readBuffer *bytes.Buffer
33}
34
35type tFramedTransportFactory struct {
36 factory TTransportFactory
37}
38
39func NewTFramedTransportFactory(factory TTransportFactory) TTransportFactory {
40 return &tFramedTransportFactory{factory: factory}
41}
42
43func (p *tFramedTransportFactory) GetTransport(base TTransport) TTransport {
44 return NewTFramedTransport(p.factory.GetTransport(base))
45}
46
47func NewTFramedTransport(transport TTransport) *TFramedTransport {
48 writeBuf := make([]byte, 0, 1024)
49 readBuf := make([]byte, 0, 1024)
50 return &TFramedTransport{transport: transport, writeBuffer: bytes.NewBuffer(writeBuf), readBuffer: bytes.NewBuffer(readBuf)}
51}
52
53func (p *TFramedTransport) Open() os.Error {
54 return p.transport.Open()
55}
56
57func (p *TFramedTransport) IsOpen() bool {
58 return p.transport.IsOpen()
59}
60
61func (p *TFramedTransport) Peek() bool {
62 return p.transport.Peek()
63}
64
65func (p *TFramedTransport) Close() os.Error {
66 return p.transport.Close()
67}
68
69func (p *TFramedTransport) Read(buf []byte) (int, os.Error) {
70 if p.readBuffer.Len() > 0 {
71 got, err := p.readBuffer.Read(buf)
72 if got > 0 {
73 return got, NewTTransportExceptionFromOsError(err)
74 }
75 }
76
77 // Read another frame of data
78 p.readFrame()
79
80 got, err := p.readBuffer.Read(buf)
81 return got, NewTTransportExceptionFromOsError(err)
82}
83
84func (p *TFramedTransport) ReadAll(buf []byte) (int, os.Error) {
85 return ReadAllTransport(p, buf)
86}
87
88func (p *TFramedTransport) Write(buf []byte) (int, os.Error) {
89 n, err := p.writeBuffer.Write(buf)
90 return n, NewTTransportExceptionFromOsError(err)
91}
92
93func (p *TFramedTransport) Flush() os.Error {
94 size := p.writeBuffer.Len()
95 buf := []byte{0, 0, 0, 0}
96 binary.BigEndian.PutUint32(buf, uint32(size))
97 _, err := p.transport.Write(buf)
98 if err != nil {
99 return NewTTransportExceptionFromOsError(err)
100 }
101 if size > 0 {
102 n, err := p.writeBuffer.WriteTo(p.transport)
103 if err != nil {
104 print("Error while flushing write buffer of size ", size, " to transport, only wrote ", n, " bytes: ", err.String(), "\n")
105 return NewTTransportExceptionFromOsError(err)
106 }
107 }
108 err = p.transport.Flush()
109 return NewTTransportExceptionFromOsError(err)
110}
111
112func (p *TFramedTransport) readFrame() (int, os.Error) {
113 buf := []byte{0, 0, 0, 0}
114 _, err := p.transport.ReadAll(buf)
115 if err != nil {
116 return 0, err
117 }
118 size := int(binary.BigEndian.Uint32(buf))
119 if size < 0 {
120 // TODO(pomack) log error
121 return 0, NewTTransportException(UNKNOWN_TRANSPORT_EXCEPTION, "Read a negative frame size ("+string(size)+")")
122 }
123 if size == 0 {
124 return 0, nil
125 }
126 buf2 := make([]byte, size)
127 n, err := p.transport.ReadAll(buf2)
128 if err != nil {
129 return n, err
130 }
131 p.readBuffer = bytes.NewBuffer(buf2)
132 return size, nil
133}