Class: OnStomp::Failover::Buffers::Receipts
- Inherits:
-
Base
- Object
- Base
- OnStomp::Failover::Buffers::Receipts
- Defined in:
- lib/onstomp/failover/buffers/receipts.rb
Overview
TODO:
Quite a lot of this code is shared between Written and Receipts, we'll want to factor the common stuff out.
A buffer that ensures frames are RECEIPTed against a client's connection and replays the ones that were not when the failover client reconnects.
Instance Method Summary (collapse)
-
- (Object) debuffer_frame(r)
Removes frames that neither transactional nor SUBSCRIBEs from the buffer by looking the buffered frames up by their
receipt
header. -
- (Receipts) initialize(failover)
constructor
A new instance of Receipts.
Methods inherited from Base
Constructor Details
- (Receipts) initialize(failover)
A new instance of Receipts
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/onstomp/failover/buffers/receipts.rb', line 10 def initialize failover super [:send, :commit, :abort, :subscribe].each do |ev| failover.__send__(:before_#{ev}") do |f, *_| add_to_buffer f, {:receipt => OnStomp.next_serial} end end failover.before_begin do |f, *_| add_to_transactions f, {:receipt => OnStomp.next_serial} end # We can scrub the subscription before UNSUBSCRIBE is fully written # because if we replay before UNSUBSCRIBE was sent, we still don't # want to be subscribed when we reconnect. failover.before_unsubscribe do |f, *_| remove_subscribe_from_buffer f end failover.on_receipt { |r, *_| debuffer_frame r } failover.on_failover_connected { |f,c,*_| replay_buffer c } end |
Instance Method Details
- (Object) debuffer_frame(r)
Removes frames that neither transactional nor SUBSCRIBEs from the buffer
by looking the buffered frames up by their receipt
header.
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/onstomp/failover/buffers/receipts.rb', line 33 def debuffer_frame r orig = @buffer_mutex.synchronize do @buffer.detect { |f| f[:receipt] == r[:receipt-id'] } end if orig # COMMIT and ABORT debuffer the whole transaction sequence if ['COMMIT', 'ABORT'].include? orig.command remove_from_transactions orig # Otherwise, if this isn't part of a transaction, debuffer the # particular frame (if it's not a SUBSCRIBE) elsif orig.command != 'SUBSCRIBE' && !orig.header?(:transaction) @buffer_mutex.synchronize { @buffer.delete orig } end end end |