标签:客户端 .text 解压 字节 poi rabbitmq on() 另一个 topic
CORBA\DCOM\RMI等RPC中间件技术已经广泛应用于各个领域,但是面对规模和复杂度都越来越高的分布式系统,这些技术慢慢显现出局限性:
面向消息的中间件较好地解决了上面的问题(Message Oriented Middleware,MOM)。发送者再发送消息给消息服务器消息服务器将消息存放到若干队列中,在合适的时候再将消息转发给接受者。这种模式下,发送和结束是异步的,发送者无需等待;二者的生命周期未必相同;发送消息的时候接受者不一定运行,接收消息的时候发送者也不一定运行;一对多通信:对一个消息可以有多个接受者。
java消息服务(JMS)定义了java中访问消息中间件的接口。JMS只是接口,并没有实现。实现JMS的接口的消息中间件成文JMS Provider,已有的MOM系统包括Apache的ActiveMQ、以及阿里的RocketMQ、IBM的MQSeries、Microsoft的MSMQ和BEA的MessageQ、RabbitMQ等待,他们基本都遵循JMS规范。
JMS 实现JMS接口的消息中间件
Provider(MessageProvider):生产者
Consumer(MessageConsumer):消费者
PTP:Point to Point ,即点对点的消息模型;
pub/sub : publish/subscribe,发布/订阅的消息模型;
Queue:队列目标;
Topic:主题目标;
ConnectionFactory:连接工厂,JMS用它创建连接;
Connection:JMS客户端到JMS Provider的连接;
Destination:消息的目的地;
Session:会话,一个法师或者接收消息的线程;
Message 接口(消息):
是在消费者和生产者之间传送的对象,也就是说从一个应用程序传送到另一个应用程序。一个消息有三个主要部分:
Session 接口(会话):
表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个地接收。会话的好处是它支持事务。如果用户选择了事务支持,会话上下文保存一组消息,知道事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话运行用户创建消息生产者来发送消息,创建消费者来接收消息。
ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线。
ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中仍然扮演着重要角色,可以说ActiveMQ在业界应用最广泛,当然如果想要有更强大的性能和海量数据处理能力,ActiveMQ还需要不断地提升版本,80%以上的业务我们用ActiveMq都能满足,当然后续如天猫淘宝这种大型的电商网站,尤其是双十一这种特殊事件,ActiveMQ需要进行很复杂的优化源码以及架构设计才能完成,我们只会会学习一个更强大的分布式消息中间件,RocketMQ,可以说ActiveMQ是和谐,是基础,所以也必须要掌握好。
去官网下载:http://activemq.apache.org,解压放到一个地方,我放在D:\Programe_Files下,然后直接启动bin下的wrapper.exe即可启动监控台,在浏览器中访问localhost:8186,用户名admin/admin 即可访问。
首先写一个简单的Hello World实例,感受一下ActiveMQ,需要实现接受者和发送者两部分代码。
发送:
1 package com; 2 3 import javax.jms.Connection; 4 import javax.jms.ConnectionFactory; 5 import javax.jms.DeliveryMode; 6 import javax.jms.Destination; 7 import javax.jms.JMSException; 8 import javax.jms.MessageProducer; 9 import javax.jms.Session; 10 import javax.jms.TextMessage; 11 12 import org.apache.activemq.ActiveMQConnectionFactory; 13 14 public class Sender { 15 16 public static void main(String[]args) throws JMSException{ 17 ConnectionFactory factory = new ActiveMQConnectionFactory( 18 ActiveMQConnectionFactory.DEFAULT_USER, 19 ActiveMQConnectionFactory.DEFAULT_PASSWORD, 20 "tcp://localhost:61616" 21 ); 22 23 Connection conn = factory.createConnection(); 24 conn.start(); 25 26 Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 27 28 Destination dest = session.createQueue("queue1"); 29 30 MessageProducer messProducer = session.createProducer(dest); 31 32 messProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);; 33 34 TextMessage mess = session.createTextMessage(); 35 for(int i=0;i<5;i++){ 36 37 mess.setText("消息内容"+i); 38 39 messProducer.send(mess); 40 } 41 42 if(conn != null)conn.close(); 43 } 44 45 }
接收:
package com; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Receiver { public static void main(String[]args) throws JMSException{ ConnectionFactory factory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616" ); Connection conn = factory.createConnection(); conn.start(); Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Destination dest = session.createQueue("queue1"); MessageConsumer messConsumer = session.createConsumer(dest); while(true){ TextMessage mess = (TextMessage) messConsumer.receive(); System.out.println(mess.getText()); } // if(conn != null)conn.close(); } }
在acitvemq.xml中的 broker 下加上如下代码:
<plugins> <simpleAuthenticationPlugin> <users> <authenticationUser username="sgm" password="sgm" groups="users,admins" /> </users> </simpleAuthenticationPlugin> </plugins>
重启ActiveMQ,再运行4.2中的 sender 发生异常:
可见在第24行尝试连接的时候就发生错误了。所以ActiveMQConnectionFactory.DEFAULT_USER要改成"sgm",密码也同样修改。receiver做同样修改。
在成功创建正确的ConnectionFactory后,下一步将是创建一个连接,他是JMS定义的一个接口。ConnectionFactory负责返回可以与底层消息传递系统进行通信的Connection实现。通常客户端只使用单一连接。根据JMS文档,Connection的目的是“利用JMS提供者封装开发的连接”。以及表示“客户端与服务例程之间的开放TCP/IP套接字”。该文档还指出Connection应该是进行客户端身份校验的地方。
当一个Connection被创建时,它的传输默认是关闭的,必须使用start方法开启。一个Connection可以建立一个或者多个Session。
当一个程序执行完成后,必须关闭Connection,否则ActiveMQ不能释放资源,关闭一个Connection同样也关闭了Session、MessageProducer和MessageConsumer。
Connection createConnection();
Connection createConnection(String userName,String password);
大多数开源框架不需要自己优化了,能使用好API其实就是最好的优化了。
一旦从ConnectionFactory中获得一个Connection,必须从Connection中创建一个或者多个Session,才能进一步执行其他操作。Session是一个发送或者接收消息的线程,可以使用Session创建MessageProducer,MessageConsumer和Message。
Session可以被事务化,也可以不用事务,通常,可以通过向Connection上的创建方法传一个布尔值进行设置。
Session createSession(boolean transacted,int acknowledgeMode);
其中 transacted为是否使用事务的标识,acknowledgeMode为签收方式。
结束事务有两种方式:提交或者回滚,当一个事务提交,消息被处理。事务中有一个步骤失败,事务就回滚,这个事务中的已经执行的动作将被撤销。在发送消息最后也必须要使用session.commit()方法提交事务。
签收模式有三种:
工作中一般使用手动签收的方式
---
---
46.ActiveMQ开篇(Hello World、安全认证、Connection、Session、MessageProducer、MessageConsumer)
标签:客户端 .text 解压 字节 poi rabbitmq on() 另一个 topic
原文地址:http://www.cnblogs.com/sigm/p/6637671.html