标签:min() handler code spring executor mysq 写入 art rabbit
随着对消息队列的应用日益推广,在分布式系统中的使用可以极大的降低对各个组件间的耦合度,从而提高组件的处理效率。因为消息队列的存在,可以使我们对任务进行异步处理,这样可以减少请求响应时间和解耦。同时由于使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦和。2019-05-31 17:42:36.798 WARN 30544 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1506)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1417)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.AmqpException: No method found for class java.util.LinkedHashMap
at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:147)
at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getMethodNameFor(DelegatingInvocableHandler.java:250)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.getMethodAsString(HandlerAdapter.java:70)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:190)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:120)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414)
... 8 common frames omitted
至于说开发的源码,我是这么写的,我在class这里进行注解,这个时候我猜测,应该是注解的位置不对
@Component
@RabbitListener(queues = "xx.yy.zz")
public class Receiver {
@RabbitHandler
public void process(MSGSTO message) {
System.out.println("消费消息");
System.out.println(message.toString());
}
}
事实上,确实是位置不对,但更加专业的解答方式是,这个listener注解是方法级别上的,而不能用在class上面,我们不妨来看下RabbitListener的源码,从根本上理解这个方法的使用。
package org.springframework.amqp.rabbit.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Repeatable;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.messaging.handler.annotation.MessageMapping;
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(RabbitListeners.class)
public @interface RabbitListener {
String id() default "";
String containerFactory() default "";
String[] queues() default {};
Queue[] queuesToDeclare() default {};
boolean exclusive() default false;
String priority() default "";
String admin() default "";
QueueBinding[] bindings() default {};
String group() default "";
String returnExceptions() default "";
String errorHandler() default "";
String concurrency() default "";
String autoStartup() default "";
}
由于业务需要,我们确实是需要对消息进行异步处理,而异步接收消息的最简单的方法是使用带注解的监听端点基础结构。简而言之,它允许将托管bean的方法公开为Rabbit listener的端点。br/>在这里,使用queues属性时,可以指定关联的容器可以监听多个队列。可以使用@Header注释来创建POJO方法可接收消息的队列名称。
这里我通过queues来指定监听的队列
@Component
public class Receiver {
@RabbitListener(queues = "xx.yy.zz")
@RabbitHandler
public void process(MSGSTO message) {
System.out.println("消费消息");
System.out.println(message.toString());
至于说配置方式,我是通过application.yml的形式进行接入配置的,例如
rabbitmq:
addresses: 127.0.0.1
port: 5672
username: guest
password: guest
publisher-confirms: true
publisher-returns: true
virtual-host: dev
listener:
simple:
concurrency: 10
max-concurrency: 20
这些属性会被注入到RabbitProperties属性中,如
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitProperties {
…
}
挺有趣的对吧:)
参考资料:
- 为什么使用消息队列,https://www.javazhiyin.com/22897.html
标签:min() handler code spring executor mysq 写入 art rabbit
原文地址:https://blog.51cto.com/yerikyu/2404468