标签:ini 需要 The rand 语法 三种方式 功能 cto 管理
??独有消费者:Queue中的消息是按照顺序被分发到consumer的,然而,当你有多个consumers同时从相同的queue中提取消息时,你将失去这个保证。因为这些消息是被多个线程并发的处理。有的时候,保证消息按照顺序处理是很重要的。例如:你可能不希望在插入订单操作结束之前执行更新这个订单的操作。
ActiveMQ从4.x版本开始支持Exclusive Consumer。Broker会从多个Consumers中挑选一个consumer来处理queue中所有的消息,从而保证了消息的有序处理。如果这个consumer失效,那么broker会自动切换到其他的consumer。可以通过destination options来创建一个Exclusive Consumer,如下:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
consumer = session.createConsumer(queue);
还可以给consumer设置优先级,以便针对网络情况进行优化,如下:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true&consumer.priority=10");
??在activemq4.0以后,你可以选择broker同步或异步的把消息分发给消费者。可以设置dispatchAsync属性,默认是true,通常情况下这是最佳的。
你也可以通过如下几种方式修改:
ActiveMQConnectionFactory.setDispatchAsync(false);
ActiveMQConnetion.setDispatchAsync(false);
在Consumer上设置
queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false");
consumer = session.createConsumer(queue);
??JMS JMSPriority定义了十个消息优先级值,0是最低优先级,9是最高优先级,另外,客户端应当将0-4看作普通优先级,5-9看作加急优先级。
自定义Consumer Priority优先级。配置如下:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.priority=10");
consumer = session.createConsumer(queue);
??Consumer的Priority的划分为0~127个级别,127是最高的级别,0是最低的也是ActiveMQ默认的。这种配置可以让Broker根据consumer的优先级来发送消息到较高的优先级的Consumer上,如果某个较高的Consumer的消息转载慢,则Broker会把消息发送到仅次于它优先级的Consumer上。
??消息持久化,保证了消费者离线之后,再次进入系统,不会错过消息,但是这也会消耗很多的资源,从5.6开始,可以对持久化进行如下管理:
Removing inactive subscribers:我们还希望可以删除那些不活动的订阅者,如下:
<broker name="localhost" offlineDurableSubscriberTimeout="86400000" offlineDurableSubscriberTaskSchedule="3600000">
offlineDurableSubscriberTaskSchedule: 多长时间检查一次,缺省300000,单位毫秒。
message.setStringProperty("JMSXGroupID","GroupA");
关闭一个Message Groups,只需要在message对象上设置属性即可,如下:
message.setStringProperty("JMSXGroupID","GroupA");
message.setIntProperty("JMSXGroupSeq",-1);
??JMS Selectors 用在获取消息的时候,可以基于消息属性和Xpath语法对消息进行过滤。JMS Selectors有SQL92语义定义。以下是个Selectors的例子:
consumer = session.createConsumer(destination, "JMSType='car' AND weight > 2500");
表达式中的属性不会自动进行类型转换,例如:
myMessage.setStringProperty("NumberOfOrders","2");
那么此时“NumberOfOrders > 1” 的结果就是会false
Message Groups虽然可以保证具有相同的message group的消息会被唯一的consumer顺序处理,但是却不能确定被哪个consumer处理,在某些情况下,Message Groups可以和JMS Selector一起工作。
??例如:设想三个consumers分别是A,B,C,你可以在producer中为消息设置三个message groups分别为“A","B","C"。然后令Consumer A使用JMSXGroupID=‘A‘作为selector,c和b也同理,这样就保证了message group A的消息只会被A处理,需要注意的是,这种做法有以下缺点:
(1) producer必须直到当前正在运行的consumers,也就是说producer和consumer被耦合到一起。
(2) 如果某个consumer失效,那么应该被这个consumer消费的消息将会一直被积压在broker上。
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://localhost:61617)?randowize=false");
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setMaximumRedeliveries(3);
cf.setRedeliveryPolicy(policy);
??当消息试图被传递的次数超多配置中的maximumRedeliveries属性的值时,那么,broker会认定该消息是一个死消息,并会把该消息发送到死队列中。默认activeMQ中死队列被声明为”ActiveMQ.DLQ",所有不能消费的消息都被传递到该死队列中。你可以在activemq.xml中配置individualDeadLetterStrategy属性,示例如下:
<policyEntry queue=">">
<dealLetterStrategy>
<individualDeadLetterStategy queuePrefix="DLQ."
useQueueForQueueMessage="true"/>
</dealLetterStrategy>
</policyEntry>
? 自动删除过期消息:有时需要直接删除过期的消息而不需要发送到死队列中,可以使用属性processExpired=false来设置,示例如下:
<policyEntry queue=">">
<dealLetterStrategy>
<sharedDeadLetterStategy processExpired="false"/>
</dealLetterStrategy>
</policyEntry>
??存放非持久消息到死队列中:默认情况下,ActiveMQ不会把非持久的死消息发送到死队列中。如果你想非持久的消息 发送到死队列中,需要设置属性processNonPersistent="true",示例如下:
<policyEntry queue=">">
<dealLetterStrategy>
<sharedDeadLetterStategy processNonPersistent="true"/>
</dealLetterStrategy>
</policyEntry>
??RedeliveryPolicy per Destination:在5.7之后,你可以为每一个Destination配置一个Redelivery Policy,示例如:
ActiveMQConnection connection ... // Create a connection
RedeliveryPolicy queuePolicy = new RedeliveryPolicy();
queuePolicy.setInitialRedeliveryDelay(0);
queuePolicy.setRedeliveryDelay(1000);
queuePolicy.setUseExponentialBackOff(false);
queuePolicy.setMaximumRedeliveries(2);
RedeliveryPolicy topicPolicy = new RedeliveryPolicy();
topicPolicy.setInitialRedeliveryDelay(0);
topicPolicy.setRedeliveryDelay(1000);
topicPolicy.setUseExponentialBackOff(false);
topicPolicy.setMaximumRedeliveries(3);
// Receive a message with the JMS API
RedeliveryPolicyMap map = connection.getRedeliveryPolicyMap();
map.put(new ActiveMQTopic(">"), topicPolicy);
map.put(new ActiveMQQueue(">"), queuePolicy);
??Prefetch机制:ActiveMQ通过Prefetch机制来提供性能,方式是在客户端得内存里可能缓存一定数量得消息。缓存消息得数量由prefetch limit来控制。当某个consumer的prefetch buffer已经达到上限,那么broker不会再向consumer分发消息,知道consumer像broker发送消息的确认,确认后的消息将会从缓存中去掉。
可以通过在ActiveMQConnectionFactory或者ActiveMQConnection上设置ActiveMQPrefetchPolicy对象来配置prefetch policy。也可以通过connection options或destination options来配置。例如:
tcp://localhost:61616?jms.prefetchPolicy.all=50
tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
或
queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
consumer = session.createConsumer(queue)
等方式配置
prefetch size缺省的值如下:
慢Consumer处理
慢消费者会在非持久的topics上导致问题,一旦消息积压起来,会导致broker把大量消息保存到内存中,broker也会因此而变慢,目前,ActiveMQ使用Pending Message Limit Strategy来解决这个问题。除了prefetch buffer之外,你还要配置缓存消息的上限,超过这个上限之后,新消息到来时会丢弃旧的消息。
通过在配置文件的destination map中配置pendingMessageLimitStrategy,可以为不同的topic message配置不同的策略。
Pending Message Limit Strategy(等待消息限制策略),目前有以下两种“
<constantPendingMessageLimitStrategy limit="50"/>
<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>
<uniquePropertyMessageEvictionStrategy propertyName="STOCK"/>
表示要抛弃属性名称为Stock的消息。标签:ini 需要 The rand 语法 三种方式 功能 cto 管理
原文地址:https://www.cnblogs.com/yangk1996/p/11667705.html