标签:set list 上下文 接收 try 处理 集群配置 int 学习笔记
消息中间件:关注于数据的发送和接受,利用高效可靠的异步消息传递机制集成分布式系统
JMS:Java消息服务,Java平台中关于面向消息中间件的API
AMQP:提供统一消息服务的应用层标准协议
常见消息中间件
ActiveMQ
RabbitMQ
Kafka
JMS规范
提供者:实现JMS规范的消息中间件服务器
客户端:发送或接受消息的应用程序
生产者/发布者:创建并发送消息的客户端
消费者/订阅者:接收并处理消息的客户端
消息:应用程序之间传递的数据内容
消息模式:在客户端之间传递消息的方式,JMS中定义了主题和队列两种模式
JMS消息模式
队列模型:
主题模型:
JMS编码接口:
使用ActiveMQ
队列模型
producer
//1. 创建ConnectionFactory
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
//2. 创建Connection
Connection connection = factory.createConnection();
//3. 启动Connection
connection.start();
//4. 创建Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5. 创建Destination
Destination destination = session.createQueue(queueName);
//6. 创建MessageProducer
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 100; i++) {
//7. 创建消息
TextMessage message = session.createTextMessage("test" + i);
//8. 发布消息
producer.send(message);
System.out.println("发送消息: " + message.getText());
}
//9. 关闭连接
connection.close();
consumer
//1. 创建ConnectionFactory
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
//2. 创建Connection
Connection connection = factory.createConnection();
//3. 启动Connection
connection.start();
//4. 创建Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5. 创建Destination
Destination destination = session.createQueue(queueName);
//6. 创建MessageConsumer
MessageConsumer consumer = session.createConsumer(destination);
//7. 创建消息监听器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收消息: " + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//9. 关闭连接(消息监听异步执行,需程序全部运行结束才能关闭连接)
// connection.close();
主题模型
producer
//1. 创建ConnectionFactory
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
//2. 创建Connection
Connection connection = factory.createConnection();
//3. 启动Connection
connection.start();
//4. 创建Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5. 创建Destination
Destination destination = session.createTopic(topicName);
//6. 创建MessageProducer
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 100; i++) {
//7. 创建消息
TextMessage message = session.createTextMessage("test" + i);
//8. 发布消息
producer.send(message);
System.out.println("发送消息: " + message.getText());
}
//9. 关闭连接
connection.close();
consumer
//1. 创建ConnectionFactory
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
//2. 创建Connection
Connection connection = factory.createConnection();
//3. 启动Connection
connection.start();
//4. 创建Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5. 创建Destination
Destination destination = session.createTopic(topicName);
//6. 创建MessageConsumer
MessageConsumer consumer = session.createConsumer(destination);
//7. 创建消息监听器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收消息: " + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//9. 关闭连接(消息监听异步执行,需程序全部运行结束才能关闭连接)
// connection.close();
spring jms
spring使用jms示例
common.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<context:annotation-config />
<!-- ActiveMQ为我们提供的ConnectionFactory -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616" />
</bean>
<!-- spring jms为我们提供连接池 -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!-- 一个队列目的地,点对点的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queue" />
</bean>
<!-- 一个主题目的地,发布订阅消息 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic"/>
</bean>
</beans>
producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<import resource="common.xml" />
<!-- 配置JmsTemplate,用于发送消息-->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<bean class="com.qyluo.jms.spring.producer.ProducerServiceImpl" />
</beans>
cosumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- 导入公共配置 -->
<import resource="common.xml" />
<!-- 配置消息监听器 -->
<bean id="consumerMessageListener" class="com.qyluo.jms.spring.consumer.ConsumerMessageListener" />
<!-- 配置消息监听容器 -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="topicDestination"/>
<property name="messageListener" ref="consumerMessageListener"/>
</bean>
</beans>
ProducerServiceImpl
public class ProducerServiceImpl implements ProducerService {
@Autowired
JmsTemplate jmsTemplate;
@Resource(name = "topicDestination")
Destination destination;
@Override
public void sendMessage(final String message) {
//使用JmsTemplate发送消息
jmsTemplate.send(destination, new MessageCreator() {
//创建一个消息
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage(message);
return textMessage;
}
});
System.out.println("发送消息: " + message);
}
}
AppProducer
public class AppProducer {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
ProducerService service = context.getBean(ProducerService.class);
for (int i = 0; i < 100; i++) {
service.sendMessage("text" + i);
}
context.close();
}
}
ConsumerMessageListener
public class ConsumerMessageListener implements MessageListener { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接收消息: " + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
AppConsumer
public class AppConsumer {
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
}
}
集群方式
ActiveMQ失效转移(failover)
允许当其中一台消息服务器宕机时,客户端在传输层上重新连接到其它消息服务器
语法:failover:(uri1,...,uriN)?transportOptions
transportOptions参数说明
Broker Cluster集群配置
NetworkConnector(网络连接器):ActiveMQ服务器之间的网络通讯方式
分为静态连接器和动态连接器
静态连接器:
<networkConnectors>
<networkConnector uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)"/>
</networkConnectors>
动态连接器:
<networkConnectors>
<networkConnector uri="multicast://default"/>
</networkConnectors>
<transportConnectors>
<transportConnector uri="tcp://localhost:0" discoverUri="multicast://default"/>
</transportConnectors>
Master/Slave集群配置
ActiveMQ Master Slave集群方案
两种集群方式对比
方式 | 高可用 | 负载均衡 |
--|----------|--------------|
Master/Slave | 是 | 否 |
Broker Cluster | 否 | 是 |
三台服务器的完美集群方案
Node A和Node B做消息同步,Node A和Node C做消息同步,Node B和Node C做Master / Slave对资源进行持久化
服务器 | 服务端口 | 管理端口 | 存储 | 网络连接器 | 用途 |
---|---|---|---|---|---|
Node-A | 61616 | 8161 | - | Node-B、Node-C | 消费者 |
Node-B | 61617 | 8162 | /share_file/kahadb | Node-A | 生产者,消费者 |
Node-C | 61618 | 8163 | /share_file/kahadb | Node-A | 生产者,消费者 |
实际业务场景特点
使用ActiveMQ的虚拟主题解决方案
使用JMS中XA系列接口保证强一致性
使用消息表的本地事务解决方案
使用内存日志的解决方案
基于消息机制的事件总线
事件驱动架构
RabbitMQ:使用交换器绑定到队列
Kafka使用group.id分组消费者
标签:set list 上下文 接收 try 处理 集群配置 int 学习笔记
原文地址:https://www.cnblogs.com/kioluo/p/8824804.html