blob: 64b2958fa786f88171f8bc9dd504c8a0825e7153 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package thrift
import (
"bufio"
"io"
)
// StreamTransport is a Transport made of an io.Reader and/or an io.Writer
type StreamTransport struct {
Reader io.Reader
Writer io.Writer
isReadWriter bool
}
type StreamTransportFactory struct {
Reader io.Reader
Writer io.Writer
isReadWriter bool
}
func (p *StreamTransportFactory) GetTransport(trans TTransport) TTransport {
if trans != nil {
t, ok := trans.(*StreamTransport)
if ok {
if t.isReadWriter {
return NewStreamTransportRW(t.Reader.(io.ReadWriter))
}
if t.Reader != nil && t.Writer != nil {
return NewStreamTransport(t.Reader, t.Writer)
}
if t.Reader != nil && t.Writer == nil {
return NewStreamTransportR(t.Reader)
}
if t.Reader == nil && t.Writer != nil {
return NewStreamTransportW(t.Writer)
}
return &StreamTransport{}
}
}
if p.isReadWriter {
return NewStreamTransportRW(p.Reader.(io.ReadWriter))
}
if p.Reader != nil && p.Writer != nil {
return NewStreamTransport(p.Reader, p.Writer)
}
if p.Reader != nil && p.Writer == nil {
return NewStreamTransportR(p.Reader)
}
if p.Reader == nil && p.Writer != nil {
return NewStreamTransportW(p.Writer)
}
return &StreamTransport{}
}
func NewStreamTransportFactory(reader io.Reader, writer io.Writer, isReadWriter bool) *StreamTransportFactory {
return &StreamTransportFactory{Reader: reader, Writer: writer, isReadWriter: isReadWriter}
}
func NewStreamTransport(r io.Reader, w io.Writer) *StreamTransport {
return &StreamTransport{Reader: bufio.NewReader(r), Writer: bufio.NewWriter(w)}
}
func NewStreamTransportR(r io.Reader) *StreamTransport {
return &StreamTransport{Reader: bufio.NewReader(r)}
}
func NewStreamTransportW(w io.Writer) *StreamTransport {
return &StreamTransport{Writer: bufio.NewWriter(w)}
}
func NewStreamTransportRW(rw io.ReadWriter) *StreamTransport {
bufrw := bufio.NewReadWriter(bufio.NewReader(rw), bufio.NewWriter(rw))
return &StreamTransport{Reader: bufrw, Writer: bufrw, isReadWriter: true}
}
// (The streams must already be open at construction time, so this should
// always return true.)
func (p *StreamTransport) IsOpen() bool {
return true
}
// (The streams must already be open. This method does nothing.)
func (p *StreamTransport) Open() error {
return nil
}
func (p *StreamTransport) Peek() bool {
return p.IsOpen()
}
// Closes both the input and output streams.
func (p *StreamTransport) Close() error {
closedReader := false
if p.Reader != nil {
c, ok := p.Reader.(io.Closer)
if ok {
e := c.Close()
closedReader = true
if e != nil {
return e
}
}
p.Reader = nil
}
if p.Writer != nil && (!closedReader || !p.isReadWriter) {
c, ok := p.Writer.(io.Closer)
if ok {
e := c.Close()
if e != nil {
return e
}
}
p.Writer = nil
}
return nil
}
// Reads from the underlying input stream if not null.
func (p *StreamTransport) Read(buf []byte) (int, error) {
if p.Reader == nil {
return 0, NewTTransportException(NOT_OPEN, "Cannot read from null inputStream")
}
n, err := p.Reader.Read(buf)
return n, NewTTransportExceptionFromError(err)
}
// Writes to the underlying output stream if not null.
func (p *StreamTransport) Write(buf []byte) (int, error) {
if p.Writer == nil {
return 0, NewTTransportException(NOT_OPEN, "Cannot write to null outputStream")
}
n, err := p.Writer.Write(buf)
return n, NewTTransportExceptionFromError(err)
}
// Flushes the underlying output stream if not null.
func (p *StreamTransport) Flush() error {
if p.Writer == nil {
return NewTTransportException(NOT_OPEN, "Cannot flush null outputStream")
}
f, ok := p.Writer.(Flusher)
if ok {
err := f.Flush()
if err != nil {
return NewTTransportExceptionFromError(err)
}
}
return nil
}