标签:mes 属性 https 不同 temporary rod property ssi 优先
??ActiveMQ支持很多消息属性,具体可以参考 http://activemq.apache.org/activemq-message-properties.html
??常见得一些属性说明:
ActiveMQ里消息属性的值,不仅可以用基本类型,还可以用List或Map类型
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" advisoryForConsumed="true"/>
</policyEntries>
</policyMap>
</destinationPolicy>
开启之后启动ActiveMQ
查看控制台:
已经可以看到以ActiveMQ.Advisory为前缀的topic了,
监听ActiveMQ.Advisory.Producer.Queue.my-queue实现如下:
package com.wangx.activemq;
import com.wangx.activemq.util.MQUtil;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMessage;
import javax.jms.*;
public class MessageReceiver {
/**
* topic名字
*/
private static final String QUEUENAME = "ActiveMQ.Advisory.Producer.Queue.my-queue";
public void receive() throws JMSException {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
//获取Session
//创建队列
Topic queue = session.createTopic(QUEUENAME);
//创建消费者
MessageConsumer messageConsumer = session.createConsumer(queue);
//监听生产者信息
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//将类型转换成ActiveMQMessage
ActiveMQMessage activeMQMessage = (ActiveMQMessage)message;
try {
//打印message
System.out.println(activeMQMessage.getMessage());
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
public static void main(String[] args) throws JMSException {
MessageReceiver messageReceiver = new MessageReceiver();
messageReceiver.receive();
}
}
Advisory的使用方式:
Topic queue = session.createTopic(QUEUENAME);
Destination destination = AdvisorySupport.getProducerAdvisoryTopic(queue);
??2)由于这个topic默认不是持久化的,所有要先看起接收端,然后再发送消息。
??3) 接收消息的时候,接收到的消息类型是ActiveMQMessage,所以需要先转换成ActiveMQMessage,然后再通过getDataStructure方法来得到具体的信息对象。
代码如下:
//将类型转换成ActiveMQMessage
ActiveMQMessage activeMQMessage = (ActiveMQMessage)message;
try {
//打印message
System.out.println(activeMQMessage.getDataStructure());
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
这样可以可以拿到相关信息,
??有时候我们不希望消息马上被broker投递出去,而是想要消息60s以后发送给消费者,或者是我们想要让消息每隔一段时间投递一次,一共投递指定的次数。。。类似这种需求,ActiveMQ提供了一种broker端消息定时调度机制。
??我们只需要把几个描述消息定时调度的方式参数作为属性添加到消息,broker端的调度器就会按照我们想要的行为去处理消息。当然需要再xml broker中配置schedulerSupport属性为true,
一共四个属性:
AMQ_SCHEDULED_DELAY: 延迟投递的时间
AMQ_SCHEDULED_PERIOD: 重复投递的时间间隔
AMQ_SCHEDULED_REPEAT:重复投递次数
AMQ_SCHEDULED_CRON: Cron表达式
ActiveMQ也提供了一个封装的消息类型:org.apache.activemq.ScheduledMessage,可以使用这个类来辅助设置,使用例子如下:延迟60s
在broker上设置schedulerSupport="true",然后使用如下代码设置:
TextMessage textMessage = session.createTextMessage("message" + i);
long time = 30 * 1000;
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
??这样,当生产者发送消息之后,消费者不会马上收到消息,而是会等待30s之后才会开始接收消息
延迟10s,投递3次,间隔5秒的例子
TextMessage textMessage = session.createTextMessage("message" + i);
long delay = 10 * 1000;
long period= 5 * 1000;
int repeat = 3;
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
messageProducer.send(textMessage);
使用CRON表达式,每个小时发送一次
textMessage.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON,"0 * * * *");
??CRON表达式的优先级高于另外三个参数,如果在设置了延时时间,也有repeat和period参数,则会在每次CRON执行的时候,重复投递repeat次,每次间隔period,就是说设置的是叠加效果,例如每小时都会发生消息被投递10次,延迟0秒开始,每次间隔1秒。
??有时候需要JMS Producer内部进行message转换,从4.2版本起,ActiveMQ提供了一个Message Transform接口用于进行消息转换,可以在如下对象上调用:
ActiveMQConnectionFactory,ActiveMQConnection,ActiveMQSession,ActiveMQMessageConsumer,ActiveMQMessageProducer.
??在消息被发送之前发送到JMS producer的消息总线前进行转换,通过producerTransform方法,在消息到达总线后,但是在consumer接收消息之前进行转换,通过consumerTransform方法,当然MessageTransfoemer接口的实现需要你自己来提供。
标签:mes 属性 https 不同 temporary rod property ssi 优先
原文地址:https://www.cnblogs.com/yangk1996/p/11667702.html