标签:消费者 网络 factory red null 消费 ide info col
生产者
必须在生产完数据之后手动提交session
package com.wn.ddd;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Producter {
public static void main(String[] args) throws JMSException {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
// JMS 客户端到JMS Provider 的连接
Connection connection = connectionFactory.createConnection();
//启动连接
connection.start();
// Session: 一个发送或接收消息的线程 false:代表不带事务的session AUTO_ACKNOWLEDGE:代表自动签收
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// Destination :消息的目的地;消息发送给谁.
// 获取session注意参数值my-queue是Query的名字
Queue queue = session.createQueue("my-queue");
// MessageProducer:创建消息生产者
MessageProducer producer = session.createProducer(queue);
// 设置不持久化 PERSISTENT:代表持久化 NON_PERSISTENT:代表不持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 发送消息
for (int i = 1; i <= 5; i++) {
sendMsg(session, producer, i);
}
System.out.println("发送成功!");
session.commit();
session.close();
connection.close();
}
/**
* 在指定的会话上,通过指定的消息生产者发出一条消息
*
* @param session
* 消息会话
* @param producer
* 消息生产者
*/
public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {
// 创建一条文本消息
TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);
// 通过消息生产者发出消息
producer.send(message);
}
}
消费者
消费完数据之后必须手动提交session
package com.wn.ddd;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsReceiver {
public static void main(String[] args) throws JMSException {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
// JMS 客户端到JMS Provider 的连接
Connection connection = connectionFactory.createConnection();
connection.start();
// Session: 一个发送或接收消息的线程 true:表单开启事务 AUTO_ACKNOWLEDGE:代表自动签收
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// Destination :消息的目的地;消息发送给谁.
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
Queue queue = session.createQueue("my-queue");
// 消费者,消息接收者
MessageConsumer consumer = session.createConsumer(queue);
while (true) {
//receive():获取消息
TextMessage message = (TextMessage) consumer.receive();
if (null != message) {
System.out.println("收到消息:" + message.getText());
session.commit();
} else {
break;
}
}
//回收资源
session.close();
connection.close();
}
}
1.自动签收
![]()
2.手动签收
生产者
package com.wn.ddd;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Producter {
public static void main(String[] args) throws JMSException {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
// JMS 客户端到JMS Provider 的连接
Connection connection = connectionFactory.createConnection();
//启动连接
connection.start();
// Session: 一个发送或接收消息的线程 false:代表不带事务的session AUTO_ACKNOWLEDGE:代表自动签收
/* Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);*/
Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
// Destination :消息的目的地;消息发送给谁.
// 获取session注意参数值my-queue是Query的名字
Queue queue = session.createQueue("my-queue");
// MessageProducer:创建消息生产者
MessageProducer producer = session.createProducer(queue);
// 设置不持久化 PERSISTENT:代表持久化 NON_PERSISTENT:代表不持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 发送消息
for (int i = 1; i <= 5; i++) {
sendMsg(session, producer, i);
}
System.out.println("发送成功!");
session.close();
connection.close();
}
/**
* 在指定的会话上,通过指定的消息生产者发出一条消息
*
* @param session
* 消息会话
* @param producer
* 消息生产者
*/
public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {
// 创建一条文本消息
TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);
// 通过消息生产者发出消息
producer.send(message);
message.acknowledge(); //手动提交
}
}
消费者
package com.wn.ddd;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import sun.plugin2.os.windows.SECURITY_ATTRIBUTES;
import javax.jms.*;
public class JmsReceiver {
public static void main(String[] args) throws JMSException {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
// JMS 客户端到JMS Provider 的连接
Connection connection = connectionFactory.createConnection();
connection.start();
// Session: 一个发送或接收消息的线程 true:表单开启事务 AUTO_ACKNOWLEDGE:代表自动签收
/*Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);*/
Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
// Destination :消息的目的地;消息发送给谁.
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
Queue queue = session.createQueue("my-queue");
// 消费者,消息接收者
MessageConsumer consumer = session.createConsumer(queue);
while (true) {
//receive():获取消息
TextMessage message = (TextMessage) consumer.receive();
if (null != message) {
System.out.println("收到消息:" + message.getText());
message.acknowledge(); //手动提交
} else {
break;
}
}
//回收资源
session.close();
connection.close();
}
}
标签:消费者 网络 factory red null 消费 ide info col
原文地址:https://www.cnblogs.com/lowerma/p/12317044.html