标签:
1 <dependency> 2 <groupId>org.springframework</groupId> 3 <artifactId>spring-jms</artifactId> 4 <version>4.2.0.RELEASE</version> 5 </dependency> 6 <dependency> 7 <groupId>org.apache.activemq</groupId> 8 <artifactId>activemq-spring</artifactId> 9 <version>5.12.1</version> 10 </dependency>
1 <!-- ======================================= 消费者配置 ============================================--> 2 <!-- 持久化主题订阅者(保证订阅者重启后能获得丢失的消息)ActiveMQ 连接工厂 --> 3 <bean id="consumerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 4 <property name="brokerURL" value="${jms.broker.url}"/> 5 <!-- Durable订阅者必须设置ClientId --> 6 <!--<property name="clientID" value="${jms.client.id.prefix}"/>--> 7 <property name="redeliveryPolicy"> 8 <bean name="defaultRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"> 9 <property name="maximumRedeliveries" value="10"/> 10 <property name="redeliveryDelay" value="60000"/> 11 </bean> 12 </property> 13 </bean> 14 15 <!-- 将Connection、Session和MessageProducer池化,这样可以大大的减少资源消耗 --> 16 <bean id="consumerPooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> 17 <property name="connectionFactory" ref="consumerConnectionFactory"/> 18 <property name="maxConnections" value="50"/> 19 </bean> 20 21 <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> 22 <bean id="consumerCachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> 23 <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> 24 <property name="targetConnectionFactory" ref="consumerPooledConnectionFactory"/> 25 </bean> 26 27 <jms:listener-container connection-factory="consumerCachingConnectionFactory" response-destination-type="queue" acknowledge="client" > 28 <jms:listener destination="${jms.queue.charge.name}" ref="chargeMessageListener" concurrency="50-100" /> 29 <jms:listener destination="${jms.queue.charge.dlq}" ref="chargeMsgDLQListener" concurrency="5"/> 31 </jms:listener-container> 32 33 <!-- 消息监听器 --> 34 <bean id="chargeMessageListener" class="com.virtual.api.jms.ChargeMessageListener"/> 35 <bean id="chargeMsgDLQListener" class="com.virtual.api.jms.ChargeMsgDLQListener"/>
运行一段时间(30 - 60分钟)后系统抛出异常
016-03-15 20:03:10.060 [org.springframework.jms.listener.DefaultMessageListenerContainer#0-39] DEBUG org.springframework.jms.connection.CachingConnectionFactory:486 - Closing cached Session: PooledSession { ActiveMQSession {id=ID:api1-20027-1458039790166-1:1:59,started=false} java.lang.Object@6edd3c62 } 2016-03-15 20:03:10.060 [org.springframework.jms.listener.DefaultMessageListenerContainer#0-33] DEBUG org.springframework.jms.listener.DefaultMessageListenerContainer:860 - Setup of JMS message listener invoker failed - already recovered by other invoker javax.jms.IllegalStateException: The Consumer is closed at org.apache.activemq.ActiveMQMessageConsumer.checkClosed(ActiveMQMessageConsumer.java:870) ~[activemq-client-5.12.1.jar:5.12.1] at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:633) ~[activemq-client-5.12.1.jar:5.12.1] at org.apache.activemq.jms.pool.PooledMessageConsumer.receive(PooledMessageConsumer.java:67) ~[activemq-jms-pool-5.12.1.jar:5.12.1] at org.springframework.jms.connection.CachedMessageConsumer.receive(CachedMessageConsumer.java:82) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:420) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:300) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:253) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1150) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1142) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1039) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79] 2016-03-15 20:03:10.060 [org.springframework.jms.listener.DefaultMessageListenerContainer#2-30] DEBUG org.springframework.jms.connection.CachingConnectionFactory:486 - Closing cached Session: PooledSession { ActiveMQSession {id=ID:api1-20027-1458039790166-1:1:14,started=false} java.lang.Object@3efefdf8 } 2016-03-15 20:03:10.060 [org.springframework.jms.listener.DefaultMessageListenerContainer#0-42] DEBUG org.springframework.jms.connection.CachingConnectionFactory:486 - Closing cached Session: PooledSession { ActiveMQSession {id=ID:api1-20027-1458039790166-1:1:61,started=false} java.lang.Object@2b3d84dc } 2016-03-15 20:03:10.060 [org.springframework.jms.listener.DefaultMessageListenerContainer#2-14] DEBUG org.springframework.jms.connection.CachingConnectionFactory:486 - Closing cached Session: PooledSession { ActiveMQSession {id=ID:api1-20027-1458039790166-1:1:38,started=false} java.lang.Object@299b7cf6 } 2016-03-15 20:03:10.061 [org.springframework.jms.listener.DefaultMessageListenerContainer#0-6] DEBUG org.springframework.jms.connection.CachingConnectionFactory:486 - Closing cached Session: PooledSession { ActiveMQSession {id=ID:api1-20027-1458039790166-1:1:60,started=false} java.lang.Object@611a7f2a } 2016-03-15 20:03:10.061 [org.springframework.jms.listener.DefaultMessageListenerContainer#2-12] DEBUG org.springframework.jms.connection.CachingConnectionFactory:486 - Closing cached Session: PooledSession { ActiveMQSession {id=ID:api1-20027-1458039790166-1:1:42,started=false} java.lang.Object@3873d06a } 2016-03-15 20:03:10.061 [org.springframework.jms.listener.DefaultMessageListenerContainer#0-44] DEBUG org.springframework.jms.connection.CachingConnectionFactory:486 - Closing cached Session: PooledSession { ActiveMQSession {id=ID:api1-20027-1458039790166-1:1:62,started=false} java.lang.Object@58de2fee } 2016-03-15 20:03:10.061 [org.springframework.jms.listener.DefaultMessageListenerContainer#0-50] DEBUG org.springframework.jms.listener.DefaultMessageListenerContainer:860 - Setup of JMS message listener invoker failed - already recovered by other invoker javax.jms.IllegalStateException: The Consumer is closed at org.apache.activemq.ActiveMQMessageConsumer.checkClosed(ActiveMQMessageConsumer.java:870) ~[activemq-client-5.12.1.jar:5.12.1] at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:633) ~[activemq-client-5.12.1.jar:5.12.1] at org.apache.activemq.jms.pool.PooledMessageConsumer.receive(PooledMessageConsumer.java:67) ~[activemq-jms-pool-5.12.1.jar:5.12.1] at org.springframework.jms.connection.CachedMessageConsumer.receive(CachedMessageConsumer.java:82) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:420) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:300) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:253) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1150) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1142) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1039) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79] 2016-03-15 20:03:10.061 [org.springframework.jms.listener.DefaultMessageListenerContainer#0-1] DEBUG org.springframework.jms.listener.DefaultMessageListenerContainer:860 - Setup of JMS message listener invoker failed - already recovered by other invoker javax.jms.IllegalStateException: The Session is closed at org.apache.activemq.ActiveMQSession.checkClosed(ActiveMQSession.java:775) ~[activemq-client-5.12.1.jar:5.12.1] at org.apache.activemq.ActiveMQSession.getTransacted(ActiveMQSession.java:540) ~[activemq-client-5.12.1.jar:5.12.1] at org.apache.activemq.jms.pool.PooledSession.getTransacted(PooledSession.java:260) ~[activemq-jms-pool-5.12.1.jar:5.12.1] at sun.reflect.GeneratedMethodAccessor51.invoke(Unknown Source) ~[na:na] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_79] at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_79] at org.springframework.jms.connection.CachingConnectionFactory$CachedSessionInvocationHandler.invoke(CachingConnectionFactory.java:386) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at com.sun.proxy.$Proxy68.getTransacted(Unknown Source) ~[na:na] at org.springframework.jms.listener.AbstractMessageListenerContainer.commitIfNecessary(AbstractMessageListenerContainer.java:757) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:348) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:253) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1150) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1142) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1039) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79] 2016-03-15 20:03:10.061 [org.springframework.jms.listener.DefaultMessageListenerContainer#0-13] DEBUG org.springframework.jms.listener.DefaultMessageListenerContainer:860 - Setup of JMS message listener invoker failed - already recovered by other invoker javax.jms.IllegalStateException: The Session is closed at org.apache.activemq.ActiveMQSession.checkClosed(ActiveMQSession.java:775) ~[activemq-client-5.12.1.jar:5.12.1] at org.apache.activemq.ActiveMQSession.getTransacted(ActiveMQSession.java:540) ~[activemq-client-5.12.1.jar:5.12.1] at org.apache.activemq.jms.pool.PooledSession.getTransacted(PooledSession.java:260) ~[activemq-jms-pool-5.12.1.jar:5.12.1] at sun.reflect.GeneratedMethodAccessor51.invoke(Unknown Source) ~[na:na] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_79] at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_79] at org.springframework.jms.connection.CachingConnectionFactory$CachedSessionInvocationHandler.invoke(CachingConnectionFactory.java:386) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at com.sun.proxy.$Proxy68.getTransacted(Unknown Source) ~[na:na] at org.springframework.jms.listener.AbstractMessageListenerContainer.commitIfNecessary(AbstractMessageListenerContainer.java:757) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:348) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:253) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1150) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1142) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1039) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79] 2016-03-15 20:03:10.061 [org.springframework.jms.listener.DefaultMessageListenerContainer#0-19] DEBUG org.springframework.jms.listener.DefaultMessageListenerContainer:860 - Setup of JMS message listener invoker failed - already recovered by other invoker javax.jms.IllegalStateException: The Consumer is closed at org.apache.activemq.ActiveMQMessageConsumer.checkClosed(ActiveMQMessageConsumer.java:870) ~[activemq-client-5.12.1.jar:5.12.1] at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:633) ~[activemq-client-5.12.1.jar:5.12.1] at org.apache.activemq.jms.pool.PooledMessageConsumer.receive(PooledMessageConsumer.java:67) ~[activemq-jms-pool-5.12.1.jar:5.12.1] at org.springframework.jms.connection.CachedMessageConsumer.receive(CachedMessageConsumer.java:82) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:420) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:300) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:253) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1150) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1142) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1039) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79] 2016-03-15 20:03:10.061 [org.springframework.jms.listener.DefaultMessageListenerContainer#0-25] DEBUG org.springframework.jms.listener.DefaultMessageListenerContainer:860 - Setup of JMS message listener invoker failed - already recovered by other invoker javax.jms.IllegalStateException: The Consumer is closed at org.apache.activemq.ActiveMQMessageConsumer.checkClosed(ActiveMQMessageConsumer.java:870) ~[activemq-client-5.12.1.jar:5.12.1] at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:633) ~[activemq-client-5.12.1.jar:5.12.1] at org.apache.activemq.jms.pool.PooledMessageConsumer.receive(PooledMessageConsumer.java:67) ~[activemq-jms-pool-5.12.1.jar:5.12.1] at org.springframework.jms.connection.CachedMessageConsumer.receive(CachedMessageConsumer.java:82) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:420) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:300) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:253) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1150) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1142) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1039) ~[spring-jms-4.2.0.RELEASE.jar:4.2.0.RELEASE] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79] 2016-03-15 20:03:10.061 [org.springframework.jms.listener.DefaultMessageListenerContainer#2-13] DEBUG org.springframework.jms.connection.CachingConnectionFactory:486 - Closing cached Session: PooledSession { ActiveMQSession {id=ID:api1-20027-1458039790166-1:1:41,started=false} java.lang.Object@16535394 }
1 <!-- 将Connection、Session和MessageProducer池化,这样可以大大的减少资源消耗 --> 2 <bean id="consumerPooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> 3 <property name="connectionFactory" ref="consumerConnectionFactory"/> 4 <property name="maxConnections" value="50"/> 5 <property name="idleTimeout" value="0"/> 6 </bean>
1 <!-- a pooling based JMS provider --> 2 <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> 3 <property name="connectionFactory"> 4 <bean class="org.apache.activemq.ActiveMQConnectionFactory"> 5 <property name="brokerURL"> 6 <value>tcp://localhost:61616</value> 7 </property> 8 </bean> 9 </property> 10 </bean> 11 12 <!-- Spring JMS Template --> 13 <bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 14 <property name="connectionFactory"> 15 <ref local="jmsFactory"/> 16 </property> 17 </bean>
1 <!-- ======================================= 生产者配置 ============================================--> 2 <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> 3 <bean id="producerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 4 <property name="brokerURL" value="${jms.broker.url}"/> 5 <property name="useAsyncSend" value="true"/> 6 <!-- clinetIDPrefix属性, 符合此前缀规则的consumer将成为该TOPIC的持久订阅者, 才可以获得此消息 --> 7 <!--<property name="clientIDPrefix" value="${jms.client.id.prefix}"/>--> 8 </bean> 9 10 <!-- 将Connection、Session和MessageProducer池化,这样可以大大的减少资源消耗 --> 11 <bean id="producerPooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> 12 <property name="connectionFactory" ref="producerConnectionFactory"/> 13 <property name="maxConnections" value="30"/> 14 </bean> 15 16 <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> 17 <bean id="defaultJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 18 <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> 19 <property name="connectionFactory" ref="producerPooledConnectionFactory"/> 20 <!-- 使 deliveryMode, priority, timeToLive设置生效--> 21 <property name="explicitQosEnabled" value="true"/> 22 <!-- 设置NON_PERSISTENT模式, 默认为PERSISTENT --> 23 <property name="deliveryPersistent" value="true"/> 24 <!-- 设置优先级0-9, 默认为4 --> 25 <property name="priority" value="4"/> 26 </bean>
1 <!-- ======================================= 生产者配置 ============================================--> 2 <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> 3 <bean id="producerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 4 <property name="brokerURL" value="${jms.broker.url}"/> 5 <property name="useAsyncSend" value="true"/> 6 <!-- clinetIDPrefix属性, 符合此前缀规则的consumer将成为该TOPIC的持久订阅者, 才可以获得此消息 --> 7 <!--<property name="clientIDPrefix" value="${jms.client.id.prefix}"/>--> 8 </bean> 9 10 <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> 11 <bean id="producerCachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> 12 <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> 13 <property name="targetConnectionFactory" ref="producerConnectionFactory"/> 14 <property name="reconnectOnException" value="true"/> 15 <property name="cacheConsumers" value="false"/> 16 <property name="cacheProducers" value="false"/> 17 <property name="sessionCacheSize" value="50"/> 18 </bean> 19 20 <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> 21 <bean id="defaultJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 22 <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> 23 <property name="connectionFactory" ref="producerCachingConnectionFactory"/> 24 <!-- 使 deliveryMode, priority, timeToLive设置生效--> 25 <property name="explicitQosEnabled" value="true"/> 26 <!-- 设置NON_PERSISTENT模式, 默认为PERSISTENT --> 27 <property name="deliveryPersistent" value="true"/> 28 <!-- 设置优先级0-9, 默认为4 --> 29 <property name="priority" value="4"/> 30 </bean>
1 <!-- ======================================= 生产者配置 ============================================--> 2 <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> 3 <bean id="producerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 4 <property name="brokerURL" value="${jms.broker.url}"/> 5 <property name="useAsyncSend" value="true"/> 6 <!-- clinetIDPrefix属性, 符合此前缀规则的consumer将成为该TOPIC的持久订阅者, 才可以获得此消息 --> 7 <!--<property name="clientIDPrefix" value="${jms.client.id.prefix}"/>--> 8 </bean> 9 10 <!-- 将Connection、Session和MessageProducer池化,这样可以大大的减少资源消耗 --> 11 <bean id="producerPooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> 12 <property name="connectionFactory" ref="producerConnectionFactory"/> 13 <property name="maxConnections" value="30"/> 14 </bean> 15 16 <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> 17 <bean id="producerCachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> 18 <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> 19 <property name="targetConnectionFactory" ref="producerConnectionFactory"/> 20 <property name="reconnectOnException" value="true"/> 21 <property name="cacheConsumers" value="false"/> 22 <property name="cacheProducers" value="false"/> 23 <property name="sessionCacheSize" value="50"/> 24 </bean> 25 26 <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> 27 <bean id="defaultJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 28 <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> 29 <property name="connectionFactory" ref="producerCachingConnectionFactory"/> 30 <!-- 使 deliveryMode, priority, timeToLive设置生效--> 31 <property name="explicitQosEnabled" value="true"/> 32 <!-- 设置NON_PERSISTENT模式, 默认为PERSISTENT --> 33 <property name="deliveryPersistent" value="true"/> 34 <!-- 设置优先级0-9, 默认为4 --> 35 <property name="priority" value="4"/> 36 </bean>
2016-03-16 14:16:17.036 [org.springframework.jms.listener.DefaultMessageListenerContainer#0-13] WARN org.springframework.jms.listener.DefaultMessageListenerContainer:871 - Setup of JMS message listener invoker failed for destination ‘q.virtual.charge‘ - trying to recover. Cause: The Consumer is closed 2016-03-16 14:16:17.051 [org.springframework.jms.listener.DefaultMessageListenerContainer#1-1] WARN org.springframework.jms.listener.DefaultMessageListenerContainer:871 - Setup of JMS message listener invoker failed for destination ‘DLQ.q.virtual.charge‘ - trying to recover. Cause: The Consumer is closed 2016-03-16 14:16:17.055 [org.springframework.jms.listener.DefaultMessageListenerContainer#2-1] WARN org.springframework.jms.listener.DefaultMessageListenerContainer:871 - Setup of JMS message listener invoker failed for destination ‘q.virtual.retry.charge‘ - trying to recover. Cause: The Consumer is closed 2016-03-16 14:16:17.056 [ActiveMQ Task-1] INFO org.apache.activemq.transport.failover.FailoverTransport:1068 - Successfully connected to tcp://xxxx:61616 2016-03-16 14:16:17.056 [ActiveMQ Task-1] INFO org.apache.activemq.transport.failover.FailoverTransport:1068 - Successfully connected to tcp://xxx:61616 2016-03-16 14:16:17.065 [org.springframework.jms.listener.DefaultMessageListenerContainer#1-1] INFO org.springframework.jms.listener.DefaultMessageListenerContainer:921 - Successfully refreshed JMS Connection 2016-03-16 14:16:17.065 [org.springframework.jms.listener.DefaultMessageListenerContainer#0-13] INFO org.springframework.jms.listener.DefaultMessageListenerContainer:921 - Successfully refreshed JMS Connection 2016-03-16 14:16:17.079 [ActiveMQ Task-1] INFO org.apache.activemq.transport.failover.FailoverTransport:1068 - Successfully connected to tcp://172.17.4.101:61616 2016-03-16 14:16:17.086 [org.springframework.jms.listener.DefaultMessageListenerContainer#2-1] INFO org.springframework.jms.listener.DefaultMessageListenerContainer:921 - Successfully refreshed JMS Connection
最终的配置文件
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" 4 xmlns:jms="http://www.springframework.org/schema/jms" 5 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 6 http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd 7 http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd"> 8 9 <!-- ======================================= 生产者配置 ============================================--> 10 <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> 11 <bean id="producerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 12 <property name="brokerURL" value="${jms.broker.url}"/> 13 <property name="useAsyncSend" value="true"/> 14 <!-- clinetIDPrefix属性, 符合此前缀规则的consumer将成为该TOPIC的持久订阅者, 才可以获得此消息 --> 15 <!--<property name="clientIDPrefix" value="${jms.client.id.prefix}"/>--> 16 </bean> 17 18 <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> 19 <bean id="producerCachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> 20 <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> 21 <property name="targetConnectionFactory" ref="producerConnectionFactory"/> 22 <property name="reconnectOnException" value="true"/> 23 <property name="cacheConsumers" value="false"/> 24 <property name="cacheProducers" value="false"/> 25 <property name="sessionCacheSize" value="50"/> 26 </bean> 27 28 <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> 29 <bean id="defaultJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 30 <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> 31 <property name="connectionFactory" ref="producerCachingConnectionFactory"/> 32 <!-- 使 deliveryMode, priority, timeToLive设置生效--> 33 <property name="explicitQosEnabled" value="true"/> 34 <!-- 设置NON_PERSISTENT模式, 默认为PERSISTENT --> 35 <property name="deliveryPersistent" value="true"/> 36 <!-- 设置优先级0-9, 默认为4 --> 37 <property name="priority" value="4"/> 38 </bean> 39 40 <!-- ======================================= 消费者配置 ============================================--> 41 <!-- 持久化主题订阅者(保证订阅者重启后能获得丢失的消息)ActiveMQ 连接工厂 --> 42 <bean id="consumerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 43 <property name="brokerURL" value="${jms.broker.url}"/> 44 <!-- Durable订阅者必须设置ClientId --> 45 <!--<property name="clientID" value="${jms.client.id.prefix}"/>--> 46 <property name="redeliveryPolicy"> 47 <bean name="defaultRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"> 48 <property name="maximumRedeliveries" value="10"/> 49 <property name="redeliveryDelay" value="60000"/> 50 </bean> 51 </property> 52 </bean> 53 54 <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> 55 <bean id="consumerCachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> 56 <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> 57 <property name="targetConnectionFactory" ref="consumerConnectionFactory"/> 58 <property name="reconnectOnException" value="true"/> 59 <property name="cacheConsumers" value="false"/> 60 <property name="cacheProducers" value="false"/> 61 <property name="sessionCacheSize" value="50"/> 62 </bean> 63 64 <jms:listener-container connection-factory="consumerCachingConnectionFactory" response-destination-type="queue" acknowledge="client" > 65 <jms:listener destination="${jms.queue.charge.name}" ref="chargeMessageListener" concurrency="50-100" /> 66 <jms:listener destination="${jms.queue.charge.dlq}" ref="chargeMsgDLQListener" concurrency="5" 67 selector="isPutBack IS NULL OR isPutBack=false"/> 68 <jms:listener destination="${jms.queue.retry.charge.name}" ref="retryChargeMessageListener" concurrency="50-100" /> 69 </jms:listener-container> 70 71 <!-- 消息监听器 --> 72 <bean id="chargeMessageListener" class="com.virtual.api.jms.ChargeMessageListener"/> 73 <bean id="chargeMsgDLQListener" class="com.virtual.api.jms.ChargeMsgDLQListener"/> 74 <bean id="retryChargeMessageListener" class="com.virtual.api.jms.RetryChargeMessageListener"/> 75 76 77 </beans>
ActiveMQ抛出异常javax.jms.IllegalStateException: The Session is closed
标签:
原文地址:http://www.cnblogs.com/coderyi/p/5284428.html