Class: OnStomp::Connections::Base
- Inherits:
-
Object
- Object
- OnStomp::Connections::Base
- Includes:
- Interfaces::ConnectionEvents
- Defined in:
- lib/onstomp/connections/base.rb
Overview
Common behavior for all connections.
Defined Under Namespace
Modules: BlockingRead, BlockingWrite, NonblockingRead, NonblockingWrite
Constant Summary
- MAX_BYTES_PER_WRITE =
The approximate maximum number of bytes to write per call to #io_process_write.
1024 * 8
- MAX_BYTES_PER_READ =
The maximum number of bytes to read per call to #io_process_read
1024 * 4
Instance Attribute Summary (collapse)
-
- (Object) client
readonly
Returns the value of attribute client.
-
- (Object) last_received_at
readonly
Returns the value of attribute last_received_at.
-
- (Object) last_transmitted_at
readonly
Returns the value of attribute last_transmitted_at.
-
- (Object) read_timeout
Returns the value of attribute read_timeout.
-
- (Object) socket
readonly
Returns the value of attribute socket.
-
- (Object) version
readonly
Returns the value of attribute version.
-
- (Object) write_timeout
Returns the value of attribute write_timeout.
Instance Method Summary (collapse)
-
- (Object) close(blocking = false)
Closes the #socket.
-
- (Object) configure(connected, con_cbs)
Performs any necessary configuration of the connection from the CONNECTED frame sent by the broker and a
Hash
of pending callbacks. -
- (Object) connect(client, *headers)
Exchanges the CONNECT/CONNECTED frame handshake with the broker and returns the version detected along with the received CONNECTED frame.
-
- (true, false) connected?
Returns true if the socket has not been closed, false otherwise.
-
- (Fixnum?) duration_since_received
Number of milliseconds since data was last received from the broker or
nil
if no data has been received when the method is called. -
- (Fixnum?) duration_since_transmitted
Number of milliseconds since data was last transmitted to the broker or
nil
if no data has been transmitted when the method is called. -
- (Object) flush_write_buffer
Flushes the write buffer by invoking #io_process_write until the buffer is empty.
- - (Base) initialize(socket, client) constructor
-
- (Object) io_process(&cb)
Makes a single call to #io_process_write and a single call to #io_process_read.
-
- (Object) io_process_read(connecting = false)
Reads serialized frame data from the socket if we're connected and and the socket is ready for reading.
-
- (Object) io_process_write
Writes serialized frame data to the socket if the write buffer is not empty and socket is ready for writing.
-
- (Object) method_missing(meth, *args, &block)
Checks if the missing method ends with '_frame', and if so raises a UnsupportedCommandError exception.
-
- (Object) push_write_buffer(data, frame)
Adds data and frame pair to the end of the write buffer.
-
- (Object) shift_write_buffer
Removes the first data and frame pair from the write buffer.
-
- (Object) unshift_write_buffer(data, frame)
Adds the remains of data and frame pair to the head of the write buffer.
-
- (Object) write_frame_nonblock(frame)
Serializes the given frame and adds the data to the connections internal write buffer.
Methods included from Interfaces::ConnectionEvents
#blocked, #closed, #died, #established, #install_bindings_from_client, #on_blocked, #on_closed, #on_died, #on_established, #on_terminated, #terminated, #trigger_connection_event
Methods included from Interfaces::EventManager
#bind_event, #event_callbacks, included, #trigger_event
Constructor Details
- (Base) initialize(socket, client)
22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/onstomp/connections/base.rb', line 22 def initialize socket, client @socket = socket @write_mutex = Mutex.new @closing = false @write_buffer = [] @read_buffer = [] @client = client @connection_up = false self.read_timeout = 120 self.write_timeout = nil setup_non_blocking_methods end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
- (Object) method_missing(meth, *args, &block)
Checks if the missing method ends with '_frame', and if so raises a UnsupportedCommandError exception.
121 122 123 124 125 126 127 |
# File 'lib/onstomp/connections/base.rb', line 121 def method_missing meth, *args, &block if meth.to_s =~ /^(.*)_frame$/ raise OnStomp::UnsupportedCommandError, $1.upcase else super end end |
Instance Attribute Details
- (Object) client (readonly)
Returns the value of attribute client
6 7 8 |
# File 'lib/onstomp/connections/base.rb', line 6 def client @client end |
- (Object) last_received_at (readonly)
Returns the value of attribute last_received_at
7 8 9 |
# File 'lib/onstomp/connections/base.rb', line 7 def last_received_at @last_received_at end |
- (Object) last_transmitted_at (readonly)
Returns the value of attribute last_transmitted_at
7 8 9 |
# File 'lib/onstomp/connections/base.rb', line 7 def last_transmitted_at @last_transmitted_at end |
- (Object) read_timeout
Returns the value of attribute read_timeout
8 9 10 |
# File 'lib/onstomp/connections/base.rb', line 8 def read_timeout @read_timeout end |
- (Object) socket (readonly)
Returns the value of attribute socket
6 7 8 |
# File 'lib/onstomp/connections/base.rb', line 6 def socket @socket end |
- (Object) version (readonly)
Returns the value of attribute version
6 7 8 |
# File 'lib/onstomp/connections/base.rb', line 6 def version @version end |
- (Object) write_timeout
Returns the value of attribute write_timeout
8 9 10 |
# File 'lib/onstomp/connections/base.rb', line 8 def write_timeout @write_timeout end |
Instance Method Details
- (Object) close(blocking = false)
Closes the #socket. If blocking
is true, the socket will be closed
immediately, otherwies the socket will remain open until #io_process_write
has finished writing all of its buffered data. Once this method has been
invoked, #write_frame_nonblock will not enqueue any additional frames
for writing.
85 86 87 88 89 90 91 |
# File 'lib/onstomp/connections/base.rb', line 85 def close blocking=false @write_mutex.synchronize { @closing = true } if blocking io_process_write until @write_buffer.empty? socket.close end end |
- (Object) configure(connected, con_cbs)
Performs any necessary configuration of the connection from the CONNECTED
frame sent by the broker and a Hash
of pending callbacks. This method
is called after the protocol negotiation has taken place between client
and broker, and the connection that receives it will be the connection
used by the client for the duration of the session.
68 69 70 71 |
# File 'lib/onstomp/connections/base.rb', line 68 def configure connected, con_cbs @version = connected.header?(:version) ? connected[:version] : '1.0' install_bindings_from_client con_cbs end |
- (Object) connect(client, *headers)
Exchanges the CONNECT/CONNECTED frame handshake with the broker and returns the version detected along with the received CONNECTED frame. The supplied list of headers will be merged into the CONNECT frame sent to the broker.
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/onstomp/connections/base.rb', line 98 def connect client, *headers # I really don't care for this. A core part of the CONNECT/CONNECTED # exchange can only be accomplished through subclasses. write_frame_nonblock connect_frame(*headers) client_con = nil until client_con io_process_write { |f| client_con ||= f } end update_last_received broker_con = nil until broker_con io_process_read(true) { |f| broker_con ||= f } end raise OnStomp::ConnectFailedError if broker_con.command != 'CONNECTED' vers = broker_con.header?(:version) ? broker_con[:version] : '1.0' raise OnStomp::UnsupportedProtocolVersionError, vers unless client.versions.include?(vers) @connection_up = true [ vers, broker_con ] end |
- (true, false) connected?
Returns true if the socket has not been closed, false otherwise.
75 76 77 |
# File 'lib/onstomp/connections/base.rb', line 75 def connected? !socket.closed? end |
- (Fixnum?) duration_since_received
Number of milliseconds since data was last received from the broker or
nil
if no data has been received when the method is called.
139 140 141 |
# File 'lib/onstomp/connections/base.rb', line 139 def duration_since_received last_received_at && ((Time.now.to_f - last_received_at) * 1000) end |
- (Fixnum?) duration_since_transmitted
Number of milliseconds since data was last transmitted to the broker or
nil
if no data has been transmitted when the method is called.
132 133 134 |
# File 'lib/onstomp/connections/base.rb', line 132 def duration_since_transmitted last_transmitted_at && ((Time.now.to_f - last_transmitted_at) * 1000) end |
- (Object) flush_write_buffer
Flushes the write buffer by invoking #io_process_write until the buffer is empty.
145 146 147 |
# File 'lib/onstomp/connections/base.rb', line 145 def flush_write_buffer io_process_write until @write_buffer.empty? end |
- (Object) io_process(&cb)
Makes a single call to #io_process_write and a single call to #io_process_read
151 152 153 154 155 156 157 |
# File 'lib/onstomp/connections/base.rb', line 151 def io_process &cb io_process_write &cb io_process_read &cb if @connection_up && !connected? triggered_close 'connection timed out', :died end end |
- (Object) io_process_read(connecting = false)
Reads serialized frame data from the socket if we're connected and and the socket is ready for reading. The received data will be pushed to the end of a read buffer, which is then sent to the connection's serializer for processing.
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 |
# File 'lib/onstomp/connections/base.rb', line 236 def io_process_read(connecting=false) if ready_for_read? begin if data = read_nonblock @read_buffer << data update_last_received serializer.bytes_to_frame(@read_buffer) do |frame| yield frame if block_given? client.dispatch_received frame end end rescue Errno::EINTR, Errno::EAGAIN, Errno::EWOULDBLOCK # do not rescue EOFError triggered_close $!. raise if connecting rescue Exception # TODO: Fix this potential race condition the right way. # This is the problematic area! If the user (or failover library) # try to reconnect the Client when the connection is closed, the # exception won't be raised until the IO Processing thread has # already been joined to the main thread. Thus, the connection gets # re-established, the "dying" thread re-enters here, and immediately # raises the exception that terminated it. triggered_close $!., :terminated raise end end if connecting && read_timeout_exceeded? triggered_close 'read blocked', :blocked raise OnStomp::ConnectionTimeoutError end end |
- (Object) io_process_write
Writes serialized frame data to the socket if the write buffer is not empty and socket is ready for writing. Once a complete frame has been written, this method will call OnStomp::Client#dispatch_transmitted to notify the client that the frame has been sent to the broker. If a complete frame cannot be written without blocking, the unsent data is sent to the head of the write buffer to be processed first the next time this method is invoked.
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/onstomp/connections/base.rb', line 196 def io_process_write if ready_for_write? to_shift = @write_buffer.length / 3 written = 0 while written < MAX_BYTES_PER_WRITE data, frame = shift_write_buffer break unless data && connected? begin w = write_nonblock data rescue Errno::EINTR, Errno::EAGAIN, Errno::EWOULDBLOCK # writing will either block, or cannot otherwise be completed, # put data back and try again some other day unshift_write_buffer data, frame break rescue Exception triggered_close $!., :terminated raise end written += w update_last_write_activity update_last_transmitted if w < data.length unshift_write_buffer data[w..-1], frame else yield frame if block_given? client.dispatch_transmitted frame end end elsif write_timeout_exceeded? triggered_close 'write blocked', :blocked end if @write_buffer.empty? && @closing triggered_close 'client disconnected' end end |
- (Object) push_write_buffer(data, frame)
Adds data and frame pair to the end of the write buffer
170 171 172 173 174 175 |
# File 'lib/onstomp/connections/base.rb', line 170 def push_write_buffer data, frame @write_mutex.synchronize { update_last_write_activity if @write_buffer.empty? @write_buffer << [data, frame] unless @closing } end |
- (Object) shift_write_buffer
Removes the first data and frame pair from the write buffer
179 180 181 |
# File 'lib/onstomp/connections/base.rb', line 179 def shift_write_buffer @write_mutex.synchronize { @write_buffer.shift } end |
- (Object) unshift_write_buffer(data, frame)
Adds the remains of data and frame pair to the head of the write buffer
185 186 187 |
# File 'lib/onstomp/connections/base.rb', line 185 def unshift_write_buffer data, frame @write_mutex.synchronize { @write_buffer.unshift [data, frame] } end |
- (Object) write_frame_nonblock(frame)
Serializes the given frame and adds the data to the connections internal write buffer
162 163 164 165 |
# File 'lib/onstomp/connections/base.rb', line 162 def write_frame_nonblock frame ser = serializer.frame_to_bytes frame push_write_buffer ser, frame end |