码迷,mamicode.com
首页 > 其他好文 > 详细

JMS中间件--ActiveMQ

时间:2014-08-28 11:25:19      阅读:161      评论:0      收藏:0      [点我收藏+]

标签:des   style   blog   http   os   java   使用   io   ar   

java系统之间的消息通讯使用最多的是基于RMI的RPC和基于JMS的RPC,这两种的消息传输方式虽然都能够起到通讯的作用,但是在笔者看来,二者之间的差别还是非常大的。首先RMI是同步传输,而JMS是异步传输,另外二者的使用场景也是大不相同。在系统集成平台这个项目中让我能够有机会更加深入的认识这两种消息通讯机制。基础系统与考试系统之间的数据传输我们采用的是将ejb发布成webservice,然后再通过esb将客户端和webservice进行连接,这种方式在本质上是ejb之间的相互调用,属于RMI方式的消息通讯,最明显的特点是当我们执行远程调用之后不得不等到webservice将数据全部传输完毕之后才能进行相关操作(当然根据业务,也必须选择这种方式,因为我们接下来的所有操作都是基于返回数据的),而JMS则是通过第三方的broker提供消息服务,这个broker相当于一个邮局,客户端发送消息只管将消息内容送到broker并告知broker投送地址,剩下的事情就有broker进行了,客户端完全不必等待,可以进行其他任何操作,而broker所担负的主要职责就是确保消息投送,我们的消息接收这则只管监听broker就可以了。经过对比相信大家已经对这两种方式有了初步的认识,接下来我们重点讲解一下基于JMS的消息中间件---ActiveMQ。

1.首先我们要搭建ActiveMQ服务器,这个服务器就相当于我们前面提到broker,搭建的过程很简单,可上官网查询,这里不再多说。

2.创建queue队列,然后创建发送端测试代码,代码如下:

 public static void main(String[] args) {
        // ConnectionFactory :连接工厂,JMS 用它创建连接
        ConnectionFactory connectionFactory;
        // Connection :JMS 客户端到JMS Provider 的连接
        Connection connection = null;
        // Session: 一个发送或接收消息的线程
        Session session;
        // Destination :消息的目的地;消息发送给谁.
        Destination destination;
        
        // MessageProducer:消息发送者
        MessageProducer producer;
        // TextMessage message;
        // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://localhost:61616");
        try {
            // 构造从工厂得到连接对象
            connection = connectionFactory.createConnection();
            // 启动
            connection.start();
            // 获取操作连接
            session = connection.createSession(Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);
            // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
            //destination = session.createQueue("FirstQueue");
            //destination = session.createTopic("SecondTopic");
            destination = session.createQueue("myqueue");
            // 得到消息生成者【发送者】
            producer = session.createProducer(destination);
            // 设置不持久化,此处学习,实际根据项目决定
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            // 构造消息,此处写死,项目就是参数,或者方法获取
            sendMessage(session, producer);
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }

    public static void sendMessage(Session session, MessageProducer producer)
            throws Exception {
       
            TextMessage message = session
                    .createTextMessage("hello changshou");
            // 发送消息到目的地方
            System.out.println("发送消息:" +message.getText());
            producer.send(message);
    }


执行该方法,发送端输出结果如下:

bubuko.com,布布扣

3.创建接收端测试代码,如下:

package com.xuwei.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Receiver {
    public static void main(String[] args) {
        // ConnectionFactory :连接工厂,JMS 用它创建连接
        ConnectionFactory connectionFactory;
        // Connection :JMS 客户端到JMS Provider 的连接
        Connection connection = null;
        // Session: 一个发送或接收消息的线程
        Session session;
        // Destination :消息的目的地;消息发送给谁.
        Destination destination;
        // 消费者,消息接收者
        MessageConsumer consumer;
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://localhost:61616");
        try {
            // 构造从工厂得到连接对象
            connection = connectionFactory.createConnection();
            // 启动
            connection.start();
            // 获取操作连接
            session = connection.createSession(Boolean.FALSE,
                    Session.AUTO_ACKNOWLEDGE);
            // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
            //destination = session.createQueue("FirstQueue");
            //destination = session.createTopic("SecondTopic");
            destination = session.createQueue("myqueue");
            consumer = session.createConsumer(destination);
            while (true) {
                //设置接收者接收消息的时间,为了便于测试,这里谁定为100s
                TextMessage message = (TextMessage) consumer.receive(100000);
                if (null != message) {
                    System.out.println("收到消息" + message.getText());
                } else {
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }
}

接收端输出结果如下:

bubuko.com,布布扣

结果显示执行发送消息和接收消息是没有问题的,但是在这里我们应该很明确消息队列和消息主题的区别,当我们创建了一个消息队列,将消息发送到消息队列中,这时针对这个消息的接收者很可能是多个,当这个消息被其中的任何一个接收这接收的到的时候这个消息就被消费掉了,其他人将不会再接到此消息,而当我们创建一个主题的时候,同样是将这个消息发送到这个主题中,所有的接收者都会接收到这个消息,并根据消息做出自己的相应处理,原理上很类似于qq的单人间消息传输和讨论组,这两个区别的验证代码也十分的简单,只要发送者创建一个队列,然后建立两个接收者,分别执行发送者的发送方法和接收者的接收方法,如果只有一个接收者能够接收到消息则验证成功,主题的验证也是相同的道理,发送者创建一个主题,然后再创建两个接收这者,仍然执行各自的方法,如果此时两个接收者都能接收到消息则说明主题的验证也成功,有兴趣的同学可以亲自实验一下。

JMS中间件--ActiveMQ

标签:des   style   blog   http   os   java   使用   io   ar   

原文地址:http://blog.csdn.net/a1314517love/article/details/38894039

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!