码迷,mamicode.com
首页 > Web开发 > 详细

【EJB四】——JMS消息服务之P2P和Pub/Sub

时间:2016-05-12 22:53:00      阅读:194      评论:0      收藏:0      [点我收藏+]

标签:

JMS:java消息服务,JMS客户端可以通过JMS服务进行异步消息传输。可以支持两种模型:P2P和Pub/Sub

P2P

点对点消息传输模式,这种模式主要包括发送者,消息队列和接收者

技术分享

特点:
1、每个消息只有一个消费者,一旦被接收(消费),此消息就不存在于消息队列中了。
2、发送者和接收者在时间上没有依赖性(当消息发送后,无论接收者有没有在接收,都不会影响消息进入消息队列)
3、接收者在成功接收消息之后,需要向队列应答成功。

Pub/Sub

发布/订阅的模式。主要包括:发布者,主题,订阅者三部分。

技术分享

特点:
1、每个消息可以有多个消费者。(这是和P2P最大的不同)只要消费者订阅了某个主题,那这个主题的发布者所发布的消息,就会被订阅者全部接收。
2、发布者和订阅者必须有时间上的依赖(这种依赖是指,订阅者必须在发布之前先进行订阅,才能接收到发布的消息)
3、为了缓和这种严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅,这样,即使订阅者没有运行,也能接收到发布者的消息

实现思路

二者除了模式不同外,程序中的实现是非常类似的。都是通过一个连接工厂connectionFactory来获取connection来,然后再通过创建
Session来创建发送者(发布者),通过context来查找并获取队列或者主题,通过发送者(发布者)来发送或发布消息。
在这里需要知道的是:某个队列或者主题是由应用服务器来提供的,不由任何一个客户端来创建。我们在客户端只需要通过JNDI来进行查找就好了。
简单的了解了这两种消息服务的区别后,通过一个小例子来加深一下对他们的理解:

P2P code

创建两个消费者MyQueueMDBBean和MyQueueMDBBean1:
//配置的queue/MyQueue为我们要发送消息的消息队列
@MessageDriven(
    activationConfig={      @ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Queue"),      @ActivationConfigProperty(propertyName="destination",propertyValue="queue/MyQueue") 
    }   
)
//只要实现了MessageListener接口,接收到消息会自动触发onMessage这个方法
//这里相当于一个消费者
public class MyQueueMDBBean implements MessageListener {

    public void onMessage(Message arg) {
        TextMessage textMessage=(TextMessage)arg;
        try {
            System.out.println("MyQueueMDBBean已接收到queue消息了,消息为:"+textMessage.getText());
        } catch (JMSException e) {

            e.printStackTrace();
        }

    }

}
由于JBoss不能自己创建Queue对象,必须在server/default/deploy目录下找到一个xxx-service.xml文件中加入:
注意JNDIName的大小写和程序中要保持一致。
<mbean code="org.jboss.mq.server.jmx.Queue" name="jboss.org.destination:server=Queue,name=myqueue" >
    <attribute name="JNDIName" >queue/MyQueue</attribute>
    <depends optional-attribute-name = "DestinationManager" >
    jboss.mq:service=DestinationManager
    </depends>
</mbean>

发送者代码:

public static void main(String[] args) throws NamingException, JMSException {
        InitialContext context=new InitialContext();
        //获取队列工厂
        QueueConnectionFactory factory=(QueueConnectionFactory) context.lookup("ConnectionFactory"); 
        //获取connection
        QueueConnection connection=factory.createQueueConnection();
        //创建QueueSession
        QueueSession session=connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        //获取的destination对象
        Queue queue=(Queue)context.lookup("queue/MyQueue");
        //创建消息
        TextMessage textMessage=session.createTextMessage("hello,world!");
        //创建发送者
        QueueSender sender=session.createSender(queue);
        //发送消息
        sender.send(textMessage);
        //关闭会话
        session.close();
        System.out.println("消息已成功发送!");

    }

Pub/Sub Code

创建消费者TopicMDBBean,TopicMDBBean1和TopicMDBBean2
//这里相当于一个消费者
@MessageDriven(
    activationConfig={
        @ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Topic"),
        @ActivationConfigProperty(propertyName="destination",propertyValue="topic/myTopic") 
    }   
)
public class TopicMDBBean implements MessageListener {

    public void onMessage(Message arg) {
        TextMessage textMessage=(TextMessage)arg;
        try {
            System.out.println("TopicMDBBean已接收到topic消息了,消息为:"+textMessage.getText());
        } catch (JMSException e) {

            e.printStackTrace();
        }

    }

}
客户端代码和P2P类似,把Queue改成Topic就可以了,这里也必须配置"topic/myTopic"主题。
public static void main(String[] args) throws NamingException, JMSException {
        InitialContext context=new InitialContext();
        //获取队列工厂
        TopicConnectionFactory factory=(TopicConnectionFactory) context.lookup("ConnectionFactory"); 
        //获取connection
        TopicConnection connection=factory.createTopicConnection();
        //创建QueueSession
        TopicSession session=connection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        //获取的destination对象
        Topic topic=(Topic)context.lookup("topic/myTopic");
        //创建消息
        TextMessage textMessage=session.createTextMessage("hello,world!");
        //创建发送者
        TopicPublisher sender=session.createPublisher(topic);
        //发送消息
        sender.send(textMessage);
        //关闭会话
        session.close();
        System.out.println("消息已成功发送!");

    }

}

执行结果

P2P:

第一次执行:

技术分享

第二次执行:

技术分享

每次发消息,只有一个消费者能够接收到消息。

Pub/Sub:

第一次执行:

技术分享

每发一次消息,只要进行监听了此主题的订阅者都可以收到这个消息!

总结

我认为通常在使用JDM的P2P发送消息时,至少应该是有一个消费者的,否则采用这个方式就没有任何意义。
所以如果我们想让我们发出去的指令只被执行一次,且每条消息都可以被成功处理可以选择P2P模式,如果想让自己发出去的消息可以被多个人接收,
或者一个或多个人处理,可以选择Pub/Sub模式

【EJB四】——JMS消息服务之P2P和Pub/Sub

标签:

原文地址:http://blog.csdn.net/wangyy130/article/details/51340108

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!