标签:




//Queue,队列Destination destination = session.createQueue(subject);//Topic,主题Destination destination = session.createTopic(subject);
public static void main(String[] args) throws Exception{String user = ActiveMQConnection.DEFAULT_USER;String password = ActiveMQConnection.DEFAULT_PASSWORD;String url = "tcp://localhost:61616";String subject = "TOOL.DEFAULT";// 1. 初始化连接工厂ConnectionFactory contectionFactory = new ActiveMQConnectionFactory(user, password, url);// 2. 创建连接Connection connection = contectionFactory.createConnection();connection.start();// 3.创建会话Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);// 4. 打开队列Destination destination = session.createQueue(subject);// 5. MessageProducer负责发送消息MessageProducer producer = session.createProducer(destination);TextMessage message = session.createTextMessage();for (int i = 0; i < 100; i++){String tmp = "测试消息:" + i;message.setStringProperty("Test", tmp);// 6. 发送消息producer.send(message);System.out.println("发送消息: " + tmp);// 只有commit之后,消息才会进入队列session.commit();}// 7. 关闭会话和连接session.close();connection.close();System.out.println("OK!!");}
public static void main(String[] args) throws Exception{String user = ActiveMQConnection.DEFAULT_USER;String password = ActiveMQConnection.DEFAULT_PASSWORD;String url = "tcp://localhost:61616";String subject = "TOOL.DEFAULT";ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);Connection connection = connectionFactory.createConnection();connection.start();final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);Destination destination = session.createQueue(subject);// MessageConsumer负责接受消息MessageConsumer consumer = session.createConsumer(destination);consumer.setMessageListener(new MessageListener(){public void onMessage(Message msg){try{String str = msg.getStringProperty("Test");System.out.println("收到消息:" + str);session.commit();}catch (JMSException e){e.printStackTrace();}}});Thread.sleep(1000 * 5);session.close();connection.close();System.out.println("OK!");}
public static void main(String[] args) throws Exception{String user = ActiveMQConnection.DEFAULT_USER;String password = ActiveMQConnection.DEFAULT_PASSWORD;String url = "tcp://localhost:61616";String subject = "MQ.TOPIC";// 1. 初始化连接工厂ConnectionFactory contectionFactory = new ActiveMQConnectionFactory(user, password, url);// 2. 创建连接Connection connection = contectionFactory.createConnection();connection.start();// 3.创建会话Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);// 4. 创建要发布的主题,和Queue的区别就在此Destination destination = session.createTopic(subject);// 5. MessageProducer负责发送消息MessageProducer producer = session.createProducer(destination);TextMessage message = session.createTextMessage();for (int i = 0; i < 10; i++){String tmp = "测试消息TOPIC:" + i;message.setStringProperty("Test", tmp);// 6. 发送消息producer.send(message);System.out.println("发送消息: " + tmp);// 只有commit之后,消息才会进入队列session.commit();}// 7. 关闭会话和连接session.close();connection.close();System.out.println("OK!");}
public static void main(String[] args) throws Exception{String user = ActiveMQConnection.DEFAULT_USER;String password = ActiveMQConnection.DEFAULT_PASSWORD;String url = "tcp://localhost:61616";String subject = "MQ.TOPIC";ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);Connection connection;connection = connectionFactory.createConnection();connection.start();final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);Topic topic = session.createTopic(subject);// MessageConsumer负责接受消息MessageConsumer consumer = session.createConsumer(topic);consumer.setMessageListener(new MessageListener(){public void onMessage(Message msg){try{String str = msg.getStringProperty("Test");System.out.println("订阅者---SecondSubscriber---收到消息:" + str);session.commit();}catch (JMSException e){e.printStackTrace();}}});Thread.sleep(1000 * 50);session.close();connection.close();System.out.println("OK!");}
标签:
原文地址:http://www.cnblogs.com/LiZhiW/p/4957446.html