标签:ext apach 模式 txt api new host broker 产生
ActiveMQ是由Apache出品的,一款最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,它非常快速,支持多种语言的客户端和协议,而且可以非常容易的嵌入到企业的应用环境中,并有许多高级功能。
ActiveMQ官网下载地址:http://activemq.apache.org/download.html
ActiveMQ 提供了Windows 和Linux、Unix 等几个版本。具体安装方法请自行查找资料进行安装,博主这边就不多叙述。
安装成功启动ActiveMQ服务后,在浏览器输入http://localhost:8161,用户名密码默认都是 admin。下面为登陆成功后的页面:
Queues是队列方式消息,从菜单栏中点击Queues可以进入到Queues页面,页面主要内容包括:
Topics是主题方式消息,从菜单栏中点击Topics可以进入到Topics页面,页面主要内容包括:
Subscribers 是查看订阅者的页面,可以查看订阅者的信息等。只在Topics消息类型中这个页面才会有数据。
Connections页面可以查看到所有的连接数。
点对点的模式主要建立在一个队列上面,当连接一个列队的时候,发送端不需要知道接收端是否正在接收,可以直接向ActiveMQ发送消息,发送的消息将会先进入队列中,如果有接收端在监听,则会发向接收端,如果没有接收端接收,则会保存在activemq服务器,直到接收端接收消息,点对点的消息模式可以有多个发送端,多个接收端,但是一条消息,只会被一个接收端给接收到,哪个接收端先连上ActiveMQ,则会先接收到,而后来的接收端则接收不到那条消息。
public class JmsProducer {
/**
* 默认连接用户名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认用户密码
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "queue.test";
public static void main(String[] args) {
/**
* 第一步:创建连接工厂
*/
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 连接
*/
Connection connection = null;
/**
* 会话
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息生产者
*/
MessageProducer messageProducer = null;
try {
/**
* 第二步:创建连接
*/
connection = connectionFactory.createConnection();
/**
* 启动连接
*/
connection.start();
/**
* 第三步:创建会话
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:创建消息目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic。这里我们创建一个名为queue.test的消息队列。
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:创建消息生产者
*/
messageProducer = session.createProducer(destination);
/**
* 第六步:发送消息,这个步骤包括创建消息,然后发送消息
*/
sendMessage(session, messageProducer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/**
* 发送消息
*
* @param session
* @param messageProducer
* @throws JMSException
*/
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
for (int i = 0; i < 10; i++) {
/**
* 创建一条文本消息
*/
TextMessage message = session.createTextMessage("ActiveMQ 发送消息" + i);
System.out.println("发送消息:Activemq 发送消息" + i);
/**
* 通过消息生产者发出消息
*/
messageProducer.send(message);
}
}
}
运行结果图:
我们可以看的,当运行JmsProducer程序时,在ActiveMQ控制台,可以看到生产者往queue.test的队列中发送了10条消息,因为这时还没有消费者,所以这边的Number Of Pending Messages显示的是10,
Number Of Consumers显示的是0,Messages Enqueued显示的也是10。
public class JmsConsumer {
/**
* 默认连接用户名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认用户密码
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "queue.test";
public static void main(String[] args) {
/**
* 第一步:创建连接工厂
*/
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 连接
*/
Connection connection = null;
/**
* 会话
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息消费者
*/
MessageConsumer messageConsumer = null;
try {
/**
* 第二步:创建连接
*/
connection = connectionFactory.createConnection();
/**
* 启动连接
*/
connection.start();
/**
* 第三步:创建会话
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:创建消息目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic。这里我们创建一个名为queue.test的消息队列。
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:创建消费者
*/
messageConsumer = session.createConsumer(destination);
while (true) {
/**
* 接收数据的时间(等待) 100 ms
*/
TextMessage textMessage = (TextMessage) messageConsumer.receive(1000 * 100);
if (textMessage != null) {
System.out.println("收到的消息:" + textMessage.getText());
} else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
运行结果图:
我们可以看到,但运行JmsConsumer程序时,在运行程序的控制台中我们可以看到消费者消费了刚刚生产者生产的消息。在ActiveMQ控制台,可以看到所以这边的Number Of Pending Messages显示的是0,Number Of Consumers显示的是1,Messages Enqueued显示的是10,Messages Dequeued显示的也是10,即消息被消费。
在前面的消费者例子中,我们这边使用while (true) 死循环来不停接受消息。这样很浪费cpu资源,实际生产中不会这么做。下面,我们采用注册一个监听器的方法,当监听到有消息入队列后,才去接收消息。
public class JmsConsumerMessageListener {
/**
* 默认连接用户名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认用户密码
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "queue.test";
public static void main(String[] args) {
/**
* 第一步:创建连接工厂
*/
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 连接
*/
Connection connection = null;
/**
* 会话
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息消费者
*/
MessageConsumer messageConsumer = null;
try {
/**
* 第二步:创建连接
*/
connection = connectionFactory.createConnection();
/**
* 启动连接
*/
connection.start();
/**
* 第三步:创建会话
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:创建消息目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic。这里我们创建一个名为queue.test的消息队列。
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:创建消费者
*/
messageConsumer = session.createConsumer(destination);
/**
* 第六步:创建监听器
*/
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("收到的消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
// if (null != session) {
// try {
// session.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// if (null != connection) {
// try {
// connection.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
}
}
}
运行结果图:
当生产者一生产消息到队列中时,我们的消费者就马上进行消费,注意程序中我们没有将会话和连接关闭,因为监听器是异步的,如果关闭后就无法接收到消息。
订阅/发布模式,同样可以有着多个发送端与多个接收端,但是接收端与发送端存在时间上的依赖,就是如果发送端发送消息的时候,接收端并没有监听消息,那么ActiveMQ将不会保存消息,将会认为消息已经发送,换一种说法,就是发送端发送消息的时候,接收端不在线,是接收不到消息的,哪怕以后监听消息,同样也是接收不到的。这个模式还有一个特点,那就是发送端发送的消息,将会被所有的接收端给接收到,不类似点对点,一条消息只会被一个接收端给接收到。
public class JmsProducer {
/**
* 默认连接用户名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认用户密码
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String TOPIC_NAME = "topic.test";
public static void main(String[] args) {
/**
* 第一步:创建连接工厂
*/
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 连接
*/
Connection connection = null;
/**
* 会话
*/
Session session = null;
/**
* 消息目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic
*/
Destination destination = null;
/**
* 消息生产者
*/
MessageProducer messageProducer = null;
try {
/**
* 第二步:创建连接
*/
connection = connectionFactory.createConnection();
/**
* 启动连接
*/
connection.start();
/**
* 第三步:创建会话
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:创建消息目的地,这里我们创建一个名为topic.test的主题
*/
destination = session.createTopic(TOPIC_NAME);
/**
* 第五步:创建消息生产者
*/
messageProducer = session.createProducer(destination);
/**
* 第六步:发送消息,这个步骤包括创建消息,然后发送消息
*/
sendMessage(session, messageProducer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/**
* 发送消息
*
* @param session
* @param messageProducer
* @throws JMSException
*/
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
for (int i = 0; i < 10; i++) {
/**
* 创建一条文本消息
*/
TextMessage message = session.createTextMessage("ActiveMQ 发送消息" + i);
System.out.println("发送消息:Activemq 发送消息" + i);
/**
* 通过消息生产者发出消息
*/
messageProducer.send(message);
}
}
}
运行结果图:
public class JmsConsumer {
/**
* 默认连接用户名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认用户密码
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String TOPIC_NAME = "topic.test";
public static void main(String[] args) {
/**
* 第一步:创建连接工厂
*/
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 连接
*/
Connection connection = null;
/**
* 会话
*/
Session session = null;
/**
* 消息目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic
*/
Destination destination = null;
/**
* 消息消费者
*/
MessageConsumer messageConsumer = null;
try {
/**
* 第二步:创建连接
*/
connection = connectionFactory.createConnection();
/**
* 启动连接
*/
connection.start();
/**
* 第三步:创建会话
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:创建消息目的地,这里我们创建一个名为topic.test的主题
*/
destination = session.createTopic(TOPIC_NAME);
/**
* 第五步:创建消费者
*/
messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("收到的消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
// if (null != session) {
// try {
// session.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// if (null != connection) {
// try {
// connection.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
}
}
}
运行结果图:
我们可以发现,Topic消息模式的代码跟Queue消息模式的代码基本是一样的,除了在创建消息目的地的时候,一个是queue一个是topic;还有一点区别就是,Topic消息模式,订阅者需要先订阅,才能接收到发布者发布的消息。
在通过Connection创建Session的时候,需要设置2个参数,一个是否支持事务,另一个是签收的模式。
签收就是消费者接受到消息后,需要告诉消息服务器,我收到消息了。当消息服务器收到回执后,本条消息将失效。因此签收将对PTP模式产生很大影响。如果消费者收到消息后,并不签收,那么本条消息继续有效,很可能会被其他消费者消费掉!
签收方式有三种:
我们上面演示的全都是字符串的消息类型,但ActiveMQ支持的还有ObjectMessage,StreamMessage,MapMessage,BytesMessage等消息类型。下面我们来看看其他消息类型是如何编写的,以下都是以队列的消息模式进行。
public class User implements Serializable {
private static final long serialVersionUID = 2504467948968634865L;
private String userName;
private String password;
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
@Override
public String toString() {
return "User{" +
"userName=‘" + userName + ‘\‘‘ +
", password=‘" + password + ‘\‘‘ +
‘}‘;
}
}
public class JmsProducer {
/**
* 默认连接用户名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认用户密码
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "object.test";
public static void main(String[] args) {
/**
* 第一步:创建连接工厂
*/
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 设置所有对所有序列化包都信任
*/
connectionFactory.setTrustAllPackages(true);
/**
* 连接
*/
Connection connection = null;
/**
* 会话
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息生产者
*/
MessageProducer messageProducer = null;
try {
/**
* 第二步:创建连接
*/
connection = connectionFactory.createConnection();
/**
* 启动连接
*/
connection.start();
/**
* 第三步:创建会话
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:创建消息目的地,这里我们创建一个名为object.test的消息队列
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:创建消息生产者
*/
messageProducer = session.createProducer(destination);
/**
* 第六步:发送消息,这个步骤包括创建消息,然后发送消息
*/
sendMessage(session, messageProducer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/**
* 发送消息
*
* @param session
* @param messageProducer
* @throws JMSException
*/
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
/**
* 创建一条Object消息
*/
ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) session.createObjectMessage();
for (int i = 0; i < 10; i++) {
User user = new User();
user.setUserName("hyn" + i);
user.setPassword("qwe" + i);
System.out.println("发送消息:Activemq 发送消息" + user.toString());
/**
* 对象需要序列化
*/
objectMessage.setObject(user);
/**
* 通过消息生产者发出消息
*/
messageProducer.send(objectMessage);
}
}
}
运行结果图:
public class JmsConsumerMessageListener {
/**
* 默认连接用户名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认用户密码
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "object.test";
public static void main(String[] args) {
/**
* 第一步:创建连接工厂
*/
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 设置所有对所有序列化包都信任
*/
connectionFactory.setTrustAllPackages(true);
/**
* 连接
*/
Connection connection = null;
/**
* 会话
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息消费者
*/
MessageConsumer messageConsumer = null;
try {
/**
* 第二步:创建连接
*/
connection = connectionFactory.createConnection();
/**
* 启动连接
*/
connection.start();
/**
* 第三步:创建会话
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:创建消息目的地,这里我们创建一个名为object.test的消息队列
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:创建消费者
*/
messageConsumer = session.createConsumer(destination);
/**
* 第六步:创建监听器
*/
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
User user = (User) ((ActiveMQObjectMessage) message).getObject();
System.out.println("收到的消息:" + user.toString());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
// if (null != session) {
// try {
// session.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// if (null != connection) {
// try {
// connection.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
}
}
}
运行结果图:
从代码中我们可以看的,ObjectMessage跟TextMessage代码差不多,只不过有两个地方需要注意:
首先我们项目的资源目录下新建两个文件,producer.txt 和 consumer.txt,在producer.txt输入如下内容,consumer.txt为空。
public class JmsProducer {
/**
* 默认连接用户名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认用户密码
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "bytes.test";
public static void main(String[] args) {
/**
* 第一步:创建连接工厂
*/
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 连接
*/
Connection connection = null;
/**
* 会话
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息生产者
*/
MessageProducer messageProducer = null;
try {
/**
* 第二步:创建连接
*/
connection = connectionFactory.createConnection();
/**
* 启动连接
*/
connection.start();
/**
* 第三步:创建会话
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:创建消息目的地,这里我们创建一个名为bytes.test的消息队列
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:创建消息生产者
*/
messageProducer = session.createProducer(destination);
/**
* 第六步:发送消息,这个步骤包括创建消息,然后发送消息
*/
sendMessage(session, messageProducer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/**
* 发送消息
*
* @param session
* @param messageProducer
* @throws JMSException
*/
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
/**
* 创建一条Byte消息
*/
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes(getFileByte(System.getProperty("user.dir")+"/src/main/resources/producer.txt"));
messageProducer.send(bytesMessage);
}
/**
* 读取文件
*
* @param fileUrl
* @return
*/
public static byte[] getFileByte(String fileUrl) {
byte[] buffer = null;
FileInputStream fileInputStream = null;
try {
fileInputStream = new FileInputStream(new File(ResourceUtils.getURL(fileUrl).getPath()));
buffer = new byte[fileInputStream.available()];
fileInputStream.read(buffer);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (fileInputStream != null) {
try {
fileInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return buffer;
}
}
运行结果图:
public class JmsConsumerMessageListener {
/**
* 默认连接用户名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认用户密码
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "bytes.test";
public static void main(String[] args) {
/**
* 第一步:创建连接工厂
*/
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 连接
*/
Connection connection = null;
/**
* 会话
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息消费者
*/
MessageConsumer messageConsumer = null;
try {
/**
* 第二步:创建连接
*/
connection = connectionFactory.createConnection();
/**
* 启动连接
*/
connection.start();
/**
* 第三步:创建会话
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:创建消息目的地,这里我们创建一个名为bytes.test的消息队列
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:创建消费者
*/
messageConsumer = session.createConsumer(destination);
/**
* 第六步:创建监听器
*/
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
FileOutputStream fileOutputStream = null;
try {
BytesMessage bytesMessage = (BytesMessage) message;
fileOutputStream = new FileOutputStream(new File((System.getProperty("user.dir") + "/src/main/resources/consumer.txt")));
byte[] content = new byte[1024];
int len;
while ((len = bytesMessage.readBytes(content)) != -1) {
fileOutputStream.write(content, 0, len);
}
} catch (JMSException e) {
e.printStackTrace();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (fileOutputStream != null) {
try {
fileOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
// if (null != session) {
// try {
// session.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// if (null != connection) {
// try {
// connection.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
}
}
}
运行结果图:
从结果可以看出,consumer.txt的内容结果跟product.txt内容是一致的,即消息接收成功。当然,发送文件的话我们也可以使用StreamMessage,下面我们来看看StreamMessage的使用。
同样需要在项目中新建producer.txt 和 consumer.txt两个文件;
public class JmsProducer {
/**
* 默认连接用户名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认用户密码
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "stream.test";
public static void main(String[] args) {
/**
* 第一步:创建连接工厂
*/
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 连接
*/
Connection connection = null;
/**
* 会话
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息生产者
*/
MessageProducer messageProducer = null;
try {
/**
* 第二步:创建连接
*/
connection = connectionFactory.createConnection();
/**
* 启动连接
*/
connection.start();
/**
* 第三步:创建会话
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:创建消息目的地,这里我们创建一个名为stream.test的消息队列
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:创建消息生产者
*/
messageProducer = session.createProducer(destination);
/**
* 第六步:发送消息,这个步骤包括创建消息,然后发送消息
*/
sendMessage(session, messageProducer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/**
* 发送消息
*
* @param session
* @param messageProducer
* @throws JMSException
*/
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
/**
* 创建一条streamMessage消息
*/
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeBytes(getFileByte(System.getProperty("user.dir") + "/src/main/resources/producer.txt"));
messageProducer.send(streamMessage);
}
/**
* 读取文件
*
* @param fileUrl
* @return
*/
public static byte[] getFileByte(String fileUrl) {
byte[] buffer = null;
FileInputStream fileInputStream = null;
try {
fileInputStream = new FileInputStream(new File(ResourceUtils.getURL(fileUrl).getPath()));
buffer = new byte[fileInputStream.available()];
fileInputStream.read(buffer);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (fileInputStream != null) {
try {
fileInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return buffer;
}
}
运行结果图:
public class JmsConsumerMessageListener {
/**
* 默认连接用户名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认用户密码
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "stream.test";
public static void main(String[] args) {
/**
* 第一步:创建连接工厂
*/
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 连接
*/
Connection connection = null;
/**
* 会话
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息消费者
*/
MessageConsumer messageConsumer = null;
try {
/**
* 第二步:创建连接
*/
connection = connectionFactory.createConnection();
/**
* 启动连接
*/
connection.start();
/**
* 第三步:创建会话
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:创建消息目的地,这里我们创建一个名为stream.test的消息队列
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:创建消费者
*/
messageConsumer = session.createConsumer(destination);
/**
* 第六步:创建监听器
*/
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
FileOutputStream fileOutputStream = null;
try {
StreamMessage streamMessage = (StreamMessage) message;
fileOutputStream = new FileOutputStream(new File((System.getProperty("user.dir") + "/src/main/resources/consumer.txt")));
byte[] content = new byte[1024];
int len;
while ((len = streamMessage.readBytes(content)) != -1) {
fileOutputStream.write(content, 0, len);
}
} catch (JMSException e) {
e.printStackTrace();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (fileOutputStream != null) {
try {
fileOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
// if (null != session) {
// try {
// session.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// if (null != connection) {
// try {
// connection.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
}
}
}
运行结果图:
public class JmsProducer {
/**
* 默认连接用户名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认用户密码
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "map.test";
public static void main(String[] args) {
/**
* 第一步:创建连接工厂
*/
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 连接
*/
Connection connection = null;
/**
* 会话
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息生产者
*/
MessageProducer messageProducer = null;
try {
/**
* 第二步:创建连接
*/
connection = connectionFactory.createConnection();
/**
* 启动连接
*/
connection.start();
/**
* 第三步:创建会话
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:创建消息目的地,这里我们创建一个名为map.test的消息队列
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:创建消息生产者
*/
messageProducer = session.createProducer(destination);
/**
* 第六步:发送消息,这个步骤包括创建消息,然后发送消息
*/
sendMessage(session, messageProducer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/**
* 发送消息
*
* @param session
* @param messageProducer
* @throws JMSException
*/
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
/**
* 创建一条mapMessage消息
*/
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("name","hyn");
mapMessage.setInt("age",27);
messageProducer.send(mapMessage);
}
}
运行结果图:
public class JmsConsumerMessageListener {
/**
* 默认连接用户名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认用户密码
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "map.test";
public static void main(String[] args) {
/**
* 第一步:创建连接工厂
*/
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 连接
*/
Connection connection = null;
/**
* 会话
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息消费者
*/
MessageConsumer messageConsumer = null;
try {
/**
* 第二步:创建连接
*/
connection = connectionFactory.createConnection();
/**
* 启动连接
*/
connection.start();
/**
* 第三步:创建会话
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:创建消息目的地,这里我们创建一个名为stream.test的消息队列
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:创建消费者
*/
messageConsumer = session.createConsumer(destination);
/**
* 第六步:创建监听器
*/
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
MapMessage mapMessage = (MapMessage) message;
try {
System.out.println("name:" + mapMessage.getString("name"));
System.out.println("age:" + mapMessage.getInt("age"));
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
// if (null != session) {
// try {
// session.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// if (null != connection) {
// try {
// connection.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
}
}
}
运行结果图:
消息发送成功后,接收端接收到了消息。然后进行处理,但是可能由于某种原因,高并发也好,IO阻塞也好,反正这条消息在接收端处理失败了。而点对点的特性是一条消息,只会被一个接收端给接收,只要接收端A接收成功了,接收端B,就不可能接收到这条消息,如果是一些普通的消息还好,但是如果是一些很重要的消息,比如说用户的支付订单,用户的退款,这些与金钱相关的,是必须保证成功的,那么这个时候要怎么处理呢?
我们可以在创建session的时候使用 CLIENT_ACKNOWLEDGE 模式。创建session的时候是需要指定事务以及消息的处理模式的。我们之前是这样创建session:
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
AUTO_ACKNOWLEDGE的消息处理模式是当消息发送给接收端之后,就自动确认成功了,而不管接收端有没有处理成功,而一旦确认成功后,就会把队列里面的消息给清除掉,避免下一个接收端接收到同样的消息。
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
而当我们使用CLIENT_ACKNOWLEDGE的消息处理模式时,如果接收端不确认消息,那么activemq将会把这条消息一直保留,直到有一个接收端确定了消息。那么要怎么确认消息呢?具体代码如下:
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("收到的消息:" + textMessage.getText());
//确认接收,并成功处理了消息
textMessage.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
之前的代码里面,实现了一个监听器,监听消息的传递,这样只要每有一个消息,都会即时的传递到程序中。但是,这样的处理,在高并发的时候,因为它是被动接收,并没有考虑到程序的处理能力,可能会压跨系统,那要怎么办呢?
答案就是把被动变为主动,当程序有着处理消息的能力时,主动去接收一条消息进行处理
if(当程序有能力处理){//当程序有能力处理时接收
Message receive = consumer.receive();
//这个可以设置超时时间,超过则不等待消息
recieve.receive(10000);
//其实receive是一个阻塞式方法,一定会拿到值的
if(null != receive){
String text = ((TextMessage)receive).getText();
receive.acknowledge();
System.out.println(text);
}else{
//没有值
}
}
ActiveMQ是支持多个接收端的,如果当程序无法处理这么多数据的时候,可以考虑多个线程,或者增加服务器来处理。
这样的场景也是有的,一条消息的有效时间,当发送一条消息的时候,可能希望这条消息在指定的时间被处理,如果超过了指定的时间,那么这条消息就失效了,就不需要进行处理了,那么我们可以使用ActiveMQ的设置有效期来实现。具体设置如下:
producer.setTimeToLive(long l);
过期的、处理失败的消息,将会被ActiveMQ置入“ActiveMQ.DLQ”这个队列中。这个队列是ActiveMQ自动创建的。如果需要查看这些未被处理的消息,可以进入这个队列中查看:
//指定一个目的地,也就是一个队列的位置
destination = session.createQueue("ActiveMQ.DLQ");
这样就可以进入队列中,然后实现接口,或者通过receive()方法,就可以拿到未被处理的消息,从而保证正确的处理。
整理文章主要为了自己日后复习用,文章中可能会引用到别的博主的文章,如涉及到博主的版权问题,请博主联系我。
ActiveMQ是由Apache出品的,一款最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,它非常快速,支持多种语言的客户端和协议,而且可以非常容易的嵌入到企业的应用环境中,并有许多高级功能。
ActiveMQ官网下载地址:http://activemq.apache.org/download.html
ActiveMQ 提供了Windows 和Linux、Unix 等几个版本。具体安装方法请自行查找资料进行安装,博主这边就不多叙述。
安装成功启动ActiveMQ服务后,在浏览器输入http://localhost:8161,用户名密码默认都是 admin。下面为登陆成功后的页面:
Queues是队列方式消息,从菜单栏中点击Queues可以进入到Queues页面,页面主要内容包括:
Topics是主题方式消息,从菜单栏中点击Topics可以进入到Topics页面,页面主要内容包括:
Subscribers 是查看订阅者的页面,可以查看订阅者的信息等。只在Topics消息类型中这个页面才会有数据。
Connections页面可以查看到所有的连接数。
点对点的模式主要建立在一个队列上面,当连接一个列队的时候,发送端不需要知道接收端是否正在接收,可以直接向ActiveMQ发送消息,发送的消息将会先进入队列中,如果有接收端在监听,则会发向接收端,如果没有接收端接收,则会保存在activemq服务器,直到接收端接收消息,点对点的消息模式可以有多个发送端,多个接收端,但是一条消息,只会被一个接收端给接收到,哪个接收端先连上ActiveMQ,则会先接收到,而后来的接收端则接收不到那条消息。
public class JmsProducer {
/**
* 默认连接用户名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认用户密码
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "queue.test";
public static void main(String[] args) {
/**
* 第一步:创建连接工厂
*/
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 连接
*/
Connection connection = null;
/**
* 会话
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息生产者
*/
MessageProducer messageProducer = null;
try {
/**
* 第二步:创建连接
*/
connection = connectionFactory.createConnection();
/**
* 启动连接
*/
connection.start();
/**
* 第三步:创建会话
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:创建消息目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic。这里我们创建一个名为queue.test的消息队列。
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:创建消息生产者
*/
messageProducer = session.createProducer(destination);
/**
* 第六步:发送消息,这个步骤包括创建消息,然后发送消息
*/
sendMessage(session, messageProducer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/**
* 发送消息
*
* @param session
* @param messageProducer
* @throws JMSException
*/
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
for (int i = 0; i < 10; i++) {
/**
* 创建一条文本消息
*/
TextMessage message = session.createTextMessage("ActiveMQ 发送消息" + i);
System.out.println("发送消息:Activemq 发送消息" + i);
/**
* 通过消息生产者发出消息
*/
messageProducer.send(message);
}
}
}
运行结果图:
我们可以看的,当运行JmsProducer程序时,在ActiveMQ控制台,可以看到生产者往queue.test的队列中发送了10条消息,因为这时还没有消费者,所以这边的Number Of Pending Messages显示的是10,
Number Of Consumers显示的是0,Messages Enqueued显示的也是10。
public class JmsConsumer {
/**
* 默认连接用户名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认用户密码
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "queue.test";
public static void main(String[] args) {
/**
* 第一步:创建连接工厂
*/
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 连接
*/
Connection connection = null;
/**
* 会话
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息消费者
*/
MessageConsumer messageConsumer = null;
try {
/**
* 第二步:创建连接
*/
connection = connectionFactory.createConnection();
/**
* 启动连接
*/
connection.start();
/**
* 第三步:创建会话
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:创建消息目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic。这里我们创建一个名为queue.test的消息队列。
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:创建消费者
*/
messageConsumer = session.createConsumer(destination);
while (true) {
/**
* 接收数据的时间(等待) 100 ms
*/
TextMessage textMessage = (TextMessage) messageConsumer.receive(1000 * 100);
if (textMessage != null) {
System.out.println("收到的消息:" + textMessage.getText());
} else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
运行结果图:
我们可以看到,但运行JmsConsumer程序时,在运行程序的控制台中我们可以看到消费者消费了刚刚生产者生产的消息。在ActiveMQ控制台,可以看到所以这边的Number Of Pending Messages显示的是0,Number Of Consumers显示的是1,Messages Enqueued显示的是10,Messages Dequeued显示的也是10,即消息被消费。
在前面的消费者例子中,我们这边使用while (true) 死循环来不停接受消息。这样很浪费cpu资源,实际生产中不会这么做。下面,我们采用注册一个监听器的方法,当监听到有消息入队列后,才去接收消息。
public class JmsConsumerMessageListener {
/**
* 默认连接用户名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认用户密码
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "queue.test";
public static void main(String[] args) {
/**
* 第一步:创建连接工厂
*/
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 连接
*/
Connection connection = null;
/**
* 会话
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息消费者
*/
MessageConsumer messageConsumer = null;
try {
/**
* 第二步:创建连接
*/
connection = connectionFactory.createConnection();
/**
* 启动连接
*/
connection.start();
/**
* 第三步:创建会话
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:创建消息目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic。这里我们创建一个名为queue.test的消息队列。
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:创建消费者
*/
messageConsumer = session.createConsumer(destination);
/**
* 第六步:创建监听器
*/
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("收到的消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
// if (null != session) {
// try {
// session.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// if (null != connection) {
// try {
// connection.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
}
}
}
运行结果图:
当生产者一生产消息到队列中时,我们的消费者就马上进行消费,注意程序中我们没有将会话和连接关闭,因为监听器是异步的,如果关闭后就无法接收到消息。
订阅/发布模式,同样可以有着多个发送端与多个接收端,但是接收端与发送端存在时间上的依赖,就是如果发送端发送消息的时候,接收端并没有监听消息,那么ActiveMQ将不会保存消息,将会认为消息已经发送,换一种说法,就是发送端发送消息的时候,接收端不在线,是接收不到消息的,哪怕以后监听消息,同样也是接收不到的。这个模式还有一个特点,那就是发送端发送的消息,将会被所有的接收端给接收到,不类似点对点,一条消息只会被一个接收端给接收到。
public class JmsProducer {
/**
* 默认连接用户名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认用户密码
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String TOPIC_NAME = "topic.test";
public static void main(String[] args) {
/**
* 第一步:创建连接工厂
*/
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 连接
*/
Connection connection = null;
/**
* 会话
*/
Session session = null;
/**
* 消息目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic
*/
Destination destination = null;
/**
* 消息生产者
*/
MessageProducer messageProducer = null;
try {
/**
* 第二步:创建连接
*/
connection = connectionFactory.createConnection();
/**
* 启动连接
*/
connection.start();
/**
* 第三步:创建会话
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:创建消息目的地,这里我们创建一个名为topic.test的主题
*/
destination = session.createTopic(TOPIC_NAME);
/**
* 第五步:创建消息生产者
*/
messageProducer = session.createProducer(destination);
/**
* 第六步:发送消息,这个步骤包括创建消息,然后发送消息
*/
sendMessage(session, messageProducer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/**
* 发送消息
*
* @param session
* @param messageProducer
* @throws JMSException
*/
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
for (int i = 0; i < 10; i++) {
/**
* 创建一条文本消息
*/
TextMessage message = session.createTextMessage("ActiveMQ 发送消息" + i);
System.out.println("发送消息:Activemq 发送消息" + i);
/**
* 通过消息生产者发出消息
*/
messageProducer.send(message);
}
}
}
运行结果图:
public class JmsConsumer {
/**
* 默认连接用户名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认用户密码
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String TOPIC_NAME = "topic.test";
public static void main(String[] args) {
/**
* 第一步:创建连接工厂
*/
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 连接
*/
Connection connection = null;
/**
* 会话
*/
Session session = null;
/**
* 消息目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic
*/
Destination destination = null;
/**
* 消息消费者
*/
MessageConsumer messageConsumer = null;
try {
/**
* 第二步:创建连接
*/
connection = connectionFactory.createConnection();
/**
* 启动连接
*/
connection.start();
/**
* 第三步:创建会话
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:创建消息目的地,这里我们创建一个名为topic.test的主题
*/
destination = session.createTopic(TOPIC_NAME);
/**
* 第五步:创建消费者
*/
messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("收到的消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
// if (null != session) {
// try {
// session.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// if (null != connection) {
// try {
// connection.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
}
}
}
运行结果图:
我们可以发现,Topic消息模式的代码跟Queue消息模式的代码基本是一样的,除了在创建消息目的地的时候,一个是queue一个是topic;还有一点区别就是,Topic消息模式,订阅者需要先订阅,才能接收到发布者发布的消息。
在通过Connection创建Session的时候,需要设置2个参数,一个是否支持事务,另一个是签收的模式。
签收就是消费者接受到消息后,需要告诉消息服务器,我收到消息了。当消息服务器收到回执后,本条消息将失效。因此签收将对PTP模式产生很大影响。如果消费者收到消息后,并不签收,那么本条消息继续有效,很可能会被其他消费者消费掉!
签收方式有三种:
我们上面演示的全都是字符串的消息类型,但ActiveMQ支持的还有ObjectMessage,StreamMessage,MapMessage,BytesMessage等消息类型。下面我们来看看其他消息类型是如何编写的,以下都是以队列的消息模式进行。
public class User implements Serializable {
private static final long serialVersionUID = 2504467948968634865L;
private String userName;
private String password;
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
@Override
public String toString() {
return "User{" +
"userName=‘" + userName + ‘\‘‘ +
", password=‘" + password + ‘\‘‘ +
‘}‘;
}
}
public class JmsProducer {
/**
* 默认连接用户名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认用户密码
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "object.test";
public static void main(String[] args) {
/**
* 第一步:创建连接工厂
*/
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 设置所有对所有序列化包都信任
*/
connectionFactory.setTrustAllPackages(true);
/**
* 连接
*/
Connection connection = null;
/**
* 会话
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息生产者
*/
MessageProducer messageProducer = null;
try {
/**
* 第二步:创建连接
*/
connection = connectionFactory.createConnection();
/**
* 启动连接
*/
connection.start();
/**
* 第三步:创建会话
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:创建消息目的地,这里我们创建一个名为object.test的消息队列
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:创建消息生产者
*/
messageProducer = session.createProducer(destination);
/**
* 第六步:发送消息,这个步骤包括创建消息,然后发送消息
*/
sendMessage(session, messageProducer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/**
* 发送消息
*
* @param session
* @param messageProducer
* @throws JMSException
*/
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
/**
* 创建一条Object消息
*/
ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) session.createObjectMessage();
for (int i = 0; i < 10; i++) {
User user = new User();
user.setUserName("hyn" + i);
user.setPassword("qwe" + i);
System.out.println("发送消息:Activemq 发送消息" + user.toString());
/**
* 对象需要序列化
*/
objectMessage.setObject(user);
/**
* 通过消息生产者发出消息
*/
messageProducer.send(objectMessage);
}
}
}
运行结果图:
public class JmsConsumerMessageListener {
/**
* 默认连接用户名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认用户密码
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "object.test";
public static void main(String[] args) {
/**
* 第一步:创建连接工厂
*/
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 设置所有对所有序列化包都信任
*/
connectionFactory.setTrustAllPackages(true);
/**
* 连接
*/
Connection connection = null;
/**
* 会话
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息消费者
*/
MessageConsumer messageConsumer = null;
try {
/**
* 第二步:创建连接
*/
connection = connectionFactory.createConnection();
/**
* 启动连接
*/
connection.start();
/**
* 第三步:创建会话
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:创建消息目的地,这里我们创建一个名为object.test的消息队列
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:创建消费者
*/
messageConsumer = session.createConsumer(destination);
/**
* 第六步:创建监听器
*/
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
User user = (User) ((ActiveMQObjectMessage) message).getObject();
System.out.println("收到的消息:" + user.toString());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
// if (null != session) {
// try {
// session.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// if (null != connection) {
// try {
// connection.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
}
}
}
运行结果图:
从代码中我们可以看的,ObjectMessage跟TextMessage代码差不多,只不过有两个地方需要注意:
首先我们项目的资源目录下新建两个文件,producer.txt 和 consumer.txt,在producer.txt输入如下内容,consumer.txt为空。
public class JmsProducer {
/**
* 默认连接用户名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认用户密码
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "bytes.test";
public static void main(String[] args) {
/**
* 第一步:创建连接工厂
*/
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 连接
*/
Connection connection = null;
/**
* 会话
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息生产者
*/
MessageProducer messageProducer = null;
try {
/**
* 第二步:创建连接
*/
connection = connectionFactory.createConnection();
/**
* 启动连接
*/
connection.start();
/**
* 第三步:创建会话
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:创建消息目的地,这里我们创建一个名为bytes.test的消息队列
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:创建消息生产者
*/
messageProducer = session.createProducer(destination);
/**
* 第六步:发送消息,这个步骤包括创建消息,然后发送消息
*/
sendMessage(session, messageProducer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/**
* 发送消息
*
* @param session
* @param messageProducer
* @throws JMSException
*/
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
/**
* 创建一条Byte消息
*/
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes(getFileByte(System.getProperty("user.dir")+"/src/main/resources/producer.txt"));
messageProducer.send(bytesMessage);
}
/**
* 读取文件
*
* @param fileUrl
* @return
*/
public static byte[] getFileByte(String fileUrl) {
byte[] buffer = null;
FileInputStream fileInputStream = null;
try {
fileInputStream = new FileInputStream(new File(ResourceUtils.getURL(fileUrl).getPath()));
buffer = new byte[fileInputStream.available()];
fileInputStream.read(buffer);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (fileInputStream != null) {
try {
fileInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return buffer;
}
}
运行结果图:
public class JmsConsumerMessageListener {
/**
* 默认连接用户名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认用户密码
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "bytes.test";
public static void main(String[] args) {
/**
* 第一步:创建连接工厂
*/
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 连接
*/
Connection connection = null;
/**
* 会话
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息消费者
*/
MessageConsumer messageConsumer = null;
try {
/**
* 第二步:创建连接
*/
connection = connectionFactory.createConnection();
/**
* 启动连接
*/
connection.start();
/**
* 第三步:创建会话
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:创建消息目的地,这里我们创建一个名为bytes.test的消息队列
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:创建消费者
*/
messageConsumer = session.createConsumer(destination);
/**
* 第六步:创建监听器
*/
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
FileOutputStream fileOutputStream = null;
try {
BytesMessage bytesMessage = (BytesMessage) message;
fileOutputStream = new FileOutputStream(new File((System.getProperty("user.dir") + "/src/main/resources/consumer.txt")));
byte[] content = new byte[1024];
int len;
while ((len = bytesMessage.readBytes(content)) != -1) {
fileOutputStream.write(content, 0, len);
}
} catch (JMSException e) {
e.printStackTrace();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (fileOutputStream != null) {
try {
fileOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
// if (null != session) {
// try {
// session.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// if (null != connection) {
// try {
// connection.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
}
}
}
运行结果图:
从结果可以看出,consumer.txt的内容结果跟product.txt内容是一致的,即消息接收成功。当然,发送文件的话我们也可以使用StreamMessage,下面我们来看看StreamMessage的使用。
同样需要在项目中新建producer.txt 和 consumer.txt两个文件;
public class JmsProducer {
/**
* 默认连接用户名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认用户密码
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "stream.test";
public static void main(String[] args) {
/**
* 第一步:创建连接工厂
*/
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 连接
*/
Connection connection = null;
/**
* 会话
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息生产者
*/
MessageProducer messageProducer = null;
try {
/**
* 第二步:创建连接
*/
connection = connectionFactory.createConnection();
/**
* 启动连接
*/
connection.start();
/**
* 第三步:创建会话
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:创建消息目的地,这里我们创建一个名为stream.test的消息队列
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:创建消息生产者
*/
messageProducer = session.createProducer(destination);
/**
* 第六步:发送消息,这个步骤包括创建消息,然后发送消息
*/
sendMessage(session, messageProducer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/**
* 发送消息
*
* @param session
* @param messageProducer
* @throws JMSException
*/
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
/**
* 创建一条streamMessage消息
*/
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeBytes(getFileByte(System.getProperty("user.dir") + "/src/main/resources/producer.txt"));
messageProducer.send(streamMessage);
}
/**
* 读取文件
*
* @param fileUrl
* @return
*/
public static byte[] getFileByte(String fileUrl) {
byte[] buffer = null;
FileInputStream fileInputStream = null;
try {
fileInputStream = new FileInputStream(new File(ResourceUtils.getURL(fileUrl).getPath()));
buffer = new byte[fileInputStream.available()];
fileInputStream.read(buffer);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (fileInputStream != null) {
try {
fileInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return buffer;
}
}
运行结果图:
public class JmsConsumerMessageListener {
/**
* 默认连接用户名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认用户密码
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "stream.test";
public static void main(String[] args) {
/**
* 第一步:创建连接工厂
*/
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 连接
*/
Connection connection = null;
/**
* 会话
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息消费者
*/
MessageConsumer messageConsumer = null;
try {
/**
* 第二步:创建连接
*/
connection = connectionFactory.createConnection();
/**
* 启动连接
*/
connection.start();
/**
* 第三步:创建会话
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:创建消息目的地,这里我们创建一个名为stream.test的消息队列
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:创建消费者
*/
messageConsumer = session.createConsumer(destination);
/**
* 第六步:创建监听器
*/
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
FileOutputStream fileOutputStream = null;
try {
StreamMessage streamMessage = (StreamMessage) message;
fileOutputStream = new FileOutputStream(new File((System.getProperty("user.dir") + "/src/main/resources/consumer.txt")));
byte[] content = new byte[1024];
int len;
while ((len = streamMessage.readBytes(content)) != -1) {
fileOutputStream.write(content, 0, len);
}
} catch (JMSException e) {
e.printStackTrace();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (fileOutputStream != null) {
try {
fileOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
// if (null != session) {
// try {
// session.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// if (null != connection) {
// try {
// connection.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
}
}
}
运行结果图:
public class JmsProducer {
/**
* 默认连接用户名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认用户密码
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "map.test";
public static void main(String[] args) {
/**
* 第一步:创建连接工厂
*/
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 连接
*/
Connection connection = null;
/**
* 会话
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息生产者
*/
MessageProducer messageProducer = null;
try {
/**
* 第二步:创建连接
*/
connection = connectionFactory.createConnection();
/**
* 启动连接
*/
connection.start();
/**
* 第三步:创建会话
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:创建消息目的地,这里我们创建一个名为map.test的消息队列
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:创建消息生产者
*/
messageProducer = session.createProducer(destination);
/**
* 第六步:发送消息,这个步骤包括创建消息,然后发送消息
*/
sendMessage(session, messageProducer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/**
* 发送消息
*
* @param session
* @param messageProducer
* @throws JMSException
*/
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
/**
* 创建一条mapMessage消息
*/
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("name","hyn");
mapMessage.setInt("age",27);
messageProducer.send(mapMessage);
}
}
运行结果图:
public class JmsConsumerMessageListener {
/**
* 默认连接用户名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认用户密码
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "map.test";
public static void main(String[] args) {
/**
* 第一步:创建连接工厂
*/
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 连接
*/
Connection connection = null;
/**
* 会话
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息消费者
*/
MessageConsumer messageConsumer = null;
try {
/**
* 第二步:创建连接
*/
connection = connectionFactory.createConnection();
/**
* 启动连接
*/
connection.start();
/**
* 第三步:创建会话
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:创建消息目的地,这里我们创建一个名为stream.test的消息队列
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:创建消费者
*/
messageConsumer = session.createConsumer(destination);
/**
* 第六步:创建监听器
*/
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
MapMessage mapMessage = (MapMessage) message;
try {
System.out.println("name:" + mapMessage.getString("name"));
System.out.println("age:" + mapMessage.getInt("age"));
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
// if (null != session) {
// try {
// session.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// if (null != connection) {
// try {
// connection.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
}
}
}
运行结果图:
消息发送成功后,接收端接收到了消息。然后进行处理,但是可能由于某种原因,高并发也好,IO阻塞也好,反正这条消息在接收端处理失败了。而点对点的特性是一条消息,只会被一个接收端给接收,只要接收端A接收成功了,接收端B,就不可能接收到这条消息,如果是一些普通的消息还好,但是如果是一些很重要的消息,比如说用户的支付订单,用户的退款,这些与金钱相关的,是必须保证成功的,那么这个时候要怎么处理呢?
我们可以在创建session的时候使用 CLIENT_ACKNOWLEDGE 模式。创建session的时候是需要指定事务以及消息的处理模式的。我们之前是这样创建session:
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
AUTO_ACKNOWLEDGE的消息处理模式是当消息发送给接收端之后,就自动确认成功了,而不管接收端有没有处理成功,而一旦确认成功后,就会把队列里面的消息给清除掉,避免下一个接收端接收到同样的消息。
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
而当我们使用CLIENT_ACKNOWLEDGE的消息处理模式时,如果接收端不确认消息,那么activemq将会把这条消息一直保留,直到有一个接收端确定了消息。那么要怎么确认消息呢?具体代码如下:
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("收到的消息:" + textMessage.getText());
//确认接收,并成功处理了消息
textMessage.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
之前的代码里面,实现了一个监听器,监听消息的传递,这样只要每有一个消息,都会即时的传递到程序中。但是,这样的处理,在高并发的时候,因为它是被动接收,并没有考虑到程序的处理能力,可能会压跨系统,那要怎么办呢?
答案就是把被动变为主动,当程序有着处理消息的能力时,主动去接收一条消息进行处理
if(当程序有能力处理){//当程序有能力处理时接收
Message receive = consumer.receive();
//这个可以设置超时时间,超过则不等待消息
recieve.receive(10000);
//其实receive是一个阻塞式方法,一定会拿到值的
if(null != receive){
String text = ((TextMessage)receive).getText();
receive.acknowledge();
System.out.println(text);
}else{
//没有值
}
}
ActiveMQ是支持多个接收端的,如果当程序无法处理这么多数据的时候,可以考虑多个线程,或者增加服务器来处理。
这样的场景也是有的,一条消息的有效时间,当发送一条消息的时候,可能希望这条消息在指定的时间被处理,如果超过了指定的时间,那么这条消息就失效了,就不需要进行处理了,那么我们可以使用ActiveMQ的设置有效期来实现。具体设置如下:
producer.setTimeToLive(long l);
过期的、处理失败的消息,将会被ActiveMQ置入“ActiveMQ.DLQ”这个队列中。这个队列是ActiveMQ自动创建的。如果需要查看这些未被处理的消息,可以进入这个队列中查看:
//指定一个目的地,也就是一个队列的位置
destination = session.createQueue("ActiveMQ.DLQ");
这样就可以进入队列中,然后实现接口,或者通过receive()方法,就可以拿到未被处理的消息,从而保证正确的处理。
整理文章主要为了自己日后复习用,文章中可能会引用到别的博主的文章,如涉及到博主的版权问题,请博主联系我。
标签:ext apach 模式 txt api new host broker 产生
原文地址:https://www.cnblogs.com/jpfss/p/10445091.html