标签:min dep 性能 异步通信 ring 状态 服务器端 pytho activemq
这样的一种通信方式,就是所谓的“异步”通信方式,对于系统A来说,只要把消息发给MQ,然后系统B就会异步处去进行处理了,系统A不能“同步”的等待系统B处理完。这样的好处是什么呢?解耦
4,什么是ActiveMQ
1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resourceadaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE1.4 商业服务器上5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA6. 支持通过JDBC和journal提供高速的消息持久化7. 从设计上保证了高性能的集群,客户端-服务器,点对点8. 支持Ajax9. 支持与Axis(Apache Extensible Interaction System 即阿帕奇可扩展交互系统。Axis本质上就是一个SOAP引擎,提供创建服务器端、客户端和网关SOAP操作的基本框架)的整合10. 可以很容易得调用内嵌JMS provider,进行测试11.支持集群1,下载
2,安装
1,配置jdk环境变量【不会的回看Linux】2,上传mq的压缩包到Linux3,解压到usr/local/ActiveMQmkdir /usr/local/ActiveMQ
tar -zxvf apache-activemq-5.15.10-bin.tar.gz -C /usr/local/ActiveMQ/
5,配置用户名和密码[默认为admin/admin]vim conf/users.properties
admin = admin
4,启动和停止重启./bin/activemq start
2./bin/activemq stop
3./bin/activemq restart
5,访问
5,端口说明
ActiveMQ是使用61616端口提供的JMS服务使用8161提供管理控制台的服务
1,JMS消息发送模式
在点对点或队列模型下,一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列。这种模式被概括为:只有一个消费者将获得消息。生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。每一个成功处理的消息都由接收者签收。
发布者/订阅者模型支持向一个特定的消息主题发布消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方。这种模式好比是匿名公告板。这种模式被概括为:多个消费者可以获得消息.在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(subscription),以便客户能够购订阅。
订阅者必须保持持续的活动状态以接收消息,除非订阅者建立了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。2,JMS应用程序接口
1,ConnectionFactory 接口(连接工厂)
用户用来创建到JMS提供者的连接的被管对象。JMS客户通过可移植的接口访问连接,这样当下层的实现改变时,
代码不需要进行修改。管理员在JNDI名字空间中配置连接工厂,这样,JMS客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或者主题连接工厂。
2,Connection 接口(连接)
连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂后,就可以创建一个与JMS提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接收队列和主题到目标。
3,Destination 接口(目标)
目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。JMS管理员创建这些对象,然后用户通过JNDI发现它们。
和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的队列,以及发布者/订阅者模型的主题
4,MessageConsumer 接口(消息消费者)由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或异步(非阻塞)接收队列和主题类型的消息。5,MessageProducer 接口(消息生产者)由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个通用的发送者,在发送消息时指定目标。6,Message 接口(消息)是在消费者和生产者之间传送的对象,也就是说从一个应用程序创送到另一个应用程序。一个消息有三个主要部分:消息头(必须):包含用于识别和为消息寻找路由的操作设置。一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。消息接口非常灵活,并提供了许多方式来定制消息的内容。7,Session 接口(会话)表示一个单线程得上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续得。就是说消息按照发送的顺序一个一个接收的。会话得好处是它支持事务,如果用户支持了事务支持,会话得上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消息产生者来发送消息,创建消息消费者来接收消息。1,创建项目加入maven依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.activemq</groupId> <artifactId>activemq</artifactId> <version>1.0-SNAPSHOT</version> <!--activemq需要的jar包 不是使用最新版本的。有BUG --> <dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.5</version> </dependency> <!--下面是log4等通用配置 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.2</version> </dependency> </dependencies> </project>
2,生产者
第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。第二步:使用ConnectionFactory对象创建一个Connection对象。第三步:开启连接,调用Connection对象的start方法。第四步:使用Connection对象创建一个Session对象。第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。第六步:使用Session对象创建一个Producer对象。第七步:创建一个Message对象,创建一个TextMessage对象。第八步:使用Producer对象发送消息。第九步:关闭资源。package com.activemq.demo;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Created by Administrator on 2019-10-26.
*/
public class ActiveMq {
private static final String QUEUE = "queue";
private static final String URL = "tcp://47.110.76.75:61616";
public static void main(String[] args) throws Exception{
//第一步:创建ActiveMQConnectionFactory对象,需要指定服务端IP以及端口号.//brokerURL服务器得IP以端口号。
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
//第二步:使用connectionFactory创建一个connection对象
Connection connection = connectionFactory.createConnection();
//第三步:开启连接,调用connection对象start得方法
connection.start();
//第四步:使用Connection对象创建一个session对象
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//第五步:使用Session对象创建Destination对象(topic、queue),此处创建一个Queue对象。//参数:队列得名称。
Queue queue = session.createQueue(QUEUE);
//第六步:使用session对象创建一个Producer对象
MessageProducer producer = session.createProducer(queue);
//第七步:创建一个Message = new ActiveMQTextMessage();
TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test");
//第八步:
producer.send(textMessage);
//第九步
producer.close();
session.close();
connection.close();
System.out.println("生产者向MQ发送消息成功!");
}
}生产完成之后可以查看有消息生成
3,消费者
消费者有两种消费方法:
1、同步消费。通过调用消费者receive方法从目的地中显示提取消息,receive方法可以一直阻塞到消息到达。
2、异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取得动作。
实现MessageListener接口,在MessageListener()方法中实现消息得处理逻辑。
4,同步消费者【一般不推荐】
第一步:创建一个连接工厂
第二步:创建一个连接
第三步:打开连接
第四步:创建会话
第五步:创建目的地
第六步:创建消费者
第七步:接收消息
第八步:关闭资源
package com.activemq.demo;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Created by Administrator on 2019-10-26.
*/
public class ActiveMq {
private static final String QUEUE = "queue";
private static final String URL = "tcp://47.110.76.75:61616";
public static void main(String[] args) throws Exception{
//第一步:创建ActiveMQConnectionFactory对象,需要指定服务端IP以及端口号.//brokerURL服务器得IP以端口号。
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
//第二步:使用connectionFactory创建一个connection对象
Connection connection = connectionFactory.createConnection();
//第三步:开启连接,调用connection对象start得方法
connection.start();
//第四步:使用Connection对象创建一个session对象
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//第五步:使用Session对象创建Destination对象(topic、queue),此处创建一个Queue对象。//参数:队列得名称。
Queue queue = session.createQueue(QUEUE);
//第六步:使用session对象创建一个Producer对象
MessageProducer producer = session.createProducer(queue);
//第七步:创建一个Message = new ActiveMQTextMessage();
TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test");
//第八步:
producer.send(textMessage);
//第九步
producer.close();
session.close();
connection.close();
System.out.println("生产者向MQ发送消息成功!");
}
}
4.1,receive方法说明
receive() 一直阻塞
receive(1000) 10秒类没接收消息就放弃
5,异步消费者【推荐】
第一步:创建一个ConnectionFactory对象。
第二步:从ConnectionFactory对象中获得一个Connection对象。
第三步:开启连接,调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
第六步:使用Session对象创建一个Consumer对象。
第七步:接收消息。
第八步:打印消息。
第九步:关闭资源。
标签:min dep 性能 异步通信 ring 状态 服务器端 pytho activemq
原文地址:https://www.cnblogs.com/jacksonxiao/p/11745675.html