Class | Qrack::Subscription |
In: |
lib/qrack/subscription.rb
|
Parent: | Object |
Subscription ancestor class
ack | [RW] | |
client | [R] | |
consumer_tag | [RW] | |
delivery_tag | [RW] | |
exclusive | [RW] | |
message_count | [R] | |
message_max | [RW] | |
queue | [R] | |
timeout | [RW] |
# File lib/qrack/subscription.rb, line 8 8: def initialize(client, queue, opts = {}) 9: @client = client 10: @queue = queue 11: 12: # Get timeout value 13: @timeout = opts[:timeout] || nil 14: 15: # Get maximum amount of messages to process 16: @message_max = opts[:message_max] || nil 17: 18: # If a consumer tag is not passed in the server will generate one 19: @consumer_tag = opts[:consumer_tag] || nil 20: 21: # Ignore the :nowait option if passed, otherwise program will hang waiting for a 22: # response from the server causing an error. 23: opts.delete(:nowait) 24: 25: # Do we want to have to provide an acknowledgement? 26: @ack = opts[:ack] || nil 27: 28: # Does this consumer want exclusive use of the queue? 29: @exclusive = opts[:exclusive] || false 30: 31: # Initialize message counter 32: @message_count = 0 33: 34: # Give queue reference to this subscription 35: @queue.subscription = self 36: 37: # Store options 38: @opts = opts 39: 40: end
# File lib/qrack/subscription.rb, line 42 42: def start(&blk) 43: 44: # Do not process any messages if zero message_max 45: if message_max == 0 46: return 47: end 48: 49: # Notify server about new consumer 50: setup_consumer 51: 52: # Start subscription loop 53: loop do 54: 55: begin 56: method = client.next_method(:timeout => timeout) 57: rescue Qrack::ClientTimeout 58: queue.unsubscribe() 59: break 60: end 61: 62: # Increment message counter 63: @message_count += 1 64: 65: # get delivery tag to use for acknowledge 66: queue.delivery_tag = method.delivery_tag if @ack 67: 68: header = client.next_payload 69: 70: # If maximum frame size is smaller than message payload body then message 71: # will have a message header and several message bodies 72: msg = '' 73: while msg.length < header.size 74: msg += client.next_payload 75: end 76: 77: # If block present, pass the message info to the block for processing 78: blk.call({:header => header, :payload => msg, :delivery_details => method.arguments}) if !blk.nil? 79: 80: # Exit loop if message_max condition met 81: if (!message_max.nil? and message_count == message_max) 82: # Stop consuming messages 83: queue.unsubscribe() 84: # Acknowledge receipt of the final message 85: queue.ack() if @ack 86: # Quit the loop 87: break 88: end 89: 90: # Have to do the ack here because the ack triggers the release of messages from the server 91: # if you are using Client#qos prefetch and you will get extra messages sent through before 92: # the unsubscribe takes effect to stop messages being sent to this consumer unless the ack is 93: # deferred. 94: queue.ack() if @ack 95: 96: end 97: 98: end