标签:消费者 网络 factory red null 消费 ide info col
生产者
必须在生产完数据之后手动提交session
package com.wn.ddd; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class Producter { public static void main(String[] args) throws JMSException { // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616"); // JMS 客户端到JMS Provider 的连接 Connection connection = connectionFactory.createConnection(); //启动连接 connection.start(); // Session: 一个发送或接收消息的线程 false:代表不带事务的session AUTO_ACKNOWLEDGE:代表自动签收 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // Destination :消息的目的地;消息发送给谁. // 获取session注意参数值my-queue是Query的名字 Queue queue = session.createQueue("my-queue"); // MessageProducer:创建消息生产者 MessageProducer producer = session.createProducer(queue); // 设置不持久化 PERSISTENT:代表持久化 NON_PERSISTENT:代表不持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 发送消息 for (int i = 1; i <= 5; i++) { sendMsg(session, producer, i); } System.out.println("发送成功!"); session.commit(); session.close(); connection.close(); } /** * 在指定的会话上,通过指定的消息生产者发出一条消息 * * @param session * 消息会话 * @param producer * 消息生产者 */ public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException { // 创建一条文本消息 TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i); // 通过消息生产者发出消息 producer.send(message); } }
消费者
消费完数据之后必须手动提交session
package com.wn.ddd; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class JmsReceiver { public static void main(String[] args) throws JMSException { // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616"); // JMS 客户端到JMS Provider 的连接 Connection connection = connectionFactory.createConnection(); connection.start(); // Session: 一个发送或接收消息的线程 true:表单开启事务 AUTO_ACKNOWLEDGE:代表自动签收 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // Destination :消息的目的地;消息发送给谁. // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 Queue queue = session.createQueue("my-queue"); // 消费者,消息接收者 MessageConsumer consumer = session.createConsumer(queue); while (true) { //receive():获取消息 TextMessage message = (TextMessage) consumer.receive(); if (null != message) { System.out.println("收到消息:" + message.getText()); session.commit(); } else { break; } } //回收资源 session.close(); connection.close(); } }
1.自动签收
2.手动签收
生产者
package com.wn.ddd; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class Producter { public static void main(String[] args) throws JMSException { // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616"); // JMS 客户端到JMS Provider 的连接 Connection connection = connectionFactory.createConnection(); //启动连接 connection.start(); // Session: 一个发送或接收消息的线程 false:代表不带事务的session AUTO_ACKNOWLEDGE:代表自动签收 /* Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);*/ Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE); // Destination :消息的目的地;消息发送给谁. // 获取session注意参数值my-queue是Query的名字 Queue queue = session.createQueue("my-queue"); // MessageProducer:创建消息生产者 MessageProducer producer = session.createProducer(queue); // 设置不持久化 PERSISTENT:代表持久化 NON_PERSISTENT:代表不持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 发送消息 for (int i = 1; i <= 5; i++) { sendMsg(session, producer, i); } System.out.println("发送成功!"); session.close(); connection.close(); } /** * 在指定的会话上,通过指定的消息生产者发出一条消息 * * @param session * 消息会话 * @param producer * 消息生产者 */ public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException { // 创建一条文本消息 TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i); // 通过消息生产者发出消息 producer.send(message);
message.acknowledge(); //手动提交
}
}
消费者
package com.wn.ddd; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import sun.plugin2.os.windows.SECURITY_ATTRIBUTES; import javax.jms.*; public class JmsReceiver { public static void main(String[] args) throws JMSException { // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616"); // JMS 客户端到JMS Provider 的连接 Connection connection = connectionFactory.createConnection(); connection.start(); // Session: 一个发送或接收消息的线程 true:表单开启事务 AUTO_ACKNOWLEDGE:代表自动签收 /*Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);*/ Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE); // Destination :消息的目的地;消息发送给谁. // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 Queue queue = session.createQueue("my-queue"); // 消费者,消息接收者 MessageConsumer consumer = session.createConsumer(queue); while (true) { //receive():获取消息 TextMessage message = (TextMessage) consumer.receive(); if (null != message) { System.out.println("收到消息:" + message.getText()); message.acknowledge(); //手动提交 } else { break; } } //回收资源 session.close(); connection.close(); } }
标签:消费者 网络 factory red null 消费 ide info col
原文地址:https://www.cnblogs.com/lowerma/p/12317044.html