码迷,mamicode.com
首页 > 其他好文 > 详细

ActiveMQ使用

时间:2018-07-28 23:27:29      阅读:240      评论:0      收藏:0      [点我收藏+]

标签:生成   scribe   enc   消息队列   ++   通信   win   程序   数据   

ActiveMQ介绍

Active是一种消息中间件,有两种模式,一种点对点模式 发布者将发布的消息发送给服务器,等待用户监听并接受数据;第二种订阅模式 发布者将消息发布给消息服务器,让服务器将所有的数据直接转发给再监听的用户,进行一对多通信(类似微信公众号)。

点对点模式:

  发布者发布8条信息,这时有3个用户在监听服务器消息,则3个用户共同消费这8条消息。服务器中的每条消息只能被一个用户消费,这种模式服务器会存储发布者发布的数据,当未被用户接收的数据则会留在服务器中,等待下个监听服务器的用户接收数据。

技术分享图片

 

订阅模式(持久订阅模式/非持久订阅模式):

  发布者发布消息给消息服务器,消息服务器则将消息直接转发给监听的用户,这要求发布者发布消息的同时用户也在监听消息,若没有用户监听, 则不保留数据,认为数据已发送完成。也就是发布者发布时,用户没在监听消息,则不会在收到该数据。即使用户以后再监听也接收不到

  技术分享图片

  持久订阅模式:订阅者会注册一个clientId,当订阅者离线时,ActiveMQ会为这个 ID 保存所有拥有这个ID的主题的消息,当订阅者连接时,则会通过自己的clientId得到所有自己处于离线时所要接收主题消息

  非持久订阅模式:只有当订阅者处于连接状态才会接收到发布者发布出来的消息,并且发送完成后ActiveMQ则将消息丢弃。

在程序中使用ActiveMQ

  从官网中下载activeMQ,下载地址:http://activemq.apache.org/download.html

  解压后,打开目录下的bin,根据自己的系统选择win32或win64安装Active服务,并开启activeMQ

  开启后浏览器访问该地址:http://127.0.0.1:8161/,选择 Manage ActiveMQ broker,输入账号密码,默认都是admin

技术分享图片

 

 

  创建一个maven项目,在pom.xml文件中引入jar包

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.9.0</version>
</dependency>

 点对点模式

创建一个消费者和一个生产者的类

技术分享图片

Consumer.java 消费者通过消息监听器监听服务器上的信息

package cn.lcf.activeMQ;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * Hello world!
 *
 */
public class Consumer {
    //设置连接地址
    private static final String url = "tcp://127.0.0.1:61616";
    //设置消息队列名称
    private static final String queueName = "queue-text";
    
    public static void main(String[] args) throws JMSException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        // 2、创建连接对象
        Connection createConnection = connectionFactory.createConnection();
        // 3、启动连接
        createConnection.start();
        // 4、创建会话     createSession第一个参数表示是否支持事务,第二个参数是客户端接收确认模式,Session.AUTO_ACKNOWLEDGE是自动确认,Session.CLIENT_ACKNOWLEDGE 客户端通过调用消息的 acknowledge 方法签收消息。
        Session createSession = createConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
        // 5、创建消息目标 
         Queue createQueue = createSession.createQueue(queueName); 
         // 6 、创建消费者 
         MessageConsumer createConsumer = createSession.createConsumer(createQueue); 
         // 7、设置消费者监听 
         createConsumer.setMessageListener(new MessageListener() { 
             @Override 
             public void onMessage(Message message) { 
                 TextMessage textMessage = (TextMessage) message; 
                 try { 
                     System.out.println("接收的消息为" + textMessage.getText()); 
                 } catch (JMSException e) {
                     // TODO Auto-generated catch block e.printStackTrace();
                 } 
            } 
         });
    }
}

  producer.java  生产者跟消费者差不多,这是第6步开始,变成创建生产者,并发送消息,发送完成之后需要关闭连接。

package cn.lcf.activeMQ;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * Hello world!
 *
 */
public class Producer 
{
    private static final String url = "tcp://127.0.0.1:61616";
    
    private static final String queueName = "queue-text";
    
    public static void main( String[] args ) throws JMSException
    {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        // 2、创建连接对象
        Connection createConnection = connectionFactory.createConnection();
        // 3、启动连接
        createConnection.start();
        // 4、创建会话
        Session createSession = createConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5、创建消息目标
        Queue createQueue = createSession.createQueue(queueName);
        // 6、创建生产者
        MessageProducer createProducer = createSession.createProducer(createQueue);
        for (int i=0;i<100;++i) {
            // 7、创建消息
            TextMessage textMessage = createSession.createTextMessage("666  " + i);
            // 8、发布消息
            createProducer.send(textMessage);
            System.out.println("发送的消息为:" + "666  " + i);
        }
        
        // 9、关闭连接
        createConnection.close();
        
    }
}

运行下生产者的类 Producer.java,将消息存到ActiveMQ服务器上

技术分享图片

查看ActiveMQ中的队列信息

技术分享图片

拥有100条信息,未出列,这时候运行一个消费者(Consumer.java)去消费这100条信息

技术分享图片

查看ActiveMQ上的信息,100条信息全被这个1个消费者接收

 技术分享图片

清空,然后同时运行三个消费者(Consumer.java执行三次后可在console切换不同类的控制台),在运行一个生产者

技术分享图片

技术分享图片技术分享图片技术分享图片

技术分享图片

可以看到这100条消息被这三个消费者平分了。

