标签:des blog http java 使用 io for 数据 ar
简单的说就是实现了ActiveMQ的broker上各种操作的记录跟踪和通知。
使用这个功能,你可以实时的知道broker上
这个机制是ActiveMQ对JMS协议的重要补充,也是基于JMS实现的ActiveMQ的可管理性的一部分。多个ActiveMQ的相互协调和互操作的基础设置。
使用很简单,完全的jms方式,示例如下
...
Destination advisoryDestination = AdvisorySupport.getProducerAdvisoryTopic(destination)
MessageConsumer consumer = session.createConsumer(advisoryDestination);
consumer.setMessageListener(this);
....
public void onMessage(Message msg){
if (msg instanceof ActiveMQMessage){
try {
ActiveMQMessage aMsg = (ActiveMQMessage)msg;
ProducerInfo prod = (ProducerInfo) aMsg.getDataStructure();
} catch (JMSException e) {
log.error("Failed to process message: " + msg);
}
}
}
通过拿到的消息的数据结构,可以进一步的拿到相关操作信息。
而且Advisory消息都是先返回当前的所有存在数据,比如现在已经存在的所有连接或是队列信息,然后再实时通知你新创建和断开的连接,新添加或删除的队列等等。。。
ActiveMQ.Advisory.Queue 数据结构为DestinationInfo,先拿到broker上现有的所有队列列表
如果再有add或remove队列的操作,拿到通知,operationType=0为add,为1则为remove
为topic时,自动过滤掉Advisory的topic。
ActiveMQ.Advisory.Producer.Queue 数据结构为ProducerInfo,其中包含Producer的参数信息
ActiveMQ.Advisory.Consumer.Queue 数据结构为ConsumerInfo,其中包含Consumer的参数信息
使用的时候,需要以此为前缀,监听需要的队列,比如kk.adt,则应该写
Topic topic = new ActiveMQTopic("ActiveMQ.Advisory.Consumer.Queue.kk.adt");
监听所有队列则
Topic topic = new ActiveMQTopic("ActiveMQ.Advisory.Consumer.Queue..>");
Producer或Consumer断开时,数据结构为RemoveInfo。
主题的监听类似。
持久订阅者上线时是ConsumerInfo,里面有clientId和consumerId,下线时的RemoveInfo里有consumerId,跟上线时对应。
ActiveMQ内置了一些工具类,根据你需要监听的队列名,自动拼Advisory的名称:
例如 AdvisorySupport.getProducerAdvisoryTopic(new ActiveMQTopic("kk.adt"));
broker接收到或是投递消息的通知默认是关闭的。
需要在policy上设置相应的开关。 消息处理相关的Advisory的数据结构是消息本身。
Advisory Topics | Description | properties | Data Structure |
---|---|---|---|
ActiveMQ.Advisory.Connection | Connection start & stop messages | ||
ActiveMQ.Advisory.Producer.Queue | Producer start & stop messages on a Queue | String=‘producerCount‘ - the number of producers | ProducerInfo |
ActiveMQ.Advisory.Producer.Topic | Producer start & stop messages on a Topic | String=‘producerCount‘ - the number of producers | ProducerInfo |
ActiveMQ.Advisory.Consumer.Queue | Consumer start & stop messages on a Queue | String=‘consumerCount‘ - the number of Consumers | ConsumerInfo |
ActiveMQ.Advisory.Consumer.Topic | Consumer start & stop messages on a Topic | String=‘consumerCount‘ - the number of Consumers | ConsumerInfo |
Advisory Topics | Description | properties | Data Structure | default | PolicyEntry property |
---|---|---|---|---|---|
ActiveMQ.Advisory.Queue | Queue create & destroy | null | null | true | none |
ActiveMQ.Advisory.Topic | Topic create & destroy | null | null | true | none |
ActiveMQ.Advisory.TempQueue | Temporary Queue create & destroy | null | null | true | none |
ActiveMQ.Advisory.TempTopic | Temporary Topic create & destroy | null | null | true | none |
ActiveMQ.Advisory.Expired.Queue | Expired messages on a Queue | String=‘orignalMessageId‘ - the expired id | Message | true | none |
ActiveMQ.Advisory.Expired.Topic | Expired messages on a Topic | String=‘orignalMessageId‘ - the expired id | Message | true | none |
ActiveMQ.Advisory.NoConsumer.Queue | No consumer is available to process messages being sent on a Queue | null | Message | false | sendAdvisoryIfNoConsumers |
ActiveMQ.Advisory.NoConsumer.Topic | No consumer is available to process messages being sent on a Topic | null | Message | false | sendAdvisoryIfNoConsumers |
Advisory Topics | Description | properties | Data Structure | default | PolicyEntry property |
---|---|---|---|---|---|
ActiveMQ.Advisory.SlowConsumer.Queue | Slow Queue Consumer | String=‘consumerId‘ - the consumer id | ConsumerInfo | false | advisoryForSlowConsumers |
ActiveMQ.Advisory.SlowConsumer.Topic | Slow Topic Consumer | String=‘consumerId‘ - the consumer id | ConsumerInfo | false | advisoryForSlowConsumers |
ActiveMQ.Advisory.FastProducer.Queue | Fast Queue producer | String=‘producerId‘ - the producer id | ProducerInfo | false | advisdoryForFastProducers |
ActiveMQ.Advisory. FastProducer.Topic | Fast Topic producer | String=‘consumerId‘ - the producer id | ProducerInfo | false | advisdoryForFastProducers |
ActiveMQ.Advisory.MessageDiscarded.Queue | Message discarded | String=‘orignalMessageId‘ - the discarded id | Message | false | advisoryForDiscardingMessages |
ActiveMQ.Advisory.MessageDiscarded.Topic | Message discarded | String=‘orignalMessageId‘ - the discarded id | Message | false | advisoryForDiscardingMessages |
ActiveMQ.Advisory.MessageDelivered.Queue | Message delivered to the broker | String=‘orignalMessageId‘ - the delivered id | Message | false | advisoryForDelivery |
ActiveMQ.Advisory.MessageDelivered.Topic | Message delivered to the broker | String=‘orignalMessageId‘ - the delivered id | Message | false | advisoryForDelivery |
ActiveMQ.Advisory.MessageConsumed.Queue | Message consumed by a client | String=‘orignalMessageId‘ - the delivered id | Message | false | advisoryForConsumed |
ActiveMQ.Advisory.MessageConsumed.Topic | Message consumed by a client | String=‘orignalMessageId‘ - the delivered id | Message | false | advisoryForConsumed |
ActiveMQ.Advisory.FULL | A Usage resource is at its limit | String=‘usageName‘ - the name of Usage resource | null | false | advisoryWhenFull |
ActiveMQ.Advisory.MasterBroker | A broker is now the master in a master/slave configuration | null | null | true | none |
Advisory Topics | Description | properties | Data Structure | default | PolicyEntry property |
---|---|---|---|---|---|
ActiveMQ.Advisory.MessageDLQd.Queue | Message sent to DLQ | String=‘orignalMessageId‘ - the delivered id | Message | Always on | advisoryForConsumed |
ActiveMQ.Advisory.MessageDLQd.Topic | Message sent to DLQ | String=‘orignalMessageId‘ - the delivered id | Message | Always on | advisoryForConsumed |
Advisory Topics | Description | properties | Data Structure | default |
---|---|---|---|---|
ActiveMQ.Advisory.NetworkBridge | Network bridge being stopped or started | Boolean="started" - true if bridge is started, false if it is stopped Boolean="createdByDuplex" - true if the bridge is created by remote network connector |
BrokerInfo - provides data of the remote broker | Always on |
<destinationPolicy>
<policyMap><policyEntries>
<policyEntry topic=">" advisoryForConsumed="true" />
</policyEntries></policyMap>
</destinationPolicy>
此外,在broker上有个通知消息的总开关,设置false以后,所有的Advisory都不可用:
<broker advisorySupport="false">...
转自:http://blog.csdn.net/kimmking/article/details/8443679
(转)ActiveMQ消息特性:Advisory Message(通知消息)
标签:des blog http java 使用 io for 数据 ar
原文地址:http://www.cnblogs.com/jiejiecool/p/3929235.html