Class Bunny::Subscription09
In: lib/bunny/subscription09.rb
Parent: Qrack::Subscription
StandardError ForcedConnectionCloseError UnsubscribeError ConnectionError ServerDownError AcknowledgementError MessageError ProtocolError ForcedChannelCloseError Qrack::Client Client Client09 Qrack::Channel Channel Channel09 Qrack::Queue Queue09 Queue Qrack::Subscription Subscription09 Subscription Exchange09 Exchange Qrack lib/bunny/exchange09.rb lib/bunny/queue08.rb lib/bunny/exchange08.rb lib/bunny/client09.rb lib/bunny/channel09.rb lib/bunny/queue09.rb lib/bunny/client08.rb lib/bunny.rb lib/bunny/subscription09.rb lib/bunny/subscription08.rb lib/bunny/channel08.rb Bunny dot/m_24_0.png

DESCRIPTION:

Asks the server to start a "consumer", which is a transient request for messages from a specific queue. Consumers last as long as the channel they were created on, or until the client cancels them with an unsubscribe. Every time a message reaches the queue it is passed to the blk for processing. If error occurs, Bunny::ProtocolError is raised.

OPTIONS:

  • :consumer_tag => ‘tag - Specifies the identifier for the consumer. The consumer tag is local to a connection, so two clients can use the same consumer tags. If this option is not specified a server generated name is used.
  • :ack => false (default) or true - If set to false, the server does not expect an acknowledgement message from the client. If set to true, the server expects an acknowledgement message from the client and will re-queue the message if it does not receive one within a time specified by the server.
  • :exclusive => true or false (default) - Request exclusive consumer access, meaning only this consumer can access the queue.
  • :nowait => true or false (default) - Ignored by Bunny, always false.
  • <tt>:timeout => number of seconds - The subscribe loop will continue to wait for messages until terminated (Ctrl-C or kill command) or this timeout interval is reached.
  • :message_max => max number messages to process - When the required number of messages is processed subscribe loop is exited.

OPERATION:

Passes a hash of message information to the block, if one has been supplied. The hash contains :header, :payload and :delivery_details. The structure of the data is as follows -

:header has instance variables -

  @klass
  @size
  @weight
  @properties is a hash containing -
    :content_type
    :delivery_mode
    :priority

:payload contains the message contents

:delivery details is a hash containing -

  :consumer_tag
  :delivery_tag
  :redelivered
  :exchange
  :routing_key

If the :timeout option is specified then Qrack::ClientTimeout is raised if method times out waiting to receive the next message from the queue.

EXAMPLES

my_queue.subscribe(:timeout => 5) {|msg| puts msg[:payload]}

my_queue.subscribe(:message_max => 10, :ack => true) {|msg| puts msg[:payload]}

Methods

Public Instance methods

[Source]

    # File lib/bunny/subscription09.rb, line 64
64:                 def setup_consumer
65:                         client.send_frame(
66:                                 Qrack::Protocol09::Basic::Consume.new({ :deprecated_ticket => 0,
67:                                                                                                                                                                                              :queue => queue.name,
68:                                                                                                                                                                                            :consumer_tag => consumer_tag,
69:                                                                                                                                                                                            :no_ack => !ack,
70:                                                                                                                                                                                                 :exclusive => exclusive,
71:                                                                                                                                                                                            :nowait => false}.merge(@opts))
72:                                                                                                 )
73: 
74:                         method = client.next_method
75: 
76:                         client.check_response(method,        Qrack::Protocol09::Basic::ConsumeOk,
77:                                 "Error subscribing to queue #{queue.name}")
78: 
79:                         @consumer_tag = method.consumer_tag
80:                 
81:                 end

[Validate]