码迷,mamicode.com
首页 > 其他好文 > 详细

RabbitMQ erlang "Routing"

时间:2015-08-01 15:52:50      阅读:296      评论:0      收藏:0      [点我收藏+]

标签:

官方网址:http://www.rabbitmq.com/tutorials/tutorial-four-python.html

前面的例子中,我们构建了一个简单的日志系统。我们可以广播日志消息给所有的接收者。

在这个例子中,我们准备增加一个新特性。我们将能仅仅订阅消息的一部分。 例如:我们直接仅仅把 critical error 类型的消息写入日志文件(保存到磁盘空间),然而还能够打印所有的日志消息到控制台。


Bindings(绑定)

在前面的例子中,我们已经创建bindings. 如下:

amqp_channel:call(Channel, #‘queue.bind‘{exchange = <<"logs">>,
                                             queue = Queue}),

binding 把 exchange和queue联系了起来。这个可以简单的理解为:这个queue对这个exchage感兴趣。

Bindings 可以带上额外的routing_key 参数。 为了取消对 basic_publish 参数的迷惑,我们将会称它为 binding key. 以下是我们如何通过一个 key 创建一个绑定。

[amqp_channel:call(Channel, #‘queue.bind‘{exchange = <<"direct_logs">>,
                                              routing_key = list_to_binary(Severity),
                                              queue = Queue})
     || Severity <- Argv],

binding key 的意义依赖于exchange 的类型。前面我们使用的 fanout 类型的exchange 会忽略 binding key的值。


Direct exchange

    前面的日志系统例子会广播所有的消息给所有消费者。我们想要扩展它使其允许过滤信息。

    使用 fanout 类型的 exchange,  将不会有太多的灵活性,仅仅能做广播。

    使用 direct类型的 exchange , direct exchange 的算法很简单,消息的routing key 和队列的 binding key 相同的消息会被投递给这个队列。看下图:

技术分享

    这个设置,我们可以看到有两个队列绑定到了一个 direct 类型的exchange。 Q1的 binding key 是orange,

Q2的binding key 是 black 和 green.

    在这个系统中,如果消息带的routing key 是orange, 则此消息会被投递给Q1, 如果消息的routing key是black 或者是green, 则这些消息会被投递给Q2,其他的消息则会被丢弃。


Multiple bindings

技术分享

相同的binding 可以绑定多个 queues . 在上图中,Q1和Q2的binding key都是 black ,这种情况下,direct 类型的exchange会和 fanout类型一样,广播消息给所有匹配的queues.routing key是black 的消息会被发送到Q1和Q2。


Emitting logs

    我们的日志系统将会使用上面的模型, 我们将发送消息到 direct 类型的exchange来代替 fanout类型。 我们将提供日志的严重性作为 routing key。 接收消息的应用程序将能选择它想要接收的严重性的消息。让我们首先专注于发送日志。

    首先,我们需要创建一个 echange:

amqp_channel:call(Channel, #‘exchange.declare‘{exchange = <<"direct_logs">>,
                                                   type = <<"direct">>}),

  另外,我们准备发送消息:

amqp_channel:cast(Channel,
                      #‘basic.publish‘{
                        exchange = <<"direct_logs">>,
                        routing_key = Severity},
                      #amqp_msg{payload = Message}),

一个简单的事情:我们将评估严重性为: ‘info‘,‘warning‘,‘error‘中的其中一种。


Subscribing(订购)

    接收消息的程序和前面的例子差不多,一个不同: 我们将会为我们感兴趣的每个严重性创建一个新的binding。

[amqp_channel:call(Channel, #‘queue.bind‘{exchange = <<"direct_logs">>,
                                              routing_key = list_to_binary(Severity),
                                              queue = Queue})
     || Severity <- Argv],

Putting it all together

    

技术分享

emit_log_direct.erl

-module(emit_log_direct).
-compile([export_all]).
-include_lib("amqp_client/include/amqp_client.hrl").

main(Argv) ->
    {ok, Connection} =
        amqp_connection:start(#amqp_params_network{host = "localhost"}),
    {ok, Channel} = amqp_connection:open_channel(Connection),

    amqp_channel:call(Channel, #‘exchange.declare‘{exchange = <<"direct_logs">>,
                                                   type = <<"direct">>}),

    {Severity, Message} = case Argv of
                              [] ->
                                  {<<"info">>, <<"Hello World!">>};
                              [S] ->
                                  {list_to_binary(S), <<"Hello World!">>};
                              [S | Msg] ->
                                  {list_to_binary(S), list_to_binary(string:join(Msg, " "))}
                          end,
    amqp_channel:cast(Channel,
                      #‘basic.publish‘{
                        exchange = <<"direct_logs">>,
                        routing_key = Severity},
                      #amqp_msg{payload = Message}),
    io:format(" [x] Sent ~p:~p~n", [Severity, Message]),
    ok = amqp_channel:close(Channel),
    ok = amqp_connection:close(Connection),
    ok.

receive_logs_direct.erl

-module(receive_logs_direct).
-compile([export_all]).

-include_lib("amqp_client/include/amqp_client.hrl").

main(Argv) ->
    {ok, Connection} =
        amqp_connection:start(#amqp_params_network{host = "localhost"}),
    {ok, Channel} = amqp_connection:open_channel(Connection),

    amqp_channel:call(Channel, #‘exchange.declare‘{exchange = <<"direct_logs">>,
                                                   type = <<"direct">>}),

    #‘queue.declare_ok‘{queue = Queue} =
        amqp_channel:call(Channel, #‘queue.declare‘{exclusive = true}),

    [amqp_channel:call(Channel, #‘queue.bind‘{exchange = <<"direct_logs">>,
                                              routing_key = list_to_binary(Severity),
                                              queue = Queue})
     || Severity <- Argv],

    io:format(" [*] Waiting for logs. To exit press CTRL+C~n"),

    amqp_channel:subscribe(Channel, #‘basic.consume‘{queue = Queue,
                                                     no_ack = true}, self()),
    receive
        #‘basic.consume_ok‘{} -> ok
    end,
    loop(Channel).

loop(Channel) ->
    receive
        {#‘basic.deliver‘{routing_key = RoutingKey}, #amqp_msg{payload = Body}} ->
            io:format(" [x] ~p:~p~n", [RoutingKey, Body]),
            loop(Channel)
    end.


RabbitMQ erlang "Routing"

标签:

原文地址:http://my.oschina.net/lvhuizhenblog/blog/486448

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!