Class: OnStomp::Client

Inherits:
Object
  • Object
show all
Includes:
OnStomp::Components::Scopes, Interfaces::ClientConfigurable, Interfaces::ClientEvents, Interfaces::FrameMethods, Interfaces::ReceiptManager, Interfaces::SubscriptionManager
Defined in:
lib/onstomp/client.rb,
lib/onstomp/failover/new_with_failover.rb

Overview

This class encapsulates a client connection to a message broker through the Stomp protocol.

Instance Attribute Summary (collapse)

Methods you ought not use directly. (collapse)

Class Method Summary (collapse)

Instance Method Summary (collapse)

Methods included from OnStomp::Components::Scopes

#transaction, #with_headers, #with_receipt

Methods included from Interfaces::SubscriptionManager

#subscriptions

Methods included from Interfaces::ClientEvents

#:begin, #abort, #ack, #after_receiving, #after_transmitting, #before_abort, #before_ack, #before_begin, #before_broker_beat, #before_client_beat, #before_commit, #before_disconnect, #before_error, #before_message, #before_nack, #before_receipt, #before_receiving, #before_send, #before_subscribe, #before_transmitting, #before_unsubscribe, #broker_beat, #client_beat, #commit, #error, #message, #nack, #on_abort, #on_ack, #on_begin, #on_broker_beat, #on_client_beat, #on_commit, #on_connect, #on_disconnect, #on_error, #on_message, #on_nack, #on_receipt, #on_send, #on_subscribe, #on_unsubscribe, #pending_connection_events, #receipt, #receiving, #send, #subscribe, #transmitting, #trigger_after_receiving, #trigger_after_transmitting, #trigger_before_receiving, #trigger_before_transmitting, #trigger_frame_event, #unsubscribe

Methods included from Interfaces::EventManager

#bind_event, #event_callbacks, included, #trigger_event

Methods included from Interfaces::FrameMethods

#abort, #ack, #beat, #begin, #commit, #nack, #send, #subscribe, #unsubscribe

Methods included from Interfaces::ClientConfigurable

included

Constructor Details

- (Client) initialize(uri, options = {})

Creates a new client for the specified uri and optional hash of options.

Parameters:

  • uri (String, URI)
  • options ({Symbol => Object}) (defaults to: {})


65
66
67
68
69
70
71
72
73
74
# File 'lib/onstomp/client.rb', line 65

def initialize uri, options={}
  @uri = uri.is_a?(::URI) ? uri : ::URI.parse(uri)
  @ssl = options.delete(:ssl)
  configure_configurable options
  configure_subscription_management
  configure_receipt_management
  on_disconnect do |f, con|
    close unless f[:receipt]
  end
end

Instance Attribute Details

- (OnStomp::Connections::Base) connection (readonly)

Connection object specific to the established STOMP protocol version



21
22
23
# File 'lib/onstomp/client.rb', line 21

def connection
  @connection
end

- (Array<Fixnum>) heartbeats

The client-side heartbeat settings to allow for this connection

Returns:

  • (Array<Fixnum>)


29
# File 'lib/onstomp/client.rb', line 29

attr_configurable_client_beats :heartbeats

- (String) host

The host header value to send to the broker when connecting. This allows the client to inform the server which host it wishes to connect with when multiple brokers may share an IP address through virtual hosting.

Returns:

  • (String)


35
# File 'lib/onstomp/client.rb', line 35

attr_configurable_str :host, :default => 'localhost', :uri_attr => :host

- (String) login

The login header value to send to the broker when connecting.

Returns:

  • (String)


39
# File 'lib/onstomp/client.rb', line 39

attr_configurable_str :login, :default => '', :uri_attr => :user

- (String) passcode

The passcode header value to send to the broker when connecting.

Returns:

  • (String)


43
# File 'lib/onstomp/client.rb', line 43

attr_configurable_str :passcode, :default => '', :uri_attr => :password

- (Fixnum) read_timeout

The number of seconds to wait before a connection that is read-blocked during the connect phase is considered dead. Defaults to 120 seconds.

Returns:

  • (Fixnum)


59
# File 'lib/onstomp/client.rb', line 59

attr_configurable_int :read_timeout, :default => 120

- (Symbol => Object) ssl (readonly)

SSL options for the connection

Returns:

  • (Symbol => Object)


18
19
20
# File 'lib/onstomp/client.rb', line 18

def ssl
  @ssl
end

- (String) uri (readonly)

