Class Qrack::Subscription
In: lib/qrack/subscription.rb
Parent: Object
Timeout::Error ClientTimeout ConnectionTimeout StandardError InvalidTypeError\n[lib/qrack/qrack08.rb\nlib/qrack/qrack09.rb] BufferOverflowError\n[lib/qrack/qrack08.rb\nlib/qrack/qrack09.rb] Channel Client Subscription Queue Protocol09 Transport09 Protocol Transport lib/qrack/qrack08.rb lib/qrack/channel.rb lib/qrack/client.rb lib/qrack/subscription.rb lib/qrack/queue.rb Transport Transport09 Protocol Protocol09 Qrack dot/m_23_0.png

Subscription ancestor class

Methods

new   start  

Attributes

ack  [RW] 
client  [R] 
consumer_tag  [RW] 
delivery_tag  [RW] 
exclusive  [RW] 
message_count  [R] 
message_max  [RW] 
queue  [R] 
timeout  [RW] 

Public Class methods

[Source]

    # File lib/qrack/subscription.rb, line 8
 8:                 def initialize(client, queue, opts = {})
 9:                         @client = client
10:                         @queue = queue
11:                 
12:                         # Get timeout value
13:                         @timeout = opts[:timeout] || nil
14:                 
15:                         # Get maximum amount of messages to process
16:                         @message_max = opts[:message_max] || nil
17: 
18:                         # If a consumer tag is not passed in the server will generate one
19:                         @consumer_tag = opts[:consumer_tag] || nil
20: 
21:                         # Ignore the :nowait option if passed, otherwise program will hang waiting for a
22:                         # response from the server causing an error.
23:                         opts.delete(:nowait)
24: 
25:                         # Do we want to have to provide an acknowledgement?
26:                         @ack = opts[:ack] || nil
27:                         
28:                         # Does this consumer want exclusive use of the queue?
29:                         @exclusive = opts[:exclusive] || false
30:                 
31:                         # Initialize message counter
32:                         @message_count = 0
33:                         
34:                         # Give queue reference to this subscription
35:                         @queue.subscription = self
36:                         
37:                         # Store options
38:                         @opts = opts
39:                 
40:                 end

Public Instance methods

[Source]

    # File lib/qrack/subscription.rb, line 42
42:                 def start(&blk)
43:                         
44:                         # Do not process any messages if zero message_max
45:                         if message_max == 0
46:                                 return
47:                         end
48:                         
49:                         # Notify server about new consumer
50:                         setup_consumer
51: 
52:                         # Start subscription loop
53:                         loop do
54:                         
55:                                 begin
56:                                         method = client.next_method(:timeout => timeout)
57:                                 rescue Qrack::ClientTimeout
58:                                         queue.unsubscribe()
59:                                         break
60:                                 end
61:                                 
62:                                 # Increment message counter
63:                                 @message_count += 1
64:                 
65:                                 # get delivery tag to use for acknowledge
66:                                 queue.delivery_tag = method.delivery_tag if @ack
67:                 
68:                                 header = client.next_payload
69: 
70:                           # If maximum frame size is smaller than message payload body then message
71:                                 # will have a message header and several message bodies                             
72:                           msg = ''
73:                                 while msg.length < header.size
74:                                         msg += client.next_payload
75:                                 end
76: 
77:                                 # If block present, pass the message info to the block for processing               
78:                                 blk.call({:header => header, :payload => msg, :delivery_details => method.arguments}) if !blk.nil?
79: 
80:                                 # Exit loop if message_max condition met
81:                                 if (!message_max.nil? and message_count == message_max)
82:                                         # Stop consuming messages
83:                                         queue.unsubscribe()                                
84:                                         # Acknowledge receipt of the final message
85:                                         queue.ack() if @ack
86:                                         # Quit the loop
87:                                         break
88:                                 end
89:                         
90:                                 # Have to do the ack here because the ack triggers the release of messages from the server
91:                                 # if you are using Client#qos prefetch and you will get extra messages sent through before
92:                                 # the unsubscribe takes effect to stop messages being sent to this consumer unless the ack is
93:                                 # deferred.
94:                                 queue.ack() if @ack
95:                 
96:                         end
97:                 
98:                 end

[Validate]