From b6722bf8c4dfc35bc818251f8004d1fa5e04d437 Mon Sep 17 00:00:00 2001 From: Bryan Duxbury Date: Wed, 27 Oct 2010 23:26:52 +0000 Subject: [PATCH] THRIFT-377. java: TFileTransport port in Java This patch adds TFileTransport to the java library. This transport is not a general-purpose file transport; instead, it is more of a way to execute one-way RPC via an offline file process. Patch: Joydeep Sen Sarma git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1028136 13f79535-47bb-0310-9956-ffa450edef68 --- .../thrift/transport/TFileProcessor.java | 130 ++++ .../thrift/transport/TFileTransport.java | 628 ++++++++++++++++++ .../thrift/transport/TSeekableFile.java | 33 + .../thrift/transport/TStandardFile.java | 60 ++ 4 files changed, 851 insertions(+) create mode 100644 lib/java/src/org/apache/thrift/transport/TFileProcessor.java create mode 100644 lib/java/src/org/apache/thrift/transport/TFileTransport.java create mode 100644 lib/java/src/org/apache/thrift/transport/TSeekableFile.java create mode 100644 lib/java/src/org/apache/thrift/transport/TStandardFile.java diff --git a/lib/java/src/org/apache/thrift/transport/TFileProcessor.java b/lib/java/src/org/apache/thrift/transport/TFileProcessor.java new file mode 100644 index 00000000..7a58eea4 --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/TFileProcessor.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.transport; + +import org.apache.thrift.TProcessor; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.protocol.TProtocolFactory; + + +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.RandomAccessFile; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.IOException; + +/** + * FileProcessor: helps in processing files generated by TFileTransport. + * Port of original cpp implementation + * + * @author Joydeep Sen Sarma + */ +public class TFileProcessor { + + private TProcessor processor_; + private TProtocolFactory inputProtocolFactory_; + private TProtocolFactory outputProtocolFactory_; + private TFileTransport inputTransport_; + private TTransport outputTransport_; + + public TFileProcessor(TProcessor processor, TProtocolFactory protocolFactory, + TFileTransport inputTransport, + TTransport outputTransport) { + processor_ = processor; + inputProtocolFactory_ = outputProtocolFactory_ = protocolFactory; + inputTransport_ = inputTransport; + outputTransport_ = outputTransport; + } + + public TFileProcessor(TProcessor processor, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory, + TFileTransport inputTransport, + TTransport outputTransport) { + processor_ = processor; + inputProtocolFactory_ = inputProtocolFactory; + outputProtocolFactory_ = outputProtocolFactory; + inputTransport_ = inputTransport; + outputTransport_ = outputTransport; + } + + private void processUntil(int lastChunk) throws TException { + TProtocol ip = inputProtocolFactory_.getProtocol(inputTransport_); + TProtocol op = outputProtocolFactory_.getProtocol(outputTransport_); + int curChunk = inputTransport_.getCurChunk(); + + try { + while (lastChunk >= curChunk) { + processor_.process(ip, op); + int newChunk = inputTransport_.getCurChunk(); + curChunk = newChunk; + } + } catch (TTransportException e) { + // if we are processing the last chunk - we could have just hit EOF + // on EOF - trap the error and stop processing. + if(e.getType() != TTransportException.END_OF_FILE) + throw e; + else { + return; + } + } + } + + /** + * Process from start to last chunk both inclusive where chunks begin from 0 + + * @param startChunkNum first chunk to be processed + * @param lastChunkNum last chunk to be processed + */ + public void processChunk(int startChunkNum, int endChunkNum) throws TException { + int numChunks = inputTransport_.getNumChunks(); + if(endChunkNum < 0) + endChunkNum += numChunks; + + if(startChunkNum < 0) + startChunkNum += numChunks; + + if(endChunkNum < startChunkNum) + throw new TException("endChunkNum " + endChunkNum + " is less than " + startChunkNum); + + inputTransport_.seekToChunk(startChunkNum); + processUntil(endChunkNum); + } + + /** + * Process a single chunk + * + * @param chunkNum chunk to be processed + */ + public void processChunk(int chunkNum) throws TException { + processChunk(chunkNum, chunkNum); + } + + /** + * Process a current chunk + */ + public void processChunk() throws TException { + processChunk(inputTransport_.getCurChunk()); + } +} diff --git a/lib/java/src/org/apache/thrift/transport/TFileTransport.java b/lib/java/src/org/apache/thrift/transport/TFileTransport.java new file mode 100644 index 00000000..630fb99b --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/TFileTransport.java @@ -0,0 +1,628 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.transport; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.IOException; +import java.util.Random; + +/** + * FileTransport implementation of the TTransport interface. + * Currently this is a straightforward port of the cpp implementation + * + * It may make better sense to provide a basic stream access on top of the framed file format + * The FileTransport can then be a user of this framed file format with some additional logic + * for chunking. + * + * @author Joydeep Sen Sarma + */ +public class TFileTransport extends TTransport { + + public static class truncableBufferedInputStream extends BufferedInputStream { + public void trunc() { + pos = count = 0; + } + public truncableBufferedInputStream(InputStream in) { + super(in); + } + public truncableBufferedInputStream(InputStream in, int size) { + super(in, size); + } + } + + + public static class Event { + private byte[] buf_; + private int nread_; + private int navailable_; + + /** + * Initialize an event. Initially, it has no valid contents + * + * @param buf byte array buffer to store event + */ + public Event(byte[] buf) { + buf_ = buf; + nread_ = navailable_ = 0; + } + + public byte[] getBuf() { return buf_;} + public int getSize() { return buf_.length; } + + + public void setAvailable(int sz) { nread_ = 0; navailable_=sz;} + public int getRemaining() { return (navailable_ - nread_); } + + public int emit(byte[] buf, int offset, int ndesired) { + if((ndesired == 0) || (ndesired > getRemaining())) + ndesired = getRemaining(); + + if(ndesired <= 0) + return (ndesired); + + System.arraycopy(buf_, nread_, buf, offset, ndesired); + nread_ += ndesired; + + return(ndesired); + } + }; + + public static class chunkState { + /** + * Chunk Size. Must be same across all implementations + */ + public static final int DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024; + + private int chunk_size_ = DEFAULT_CHUNK_SIZE; + private long offset_ = 0; + + public chunkState() {} + public chunkState(int chunk_size) { chunk_size_ = chunk_size; } + + public void skip(int size) {offset_ += size; } + public void seek(long offset) {offset_ = offset;} + + public int getChunkSize() { return chunk_size_;} + public int getChunkNum() { return ((int)(offset_/chunk_size_));} + public int getRemaining() { return (chunk_size_ - ((int)(offset_ % chunk_size_)));} + public long getOffset() { return (offset_);} + } + + public static enum tailPolicy { + + NOWAIT(0, 0), + WAIT_FOREVER(500, -1); + + /** + * Time in milliseconds to sleep before next read + * If 0, no sleep + */ + public final int timeout_; + + /** + * Number of retries before giving up + * if 0, no retries + * if -1, retry forever + */ + public final int retries_; + + /** + * ctor for policy + * + * @param timeout sleep time for this particular policy + * @param retries number of retries + */ + + tailPolicy(int timeout, int retries) { + timeout_ = timeout; + retries_ = retries; + } + } + + /** + * Current tailing policy + */ + tailPolicy currentPolicy_ = tailPolicy.NOWAIT; + + + /** + * Underlying file being read + */ + protected TSeekableFile inputFile_ = null; + + /** + * Underlying outputStream + */ + protected OutputStream outputStream_ = null; + + + /** + * Event currently read in + */ + Event currentEvent_ = null; + + /** + * InputStream currently being used for reading + */ + InputStream inputStream_ = null; + + /** + * current Chunk state + */ + chunkState cs = null; + + /** + * Read timeout + */ + private int readTimeout_ = 0; + + /** + * is read only? + */ + private boolean readOnly_ = false; + + /** + * Get File Tailing Policy + * + * @return current read policy + */ + public tailPolicy getTailPolicy() { + return (currentPolicy_); + } + + /** + * Set file Tailing Policy + * + * @param policy New policy to set + * @return Old policy + */ + public tailPolicy setTailPolicy(tailPolicy policy) { + tailPolicy old = currentPolicy_; + currentPolicy_ = policy; + return (old); + } + + + /** + * Initialize read input stream + * + * @return input stream to read from file + */ + private InputStream createInputStream() throws TTransportException { + InputStream is; + try { + if(inputStream_ != null) { + ((truncableBufferedInputStream)inputStream_).trunc(); + is = inputStream_; + } else { + is = new truncableBufferedInputStream(inputFile_.getInputStream()); + } + } catch (IOException iox) { + System.err.println("createInputStream: "+iox.getMessage()); + throw new TTransportException(iox.getMessage(), iox); + } + return(is); + } + + /** + * Read (potentially tailing) an input stream + * + * @param is InputStream to read from + * @param buf Buffer to read into + * @param off Offset in buffer to read into + * @param len Number of bytes to read + * @param tp policy to use if we hit EOF + * + * @return number of bytes read + */ + private int tailRead(InputStream is, byte[] buf, + int off, int len, tailPolicy tp) throws TTransportException { + int orig_len = len; + try { + int retries = 0; + while(len > 0) { + int cnt = is.read(buf, off, len); + if(cnt > 0) { + off += cnt; + len -= cnt; + retries = 0; + cs.skip(cnt); // remember that we read so many bytes + } else if (cnt == -1) { + // EOF + retries++; + + if((tp.retries_ != -1) && tp.retries_ < retries) + return (orig_len - len); + + if(tp.timeout_ > 0) { + try {Thread.sleep(tp.timeout_);} catch(InterruptedException e) {} + } + } else { + // either non-zero or -1 is what the contract says! + throw new + TTransportException("Unexpected return from InputStream.read = " + + cnt); + } + } + } catch (IOException iox) { + throw new TTransportException(iox.getMessage(), iox); + } + + return(orig_len - len); + } + + /** + * Event is corrupted. Do recovery + * + * @return true if recovery could be performed and we can read more data + * false is returned only when nothing more can be read + */ + private boolean performRecovery() throws TTransportException { + int numChunks = getNumChunks(); + int curChunk = cs.getChunkNum(); + + if(curChunk >= (numChunks-1)) { + return false; + } + seekToChunk(curChunk+1); + return true; + } + + /** + * Read event from underlying file + * + * @return true if event could be read, false otherwise (on EOF) + */ + private boolean readEvent() throws TTransportException { + byte[] ebytes = new byte[4]; + int esize; + int nread; + int nrequested; + + retry: + do { + // corner case. read to end of chunk + nrequested = cs.getRemaining(); + if(nrequested < 4) { + nread = tailRead(inputStream_, ebytes, 0, nrequested, currentPolicy_); + if(nread != nrequested) { + return(false); + } + } + + // assuming serialized on little endian machine + nread = tailRead(inputStream_, ebytes, 0, 4, currentPolicy_); + if(nread != 4) { + return(false); + } + + esize=0; + for(int i=3; i>=0; i--) { + int val = (0x000000ff & (int)ebytes[i]); + esize |= (val << (i*8)); + } + + // check if event is corrupted and do recovery as required + if(esize > cs.getRemaining()) { + throw new TTransportException("FileTransport error: bad event size"); + /* + if(performRecovery()) { + esize=0; + } else { + return false; + } + */ + } + } while (esize == 0); + + // reset existing event or get a larger one + if(currentEvent_.getSize() < esize) + currentEvent_ = new Event(new byte [esize]); + + // populate the event + byte[] buf = currentEvent_.getBuf(); + nread = tailRead(inputStream_, buf, 0, esize, currentPolicy_); + if(nread != esize) { + return(false); + } + currentEvent_.setAvailable(esize); + return(true); + } + + /** + * open if both input/output open unless readonly + * + * @return true + */ + public boolean isOpen() { + return ((inputStream_ != null) && (readOnly_ || (outputStream_ != null))); + } + + + /** + * Diverging from the cpp model and sticking to the TSocket model + * Files are not opened in ctor - but in explicit open call + */ + public void open() throws TTransportException { + if (isOpen()) + throw new TTransportException(TTransportException.ALREADY_OPEN); + + try { + inputStream_ = createInputStream(); + cs = new chunkState(); + currentEvent_ = new Event(new byte [256]); + + if(!readOnly_) + outputStream_ = new BufferedOutputStream(inputFile_.getOutputStream(), 8192); + } catch (IOException iox) { + throw new TTransportException(TTransportException.NOT_OPEN, iox); + } + } + + /** + * Closes the transport. + */ + public void close() { + if (inputFile_ != null) { + try { + inputFile_.close(); + } catch (IOException iox) { + System.err.println("WARNING: Error closing input file: " + + iox.getMessage()); + } + inputFile_ = null; + } + if (outputStream_ != null) { + try { + outputStream_.close(); + } catch (IOException iox) { + System.err.println("WARNING: Error closing output stream: " + + iox.getMessage()); + } + outputStream_ = null; + } + } + + + /** + * File Transport ctor + * + * @param path File path to read and write from + * @param readOnly Whether this is a read-only transport + */ + public TFileTransport(final String path, boolean readOnly) throws IOException { + inputFile_ = new TStandardFile(path); + readOnly_ = readOnly; + } + + /** + * File Transport ctor + * + * @param inputFile_ open TSeekableFile to read/write from + * @param readOnly Whether this is a read-only transport + */ + public TFileTransport(TSeekableFile inputFile, boolean readOnly) { + inputFile_ = inputFile; + readOnly_ = readOnly; + } + + + /** + * Cloned from TTransport.java:readAll(). Only difference is throwing an EOF exception + * where one is detected. + */ + public int readAll(byte[] buf, int off, int len) + throws TTransportException { + int got = 0; + int ret = 0; + while (got < len) { + ret = read(buf, off+got, len-got); + if (ret < 0) { + throw new TTransportException("Error in reading from file"); + } + if(ret == 0) { + throw new TTransportException(TTransportException.END_OF_FILE, + "End of File reached"); + } + got += ret; + } + return got; + } + + + /** + * Reads up to len bytes into buffer buf, starting att offset off. + * + * @param buf Array to read into + * @param off Index to start reading at + * @param len Maximum number of bytes to read + * @return The number of bytes actually read + * @throws TTransportException if there was an error reading data + */ + public int read(byte[] buf, int off, int len) throws TTransportException { + if(!isOpen()) + throw new TTransportException(TTransportException.NOT_OPEN, + "Must open before reading"); + + if(currentEvent_.getRemaining() == 0) { + if(!readEvent()) + return(0); + } + + int nread = currentEvent_.emit(buf, off, len); + return nread; + } + + public int getNumChunks() throws TTransportException { + if(!isOpen()) + throw new TTransportException(TTransportException.NOT_OPEN, + "Must open before getNumChunks"); + try { + long len = inputFile_.length(); + if(len == 0) + return 0; + else + return (((int)(len/cs.getChunkSize())) + 1); + + } catch (IOException iox) { + throw new TTransportException(iox.getMessage(), iox); + } + } + + public int getCurChunk() throws TTransportException { + if(!isOpen()) + throw new TTransportException(TTransportException.NOT_OPEN, + "Must open before getCurChunk"); + return (cs.getChunkNum()); + + } + + + public void seekToChunk(int chunk) throws TTransportException { + if(!isOpen()) + throw new TTransportException(TTransportException.NOT_OPEN, + "Must open before seeking"); + + int numChunks = getNumChunks(); + + // file is empty, seeking to chunk is pointless + if (numChunks == 0) { + return; + } + + // negative indicates reverse seek (from the end) + if (chunk < 0) { + chunk += numChunks; + } + + // too large a value for reverse seek, just seek to beginnin + if (chunk < 0) { + chunk = 0; + } + + long eofOffset=0; + boolean seekToEnd = (chunk >= numChunks); + if(seekToEnd) { + chunk = chunk - 1; + try { eofOffset = inputFile_.length(); } + catch (IOException iox) {throw new TTransportException(iox.getMessage(), + iox);} + } + + if(chunk*cs.getChunkSize() != cs.getOffset()) { + try { inputFile_.seek((long)chunk*cs.getChunkSize()); } + catch (IOException iox) { + System.err.println("createInputStream: "+iox.getMessage()); + throw new TTransportException("Seek to chunk " + + chunk + " " +iox.getMessage(), iox); + } + + cs.seek((long)chunk*cs.getChunkSize()); + currentEvent_.setAvailable(0); + inputStream_ = createInputStream(); + } + + if(seekToEnd) { + // waiting forever here - otherwise we can hit EOF and end up + // having consumed partial data from the data stream. + tailPolicy old = setTailPolicy(tailPolicy.WAIT_FOREVER); + while(cs.getOffset() < eofOffset) { readEvent(); } + currentEvent_.setAvailable(0); + setTailPolicy(old); + } + } + + public void seekToEnd() throws TTransportException { + if(!isOpen()) + throw new TTransportException(TTransportException.NOT_OPEN, + "Must open before seeking"); + seekToChunk(getNumChunks()); + } + + + /** + * Writes up to len bytes from the buffer. + * + * @param buf The output data buffer + * @param off The offset to start writing from + * @param len The number of bytes to write + * @throws TTransportException if there was an error writing data + */ + public void write(byte[] buf, int off, int len) throws TTransportException { + throw new TTransportException("Not Supported"); + } + + /** + * Flush any pending data out of a transport buffer. + * + * @throws TTransportException if there was an error writing out data. + */ + public void flush() throws TTransportException { + throw new TTransportException("Not Supported"); + } + + /** + * test program + * + */ + public static void main(String[] args) throws Exception { + + int num_chunks = 10; + + if((args.length < 1) || args[0].equals("--help") + || args[0].equals("-h") || args[0].equals("-?")) { + printUsage(); + } + + if(args.length > 1) { + try { + num_chunks = Integer.parseInt(args[1]); + } catch (Exception e) { + System.err.println("Cannot parse " + args[1]); + printUsage(); + } + } + + TFileTransport t = new TFileTransport(args[0], true); + t.open(); + System.out.println("NumChunks="+t.getNumChunks()); + + Random r = new Random(); + for(int j=0; j [num_chunks]"); + System.err.println(" (Opens and reads num_chunks chunks from file randomly)"); + System.exit(1); + } + +} diff --git a/lib/java/src/org/apache/thrift/transport/TSeekableFile.java b/lib/java/src/org/apache/thrift/transport/TSeekableFile.java new file mode 100644 index 00000000..e02d36f6 --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/TSeekableFile.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.transport; + +import java.io.InputStream; +import java.io.OutputStream; +import java.io.IOException; + +public interface TSeekableFile { + + public InputStream getInputStream() throws IOException; + public OutputStream getOutputStream() throws IOException; + public void close() throws IOException; + public long length() throws IOException; + public void seek(long pos) throws IOException; +} diff --git a/lib/java/src/org/apache/thrift/transport/TStandardFile.java b/lib/java/src/org/apache/thrift/transport/TStandardFile.java new file mode 100644 index 00000000..7a33af8e --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/TStandardFile.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.transport; + +import java.io.InputStream; +import java.io.OutputStream; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.io.FileInputStream; +import java.io.FileOutputStream; + +public class TStandardFile implements TSeekableFile { + + protected String path_ = null; + protected RandomAccessFile inputFile_ = null; + + public TStandardFile(String path) throws IOException { + path_ = path; + inputFile_ = new RandomAccessFile(path_, "r"); + } + + public InputStream getInputStream() throws IOException { + return new FileInputStream(inputFile_.getFD()); + } + + public OutputStream getOutputStream() throws IOException { + return new FileOutputStream(path_); + } + + public void close() throws IOException { + if(inputFile_ != null) { + inputFile_.close(); + } + } + + public long length() throws IOException { + return inputFile_.length(); + } + + public void seek(long pos) throws IOException { + inputFile_.seek(pos); + } +} -- 2.17.1