标签:activemq
生产者
public class ProducerTest {
public static void main(String[] args) throws Exception
{
// Create a ConnectionFactory
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// Create a Connection
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// Create a Session
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Queue queue = session.createQueue("yyc-test");
// Create a MessageProducer from the Session to the Topic or Queue
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(session.createObjectMessage("你好测试"));
// Clean up
producer.close();
session.close();
connection.close();
}
}
消费者
public class ConsumerTest {
public static void main(String[] args) throws Exception
{
// Create a ConnectionFactory
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// Create a Connection
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
connection.setExceptionListener(new MyExceptionListener());
// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Queue queue = session.createQueue("yyc-test");
// Create a MessageConsumer from the Session to the Topic or Queue
MessageConsumer consumer = session.createConsumer(queue);
// 方式一:同步操作
Message mesg = consumer.receive(1000);
if (mesg instanceof TextMessage) {
TextMessage textMessage = (TextMessage) mesg;
String text = textMessage.getText();
System.out.println("Received: " + text);
} else {
System.out.println("Received: " + mesg);
}
consumer.close();
session.close();
connection.close();
// 方式二:异步监听操作
//consumer.setMessageListener(new MyMessageListener());
}
public static class MyMessageListener implements MessageListener
{
@Override
public void onMessage(Message msg) {
System.out.println("Received: " + msg);
}
}
public static class MyExceptionListener implements ExceptionListener
{
@Override
public void onException(JMSException arg0) {
System.out.println("JMS Exception occured. Shutting down client.");
}
}
}
本文出自 “旅行者” 博客,请务必保留此出处http://881206524.blog.51cto.com/10315134/1928355
标签:activemq
原文地址:http://881206524.blog.51cto.com/10315134/1928355