标签:通过 maven UI queue cto 提前 发送 net r.java
声明 转载请注明出处! Reprint please indicate the source!
http://www.hiknowledge.top/?p=86&preview=true
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
引用自百度百科
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
引用自百度百科
Maven依赖
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.14.4</version>
</dependency>要运行起来demo得先,启动broker。我是在虚拟机上测试的。ip:192.168.235.100
进入到 执行:
activemq start访问页面 http://192.168.235.100:8161/
broker%E7%AE%A1%E7%90%86%E9%A1%B5%E9%9D%A2.png)
默认用户名/密码:admin/admin
JMSProducer.java
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * Created with IntelliJ IDEA.<br>
 * Description: JMS ActiveMQ Demo测试 消息生产者<br>
 * 运行前,需要打开本地的activemq。
 * 如果需要更改broker地址,要提前运行相应的broker。
 * User: jahen<br>
 * Date: 2017-04-02<br>
 * Time: 11:06<br>
 */
public class JMSProducer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认连接
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码
//    private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认连接地址 为 failover://tcp://localhost:61616
    private static final String BROKER_URL = "failover://tcp://192.168.235.100:61616"; // 指定连接地址 (my VM)
    private static final int SENDNUM = 10; // 发送的消息数量
    public static void main(String[] args) {
        ConnectionFactory connectionFactory; // 连接工程,生产Connection
        Connection connection = null; // 连接
        Session session; // 会话 接受或者发送消息的线程
        Destination destination; // 消息的目的地
        MessageProducer messageProducer; // 消息生产者
        // 实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        // 创建连接
        try {
            connection = connectionFactory.createConnection();
            connection.start(); // 启动连接
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 提交事务,自动确认
            destination = session.createQueue("FirstQueue"); // 创建消息队列
            messageProducer = session.createProducer(destination); // 创建消息发送者
            sendMessage(session, messageProducer); // 发送消息
            session.commit(); // 提交事务
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (connection!=null)
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
        }
    }
    /**
     * 发送消息
     * @param session 会话
     * @param messageProducer 消息生产者
     */
    private static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
        for (int i=0; i<JMSProducer.SENDNUM; i++) {
            TextMessage message = session.createTextMessage("ActiveMQ 发送的消息 "+i);
            System.out.println("发送消息: ActiveMQ 发送的消息 "+i);
            messageProducer.send(message);
        }
    }
}运行一下JMSProudcer,生产10条消息。
JMSConsumer.java
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import javax.jms.*;
/**
 * Created with IntelliJ IDEA.<br>
 * Description: 消息消费者1-点对点模式<br>
 *     实现方式1 循环检测<br>
 * User: jahen<br>
 * Date: 2017-04-02<br>
 * Time: 13:44<br>
 */
public class JMSConsumer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认连接
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码
    //    private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认连接地址 为 failover://tcp://localhost:61616
    private static final String BROKER_URL = "failover://tcp://192.168.235.100:61616"; // 指定连接地址 (my VM)
    public static void main(String args[]) {
        ConnectionFactory connectionFactory; // 连接工程,生产Connection
        Connection connection = null; // 连接
        Session session; // 会话 接受或者发送消息的线程
        Destination destination; // 消息的目的地
        MessageConsumer messageConsumer; // 消息消费者
        // 实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 消费消息不需要事务,自动确认
            destination = session.createQueue("FirstQueue"); // 创建消息队列
            messageConsumer = session.createConsumer(destination); // 创建消息消费者
            while (true) {
                TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);// 设置延时为100s
                if (textMessage!=null) { // 接收到消息
                    System.out.println("接收的消息:"+textMessage.getText());
                }else {
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}运行一下JMSConsumer,消费10条消息。
%E6%B6%88%E8%B4%B9-%E7%94%9F%E4%BA%A7.png)
这种方式消费消息,通过循环检查,显然是不高明的。
下面,通过设置监听的方式,实现消息消费。
再次运行一下JMSProudcer,生产10条消息。
%E5%8F%88%E7%94%9F%E4%BA%A7%E4%BA%8610%E6%9D%A1%E6%B6%88%E6%81%AF.png)
首先实现一下监听器
Listenr.java
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
 * Created with IntelliJ IDEA.<br>
 * Description: 消息监听者<br>
 * User: jahen<br>
 * Date: 2017-04-02<br>
 * Time: 14:30<br>
 */