点对点模式主要用于消除程序高并发高峰对数据库造成的巨大压力,可以通过使用消息队列,让消费者进程从消息队列中获取数据,然后异步将数据写入数据库,由于消息队列的服务处理速度远快于数据库,因此用户的响应延迟可得到有效改善。

 

订阅模式(非持久订阅)

  将消费者(Consumer.java)修改成以下内容,写法相同,只是session不再是创建队列消费者,而是创建主题消费者

package cn.lcf.TestActiveMQ;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消费者
 *
 */
public class Consumer {
    private static final String URL = "tcp://127.0.0.1:61616";
    //订阅模式名称
    private static final String topicName = "topic-name";
    
    public static void main(String[] args) throws JMSException {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
        //创建连接
        Connection createConnection = connectionFactory.createConnection();//打开连接
        createConnection.start();
        //创建会话
        Session createSession = createConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建发布/订阅模式消息
        Topic createTopic = createSession.createTopic(topicName);//        非持久订阅
        //创建消费者
        MessageConsumer createConsumer = createSession.createConsumer(createTopic);
        //设置消费者监听
        createConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("接收的消息为:"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

将生产者(Producer.java)修改成以下内容

package cn.lcf.TestActiveMQ;

import java.util.Enumeration;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 生产者
 *
 */
public class Producer {

    private static final String URL = "tcp://127.0.0.1:61616";
    //发布/订阅模式名称
    private static final String topicName = "topic-name";
    
    public static void main(String[] args) throws JMSException {
        
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
        
        Connection createConnection = connectionFactory.createConnection();
        
        createConnection.start();
        
        Session createSession = createConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //发布/订阅模式
        Topic createTopic = createSession.createTopic(topicName);
        
        MessageProducer createProducer = createSession.createProducer(createTopic);
        
        for (int i = 0; i < 100; i++) {
            TextMessage textMessage = createSession.createTextMessage("666 " + i);
            createProducer.send(textMessage);
            System.out.println("发送的消息为:" + textMessage.getText());
        }
        createConnection.close();
    }
}

之后先运行消费者,在运行生产者,消费者才能接受到信息,否则生产者发布信息时若没有在监听的消费者则会将信息丢弃,这样消费者是接收不到信息的。

同时运行多个消费者,在运行生产者,消费者将获取生产者发布的所有消息

技术分享图片

 

技术分享图片技术分享图片技术分享图片

 

订阅模式(持久订阅)

持久订阅模式的客户端需要创建一个链接id,以保证服务器确认该客户端是否已消费信息,创建完订阅模式,之后不再是创建一个消费者,而是创建一个带有id的用户,这个用户id是唯一的,若有两个相同的id连接,则会报错

public static void main(String[] args) throws JMSException {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
        //创建连接
        Connection createConnection = connectionFactory.createConnection();
        //创建客户端ID
        createConnection.setClientID("333");
        //打开连接
        createConnection.start();
        //创建会话
        Session createSession = createConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        //创建发布/订阅模式消息
        Topic createTopic = createSession.createTopic(topicName);
        
        //创建持久订阅 即未在发布者发布时监听消息,在之后也能接收消息
        TopicSubscriber subscriber = createSession.createDurableSubscriber(createTopic, "333");
        
        subscriber.setMessageListener(new MessageListener() {
            
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("接受消息:" + textMessage.getText());
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
    }

生产者需要将消息模式设为持久订阅模式

public static void main(String[] args) throws JMSException {
        
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
        
        Connection createConnection = connectionFactory.createConnection();
        
        createConnection.start();
        
        Session createSession = createConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //发布/订阅模式
        Topic createTopic = createSession.createTopic(topicName);
        
        MessageProducer createProducer = createSession.createProducer(createTopic);
        //设置为持久订阅模式
        createProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
        
        for (int i = 0; i < 100; i++) {
            TextMessage textMessage = createSession.createTextMessage("666 " + i);
            createProducer.send(textMessage);
            System.out.println("发送的消息为:" + textMessage.getText());
        }
        createConnection.close();
    }

运行用户(Consumer.java),创建连接id,之后将用户连接关闭,启动生产者(Producer.java)发布消息,最后在重新连接用户获取信息。当用户离线状态时,发布者发布的消息会将信息存在activeMQ服务器上,等待用户监听时将消息发送给用户。

生成的用户会在subscribers中显示

技术分享图片

运行发布者(Producer.java),并且用户处于离线状态,则会显示消息等待出列。

 技术分享图片

最后再次连接上用户(Consumer.java),则用户的能立即获取消息。

技术分享图片

技术分享图片

持久传输和非持久传输最大的区别是:采用持久传输时,传输的消息会保存到磁盘中(messages are persisted to disk/database),即“存储转发”方式。先把消息存储到磁盘中,然后再将消息“转发”给订阅者。

采用非持久传输时,发送的消息不会存储到磁盘中。

采用持久传输时,当Borker宕机 恢复后,消息还在。采用非持久传输,Borker宕机重启后,消息丢失。比如,当生产者将消息投递给Broker后,Broker将该消息存储到磁盘中,在Broker将消息发送给Subscriber之前,Broker宕机了,如果采用持久传输,Broker重启后,从磁盘中读出消息再传递给Subscriber;如果采用非持久传输,这条消息就丢失了。

 

ActiveMQ使用

标签:生成   scribe   enc   消息队列   ++   通信   win   程序   数据   

原文地址:https://www.cnblogs.com/lzylcf/p/9381446.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!