标签:
//Queue,队列
Destination destination = session.createQueue(subject);
//Topic,主题
Destination destination = session.createTopic(subject);
public static void main(String[] args) throws Exception
{
String user = ActiveMQConnection.DEFAULT_USER;
String password = ActiveMQConnection.DEFAULT_PASSWORD;
String url = "tcp://localhost:61616";
String subject = "TOOL.DEFAULT";
// 1. 初始化连接工厂
ConnectionFactory contectionFactory = new ActiveMQConnectionFactory(user, password, url);
// 2. 创建连接
Connection connection = contectionFactory.createConnection();
connection.start();
// 3.创建会话
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 4. 打开队列
Destination destination = session.createQueue(subject);
// 5. MessageProducer负责发送消息
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage();
for (int i = 0; i < 100; i++)
{
String tmp = "测试消息:" + i;
message.setStringProperty("Test", tmp);
// 6. 发送消息
producer.send(message);
System.out.println("发送消息: " + tmp);
// 只有commit之后,消息才会进入队列
session.commit();
}
// 7. 关闭会话和连接
session.close();
connection.close();
System.out.println("OK!!");
}
public static void main(String[] args) throws Exception
{
String user = ActiveMQConnection.DEFAULT_USER;
String password = ActiveMQConnection.DEFAULT_PASSWORD;
String url = "tcp://localhost:61616";
String subject = "TOOL.DEFAULT";
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
Connection connection = connectionFactory.createConnection();
connection.start();
final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(subject);
// MessageConsumer负责接受消息
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener()
{
public void onMessage(Message msg)
{
try
{
String str = msg.getStringProperty("Test");
System.out.println("收到消息:" + str);
session.commit();
}
catch (JMSException e)
{
e.printStackTrace();
}
}
});
Thread.sleep(1000 * 5);
session.close();
connection.close();
System.out.println("OK!");
}
public static void main(String[] args) throws Exception
{
String user = ActiveMQConnection.DEFAULT_USER;
String password = ActiveMQConnection.DEFAULT_PASSWORD;
String url = "tcp://localhost:61616";
String subject = "MQ.TOPIC";
// 1. 初始化连接工厂
ConnectionFactory contectionFactory = new ActiveMQConnectionFactory(user, password, url);
// 2. 创建连接
Connection connection = contectionFactory.createConnection();
connection.start();
// 3.创建会话
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 4. 创建要发布的主题,和Queue的区别就在此
Destination destination = session.createTopic(subject);
// 5. MessageProducer负责发送消息
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage();
for (int i = 0; i < 10; i++)
{
String tmp = "测试消息TOPIC:" + i;
message.setStringProperty("Test", tmp);
// 6. 发送消息
producer.send(message);
System.out.println("发送消息: " + tmp);
// 只有commit之后,消息才会进入队列
session.commit();
}
// 7. 关闭会话和连接
session.close();
connection.close();
System.out.println("OK!");
}
public static void main(String[] args) throws Exception
{
String user = ActiveMQConnection.DEFAULT_USER;
String password = ActiveMQConnection.DEFAULT_PASSWORD;
String url = "tcp://localhost:61616";
String subject = "MQ.TOPIC";
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
Connection connection;
connection = connectionFactory.createConnection();
connection.start();
final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(subject);
// MessageConsumer负责接受消息
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener()
{
public void onMessage(Message msg)
{
try
{
String str = msg.getStringProperty("Test");
System.out.println("订阅者---SecondSubscriber---收到消息:" + str);
session.commit();
}
catch (JMSException e)
{
e.printStackTrace();
}
}
});
Thread.sleep(1000 * 50);
session.close();
connection.close();
System.out.println("OK!");
}
标签:
原文地址:http://www.cnblogs.com/LiZhiW/p/4957446.html