Class Bunny::Exchange09
In: lib/bunny/exchange09.rb
Parent: Object
StandardError ForcedConnectionCloseError UnsubscribeError ConnectionError ServerDownError AcknowledgementError MessageError ProtocolError ForcedChannelCloseError Qrack::Client Client Client09 Qrack::Channel Channel Channel09 Qrack::Queue Queue09 Queue Qrack::Subscription Subscription09 Subscription Exchange09 Exchange Qrack lib/bunny/exchange09.rb lib/bunny/queue08.rb lib/bunny/exchange08.rb lib/bunny/client09.rb lib/bunny/channel09.rb lib/bunny/queue09.rb lib/bunny/client08.rb lib/bunny.rb lib/bunny/subscription09.rb lib/bunny/subscription08.rb lib/bunny/channel08.rb Bunny dot/m_24_0.png

DESCRIPTION:

Exchanges are the routing and distribution hub of AMQP. All messages that Bunny sends to an AMQP broker/server have to pass through an exchange in order to be routed to a destination queue. The AMQP specification defines the types of exchange that you can create.

At the time of writing there are four (4) types of exchange defined -

  • :direct
  • :fanout
  • :topic
  • :headers

AMQP-compliant brokers/servers are required to provide default exchanges for the direct and fanout exchange types. All default exchanges are prefixed with ‘amq.’, for example -

  • amq.direct
  • amq.fanout
  • amq.topic
  • amq.match or amq.headers

If you want more information about exchanges, please consult the documentation for your target broker/server or visit the AMQP website to find the version of the specification that applies to your target broker/server.

Methods

delete   new   publish  

Attributes

client  [R] 
key  [R] 
name  [R] 
opts  [R] 
type  [R] 

Public Class methods

[Source]

    # File lib/bunny/exchange09.rb, line 36
36:     def initialize(client, name, opts = {})
37:       # check connection to server
38:       raise Bunny::ConnectionError, 'Not connected to server' if client.status == :not_connected
39:     
40:       @client, @name, @opts = client, name, opts
41:   
42:       # set up the exchange type catering for default names
43:       if name.match(/^amq\./)
44:         new_type = name.sub(/amq\./, '')
45:         # handle 'amq.match' default
46:         new_type = 'headers' if new_type == 'match'
47:         @type = new_type.to_sym
48:       else
49:         @type = opts[:type] || :direct
50:       end
51:       
52:       @key = opts[:key]
53:       @client.exchanges[@name] ||= self
54:       
55:       # ignore the :nowait option if passed, otherwise program will hang waiting for a
56:       # response that will not be sent by the server
57:       opts.delete(:nowait)
58:       
59:       unless name == "amq.#{type}" or name == ''
60:         client.send_frame(
61:           Qrack::Protocol09::Exchange::Declare.new(
62:             { :exchange => name, :type => type, :nowait => false,
63:                                                        :deprecated_ticket => 0, :deprecated_auto_delete => false, :deprecated_internal => false }.merge(opts)
64:           )
65:         )
66: 
67:         method = client.next_method
68: 
69:                                 client.check_response(method, Qrack::Protocol09::Exchange::DeclareOk,
70:                                     "Error declaring exchange #{name}: type = #{type}")
71:                                 
72:       end
73:     end

Public Instance methods

DESCRIPTION:

Requests that an exchange is deleted from broker/server. Removes reference from exchanges if successful. If an error occurs raises Bunny::ProtocolError.

Options:

  • :if_unused => true or false (default) - If set to true, the server will only delete the exchange if it has no queue bindings. If the exchange has queue bindings the server does not delete it but raises a channel exception instead.
  • :nowait => true or false (default) - Ignored by Bunny, always false.

Returns:

:delete_ok if successful

[Source]

     # File lib/bunny/exchange09.rb, line 94
 94:     def delete(opts = {})
 95:       # ignore the :nowait option if passed, otherwise program will hang waiting for a
 96:       # response that will not be sent by the server
 97:       opts.delete(:nowait)
 98: 
 99:       client.send_frame(
100:         Qrack::Protocol09::Exchange::Delete.new({ :exchange => name, :nowait => false, :deprecated_ticket => 0 }.merge(opts))
101:       )
102: 
103:       method = client.next_method
104: 
105:                         client.check_response(method, Qrack::Protocol09::Exchange::DeleteOk,
106:                              "Error deleting exchange #{name}")
107: 
108:       client.exchanges.delete(name)
109: 
110:       # return confirmation
111:       :delete_ok
112:     end

DESCRIPTION:

Publishes a message to a specific exchange. The message will be routed to queues as defined by the exchange configuration and distributed to any active consumers when the transaction, if any, is committed.

OPTIONS:

  • :key => ‘routing_key‘ - Specifies the routing key for the message. The routing key is used for routing messages depending on the exchange configuration.
  • :mandatory => true or false (default) - Tells the server how to react if the message cannot be routed to a queue. If set to true, the server will return an unroutable message with a Return method. If this flag is zero, the server silently drops the message.
  • :immediate => true or false (default) - Tells the server how to react if the message cannot be routed to a queue consumer immediately. If set to true, the server will return an undeliverable message with a Return method. If set to false, the server will queue the message, but with no guarantee that it will ever be consumed.
  • :persistent => true or false (default) - Tells the server whether to persist the message If set to true, the message will be persisted to disk and not lost if the server restarts. If set to false, the message will not be persisted across server restart. Setting to true incurs a performance penalty as there is an extra cost associated with disk access.

RETURNS:

nil

[Source]

     # File lib/bunny/exchange09.rb, line 144
144:     def publish(data, opts = {})
145:       opts = opts.dup
146:       out = []
147: 
148:                         # Set up options
149:                         routing_key = opts.delete(:key) || key
150:                         mandatory = opts.delete(:mandatory)
151:                         immediate = opts.delete(:immediate)
152:                         delivery_mode = opts.delete(:persistent) ? 2 : 1
153: 
154:                         out << Qrack::Protocol09::Basic::Publish.new(
155:                           { :exchange => name,
156:                                         :routing_key => routing_key,
157:                                         :mandatory => mandatory,
158:                                         :immediate => immediate,
159:                                         :deprecated_ticket => 0 }
160:                         )
161:                         data = data.to_s
162:                         out << Qrack::Protocol09::Header.new(
163:                           Qrack::Protocol09::Basic,
164:                           data.length, {
165:                             :content_type  => 'application/octet-stream',
166:                             :delivery_mode => delivery_mode,
167:                             :priority      => 0 
168:                           }.merge(opts)
169:                         )
170:                         out << Qrack::Transport09::Body.new(data)
171: 
172:       client.send_frame(*out)
173:     end

[Validate]