标签:
官方网址:http://www.rabbitmq.com/tutorials/tutorial-four-python.html
前面的例子中,我们构建了一个简单的日志系统。我们可以广播日志消息给所有的接收者。
在这个例子中,我们准备增加一个新特性。我们将能仅仅订阅消息的一部分。 例如:我们直接仅仅把 critical error 类型的消息写入日志文件(保存到磁盘空间),然而还能够打印所有的日志消息到控制台。
Bindings(绑定)
在前面的例子中,我们已经创建bindings. 如下:
amqp_channel:call(Channel, #‘queue.bind‘{exchange = <<"logs">>, queue = Queue}),
binding 把 exchange和queue联系了起来。这个可以简单的理解为:这个queue对这个exchage感兴趣。
Bindings 可以带上额外的routing_key 参数。 为了取消对 basic_publish 参数的迷惑,我们将会称它为 binding key. 以下是我们如何通过一个 key 创建一个绑定。
[amqp_channel:call(Channel, #‘queue.bind‘{exchange = <<"direct_logs">>, routing_key = list_to_binary(Severity), queue = Queue}) || Severity <- Argv],
binding key 的意义依赖于exchange 的类型。前面我们使用的 fanout 类型的exchange 会忽略 binding key的值。
Direct exchange
前面的日志系统例子会广播所有的消息给所有消费者。我们想要扩展它使其允许过滤信息。
使用 fanout 类型的 exchange, 将不会有太多的灵活性,仅仅能做广播。
使用 direct类型的 exchange , direct exchange 的算法很简单,消息的routing key 和队列的 binding key 相同的消息会被投递给这个队列。看下图:
这个设置,我们可以看到有两个队列绑定到了一个 direct 类型的exchange。 Q1的 binding key 是orange,
Q2的binding key 是 black 和 green.
在这个系统中,如果消息带的routing key 是orange, 则此消息会被投递给Q1, 如果消息的routing key是black 或者是green, 则这些消息会被投递给Q2,其他的消息则会被丢弃。
Multiple bindings
相同的binding 可以绑定多个 queues . 在上图中,Q1和Q2的binding key都是 black ,这种情况下,direct 类型的exchange会和 fanout类型一样,广播消息给所有匹配的queues.routing key是black 的消息会被发送到Q1和Q2。
Emitting logs
我们的日志系统将会使用上面的模型, 我们将发送消息到 direct 类型的exchange来代替 fanout类型。 我们将提供日志的严重性作为 routing key。 接收消息的应用程序将能选择它想要接收的严重性的消息。让我们首先专注于发送日志。
首先,我们需要创建一个 echange:
amqp_channel:call(Channel, #‘exchange.declare‘{exchange = <<"direct_logs">>, type = <<"direct">>}),
另外,我们准备发送消息:
amqp_channel:cast(Channel, #‘basic.publish‘{ exchange = <<"direct_logs">>, routing_key = Severity}, #amqp_msg{payload = Message}),
一个简单的事情:我们将评估严重性为: ‘info‘,‘warning‘,‘error‘中的其中一种。
Subscribing(订购)
接收消息的程序和前面的例子差不多,一个不同: 我们将会为我们感兴趣的每个严重性创建一个新的binding。
[amqp_channel:call(Channel, #‘queue.bind‘{exchange = <<"direct_logs">>, routing_key = list_to_binary(Severity), queue = Queue}) || Severity <- Argv],
Putting it all together
emit_log_direct.erl
-module(emit_log_direct). -compile([export_all]). -include_lib("amqp_client/include/amqp_client.hrl"). main(Argv) -> {ok, Connection} = amqp_connection:start(#amqp_params_network{host = "localhost"}), {ok, Channel} = amqp_connection:open_channel(Connection), amqp_channel:call(Channel, #‘exchange.declare‘{exchange = <<"direct_logs">>, type = <<"direct">>}), {Severity, Message} = case Argv of [] -> {<<"info">>, <<"Hello World!">>}; [S] -> {list_to_binary(S), <<"Hello World!">>}; [S | Msg] -> {list_to_binary(S), list_to_binary(string:join(Msg, " "))} end, amqp_channel:cast(Channel, #‘basic.publish‘{ exchange = <<"direct_logs">>, routing_key = Severity}, #amqp_msg{payload = Message}), io:format(" [x] Sent ~p:~p~n", [Severity, Message]), ok = amqp_channel:close(Channel), ok = amqp_connection:close(Connection), ok.
receive_logs_direct.erl
-module(receive_logs_direct). -compile([export_all]). -include_lib("amqp_client/include/amqp_client.hrl"). main(Argv) -> {ok, Connection} = amqp_connection:start(#amqp_params_network{host = "localhost"}), {ok, Channel} = amqp_connection:open_channel(Connection), amqp_channel:call(Channel, #‘exchange.declare‘{exchange = <<"direct_logs">>, type = <<"direct">>}), #‘queue.declare_ok‘{queue = Queue} = amqp_channel:call(Channel, #‘queue.declare‘{exclusive = true}), [amqp_channel:call(Channel, #‘queue.bind‘{exchange = <<"direct_logs">>, routing_key = list_to_binary(Severity), queue = Queue}) || Severity <- Argv], io:format(" [*] Waiting for logs. To exit press CTRL+C~n"), amqp_channel:subscribe(Channel, #‘basic.consume‘{queue = Queue, no_ack = true}, self()), receive #‘basic.consume_ok‘{} -> ok end, loop(Channel). loop(Channel) -> receive {#‘basic.deliver‘{routing_key = RoutingKey}, #amqp_msg{payload = Body}} -> io:format(" [x] ~p:~p~n", [RoutingKey, Body]), loop(Channel) end.
标签:
原文地址:http://my.oschina.net/lvhuizhenblog/blog/486448