From: Roger Meier Date: Tue, 5 Jun 2012 20:14:14 +0000 (+0000) Subject: THRIFT-1195 Allow users to act on client connects/disconnects X-Git-Tag: 0.9.1~347 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=cefdca6f49ebfdc31e2a6f87f446a29a0891bf0a;p=common%2Fthrift.git THRIFT-1195 Allow users to act on client connects/disconnects HIVE-3067 Shutdown HiveMetaStore on client disconnect or timeout HIVE-3057 metastore.HiveMetaStore$HMSHandler should set the thread local raw store to null in shutdown() Patch: Dragan Okiljevic git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1346566 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java index 7fd75bf2..e5e26b2f 100644 --- a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java +++ b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java @@ -550,4 +550,13 @@ public abstract class AbstractNonblockingServer extends TServer { } } } // FrameBuffer + + public void setServerEventHandler(TServerEventHandler eventHandler) { + throw new UnsupportedOperationException("Not supported yet."); + } + + public TServerEventHandler getEventHandler() { + throw new UnsupportedOperationException("Not supported yet."); + } + } diff --git a/lib/java/src/org/apache/thrift/server/ServerContext.java b/lib/java/src/org/apache/thrift/server/ServerContext.java new file mode 100644 index 00000000..9b0b99ee --- /dev/null +++ b/lib/java/src/org/apache/thrift/server/ServerContext.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Interface for storing server's connection context + */ + +package org.apache.thrift.server; + +public interface ServerContext {} diff --git a/lib/java/src/org/apache/thrift/server/TServer.java b/lib/java/src/org/apache/thrift/server/TServer.java index 0af66d39..a85a4297 100644 --- a/lib/java/src/org/apache/thrift/server/TServer.java +++ b/lib/java/src/org/apache/thrift/server/TServer.java @@ -125,6 +125,8 @@ public abstract class TServer { private boolean isServing; + protected TServerEventHandler eventHandler_; + protected TServer(AbstractServerArgs args) { processorFactory_ = args.processorFactory; serverTransport_ = args.serverTransport; @@ -152,4 +154,12 @@ public abstract class TServer { protected void setServing(boolean serving) { isServing = serving; } + + public void setServerEventHandler(TServerEventHandler eventHandler) { + eventHandler_ = eventHandler; + } + + public TServerEventHandler getEventHandler() { + return eventHandler_; + } } diff --git a/lib/java/src/org/apache/thrift/server/TServerEventHandler.java b/lib/java/src/org/apache/thrift/server/TServerEventHandler.java new file mode 100644 index 00000000..f069b9bf --- /dev/null +++ b/lib/java/src/org/apache/thrift/server/TServerEventHandler.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.server; + +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TTransport; + +/** + * Interface that can handle events from the server core. To + * use this you should subclass it and implement the methods that you care + * about. Your subclass can also store local data that you may care about, + * such as additional "arguments" to these methods (stored in the object + * instance's state). + */ +public interface TServerEventHandler { + + /** + * Called before the server begins. + */ + void preServe(); + + /** + * Called when a new client has connected and is about to being processing. + */ + ServerContext createContext(TProtocol input, + TProtocol output); + + /** + * Called when a client has finished request-handling to delete server + * context. + */ + void deleteContext(ServerContext serverContext, + TProtocol input, + TProtocol output); + + /** + * Called when a client is about to call the processor. + */ + void processContext(ServerContext serverContext, + TTransport inputTransport, TTransport outputTransport); + +} \ No newline at end of file diff --git a/lib/java/src/org/apache/thrift/server/TSimpleServer.java b/lib/java/src/org/apache/thrift/server/TSimpleServer.java index ef1b10a5..6e928019 100644 --- a/lib/java/src/org/apache/thrift/server/TSimpleServer.java +++ b/lib/java/src/org/apache/thrift/server/TSimpleServer.java @@ -50,6 +50,11 @@ public class TSimpleServer extends TServer { return; } + // Run the preServe event + if (eventHandler_ != null) { + eventHandler_.preServe(); + } + setServing(true); while (!stopped_) { @@ -59,6 +64,7 @@ public class TSimpleServer extends TServer { TTransport outputTransport = null; TProtocol inputProtocol = null; TProtocol outputProtocol = null; + ServerContext connectionContext = null; try { client = serverTransport_.accept(); if (client != null) { @@ -67,7 +73,17 @@ public class TSimpleServer extends TServer { outputTransport = outputTransportFactory_.getTransport(client); inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); - while (processor.process(inputProtocol, outputProtocol)) {} + if (eventHandler_ != null) { + connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol); + } + while (true) { + if (eventHandler_ != null) { + eventHandler_.processContext(connectionContext, inputTransport, outputTransport); + } + if(!processor.process(inputProtocol, outputProtocol)) { + break; + } + } } } catch (TTransportException ttx) { // Client died, just move on @@ -81,6 +97,10 @@ public class TSimpleServer extends TServer { } } + if (eventHandler_ != null) { + eventHandler_.deleteContext(connectionContext, inputProtocol, outputProtocol); + } + if (inputTransport != null) { inputTransport.close(); } diff --git a/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java b/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java index 0b037d80..9a68c763 100644 --- a/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java +++ b/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java @@ -108,6 +108,11 @@ public class TThreadPoolServer extends TServer { return; } + // Run the preServe event + if (eventHandler_ != null) { + eventHandler_.preServe(); + } + stopped_ = false; setServing(true); while (!stopped_) { @@ -175,15 +180,33 @@ public class TThreadPoolServer extends TServer { TTransport outputTransport = null; TProtocol inputProtocol = null; TProtocol outputProtocol = null; + + TServerEventHandler eventHandler = null; + ServerContext connectionContext = null; + try { processor = processorFactory_.getProcessor(client_); inputTransport = inputTransportFactory_.getTransport(client_); outputTransport = outputTransportFactory_.getTransport(client_); inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); - outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); + outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); + + eventHandler = getEventHandler(); + if (eventHandler != null) { + connectionContext = eventHandler.createContext(inputProtocol, outputProtocol); + } // we check stopped_ first to make sure we're not supposed to be shutting // down. this is necessary for graceful shutdown. - while (!stopped_ && processor.process(inputProtocol, outputProtocol)) {} + while (true) { + + if (eventHandler != null) { + eventHandler.processContext(connectionContext, inputTransport, outputTransport); + } + + if(stopped_ || !processor.process(inputProtocol, outputProtocol)) { + break; + } + } } catch (TTransportException ttx) { // Assume the client died and continue silently } catch (TException tx) { @@ -192,6 +215,10 @@ public class TThreadPoolServer extends TServer { LOGGER.error("Error occurred during processing of message.", x); } + if (eventHandler != null) { + eventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol); + } + if (inputTransport != null) { inputTransport.close(); } diff --git a/lib/java/test/org/apache/thrift/test/TestServer.java b/lib/java/test/org/apache/thrift/test/TestServer.java index e65756d3..90778824 100644 --- a/lib/java/test/org/apache/thrift/test/TestServer.java +++ b/lib/java/test/org/apache/thrift/test/TestServer.java @@ -26,13 +26,17 @@ import java.util.Map; import java.util.Set; import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.server.ServerContext; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TServer.Args; import org.apache.thrift.server.TSimpleServer; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.server.ServerTestBase.TestHandler; +import org.apache.thrift.server.TServerEventHandler; +import org.apache.thrift.transport.TTransport; import thrift.test.Insanity; import thrift.test.Numberz; @@ -44,6 +48,51 @@ import thrift.test.Xtruct2; public class TestServer { + static class TestServerContext implements ServerContext { + + int connectionId; + + public TestServerContext(int connectionId) { + this.connectionId = connectionId; + } + + public int getConnectionId() { + return connectionId; + } + + public void setConnectionId(int connectionId) { + this.connectionId = connectionId; + } + + } + + static class TestServerEventHandler implements TServerEventHandler { + + private int nextConnectionId = 1; + + public void preServe() { + System.out.println("TServerEventHandler.preServe - called only once before server starts accepting connections"); + } + + public ServerContext createContext(TProtocol input, TProtocol output) { + //we can create some connection level data which is stored while connection is alive & served + TestServerContext ctx = new TestServerContext(nextConnectionId++); + System.out.println("TServerEventHandler.createContext - connection #"+ctx.getConnectionId()+" established"); + return ctx; + } + + public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) { + TestServerContext ctx = (TestServerContext)serverContext; + System.out.println("TServerEventHandler.deleteContext - connection #"+ctx.getConnectionId()+" terminated"); + } + + public void processContext(ServerContext serverContext, TTransport inputTransport, TTransport outputTransport) { + TestServerContext ctx = (TestServerContext)serverContext; + System.out.println("TServerEventHandler.processContext - connection #"+ctx.getConnectionId()+" is ready to process next request"); + } + + } + public static void main(String [] args) { try { int port = 9090; @@ -74,6 +123,9 @@ public class TestServer { // ThreadPool Server serverEngine = new TThreadPoolServer(new TThreadPoolServer.Args(tServerSocket).processor(testProcessor).protocolFactory(tProtocolFactory)); + //Set server event handler + serverEngine.setServerEventHandler(new TestServerEventHandler()); + // Run it System.out.println("Starting the server on port " + port + "..."); serverEngine.serve();