The URI reference to the STOMP broker

Returns:

  • (String)


15
16
17
# File 'lib/onstomp/client.rb', line 15

def uri
  @uri
end

- (Array<String>) versions

The protocol versions to allow for this connection

Returns:

  • (Array<String>)


25
# File 'lib/onstomp/client.rb', line 25

attr_configurable_protocols :versions

- (Fixnum) write_timeout

The number of seconds to wait before a write-blocked connection is considered dead. Defaults to 120 seconds.

Returns:

  • (Fixnum)


53
# File 'lib/onstomp/client.rb', line 53

attr_configurable_int :write_timeout, :default => 120

Class Method Details

+ (OnStomp::Client, OnStomp::Failover::Client) new_with_failover(uri, options = {}) Also known as: new

Creates an alias chain for new so that if a failover: URI or an array of URIs are passed to the constructor, a failover client is built instead.



9
10
11
12
13
14
15
# File 'lib/onstomp/failover/new_with_failover.rb', line 9

def new_with_failover(uri, options={})
  if uri.is_a?(Array) || uri.to_s =~ /^failover:/i
    OnStomp::Failover::Client.new(uri, options)
  else
    new_without_failover(uri, options)
  end
end

Instance Method Details

- (self) close!

Note:

Use of this method may result in frames never being sent to the broker. This method should only be used if #disconnect is not an option and the connection needs to be terminated immediately.

Forces the connection between broker and client closed.

Returns:

  • (self)


123
124
125
126
127
# File 'lib/onstomp/client.rb', line 123

def close!
  close
  processor_inst.stop
  self
end

- (self) connect(headers = {}) Also known as: open

Connects to the STOMP broker referenced by #uri. Includes optional headers in the CONNECT frame, if specified.

Parameters:

  • headers ({#to_sym => #to_s}) (defaults to: {})

Returns:

  • (self)


81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/onstomp/client.rb', line 81

def connect headers={}
  # FIXME: This is a quick fix to force the Threaded IO processor to
  # complete its work before we establish a connection.
  processor_inst.stop
  @connection = OnStomp::Connections.connect self, headers,
    { :accept-version' => @versions.join(','), :host => @host,
      :heart-beat' => @heartbeats.join(','), :login => @login,
      :passcode => @passcode }, pending_connection_events,
    read_timeout, write_timeout
  processor_inst.start
  self
end

- (true, false) connected?

Returns true if a connection to the broker exists and itself is connected.

Returns:

  • (true, false)


113
114
115
# File 'lib/onstomp/client.rb', line 113

def connected?
  connection && connection.connected?
end

- (OnStomp::Components::Frame) disconnect_with_flush(headers = {}) Also known as: disconnect

Sends a DISCONNECT frame to the broker and blocks until the connection has been closed. This method ensures that all frames not yet sent to the broker will get processed barring any IO exceptions.

Parameters:

  • headers ({#to_sym => #to_s}) (defaults to: {})

Returns:



101
102
103
104
105
106
# File 'lib/onstomp/client.rb', line 101

def disconnect_with_flush headers={}
  processor_inst.prepare_to_close
  disconnect_without_flush(headers).tap do
    processor_inst.join
  end
end

- (Object) dispatch_received(frame)

Called by #connection when a frame has been read from the socket connection to the STOMP broker.



145
146
147
148
# File 'lib/onstomp/client.rb', line 145

def dispatch_received frame
  trigger_before_receiving frame
  trigger_after_receiving frame
end

- (Object) dispatch_transmitted(frame)

Called by #connection when a frame has been written to the socket connection to the STOMP broker.



152
153
154
# File 'lib/onstomp/client.rb', line 152

def dispatch_transmitted frame
  trigger_after_transmitting frame
end

- (Class) processor

The class to use when instantiating a new IO processor for the connection. Defaults to OnStomp::Components::ThreadedProcessor

Returns:

  • (Class)


48
# File 'lib/onstomp/client.rb', line 48

attr_configurable_processor :processor

- (OnStomp::Components::Frame) transmit(frame, cbs = {})

Ultimately sends a frame to the STOMP broker. This method should not be invoked directly. Use the frame methods provided by the Interfaces:FrameMethod interface.



135
136
137
138
139
140
141
# File 'lib/onstomp/client.rb', line 135

def transmit frame, cbs={}
  frame.tap do
    register_callbacks frame, cbs
    trigger_before_transmitting frame
    connection && connection.write_frame_nonblock(frame)
  end
end