Class | Bunny::Exchange09 |
In: |
lib/bunny/exchange09.rb
|
Parent: | Object |
Exchanges are the routing and distribution hub of AMQP. All messages that Bunny sends to an AMQP broker/server have to pass through an exchange in order to be routed to a destination queue. The AMQP specification defines the types of exchange that you can create.
At the time of writing there are four (4) types of exchange defined -
AMQP-compliant brokers/servers are required to provide default exchanges for the direct and fanout exchange types. All default exchanges are prefixed with ‘amq.’, for example -
If you want more information about exchanges, please consult the documentation for your target broker/server or visit the AMQP website to find the version of the specification that applies to your target broker/server.
client | [R] | |
key | [R] | |
name | [R] | |
opts | [R] | |
type | [R] |
# File lib/bunny/exchange09.rb, line 36 36: def initialize(client, name, opts = {}) 37: # check connection to server 38: raise Bunny::ConnectionError, 'Not connected to server' if client.status == :not_connected 39: 40: @client, @name, @opts = client, name, opts 41: 42: # set up the exchange type catering for default names 43: if name.match(/^amq\./) 44: new_type = name.sub(/amq\./, '') 45: # handle 'amq.match' default 46: new_type = 'headers' if new_type == 'match' 47: @type = new_type.to_sym 48: else 49: @type = opts[:type] || :direct 50: end 51: 52: @key = opts[:key] 53: @client.exchanges[@name] ||= self 54: 55: # ignore the :nowait option if passed, otherwise program will hang waiting for a 56: # response that will not be sent by the server 57: opts.delete(:nowait) 58: 59: unless name == "amq.#{type}" or name == '' 60: client.send_frame( 61: Qrack::Protocol09::Exchange::Declare.new( 62: { :exchange => name, :type => type, :nowait => false, 63: :deprecated_ticket => 0, :deprecated_auto_delete => false, :deprecated_internal => false }.merge(opts) 64: ) 65: ) 66: 67: method = client.next_method 68: 69: client.check_response(method, Qrack::Protocol09::Exchange::DeclareOk, 70: "Error declaring exchange #{name}: type = #{type}") 71: 72: end 73: end
Requests that an exchange is deleted from broker/server. Removes reference from exchanges if successful. If an error occurs raises Bunny::ProtocolError.
:delete_ok if successful
# File lib/bunny/exchange09.rb, line 94 94: def delete(opts = {}) 95: # ignore the :nowait option if passed, otherwise program will hang waiting for a 96: # response that will not be sent by the server 97: opts.delete(:nowait) 98: 99: client.send_frame( 100: Qrack::Protocol09::Exchange::Delete.new({ :exchange => name, :nowait => false, :deprecated_ticket => 0 }.merge(opts)) 101: ) 102: 103: method = client.next_method 104: 105: client.check_response(method, Qrack::Protocol09::Exchange::DeleteOk, 106: "Error deleting exchange #{name}") 107: 108: client.exchanges.delete(name) 109: 110: # return confirmation 111: :delete_ok 112: end
Publishes a message to a specific exchange. The message will be routed to queues as defined by the exchange configuration and distributed to any active consumers when the transaction, if any, is committed.
nil
# File lib/bunny/exchange09.rb, line 144 144: def publish(data, opts = {}) 145: opts = opts.dup 146: out = [] 147: 148: # Set up options 149: routing_key = opts.delete(:key) || key 150: mandatory = opts.delete(:mandatory) 151: immediate = opts.delete(:immediate) 152: delivery_mode = opts.delete(:persistent) ? 2 : 1 153: 154: out << Qrack::Protocol09::Basic::Publish.new( 155: { :exchange => name, 156: :routing_key => routing_key, 157: :mandatory => mandatory, 158: :immediate => immediate, 159: :deprecated_ticket => 0 } 160: ) 161: data = data.to_s 162: out << Qrack::Protocol09::Header.new( 163: Qrack::Protocol09::Basic, 164: data.length, { 165: :content_type => 'application/octet-stream', 166: :delivery_mode => delivery_mode, 167: :priority => 0 168: }.merge(opts) 169: ) 170: out << Qrack::Transport09::Body.new(data) 171: 172: client.send_frame(*out) 173: end