From 0c5c234b58802c36013b0d69b237ec3f098ebfd5 Mon Sep 17 00:00:00 2001 From: T Jake Luciani Date: Thu, 12 Nov 2009 03:01:33 +0000 Subject: [PATCH] THRIFT-619: Perl server and example git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@835206 13f79535-47bb-0310-9956-ffa450edef68 --- lib/perl/Makefile.am | 1 + lib/perl/lib/Thrift/BufferedTransport.pm | 27 ++ lib/perl/lib/Thrift/Server.pm | 313 +++++++++++++++++++++++ lib/perl/lib/Thrift/Socket.pm | 67 ++++- lib/perl/lib/Thrift/Transport.pm | 48 ++++ tutorial/perl/PerlServer.pl | 124 +++++++++ 6 files changed, 576 insertions(+), 4 deletions(-) create mode 100644 lib/perl/lib/Thrift/Server.pm create mode 100644 tutorial/perl/PerlServer.pl diff --git a/lib/perl/Makefile.am b/lib/perl/Makefile.am index 163d0158..eb195603 100644 --- a/lib/perl/Makefile.am +++ b/lib/perl/Makefile.am @@ -50,5 +50,6 @@ EXTRA_DIST = \ lib/Thrift/HttpClient.pm \ lib/Thrift/MemoryBuffer.pm \ lib/Thrift/Protocol.pm \ + lib/Thrift/Server.pm \ lib/Thrift/Socket.pm \ lib/Thrift/Transport.pm diff --git a/lib/perl/lib/Thrift/BufferedTransport.pm b/lib/perl/lib/Thrift/BufferedTransport.pm index bef564d6..3868ca2d 100644 --- a/lib/perl/lib/Thrift/BufferedTransport.pm +++ b/lib/perl/lib/Thrift/BufferedTransport.pm @@ -106,4 +106,31 @@ sub flush } +# +# BufferedTransport factory creates buffered transport objects from transports +# +package Thrift::BufferedTransportFactory; + +sub new { + my $classname = shift; + my $self = {}; + + return bless($self,$classname); +} + +# +# Build a buffered transport from the base transport +# +# @return Thrift::BufferedTransport transport +# +sub getTransport +{ + my $self = shift; + my $trans = shift; + + my $buffered = Thrift::BufferedTransport->new($trans); + return $buffered; +} + + 1; diff --git a/lib/perl/lib/Thrift/Server.pm b/lib/perl/lib/Thrift/Server.pm new file mode 100644 index 00000000..960fbd12 --- /dev/null +++ b/lib/perl/lib/Thrift/Server.pm @@ -0,0 +1,313 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 5.6.0; +use strict; +use warnings; + +use Thrift; +use Thrift::BufferedTransport; +use Thrift::BinaryProtocol; + +# +# Server base class module +# +package Thrift::Server; + +# 3 possible constructors: +# 1. (processor, serverTransport) +# 2. (processor, serverTransport, transportFactory, protocolFactory) +# 3. (processor, serverTransport, +# inputTransportFactory, outputTransportFactory, +# inputProtocolFactory, outputProtocolFactory) +sub new +{ + my $classname = shift; + my @args = @_; + + my $self; + + if (scalar @args == 2) + { + $self = _init($args[0], $args[1], + Thrift::BufferedTransportFactory->new(), + Thrift::BufferedTransportFactory->new(), + Thrift::BinaryProtocolFactory->new(), + Thrift::BinaryProtocolFactory->new()); + } + elsif (scalar @args == 4) + { + $self = _init($args[0], $args[1], $args[2], $args[2], $args[3], $args[3]); + } + elsif (scalar @args == 6) + { + $self = _init($args[0], $args[1], $args[2], $args[3], $args[4], $args[5]); + } + else + { + die "Thrift::Server expects exactly 2, 4, or 6 args"; + } + + return bless($self,$classname); +} + +sub _init +{ + my $processor = shift; + my $serverTransport = shift; + my $inputTransportFactory = shift; + my $outputTransportFactory = shift; + my $inputProtocolFactory = shift; + my $outputProtocolFactory = shift; + + my $self = { + processor => $processor, + serverTransport => $serverTransport, + inputTransportFactory => $inputTransportFactory, + outputTransportFactory => $outputTransportFactory, + inputProtocolFactory => $inputProtocolFactory, + outputProtocolFactory => $outputProtocolFactory, + }; +} + +sub serve +{ + die "abstract"; +} + +sub _clientBegin +{ + my $self = shift; + my $iprot = shift; + my $oprot = shift; + + if (exists $self->{serverEventHandler} and + defined $self->{serverEventHandler}) + { + $self->{serverEventHandler}->clientBegin($iprot, $oprot); + } +} + +sub _handleException +{ + my $self = shift; + my $e = shift; + + if ($e =~ m/TException/ and exists $e->{message}) { + my $message = $e->{message}; + my $code = $e->{code}; + my $out = $code . ':' . $message; + + $message =~ m/TTransportException/ and die $out; + if ($message =~ m/TSocket/) { + # suppress TSocket messages + } else { + warn $out; + } + } else { + warn $e; + } +} + + +# +# SimpleServer from the Server base class that handles one connection at a time +# +package Thrift::SimpleServer; +use base qw( Thrift::Server ); + +sub new +{ + my $classname = shift; + my @args = @_; + + my $self = $classname->SUPER::new(@args); + return bless($self,$classname); +} + +sub serve +{ + my $self = shift; + + $self->{serverTransport}->listen(); + while (1) + { + my $client = $self->{serverTransport}->accept(); + my $itrans = $self->{inputTransportFactory}->getTransport($client); + my $otrans = $self->{outputTransportFactory}->getTransport($client); + my $iprot = $self->{inputProtocolFactory}->getProtocol($itrans); + my $oprot = $self->{outputProtocolFactory}->getProtocol($otrans); + eval { + $self->_clientBegin($iprot, $oprot); + while (1) + { + $self->{processor}->process($iprot, $oprot); + } + }; if($@) { + $self->_handleException($@); + } + + $itrans->close(); + $otrans->close(); + } +} + + +# +# ForkingServer that forks a new process for each request +# +package Thrift::ForkingServer; +use base qw( Thrift::Server ); + +use POSIX ":sys_wait_h"; + +sub new +{ + my $classname = shift; + my @args = @_; + + my $self = $classname->SUPER::new(@args); + return bless($self,$classname); +} + + +sub serve +{ + my $self = shift; + + $self->{serverTransport}->listen(); + while (1) + { + my $client = $self->{serverTransport}->accept(); + $self->_client($client); + } +} + +sub _client +{ + my $self = shift; + my $client = shift; + + eval { + my $itrans = $self->{inputTransportFactory}->getTransport($client); + my $otrans = $self->{outputTransportFactory}->getTransport($client); + + my $iprot = $self->{inputProtocolFactory}->getProtocol($itrans); + my $oprot = $self->{outputProtocolFactory}->getProtocol($otrans); + + $self->_clientBegin($iprot, $oprot); + + my $pid = fork(); + + if ($pid) #parent + { + $self->_parent($pid, $itrans, $otrans); + } else { + $self->_child($itrans, $otrans, $iprot, $oprot); + } + }; if($@) { + $self->_handleException($@); + } +} + +sub _parent +{ + my $self = shift; + my $pid = shift; + my $itrans = shift; + my $otrans = shift; + + # add before collect, otherwise you race w/ waitpid + $self->{children}->{$pid} = 1; + $self->_collectChildren(); + + # Parent must close socket or the connection may not get closed promptly + $self->tryClose($itrans); + $self->tryClose($otrans); +} + +sub _child +{ + my $self = shift; + my $itrans = shift; + my $otrans = shift; + my $iprot = shift; + my $oprot = shift; + + my $ecode = 0; + eval { + while (1) + { + $self->{processor}->process($iprot, $oprot); + } + }; if($@) { + $ecode = 1; + $self->_handleException($@); + } + + $self->tryClose($itrans); + $self->tryClose($otrans); + + exit($ecode); +} + +sub tryClose +{ + my $self = shift; + my $file = shift; + + eval { + if (defined $file) + { + $file->close(); + } + }; if($@) { + if ($@ =~ m/TException/ and exists $@->{message}) { + my $message = $@->{message}; + my $code = $@->{code}; + my $out = $code . ':' . $message; + + warn $out; + } else { + warn $@; + } + } +} + +sub _collectChildren +{ + my $self = shift; + + while (scalar keys %{$self->{children}}) + { + my $pid = waitpid(-1, WNOHANG); + + if ($pid>0) + { + delete $self->{children}->{$pid}; + } + else + { + last; + } + } +} + + +1; diff --git a/lib/perl/lib/Thrift/Socket.pm b/lib/perl/lib/Thrift/Socket.pm index 4d2aac74..7ebea356 100644 --- a/lib/perl/lib/Thrift/Socket.pm +++ b/lib/perl/lib/Thrift/Socket.pm @@ -33,9 +33,9 @@ use base('Thrift::Transport'); sub new { - my $classname = shift; - my $host = shift || "localhost"; - my $port = shift || 9090; + my $classname = shift; + my $host = shift || "localhost"; + my $port = shift || 9090; my $debugHandler = shift; my $self = { @@ -132,7 +132,7 @@ sub close my $self = shift; if( defined $self->{handle} ){ - close( ($self->{handle}->handles())[0] ); + CORE::close( ($self->{handle}->handles())[0] ); } } @@ -268,4 +268,63 @@ sub flush my $ret = ($self->{handle}->handles())[0]->flush; } + +# +# Build a ServerSocket from the ServerTransport base class +# +package Thrift::ServerSocket; + +use base qw( Thrift::Socket Thrift::ServerTransport ); + +use constant LISTEN_QUEUE_SIZE => 128; + +sub new +{ + my $classname = shift; + my $port = shift; + + my $self = $classname->SUPER::new(undef, $port, undef); + return bless($self,$classname); +} + +sub listen +{ + my $self = shift; + + # Listen to a new socket + my $sock = IO::Socket::INET->new(LocalAddr => undef, # any addr + LocalPort => $self->{port}, + Proto => 'tcp', + Listen => LISTEN_QUEUE_SIZE, + ReuseAddr => 1) + || do { + my $error = 'TServerSocket: Could not bind to ' . + $self->{host} . ':' . $self->{port} . ' (' . $! . ')'; + + if ($self->{debug}) { + $self->{debugHandler}->($error); + } + + die new Thrift::TException($error); + }; + + $self->{handle} = $sock; +} + +sub accept +{ + my $self = shift; + + if ( exists $self->{handle} and defined $self->{handle} ) + { + my $client = $self->{handle}->accept(); + my $result = new Thrift::Socket; + $result->{handle} = new IO::Select($client); + return $result; + } + + return 0; +} + + 1; diff --git a/lib/perl/lib/Thrift/Transport.pm b/lib/perl/lib/Thrift/Transport.pm index e22592be..5ec6feee 100644 --- a/lib/perl/lib/Thrift/Transport.pm +++ b/lib/perl/lib/Thrift/Transport.pm @@ -125,5 +125,53 @@ sub write # sub flush {} + +# +# TransportFactory creates transport objects from transports +# +package Thrift::TransportFactory; + +sub new { + my $classname = shift; + my $self = {}; + + return bless($self,$classname); +} + +# +# Build a transport from the base transport +# +# @return Thrift::Transport transport +# +sub getTransport +{ + my $self = shift; + my $trans = shift; + + return $trans; +} + + +# +# ServerTransport base class module +# +package Thrift::ServerTransport; + +sub listen +{ + die "abstract"; +} + +sub accept +{ + die "abstract"; +} + +sub close +{ + die "abstract"; +} + + 1; diff --git a/tutorial/perl/PerlServer.pl b/tutorial/perl/PerlServer.pl new file mode 100644 index 00000000..a40ec69b --- /dev/null +++ b/tutorial/perl/PerlServer.pl @@ -0,0 +1,124 @@ +#!/usr/bin/perl + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +use strict; +use lib '../gen-perl'; +use Thrift::Socket; +use Thrift::Server; +use tutorial::Calculator; + +package CalculatorHandler; +use base qw(tutorial::CalculatorIf); + +sub new { + my $classname = shift; + my $self = {}; + + return bless($self,$classname); +} + + +sub ping +{ + print "ping()\n"; +} + +sub add +{ + my($self, $n1, $n2) = @_; + printf("add(%d,%d)\n", $n1, $n2); + return $n1 + $n2; +} + +sub calculate +{ + my($self, $logid, $work) = @_; + my $op = $work->{op}; + my $num1 = $work->{num1}; + my $num2 = $work->{num2}; + printf("calculate(%d, %d %d %d)\n", $logid, $num1, $num2, $op); + + my $val; + + if ($op == tutorial::Operation::ADD) { + $val = $num1 + $num2; + } elsif ($op == tutorial::Operation::SUBTRACT) { + $val = $num1 - $num2; + } elsif ($op == tutorial::Operation::MULTIPLY) { + $val = $num1 * $num2; + } elsif ($op == tutorial::Operation::DIVIDE) { + if ($num2 == 0) + { + my $x = new tutorial::InvalidOperation; + $x->what($op); + $x->why('Cannot divide by 0'); + die $x; + } + $val = $num1 / $num2; + } else { + my $x = new tutorial::InvalidOperation; + $x->what($op); + $x->why('Invalid operation'); + die $x; + } + + my $log = new shared::SharedStruct; + $log->key($logid); + $log->value(int($val)); + $self->{log}->{$logid} = $log; + + return $val; +} + +sub getStruct +{ + my($self, $key) = @_; + printf("getStruct(%d)\n", $key); + return $self->{log}->{$key}; +} + +sub zip +{ + my($self) = @_; + print "zip()\n"; +} + + + +eval { + my $handler = new CalculatorHandler; + my $processor = new tutorial::CalculatorProcessor($handler); + my $serversocket = new Thrift::ServerSocket(9090); + my $forkingserver = new Thrift::ForkingServer($processor, $serversocket); + print "Starting the server...\n"; + $forkingserver->serve(); + print "done.\n"; +}; if ($@) { + if ($@ =~ m/TException/ and exists $@->{message}) { + my $message = $@->{message}; + my $code = $@->{code}; + my $out = $code . ':' . $message; + die $out; + } else { + die $@; + } +} + -- 2.17.1