Class: OnStomp::Components::ThreadedProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/onstomp/components/threaded_processor.rb

Overview

An IO processor that does its work on its own thread.

Instance Method Summary (collapse)

Constructor Details

- (ThreadedProcessor) initialize(client)

Creates a new processor for the client

Parameters:



7
8
9
10
11
# File 'lib/onstomp/components/threaded_processor.rb', line 7

def initialize client
  @client = client
  @run_thread = nil
  @closing = false
end

Instance Method Details

- (self) join

Causes the thread this method was invoked in to pass until the processor is no longer running.

Returns:

  • (self)


55
56
57
58
59
# File 'lib/onstomp/components/threaded_processor.rb', line 55

def join
  Thread.pass while running?
  @run_thread && @run_thread.join
  self
end

- (Object) prepare_to_close

Prepares the conneciton for closing by flushing its write buffer.



42
43
44
45
46
47
48
49
50
# File 'lib/onstomp/components/threaded_processor.rb', line 42

def prepare_to_close
  if running?
    @closing = true
    Thread.pass until @run_thread.stop?
    @client.connection.flush_write_buffer
    @closing = false
    @run_thread.wakeup
  end
end

- (true, false) running?

Returns true if its IO thread has been created and is alive, otherwise false.

Returns:

  • (true, false)


16
17
18
# File 'lib/onstomp/components/threaded_processor.rb', line 16

def running?
  @run_thread && @run_thread.alive?
end

- (self) start

Starts the processor by creating a new thread that continually invokes OnStomp::Connections::Base#io_process while the client is connected.

Returns:

  • (self)


24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/onstomp/components/threaded_processor.rb', line 24

def start
  @run_thread = Thread.new do
    begin
      while @client.connected?
        @client.connection.io_process
        Thread.stop if @closing
      end
    rescue OnStomp::StopReceiver
    rescue Exception
      # FIXME: This is pretty hacky, too. The problem is one of race
      # conditions and how we access the connection.
      raise if @run_thread == Thread.current
    end
  end
  self
end

- (self) stop

Forcefully stops the processor and joins its IO thread to the callee's thread.

Returns:

  • (self)

Raises:

  • (IOError, SystemCallError)

    if either were raised in the IO thread and the client is still connected after the thread is joined.



67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/onstomp/components/threaded_processor.rb', line 67

def stop
  if @run_thread
    begin
      @run_thread.raise OnStomp::StopReceiver if @run_thread.alive?
      @run_thread.join
    rescue OnStomp::StopReceiver
    rescue IOError, SystemCallError
      raise if @client.connected?
    end
    @run_thread = nil
  end
  self
end