前面文章《Apache ActiveMQ 负载均衡 》在最后有提到一个场景,就是当AMQ的节点数大于2个的时候(HA + LB),且配置了消息回流的情况下的一些问题。
HA + LB的基本结构如下图:
问题即发生在当生产者将消息投递到Master节点后(AMQ SERVER),消费者与A节点建立连接(Broker),根据AMQ的“预先消费”策略预先消费了一定数量的消息,即A节点消费了Master节点的一部分消息,A节点在将消息转发至消费者Consumer。
消费者在消费过程中,A节点意外宕机,消费者根据failover机制会自动连接至B或C节点,想继续消费剩余的消息(A节点未消费完成的消息)。
那么我们可以按如下的配置方式即可解决该场景下的问题:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <!-- Allows us to use system properties as variables in this configuration file --> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <value>file:${activemq.conf}/credentials.properties</value> </property> </bean> <!-- The <broker> element is used to configure the ActiveMQ broker. --> <broker xmlns="http://activemq.apache.org/schema/core" brokerName="broker129" dataDirectory="${activemq.data}"> <!-- Destination specific policies using destination names or wildcards --> <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">"> </policyEntry> <policyEntry queue=">" enableAudit="false"> <deadLetterStrategy> <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" /> </deadLetterStrategy> <networkBridgeFilterFactory> <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/> </networkBridgeFilterFactory> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> <!-- The managementContext is used to configure how ActiveMQ is exposed in JMX. By default, ActiveMQ uses the MBean server that is started by the JVM. For more information, see: http://activemq.apache.org/jmx.html --> <managementContext> <managementContext createConnector="false"/> </managementContext> <!-- duplex Broker双工模式,即:Broker可以是消费者也可以是发布者。 该参数同“消息回流”(replayWhenNoConsumers)不同,注意区分。 --> <!-- networkTTL 即:信息和订阅在网络可以通过的Broker数量。该参数需要根据LB数量合理设置 --> <networkConnectors> <networkConnector duplex="true" networkTTL="3" uri="static:(tcp://192.168.137.200:61616)"/> </networkConnectors> <!-- Configure message persistence for the broker. The default persistence mechanism is the KahaDB store (identified by the kahaDB tag). For more information, see: http://activemq.apache.org/persistence.html --> <persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter> <!-- The systemUsage controls the maximum amount of space the broker will use before disabling caching and/or slowing down producers. For more information, see: http://activemq.apache.org/producer-flow-control.html --> <systemUsage> <systemUsage> <memoryUsage> <memoryUsage percentOfJvmHeap="70" /> </memoryUsage> <storeUsage> <storeUsage limit="100 gb"/> </storeUsage> <tempUsage> <tempUsage limit="50 gb"/> </tempUsage> </systemUsage> </systemUsage> <!-- The transport connectors expose ActiveMQ over a given protocol to clients and other brokers. For more information, see: http://activemq.apache.org/configuring-transports.html --> <transportConnectors> <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> <transportConnector name="openwire" uri="nio://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> </transportConnectors> <!-- destroy the spring context on shutdown to stop jetty --> <shutdownHooks> <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" /> </shutdownHooks> </broker> <!-- Enable web consoles, REST and Ajax APIs and demos The web consoles requires by default login, you can disable this in the jetty.xml file Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details --> <import resource="jetty.xml"/> </beans>
原文地址:http://blog.csdn.net/jason5186/article/details/44812187