public void onMessage(Message message) {//这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换,或者直接把onMessage方法的参数改成Message的子类TextMessageTextMessage textMessage = (TextMessage)message;System. out.println( "接收到一个纯文本消息" );try {System. out.println( "消息内容是:" + textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}}
xmlns:tx= "http://www.springframework.org/schema/tx" xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance"xmlns:context= "http://www.springframework.org/schema/context" xmlns:jms= "http://www.springframework.org/schema/jms"xsi:schemaLocation= "http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsdhttp://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsdhttp://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.1.xsdhttp://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd ">
<bean id ="connectionFactory" class= "org.springframework.jms.connection.CachingConnectionFactory" ><property name ="targetConnectionFactory"><bean class= "org.apache.activemq.ActiveMQConnectionFactory" ><property name ="brokerURL"><value >tcp://localhost:61616 </value ></property ></bean ></property ><property name ="sessionCacheSize" value= "1" /></bean >
<!-- Spring jmsTemplate queue --><bean id ="jmsTemplate" class= "org.springframework.jms.core.JmsTemplate" ><property name ="connectionFactory" ref= "connectionFactory"></property ><property name ="defaultDestinationName" value= "subject"></property ><property name ="deliveryPersistent" value= "true"></property ><property name ="pubSubDomain" value="false"></ property> <!-- false p2p,true topic --><property name ="sessionAcknowledgeMode" value= "1"></property ><property name ="explicitQosEnabled" value= "true"></property ><property name ="timeToLive" value="604800000"></ property></bean ><!-- 配置Queue,其中value为Queue名称->start --><bean id = "testQueue" class = "org.apache.activemq.command.ActiveMQQueue" ><constructor-arg index = "0" value ="${pur.test.add}" /></bean ><bean id = "sessionAwareQueue" class = "org.apache.activemq.command.ActiveMQQueue" ><constructor-arg index = "0" value= "queue.liupeng.sessionaware" /></bean ><!-- 配置Queue,其中value为Queue名称->end --><!-- 注入AMQ的实现类属性(JmsTemplate和Destination) --><bean id = "amqQueueSender" class = "com.tuniu.scc.purchase.plan.manage.core.amq.AMQQueueSender" ><property name = "jmsTemplate" ref="jmsTemplate" ></property ><property name = "testQueue" ref="testQueue" ></property ><property name = "sessionAwareQueue" ref= "sessionAwareQueue"></property ></bean ><!-- 消息发送必用的发送类 --><bean id = "multiThreadAMQSender" class ="com.tuniu.scc.purchase.plan.manage.core.amq.MultiThreadAMQSender"init-method= "init"><property name = "jmsTemplate" ref="jmsTemplate" ></property ><property name = "multiThreadAMQExecutor" ref= "multiThreadAMQExecutor" ></property ></bean ><!-- 消息监听器->start --><bean id = "consumerMessageListener" class= "com.tuniu.scc.purchase.plan.manage.core.amq.ConsumerMessageListener" /><!-- 消息监听容器 --><bean id = "jmsContainer" class= "org.springframework.jms.listener.DefaultMessageListenerContainer" ><property name = "connectionFactory" ref= "connectionFactory" /><property name = "destination" ref= "testQueue" /> <!-- 消费者队列名称,修改 --><property name = "messageListener" ref= "consumerMessageListener" /></bean ><bean id = "consumerSessionAwareMessageListener" class ="com.tuniu.scc.purchase.plan.manage.core.amq.ConsumerSessionAwareMessageListener" ><property name ="testQueue" ref="testQueue"/> <!-- 接收消息后返回给testQueue队列 --></bean >< bean id= "sessionAwareListenerContainer" class= "org.springframework.jms.listener.DefaultMessageListenerContainer" ><property name ="connectionFactory" ref= "connectionFactory" /><property name ="destination" ref="sessionAwareQueue" /><property name ="messageListener" ref= "consumerSessionAwareMessageListener" /></bean ><!-- 消息监听器->end --></beans>
private AMQQueueSender amqQueueSender;
private static final Logger LOG = LoggerFactory.getLogger(AMQController. class);
@UvConfig(method = "testQueue", description = "测试AMQ")@RequestMapping(value = "/testQueue", method = RequestMethod. POST)@TSPServiceInfo(name = "PUR.NM.AMQController.testQueue" , description = "测试AMQ")public void testQueue(HttpServletRequest request, HttpServletResponse response) {try {long beginTime = System. currentTimeMillis();LOG.info( "发送开始");//amqQueueSender.sendMessage("test", StaticProperty.TEST_QUEUE);amqQueueSender.sendMessage( "test", StaticProperty.TEST_SESSIONAWARE_QUEUE );LOG.info( "发送结束,耗时:" +(System.currentTimeMillis()-beginTime)+ "ms");} catch (InterruptedException e) {LOG.error( "测试失败", e);}}
private Destination testQueue; //返回消息目的队列@Overridepublic void onMessage(TextMessage message, Session session) throws JMSException {System. out.println( "收到一条消息" );System. out.println( "消息内容是:" +message.getText());MessageProducer producer = session.createProducer( testQueue);Message txtMessage = session.createTextMessage("consumerSessionAwareMessageListener..." );producer.send(txtMessage);}public Destination getTestQueue() {return testQueue;}public void setTestQueue(Destination sessionAwareQueue) {this.testQueue = sessionAwareQueue;}}
收到一条消息消息内容是:test接收到一个纯文本消息消息内容是:consumerSessionAwareMessageListener...
public void handleMessage(String message) {System. out.println( "ConsumerListener通过handleMessage接收到一个纯文本消息,消息内容是:" + message);}public void receiveMessage(String message) {System.out.println("ConsumerListener通过receiveMessage接收到一个纯文本消息,消息内容是:" + message);}}
版权声明:本文为博主原创文章,未经博主允许不得转载。
原文地址:http://blog.csdn.net/liupeng_family/article/details/46774441