Class: OnStomp::Connections::Base

Inherits:
Object
  • Object
show all
Includes:
Interfaces::ConnectionEvents
Defined in:
lib/onstomp/connections/base.rb

Overview

Common behavior for all connections.

Direct Known Subclasses

Stomp_1_0, Stomp_1_1

Defined Under Namespace

Modules: BlockingRead, BlockingWrite, NonblockingRead, NonblockingWrite

Constant Summary

MAX_BYTES_PER_WRITE =

The approximate maximum number of bytes to write per call to #io_process_write.

1024 * 8
MAX_BYTES_PER_READ =

The maximum number of bytes to read per call to #io_process_read

1024 * 4

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Methods included from Interfaces::ConnectionEvents

#blocked, #closed, #died, #established, #install_bindings_from_client, #on_blocked, #on_closed, #on_died, #on_established, #on_terminated, #terminated, #trigger_connection_event

Methods included from Interfaces::EventManager

#bind_event, #event_callbacks, included, #trigger_event

Constructor Details

- (Base) initialize(socket, client)

Creates a new connection using the given #socket object and client. The #socket object will generally be a TCPSocket or an OpenSSL::SSL::SSLSocket and must support the methods read_nonblock write_nonblock, and close.

Parameters:



22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/onstomp/connections/base.rb', line 22

def initialize socket, client
  @socket = socket
  @write_mutex = Mutex.new
  @closing = false
  @write_buffer = []
  @read_buffer = []
  @client = client
  @connection_up = false
  self.read_timeout = 120
  self.write_timeout = nil
  setup_non_blocking_methods
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

- (Object) method_missing(meth, *args, &block)

Checks if the missing method ends with '_frame', and if so raises a UnsupportedCommandError exception.



121
122
123
124
125
126
127
# File 'lib/onstomp/connections/base.rb', line 121

def method_missing meth, *args, &block
  if meth.to_s =~ /^(.*)_frame$/
    raise OnStomp::UnsupportedCommandError, $1.upcase
  else
    super
  end
end

Instance Attribute Details

- (Object) client (readonly)

Returns the value of attribute client



6
7
8
# File 'lib/onstomp/connections/base.rb', line 6

def client
  @client
end

- (Object) last_received_at (readonly)

Returns the value of attribute last_received_at



7
8
9
# File 'lib/onstomp/connections/base.rb', line 7

def last_received_at
  @last_received_at
end

- (Object) last_transmitted_at (readonly)

Returns the value of attribute last_transmitted_at



7
8
9
# File 'lib/onstomp/connections/base.rb', line 7

def last_transmitted_at
  @last_transmitted_at
end

- (Object) read_timeout

Returns the value of attribute read_timeout



8
9
10
# File 'lib/onstomp/connections/base.rb', line 8

def read_timeout
  @read_timeout
end

- (Object) socket (readonly)

Returns the value of attribute socket



6
7
8
# File 'lib/onstomp/connections/base.rb', line 6

def socket
  @socket
end

- (Object) version (readonly)

Returns the value of attribute version



6
7
8
# File 'lib/onstomp/connections/base.rb', line 6

def version
  @version
end

- (Object) write_timeout

Returns the value of attribute write_timeout



8
9
10
# File 'lib/onstomp/connections/base.rb', line 8

def write_timeout
  @write_timeout
end

Instance Method Details

- (Object) close(blocking = false)

Closes the #socket. If blocking is true, the socket will be closed immediately, otherwies the socket will remain open until #io_process_write has finished writing all of its buffered data. Once this method has been invoked, #write_frame_nonblock will not enqueue any additional frames for writing.

Parameters:

  • blocking (true, false) (defaults to: false)


85
86
87
88
89
90
91
# File 'lib/onstomp/connections/base.rb', line 85

def close blocking=false
  @write_mutex.synchronize { @closing = true }
  if blocking
    io_process_write until @write_buffer.empty?
    socket.close
  end
end

- (Object) configure(connected, con_cbs)

Performs any necessary configuration of the connection from the CONNECTED frame sent by the broker and a Hash of pending callbacks. This method is called after the protocol negotiation has taken place between client and broker, and the connection that receives it will be the connection used by the client for the duration of the session.

Parameters:



68
69
70
71
# File 'lib/onstomp/connections/base.rb', line 68

def configure connected, con_cbs
  @version = connected.header?(:version) ? connected[:version] : '1.0'
  install_bindings_from_client con_cbs
end

- (Object) connect(client, *headers)

Exchanges the CONNECT/CONNECTED frame handshake with the broker and returns the version detected along with the received CONNECTED frame. The supplied list of headers will be merged into the CONNECT frame sent to the broker.

Parameters:

Raises:



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/onstomp/connections/base.rb', line 98

def connect client, *headers
  # I really don't care for this. A core part of the CONNECT/CONNECTED
  # exchange can only be accomplished through subclasses.
  write_frame_nonblock connect_frame(*headers)
  client_con = nil
  until client_con
    io_process_write { |f| client_con ||= f }
  end
  update_last_received
  broker_con = nil
  until broker_con
    io_process_read(true) { |f| broker_con ||= f }
  end
  raise OnStomp::ConnectFailedError if broker_con.command != 'CONNECTED'
  vers = broker_con.header?(:version) ? broker_con[:version] : '1.0'
  raise OnStomp::UnsupportedProtocolVersionError, vers unless client.versions.include?(vers)
  @connection_up = true
  [ vers, broker_con ]
end

- (true, false) connected?

Returns true if the socket has not been closed, false otherwise.

