Class: OnStomp::Failover::Buffers::Written
- Inherits:
-
Base
- Object
- Base
- OnStomp::Failover::Buffers::Written
- Defined in:
- lib/onstomp/failover/buffers/written.rb
Overview
A buffer that ensures frames are at least written to a client's connection and replays the ones that were not when the failover client reconnects.
Instance Method Summary (collapse)
-
- (Object) debuffer_non_transactional_frame(f, *_)
Removes a frame that is not part of a transaction from the buffer after it has been written the broker socket so that it will not be replayed when the failover client re-connects.
-
- (Written) initialize(failover)
constructor
A new instance of Written.
Methods inherited from Base
Constructor Details
- (Written) initialize(failover)
A new instance of Written
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/onstomp/failover/buffers/written.rb', line 8 def initialize failover super [:send, :commit, :abort, :subscribe].each do |ev| failover.__send__(:before_#{ev}") do |f, *_| add_to_buffer f end end # We only want to scrub the transactions if ABORT or COMMIT was # at least written fully to the socket. [:commit, :abort].each do |ev| failover.__send__(:on_#{ev}") do |f,*_| remove_from_transactions f end end failover.before_begin { |f, *_| add_to_transactions f } # We can scrub the subscription before UNSUBSCRIBE is fully written # because if we replay before UNSUBSCRIBE was sent, we still don't # want to be subscribed when we reconnect. failover.before_unsubscribe { |f, *_| remove_subscribe_from_buffer f } failover.on_send &method(:debuffer_non_transactional_frame) failover.on_failover_connected { |f,c,*_| replay_buffer c } end |
Instance Method Details
- (Object) debuffer_non_transactional_frame(f, *_)
Removes a frame that is not part of a transaction from the buffer after it has been written the broker socket so that it will not be replayed when the failover client re-connects
34 35 36 37 38 |
# File 'lib/onstomp/failover/buffers/written.rb', line 34 def debuffer_non_transactional_frame f, *_ unless @txs.key?(f[:transaction]) @buffer_mutex.synchronize { @buffer.delete f } end end |