public class Listener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("收到消息:" + ((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}JMSConsumer2.java
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * Created with IntelliJ IDEA.<br>
 * Description: 消息消费者2-点对点模式<br>
 *     实现方式2 设置监听<br>
 * User: jahen<br>
 * Date: 2017-04-02<br>
 * Time: 13:44<br>
 */
public class JMSConsumer2 {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认连接
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码
    //    private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认连接地址 为 failover://tcp://localhost:61616
    private static final String BROKER_URL = "failover://tcp://192.168.235.100:61616"; // 指定连接地址 (my VM)
    public static void main(String args[]) {
        ConnectionFactory connectionFactory; // 连接工程,生产Connection
        Connection connection = null; // 连接
        Session session; // 会话 接受或者发送消息的线程
        Destination destination; // 消息的目的地
        MessageConsumer messageConsumer; // 消息消费者
        // 实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 消费消息不需要事务,自动确认
            destination = session.createQueue("FirstQueue"); // 创建消息队列
            messageConsumer = session.createConsumer(destination); // 创建消息消费者
            messageConsumer.setMessageListener(new Listener());// 注册消息监听
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}运行一下JMSConsumer2,新产生的消息被消费了。
%E6%96%B0%E4%BA%A7%E7%94%9F%E7%9A%84%E6%B6%88%E6%81%AF%E8%A2%AB%E6%B6%88%E8%B4%B9%E4%BA%86.png)
注意:发布/订阅要先运行订阅,再运行发布才能收到消息。
JMSConsumer.java
package com.jahentao.activemq;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * Created with IntelliJ IDEA.<br>
 * Description: 消息消费者-发布订阅模式 消息订阅者<br>
 *     实现方式 设置监听<br>
 *     消息订阅者1<br>
 * User: jahen<br>
 * Date: 2017-04-02<br>
 * Time: 13:44<br>
 */
public class JMSConsumer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认连接
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码
    //    private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认连接地址 为 failover://tcp://localhost:61616
    private static final String BROKER_URL = "failover://tcp://192.168.235.100:61616"; // 指定连接地址 (my VM)
    public static void main(String args[]) {
        ConnectionFactory connectionFactory; // 连接工程,生产Connection
        Connection connection = null; // 连接
        Session session; // 会话 接受或者发送消息的线程
        Destination destination; // 消息的目的地
        MessageConsumer messageConsumer; // 消息消费者
        // 实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 消费消息不需要事务,自动确认
//            destination = session.createQueue("FirstQueue"); // 创建消息队列
            destination = session.createTopic("FirstTopic"); // 创建消息订阅者
            messageConsumer = session.createConsumer(destination); // 创建消息消费者
            messageConsumer.setMessageListener(new Listener());// 注册消息监听
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}Listener.java
package com.jahentao.activemq;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
 * Created with IntelliJ IDEA.<br>
 * Description: 订阅者1消息监听器<br>
 * User: jahen<br>
 * Date: 2017-04-02<br>
 * Time: 14:46:52<br>
 */
public class Listener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("订阅者一 收到消息:" + ((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}JMSConsumer2.java
package com.jahentao.activemq;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * Created with IntelliJ IDEA.<br>
 * Description: 消息消费者-发布订阅模式 消息订阅者<br>
 *     实现方式 设置监听<br>
 *     消息订阅者2<br>
 * User: jahen<br>
 * Date: 2017-04-02<br>
 * Time: 13:44<br>
 */
public class JMSConsumer2 {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认连接
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码
    //    private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认连接地址 为 failover://tcp://localhost:61616
    private static final String BROKER_URL = "failover://tcp://192.168.235.100:61616"; // 指定连接地址 (my VM)
    public static void main(String args[]) {
        ConnectionFactory connectionFactory; // 连接工程,生产Connection
        Connection connection = null; // 连接
        Session session; // 会话 接受或者发送消息的线程
        Destination destination; // 消息的目的地
        MessageConsumer messageConsumer; // 消息消费者
        // 实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 消费消息不需要事务,自动确认
//            destination = session.createQueue("FirstQueue"); // 创建消息队列
            destination = session.createTopic("FirstTopic"); // 创建消息订阅者
            messageConsumer = session.createConsumer(destination); // 创建消息消费者
            messageConsumer.setMessageListener(new Listener2());// 注册消息监听
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}Listener2.java
package com.jahentao.activemq;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
 * Created with IntelliJ IDEA.<br>
 * Description: 订阅者2消息监听器<br>
 * User: jahen<br>
 * Date: 2017-04-02<br>
 * Time: 14:46:52<br>
 */
public class Listener2 implements MessageListener {
    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("订阅者二 收到消息:" + ((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}首先,分别运行JMSConsumer、JMSConsumer2进行订阅。
2%E4%B8%AA%E8%AE%A2%E9%98%85%E8%80%85.png)
JMSProducer.java
package com.jahentao.activemq;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * Created with IntelliJ IDEA.<br>
 * Description: JMS ActiveMQ Demo测试 发布订阅模式 消息发布者<br>
 * 运行前,需要打开本地的activemq。
 * 如果需要更改broker地址,要提前运行相应的broker。
 * User: jahen<br>
 * Date: 2017-04-02<br>
 * Time: 14:42:59<br>
 */
public class JMSProducer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认连接
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码
//    private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认连接地址 为 failover://tcp://localhost:61616
    private static final String BROKER_URL = "failover://tcp://192.168.235.100:61616"; // 指定连接地址 (my VM)
    private static final int SENDNUM = 10; // 发送的消息数量
    public static void main(String[] args) {
        ConnectionFactory connectionFactory; // 连接工程,生产Connection
        Connection connection = null; // 连接
        Session session; // 会话 接受或者发送消息的线程
        Destination destination; // 消息的目的地
        MessageProducer messageProducer; // 消息生产者
        // 实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        // 创建连接
        try {
            connection = connectionFactory.createConnection();
            connection.start(); // 启动连接
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 提交事务,自动确认
//            destination = session.createQueue("FirstQueue"); // 创建消息队列
            destination = session.createTopic("FirstTopic"); // 创建主题
            messageProducer = session.createProducer(destination); // 创建消息发送者
            sendMessage(session, messageProducer); // 发送消息
            session.commit(); // 提交事务
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (connection!=null)
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
        }
    }
    /**
     * 发送消息
     * @param session 会话
     * @param messageProducer 消息生产者
     */
    private static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
        for (int i = 0; i< JMSProducer.SENDNUM; i++) {
            TextMessage message = session.createTextMessage("ActiveMQ 发送的消息 "+i);
            System.out.println("发送消息: ActiveMQ 发送的消息 "+i);
            messageProducer.send(message);
        }
    }
}然后运行JMSProducer。
%E5%8F%91%E5%B8%83%E6%B6%88%E6%81%AF%E8%AE%A2%E9%98%85%E8%80%85%E6%94%B6%E5%88%B0%E6%B6%88%E6%81%AF.png)
java1234上发布的教程"一头扎进ActiveMQ"
这里学习的源码,托管在码云上
标签:通过 maven UI queue cto 提前 发送 net r.java
原文地址:http://www.cnblogs.com/jahentao/p/activemq_study_1.html