标签:nal lse mqc 状态 system producer tran tor set
1.创建一个抽象类定义发送消息和接受消息的抽象方法
package cn.base.jms; import javax.jms.*; /** * @author gu.fei * @version 2017-03-24 9:20 */ public abstract class Queuehandler { //默认队列名称queue private String queue = "queue"; //连接工厂 private Connection connection; private int acknowledgeMode = Session.AUTO_ACKNOWLEDGE; private boolean transacted = false; public Queuehandler() { } /** * 发送消息 * @return */ public abstract Object sendMessage(MessageProducer producer,Session session); /** * 接收消息 * @return */ public abstract Object reciveMessage(Message message); /** * 执行发送 */ public void doSend() { Session session = null; try { session = connection.createSession(transacted,acknowledgeMode); Destination destination = session.createQueue(queue); MessageProducer producer = session.createProducer(destination); sendMessage(producer,session); } catch (JMSException e) { e.printStackTrace(); } finally { if(null != session) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } } } /** * 执行发送 */ public void doRecive() { Session session = null; try { session = connection.createSession(transacted,acknowledgeMode); Destination destination = session.createQueue(queue); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { reciveMessage(message); } }); //保持进程启动状态 while (true) {} } catch (JMSException e) { e.printStackTrace(); } finally { if(null != session) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } } } private void init() { } public String getQueue() { return queue; } public void setQueue(String queue) { this.queue = queue; } public Connection getConnection() { return connection; } public void setConnection(Connection connection) { this.connection = connection; } public int getAcknowledgeMode() { return acknowledgeMode; } public void setAcknowledgeMode(int acknowledgeMode) { this.acknowledgeMode = acknowledgeMode; } public boolean isTransacted() { return transacted; } public void setTransacted(boolean transacted) { this.transacted = transacted; } }
2.定义一个发送类集成上面抽象方法
package cn.base.jms; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @author gu.fei * @version 2017-03-24 9:50 */ public class OneProducer extends Queuehandler { static ConnectionFactory connectionFactory = null; static Connection connection = null; static { connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); try { connection = connectionFactory.createConnection(); connection.start(); } catch (JMSException e) { e.printStackTrace(); } } public OneProducer() { } @Override public Object sendMessage(MessageProducer producer, Session session) { try { for (int i = 0; i <10 ; i++) { Message message = session.createTextMessage("hello,world!" + i); producer.send(message); } } catch (JMSException e) { e.printStackTrace(); } return null; } @Override public Object reciveMessage(Message message) { return null; } public static void main(String[] args) { OneProducer oneProducer = new OneProducer(); oneProducer.setConnection(connection); oneProducer.doSend(); try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } }
3.定义两个消费者
package cn.base.jms; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @author gu.fei * @version 2017-03-24 9:50 */ public class OneCustomer extends Queuehandler { static ConnectionFactory connectionFactory = null; static Connection connection = null; static { connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); try { connection = connectionFactory.createConnection(); connection.start(); } catch (JMSException e) { e.printStackTrace(); } } public OneCustomer() { } @Override public Object sendMessage(MessageProducer producer, Session session) { return null; } @Override public Object reciveMessage(Message message) { TextMessage text = (TextMessage)message; try { Thread.sleep(1000); System.out.println("One接受消息:" + text.getText()); } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return null; } public static void main(String[] args) { OneCustomer customer = new OneCustomer(); customer.setConnection(connection); customer.doRecive(); try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } }
package cn.base.jms; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @author gu.fei * @version 2017-03-24 9:50 */ public class TwoCustomer extends Queuehandler { static ConnectionFactory connectionFactory = null; static Connection connection = null; static { connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); try { connection = connectionFactory.createConnection(); connection.start(); } catch (JMSException e) { e.printStackTrace(); } } public TwoCustomer() { } @Override public Object sendMessage(MessageProducer producer, Session session) { return null; } @Override public Object reciveMessage(Message message) { TextMessage text = (TextMessage)message; try { Thread.sleep(1000); System.out.println("Two接受消息:" + text.getText()); } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return null; } public static void main(String[] args) { TwoCustomer customer = new TwoCustomer(); customer.setConnection(connection); customer.doRecive(); try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } }
先启动两个消费者,然后启动生产者
结果如下:
one:
One接受消息:hello,world!1
One接受消息:hello,world!3
One接受消息:hello,world!5
One接受消息:hello,world!7
One接受消息:hello,world!9
two:
Two接受消息:hello,world!0
Two接受消息:hello,world!2
Two接受消息:hello,world!4
Two接受消息:hello,world!6
Two接受消息:hello,world!8
标签:nal lse mqc 状态 system producer tran tor set
原文地址:https://www.cnblogs.com/shouhutian/p/9097240.html