blob: 54dad04d1de6f17d80ff42bed4b17ee490e8fdf9 [file] [log] [blame]
Kevin Clarke0fddde2008-06-18 01:16:02 +00001require 'thrift/server'
2
3# thrift/server already imports fastthread/thread
4
5module Thrift
6 # this class expects to always use a FramedTransport for reading messages
7 #--
8 # this isn't very pretty, but we're working around the fact that FramedTransport
9 # and the processors are all written in a synchronous manner.
10 # So lets read data off the wire ourselves, check if we have a full frame, and
11 # only then hand it to the transport to parse
12 #
13 # we inherit from ThreadPoolServer for the initialize/rescuable_serve methods
14 class NonblockingServer < ThreadPoolServer
15 def serve
16 @server_thread = Thread.current
17 @serverTransport.listen
18
19 begin
20 connections = {}
21 running_connections = {}
22 # the swapping_connections stuff is to ensure the thread doesn't
23 # put the connection back into the regular list, then have the server
24 # thread process it again, then have the first thread remove it from
25 # the running_connections list
26 swapping_connections = {}
27 thread_group = ThreadGroup.new
28 loop do
29 break if @shutdown
30 rd, = select([@serverTransport.handle, *connections.keys])
31 next if rd.nil?
32 rd.each do |socket|
33 if socket == @serverTransport.handle
34 client = @serverTransport.accept
35 buffer = ''
36 outtrans = @transportFactory.get_transport(client)
37 outprot = @protocolFactory.get_protocol(outtrans)
38 connections[client.handle] = [client, buffer, outtrans, outprot]
39 else
40 client, buffer, outtrans, outprot = connections[socket]
41 if socket.eof?
42 client.close
43 connections.delete(socket)
44 else
45 buffer << client.read(4096, true)
46 if has_full_frame?(buffer)
47 running_connections[socket] = connections.delete(socket)
48 @thread_q.push :token
49 t = Thread.new(Thread.current) do |master|
50 begin
51 membuf = MemoryBuffer.new(buffer)
52 intrans = @transportFactory.get_transport(membuf)
53 inprot = @protocolFactory.get_protocol(intrans)
54 @processor.process(inprot, outprot)
55 if @shutdown
56 client.close
57 running_connections.delete(socket)
58 else
59 swapping_connections[socket] = running_connections.delete(socket)
60 master.wakeup
61 end
62 rescue => e
63 outtrans.close
64 @exception_q.push e
65 ensure
66 running_connections.delete(socket)
67 connections[socket] = swapping_connections.delete(socket) if swapping_connections.include? socket
68 intrans.close
69 @thread_q.pop
70 end
71 end
72 thread_group.add t
73 end
74 end
75 end
76 end
77 end
78 if @shutdown
79 @serverTransport.close
80 connections.merge! running_connections
81 connections.merge! swapping_connections
82 connections.values.each do |client, buffer, outtrans, outprot|
83 # can't close completely or we'll break active messages
84 # but lets at least stop accepting input
85 client.handle.close_read
86 end
87 start = Time.now.to_f
88 until thread_group.list.empty?
89 if @shutdown_timeout
90 now = Time.now.to_f
91 cur_timeout = @shutdown_timeout - (now - start)
92 break if cur_timeout <= 0
93 thread_group.list.first.join(cur_timeout)
94 else
95 thread_group.list.first.join
96 end
97 end
98 thread_group.list.each { |t| t.kill } if @shutdown_kill
99 # now kill connections completely if they still exists
100 connections.values.each do |client, buffer, outtrans, outprot|
101 client.close
102 end
103 end
104 ensure
105 @serverTransport.close
106 end
107 end
108
109 # Stop accepting new messages and wait for active messages to finish
110 # If the given timeout passes without the active messages finishing,
111 # control will exit from #serve and leave the remaining threads active.
112 # If you pass true for kill, the remaining threads will be reaped instead.
113 # A false timeout means wait indefinitely
114 def shutdown(timeout = nil, kill = false)
115 @shutdown_timeout = timeout
116 @shutdown_kill = kill
117 @shutdown = true
118 @server_thread.wakeup
119 end
120
121 private
122
123 def has_full_frame?(buf)
124 return no unless buf.length >= 4
125 size = buf.unpack('N').first
126 size + 4 <= buf.length
127 end
128 end
129end