Class: OnStomp::Failover::Buffers::Receipts

Inherits:
Base
  • Object
show all
Defined in:
lib/onstomp/failover/buffers/receipts.rb

Overview

TODO:

Quite a lot of this code is shared between Written and Receipts, we'll want to factor the common stuff out.

A buffer that ensures frames are RECEIPTed against a client's connection and replays the ones that were not when the failover client reconnects.

Instance Method Summary (collapse)

Methods inherited from Base

#buffered

Constructor Details

- (Receipts) initialize(failover)

A new instance of Receipts



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/onstomp/failover/buffers/receipts.rb', line 10

def initialize failover
  super
  [:send, :commit, :abort, :subscribe].each do |ev|
    failover.__send__(:before_#{ev}") do |f, *_|
      add_to_buffer f, {:receipt => OnStomp.next_serial}
    end
  end
  failover.before_begin do |f, *_|
    add_to_transactions f, {:receipt => OnStomp.next_serial}
  end
  # We can scrub the subscription before UNSUBSCRIBE is fully written
  # because if we replay before UNSUBSCRIBE was sent, we still don't
  # want to be subscribed when we reconnect.
  failover.before_unsubscribe do |f, *_|
    remove_subscribe_from_buffer f
  end
  failover.on_receipt { |r, *_| debuffer_frame r }
  failover.on_failover_connected { |f,c,*_| replay_buffer c }
end

Instance Method Details

- (Object) debuffer_frame(r)

Removes frames that neither transactional nor SUBSCRIBEs from the buffer by looking the buffered frames up by their receipt header.



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/onstomp/failover/buffers/receipts.rb', line 33

def debuffer_frame r
  orig = @buffer_mutex.synchronize do
    @buffer.detect { |f| f[:receipt] == r[:receipt-id'] }
  end
  if orig
    # COMMIT and ABORT debuffer the whole transaction sequence
    if ['COMMIT', 'ABORT'].include? orig.command
      remove_from_transactions orig
    # Otherwise, if this isn't part of a transaction, debuffer the
    # particular frame (if it's not a SUBSCRIBE)
    elsif orig.command != 'SUBSCRIBE' && !orig.header?(:transaction)
      @buffer_mutex.synchronize { @buffer.delete orig }
    end
  end
end