码迷,mamicode.com
首页 > 其他好文 > 详细

JMS-activeMq发布订阅模式(非持久订阅)

时间:2017-11-13 14:13:07      阅读:235      评论:0      收藏:0      [点我收藏+]

标签:ber   ring   receiver   persist   except   tcp   订阅模式   send   system   

 

技术分享

Publisher的代码:

  1. import javax.jms.Connection;  
  2. import javax.jms.ConnectionFactory;  
  3. import javax.jms.DeliveryMode;  
  4. import javax.jms.Destination;  
  5. import javax.jms.JMSException;  
  6. import javax.jms.MapMessage;  
  7. import javax.jms.MessageProducer;  
  8. import javax.jms.Session;  
  9. import javax.jms.TextMessage;  
  10.   
  11. import org.apache.activemq.ActiveMQConnectionFactory;  
  12.   
  13. public class Publisher {  
  14.   
  15.     // 单例模式  
  16.   
  17.     // 1、连接工厂  
  18.     private ConnectionFactory connectionFactory;  
  19.     // 2、连接对象  
  20.     private Connection connection;  
  21.     // 3、Session对象  
  22.     private Session session;  
  23.     // 4、生产者  
  24.     private MessageProducer messageProducer;  
  25.   
  26.     public Publisher() {  
  27.   
  28.         try {  
  29.             this.connectionFactory = new ActiveMQConnectionFactory("zhangsan",  
  30.                     "123", "tcp://localhost:61616");  
  31.             this.connection = connectionFactory.createConnection();  
  32.             this.connection.start();  
  33.             // 不使用事务  
  34.             // 设置客户端签收模式  
  35.             this.session = this.connection.createSession(false,  
  36.                     Session.AUTO_ACKNOWLEDGE);  
  37.             this.messageProducer = this.session.createProducer(null);  
  38.         } catch (JMSException e) {  
  39.             throw new RuntimeException(e);  
  40.         }  
  41.   
  42.     }  
  43.   
  44.     public Session getSession() {  
  45.         return this.session;  
  46.     }  
  47.   
  48.     public void send1(/* String QueueName, Message message */) {  
  49.         try {  
  50.   
  51.             Destination destination = this.session.createTopic("topic1");  
  52.             MapMessage msg1 = this.session.createMapMessage();  
  53.             msg1.setString("name", "张三");  
  54.             msg1.setInt("age", 22);  
  55.   
  56.             MapMessage msg2 = this.session.createMapMessage();  
  57.             msg2.setString("name", "李四");  
  58.             msg2.setInt("age", 25);  
  59.   
  60.             MapMessage msg3 = this.session.createMapMessage();  
  61.             msg3.setString("name", "张三");  
  62.             msg3.setInt("age", 30);  
  63.   
  64.             // 发送消息到topic1  
  65.             this.messageProducer.send(destination, msg1,  
  66.                     DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);  
  67.             this.messageProducer.send(destination, msg2,  
  68.                     DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);  
  69.             this.messageProducer.send(destination, msg3,  
  70.                     DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);  
  71.   
  72.         } catch (JMSException e) {  
  73.             throw new RuntimeException(e);  
  74.         }  
  75.     }  
  76.   
  77.     public void send2() {  
  78.         try {  
  79.             Destination destination = this.session.createTopic("topic1");  
  80.             TextMessage message = this.session.createTextMessage("我是一个字符串");  
  81.             // 发送消息  
  82.             this.messageProducer.send(destination, message,  
  83.                     DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);  
  84.         } catch (JMSException e) {  
  85.             throw new RuntimeException(e);  
  86.         }  
  87.   
  88.     }  
  89.   
  90.     public static void main(String[] args) {  
  91.         Publisher producer = new Publisher();  
  92.         producer.send1();  
  93.   
  94.     }  
  95.   
  96. }  

Subscribe的代码:

  1. import javax.jms.Connection;  
  2. import javax.jms.ConnectionFactory;  
  3. import javax.jms.Destination;  
  4. import javax.jms.JMSException;  
  5. import javax.jms.MapMessage;  
  6. import javax.jms.Message;  
  7. import javax.jms.MessageConsumer;  
  8. import javax.jms.MessageListener;  
  9. import javax.jms.Session;  
  10. import javax.jms.TextMessage;  
  11.   
  12. import org.apache.activemq.ActiveMQConnectionFactory;  
  13.   
  14. public class Subscriber {  
  15.   
  16.     // 单例模式  
  17.   
  18.     // 1、连接工厂  
  19.     private ConnectionFactory connectionFactory;  
  20.     // 2、连接对象  
  21.     private Connection connection;  
  22.     // 3、Session对象  
  23.     private Session session;  
  24.     // 4、生产者  
  25.     private MessageConsumer messageConsumer;  
  26.     // 5、目的地址  
  27.     private Destination destination;  
  28.   
  29.     public Subscriber() {  
  30.   
  31.         try {  
  32.             this.connectionFactory = new ActiveMQConnectionFactory("zhangsan",  
  33.                     "123", "tcp://localhost:61616");  
  34.             this.connection = connectionFactory.createConnection();  
  35.             this.connection.start();  
  36.             // 不使用事务  
  37.             // 设置客户端签收模式  
  38.             this.session = this.connection.createSession(false,  
  39.                     Session.AUTO_ACKNOWLEDGE);  
  40.             this.destination = this.session.createTopic("topic1");  
  41.             this.messageConsumer = this.session.createConsumer(destination);  
  42.         } catch (JMSException e) {  
  43.             throw new RuntimeException(e);  
  44.         }  
  45.   
  46.     }  
  47.   
  48.     public Session getSession() {  
  49.         return this.session;  
  50.     }  
  51.   
  52.     // 用于监听消息队列的消息  
  53.     class MyLister implements MessageListener {  
  54.   
  55.         @Override  
  56.         public void onMessage(Message message) {  
  57.             try {  
  58.                 if (message instanceof TextMessage) {  
  59.   
  60.                 }  
  61.                 if (message instanceof MapMessage) {  
  62.                     MapMessage ret = (MapMessage) message;  
  63.                     System.out.println(ret.toString());  
  64.                     System.out.println(ret.getString("name"));  
  65.                     System.out.println(ret.getInt("age"));  
  66.                     // 因为设置的是客户端的签收模式,所以要手动的去确认消息的消费  
  67.                     message.acknowledge();  
  68.                 }  
  69.             } catch (JMSException e) {  
  70.                 throw new RuntimeException(e);  
  71.             }  
  72.         }  
  73.   
  74.     }  
  75.   
  76.     // 用于异步监听消息  
  77.     public void receiver() {  
  78.         try {  
  79.             this.messageConsumer.setMessageListener(new MyLister());  
  80.         } catch (JMSException e) {  
  81.             throw new RuntimeException(e);  
  82.         }  
  83.     }  
  84.   
  85.     public static void main(String[] args) {  
  86.         Subscriber conmuser = new Subscriber();  
  87.         conmuser.receiver();  
  88.   
  89.     }  
  90.   
  91. }  

 技术分享

技术分享

技术分享

 

 先启动消费者(先订阅后消费),再启动发布者

技术分享

 

JMS-activeMq发布订阅模式(非持久订阅)

标签:ber   ring   receiver   persist   except   tcp   订阅模式   send   system   

原文地址:http://www.cnblogs.com/austinspark-jessylu/p/7825257.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!