位置在安装目录的wrapper.log中,例如我的具体路径为:/Users/shen/Downloads/apache-activemq-5.15.4/data/wrapper.log中
WrapperSimpleApp: Unable to locate the class org.apache.activemq.console.Main: java.lang.UnsupportedClassVersionError: org/apache/activemq/console/Main : Unsupported major.minor version 52.0
-------queue(点对点)的配置:
生产者:
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://127.0.0.1:61616"
userName="admin"
password="admin"/>
<!-- 配置JMS连接工长 -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="amqConnectionFactory" />
<property name="sessionCacheSize" value="100" />
</bean>
<!-- 定义消息队列(Queue) -->
<bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 设置消息队列的名字 -->
<constructor-arg>
<value>Jaycekon</value>
</constructor-arg>
</bean>
<!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination" ref="demoQueueDestination" />
<property name="receiveTimeout" value="10000" />
<!-- true是topic,false是queue,默认是false,此处显示写出false -->
<property name="pubSubDomain" value="false" />
</bean>
消费者(监听器):
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://127.0.0.1:61616"
userName="admin"
password="admin"/>
<!-- 配置JMS连接工长 -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="amqConnectionFactory"/>
<property name="sessionCacheSize" value="100"/>
</bean>
<!-- 定义消息队列(Queue) -->
<bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 设置消息队列的名字 -->
<constructor-arg>
<value>Jaycekon</value>
</constructor-arg>
</bean>
<!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="defaultDestination" ref="demoQueueDestination"/>
<property name="receiveTimeout" value="10000"/>
<!-- true是topic,false是queue,默认是false,此处显示写出false -->
<property name="pubSubDomain" value="false"/>
</bean>
<!-- 配置消息 队列监听者(Queue) -->
<bean id="queueMessageListener" class="yacol.domain.mq.handler.YacolMessageQueueListener"/>
<!-- 显示注入消息监听容器(Queue),配置连接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 -->
<bean id="queueListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="demoQueueDestination"/>
<property name="messageListener" ref="queueMessageListener"/>
</bean>
--------topic(发布订阅)的配置:
生产者:
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://127.0.0.1:61616"
userName="admin"
password="admin"/>
<!-- 配置JMS连接工长 -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="amqConnectionFactory"/>
<property name="sessionCacheSize" value="100"/>
</bean>
<!--这个是主题(topic)目的地,一对多的 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="goodsAddTopic" />
</bean>
<!-- topic(主题,发布/订阅),一对多 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="defaultDestination" ref="topicDestination"></property>
<property name="pubSubDomain" value="true"/>
<!-- 设置接收超时时间 60秒 -->
<property name="receiveTimeout" value="10000"/>
<!-- 消息不持久化 -->
<property name="explicitQosEnabled" value="true"></property>
</bean>
消费者(监听器):
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://127.0.0.1:61616"
userName="admin"
password="admin"/>
<!-- 配置JMS连接工长 -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="amqConnectionFactory"/>
<property name="sessionCacheSize" value="100"/>
</bean>
<!--这个是主题(topic)目的地,一对多的 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="goodsAddTopic"/>
</bean>
<!-- topic(主题,发布/订阅),一对多 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="pubSubDomain" value="true"/>
<!-- 设置接收超时时间 60秒 -->
<property name="receiveTimeout" value="10000"/>
<!-- 消息不持久化 -->
<property name="explicitQosEnabled" value="true"></property>
</bean>
<!-- 消息监听类 -->
<bean id="goodsAddMessageListener" class="yacol.domain.mq.handler.YacolMessageTopicListener"/>
<!-- 消息监听器 -->
<bean id="topicListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"></property>
<property name="destination" ref="topicDestination"></property>
<property name="messageListener" ref="goodsAddMessageListener"></property>
</bean>
三、使用总结
消息发送的是一个对象(Emlpoyee)时需要将对象转换成Message对象,需要一个转换类去处理
public class MessageConvert {
public static Message toMessage(Object obj, Session session) throws JMSException, MessageConversionException {
JSONObject jsonRoot = new JSONObject();
JSONObject jsonObj = new JSONObject();
jsonObj.put("topic", "MSG_PUSH");
jsonObj.put("body", obj);
jsonObj.put("isFromJava", true);
jsonRoot.put("value", jsonObj.toJSONString());
Message message = session.createMapMessage();
message.setObjectProperty("obj", jsonRoot.toJSONString());
return message;
}
public static Object fromMessage(Message message) throws JMSException, MessageConversionException {
JSONObject jsonRoot = null;
jsonRoot = (JSONObject) JSON.parse(message.getStringProperty("obj"));
JSONObject jsonObj = JSONObject.parseObject(jsonRoot.getString("value"));
EmployeeDomain employee = JSON.toJavaObject(jsonObj.getJSONObject("body"), EmployeeDomain.class);
return employee;
}
}
在实际的工作中会遇到一种问题:
重启应用的时候可能会导致消息的丢失,如某个应用在接受消息之后还没来得及处理就被重启,并且这条消息从队列中删除,那么这个消息就会被丢失。这个其实就是一个消息确认机制的问题,是在取到消息就立即确认还是应用处理完再确认的问题。可以看看下面的文章
消息确认机制: