Class: OnStomp::Failover::Buffers::Written

Inherits:
Base
  • Object
show all
Defined in:
lib/onstomp/failover/buffers/written.rb

Overview

A buffer that ensures frames are at least written to a client's connection and replays the ones that were not when the failover client reconnects.

Instance Method Summary (collapse)

Methods inherited from Base

#buffered

Constructor Details

- (Written) initialize(failover)

A new instance of Written



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/onstomp/failover/buffers/written.rb', line 8

def initialize failover
  super
  [:send, :commit, :abort, :subscribe].each do |ev|
    failover.__send__(:before_#{ev}") do |f, *_|
      add_to_buffer f
    end
  end
  # We only want to scrub the transactions if ABORT or COMMIT was
  # at least written fully to the socket.
  [:commit, :abort].each do |ev|
    failover.__send__(:on_#{ev}") do |f,*_|
      remove_from_transactions f
    end
  end
  failover.before_begin { |f, *_| add_to_transactions f }
  # We can scrub the subscription before UNSUBSCRIBE is fully written
  # because if we replay before UNSUBSCRIBE was sent, we still don't
  # want to be subscribed when we reconnect.
  failover.before_unsubscribe { |f, *_| remove_subscribe_from_buffer f }
  failover.on_send &method(:debuffer_non_transactional_frame)
  failover.on_failover_connected { |f,c,*_| replay_buffer c }
end

Instance Method Details

- (Object) debuffer_non_transactional_frame(f, *_)

Removes a frame that is not part of a transaction from the buffer after it has been written the broker socket so that it will not be replayed when the failover client re-connects



34
35
36
37
38
# File 'lib/onstomp/failover/buffers/written.rb', line 34

def debuffer_non_transactional_frame f, *_
  unless @txs.key?(f[:transaction])
    @buffer_mutex.synchronize { @buffer.delete f }
  end
end