Returns:

  • (true, false)


75
76
77
# File 'lib/onstomp/connections/base.rb', line 75

def connected?
  !socket.closed?
end

- (Fixnum?) duration_since_received

Number of milliseconds since data was last received from the broker or nil if no data has been received when the method is called.

Returns:

  • (Fixnum, nil)


139
140
141
# File 'lib/onstomp/connections/base.rb', line 139

def duration_since_received
  last_received_at && ((Time.now.to_f - last_received_at) * 1000)
end

- (Fixnum?) duration_since_transmitted

Number of milliseconds since data was last transmitted to the broker or nil if no data has been transmitted when the method is called.

Returns:

  • (Fixnum, nil)


132
133
134
# File 'lib/onstomp/connections/base.rb', line 132

def duration_since_transmitted
  last_transmitted_at && ((Time.now.to_f - last_transmitted_at) * 1000)
end

- (Object) flush_write_buffer

Flushes the write buffer by invoking #io_process_write until the buffer is empty.



145
146
147
# File 'lib/onstomp/connections/base.rb', line 145

def flush_write_buffer
  io_process_write until @write_buffer.empty?
end

- (Object) io_process(&cb)

Makes a single call to #io_process_write and a single call to #io_process_read



151
152
153
154
155
156
157
# File 'lib/onstomp/connections/base.rb', line 151

def io_process &cb
  io_process_write &cb
  io_process_read &cb
  if @connection_up && !connected?
    triggered_close 'connection timed out', :died
  end
end

- (Object) io_process_read(connecting = false)

Reads serialized frame data from the socket if we're connected and and the socket is ready for reading. The received data will be pushed to the end of a read buffer, which is then sent to the connection's serializer for processing.



236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
# File 'lib/onstomp/connections/base.rb', line 236

def io_process_read(connecting=false)
  if ready_for_read?
    begin
      if data = read_nonblock
        @read_buffer << data
        update_last_received
        serializer.bytes_to_frame(@read_buffer) do |frame|
          yield frame if block_given?
          client.dispatch_received frame
        end
      end
    rescue Errno::EINTR, Errno::EAGAIN, Errno::EWOULDBLOCK
      # do not
    rescue EOFError
      triggered_close $!.message
      raise if connecting
    rescue Exception
      # TODO: Fix this potential race condition the right way.
      # This is the problematic area!  If the user (or failover library)
      # try to reconnect the Client when the connection is closed, the
      # exception won't be raised until the IO Processing thread has
      # already been joined to the main thread.  Thus, the connection gets
      # re-established, the "dying" thread re-enters here, and immediately
      # raises the exception that terminated it.
      triggered_close $!.message, :terminated
      raise
    end
  end
  if connecting && read_timeout_exceeded?
    triggered_close 'read blocked', :blocked
    raise OnStomp::ConnectionTimeoutError
  end
end

- (Object) io_process_write

Writes serialized frame data to the socket if the write buffer is not empty and socket is ready for writing. Once a complete frame has been written, this method will call OnStomp::Client#dispatch_transmitted to notify the client that the frame has been sent to the broker. If a complete frame cannot be written without blocking, the unsent data is sent to the head of the write buffer to be processed first the next time this method is invoked.



196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/onstomp/connections/base.rb', line 196

def io_process_write
  if ready_for_write?
    to_shift = @write_buffer.length / 3
    written = 0
    while written < MAX_BYTES_PER_WRITE
      data, frame = shift_write_buffer
      break unless data && connected?
      begin
        w = write_nonblock data
      rescue Errno::EINTR, Errno::EAGAIN, Errno::EWOULDBLOCK
        # writing will either block, or cannot otherwise be completed,
        # put data back and try again some other day
        unshift_write_buffer data, frame
        break
      rescue Exception
        triggered_close $!.message, :terminated
        raise
      end
      written += w
      update_last_write_activity
      update_last_transmitted
      if w < data.length
        unshift_write_buffer data[w..-1], frame
      else
        yield frame if block_given?
        client.dispatch_transmitted frame
      end
    end
  elsif write_timeout_exceeded?
    triggered_close 'write blocked', :blocked
  end
  if @write_buffer.empty? && @closing
    triggered_close 'client disconnected'
  end
end

- (Object) push_write_buffer(data, frame)

Adds data and frame pair to the end of the write buffer

Parameters:



170
171
172
173
174
175
# File 'lib/onstomp/connections/base.rb', line 170

def push_write_buffer data, frame
  @write_mutex.synchronize {
    update_last_write_activity if @write_buffer.empty?
    @write_buffer << [data, frame] unless @closing
  }
end

- (Object) shift_write_buffer

Removes the first data and frame pair from the write buffer

Parameters:



179
180
181
# File 'lib/onstomp/connections/base.rb', line 179

def shift_write_buffer
  @write_mutex.synchronize { @write_buffer.shift }
end

- (Object) unshift_write_buffer(data, frame)

Adds the remains of data and frame pair to the head of the write buffer

Parameters:



185
186
187
# File 'lib/onstomp/connections/base.rb', line 185

def unshift_write_buffer data, frame
  @write_mutex.synchronize { @write_buffer.unshift [data, frame] }
end

- (Object) write_frame_nonblock(frame)

Serializes the given frame and adds the data to the connections internal write buffer

Parameters:



162
163
164
165
# File 'lib/onstomp/connections/base.rb', line 162

def write_frame_nonblock frame
  ser = serializer.frame_to_bytes frame
  push_write_buffer ser, frame
end