码迷,mamicode.com
首页 > 其他好文 > 详细

ActiveMQ发布订阅模式

时间:2017-01-17 15:20:28      阅读:197      评论:0      收藏:0      [点我收藏+]

标签:available   需要   void   normal   code   系统   host   cer   com   

ActiveMQ的另一种模式就SUB/HUB即发布订阅模式,是SUB/hub就是一拖N的USB分线器的意思。意思就是一个来源分到N个出口。还是上节的例子,当一个订单产生后,后台N个系统需要联动,但有一个前提是都需要收到订单信息,那么我们就需要将一个生产者的消息发布到N个消费者。

生产者:

 

技术分享
            try
            {
                //Create the Connection Factory  
                IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
                using (IConnection connection = factory.CreateConnection())
                {
                    //Create the Session  
                    using (ISession session = connection.CreateSession())
                    {
                        //Create the Producer for the topic/queue  
                        IMessageProducer prod = session.CreateProducer(
                            new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"));

                        //Send Messages  
                        int i = 0;

                        while (!Console.KeyAvailable)
                        {
                            ITextMessage msg = prod.CreateTextMessage();
                            msg.Text = i.ToString();
                            Console.WriteLine("Sending: " + i.ToString());
                            prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);

                            System.Threading.Thread.Sleep(5000);
                            i++;
                        }
                    }
                }

                Console.ReadLine();
            }
            catch (System.Exception e)
            {
                Console.WriteLine("{0}", e.Message);
                Console.ReadLine();
            }
技术分享

假设生产者每5秒发送一次消息:

技术分享

消费者:

技术分享
        static void Main(string[] args)
        {
            try  
            {  
                //Create the Connection factory  
                IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");  
                  
                //Create the connection  
                using (IConnection connection = factory.CreateConnection())  
                {  
                    connection.ClientId = "testing listener1";  
                    connection.Start();  
  
                    //Create the Session  
                    using (ISession session = connection.CreateSession())  
                    {  
                        //Create the Consumer  
                        IMessageConsumer consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"), "testing listener1", null, false);  
                          
                        consumer.Listener += new MessageListener(consumer_Listener);  
  
                        Console.ReadLine();  
                    }  
                    connection.Stop();  
                    connection.Close();  
                }  
            }  
            catch (System.Exception e)  
            {  
                Console.WriteLine(e.Message);  
            }  
        }  
  
        static void consumer_Listener(IMessage message)  
        {  
            try  
            {  
                ITextMessage msg = (ITextMessage)message;  
                Console.WriteLine("Receive: " + msg.Text);  
           }  
            catch (System.Exception e)  
            {  
                Console.WriteLine(e.Message);  
            }  
        }
技术分享

 

启动一个消费者:

技术分享

我们发现他是从15开始的,而不是像上节一样从头开始,再启动另一个消费者:

技术分享

我们发现就是从启动时开始接受消息的,之前的消息就丢失了。

整体状态如下:

技术分享

我们观察管理界面:

技术分享

产生了一个testing的Topics,而订阅方有2个都订阅的是testing:

技术分享

这样只需要在需要获取消息的地方订阅即可及时获得。

源代码下载

ActiveMQ发布订阅模式

标签:available   需要   void   normal   code   系统   host   cer   com   

原文地址:http://www.cnblogs.com/amylis_chen/p/6292997.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!