标签:typename 字符串 inf eth val cout ble cto pass
结合实地代码的MQ应用:
1、配置

spring-context.xml中的配置:

spring-jms.xml 配置:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd">
<description>spring-mq CHO3.0三网</description>
<!-- JMS连接工厂_X网 -->
<bean id="jmsConnectionFactoryByCHP30" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${jms.brokerURL}" />
<property name="closeTimeout" value="${jms.closeTimeout}" />
<property name="userName" value="${jms.userName}" />
<property name="password" value="${jms.password}" />
<property name="optimizedAckScheduledAckInterval" value="${jms.optimizedAckScheduledAckInterval}" />
</bean>
</property>
</bean>
<!-- JMS连接工厂_CRM系统 -->
<bean id="jmsConnectionFactoryByMCRM" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${jms.brokerURL.MCRM}" />
<property name="closeTimeout" value="${jms.closeTimeout.MCRM}" />
<property name="userName" value="${jms.userName.MCRM}" />
<property name="password" value="${jms.password.MCRM}" />
<property name="optimizedAckScheduledAckInterval" value="${jms.optimizedAckScheduledAckInterval.MCRM}" />
</bean>
</property>
</bean>
</beans>
spring-jms-triple.xml 配置:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd">
<description>spring-mq CHO3.0三网通</description>
<!-- ==================== A010 (发送) : 组织机构信息同步 ==================== -->
<!-- 消息定义 :CPO3.0三网通 -->
<bean id="OrgInfo" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="XH.DJR.Queue.A010.OrgInfo,XH.JX.Queue.A010.OrgInfo,XH.CX.Queue.A010.OrgInfo"/>
</bean>
<!-- 消息模板 :CPO3.0三网通 -->
<bean id="jmsTemplate_OrgInfoSync" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsConnectionFactoryByCPO30" />
<property name="defaultDestination" ref="OrgInfo" />
<property name="pubSubDomain" value="true" />
<property name="receiveTimeout" value="${jms.receiveTimeout}" />
<property name="explicitQosEnabled" value="true"/>
<!-- 发送模式(2:持久) -->
<property name="deliveryMode" value="2"/>
</bean>
<!-- ==================== A011 (发送) : 理财经理信息同步 ==================== -->
<!-- 消息定义 :CPO3.0三网通(大金X/X信) -->
<bean id="LeadManagerInfo" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="XH.DJR.Queue.A011.LeadManagerInfo,XH.JX.Queue.A011.LeadManagerInfo"/>
</bean>
<!-- 消息定义 :CPO3.0三网通(创新) -->
<bean id="LeadManagerInfoByCX" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="XH.CX.Queue.A011.LeadManagerInfo"/>
</bean>
<!-- 消息模板 :CPO3.0三网通 -->
<bean id="jmsTemplate_LeadManagerInfoSync" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsConnectionFactoryByCPO30" />
<property name="defaultDestination" ref="LeadManagerInfo" />
<property name="pubSubDomain" value="true" />
<property name="receiveTimeout" value="${jms.receiveTimeout}" />
<property name="explicitQosEnabled" value="true"/>
<property name="deliveryMode" value="2"/>
</bean>
<!-- 消息模板 :CPO3.0三网通(创新) -->
<bean id="jmsTemplate_LeadManagerInfoSyncByCX" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsConnectionFactoryByCPO30" />
<property name="defaultDestination" ref="LeadManagerInfoByCX" />
<property name="pubSubDomain" value="true" />
<property name="receiveTimeout" value="${jms.receiveTimeout}" />
<property name="explicitQosEnabled" value="true"/>
<property name="deliveryMode" value="2"/>
</bean>
<!-- ==================== A012 (接收) : 新增客户信息 ==================== -->
<!-- 消息定义 :CPO3.0三网通 -->
<bean id="NewCustomer" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="XH.CPO.Queue.A012.NewCustomer"/>
</bean>
<!-- 消息监听模板 :CPO3.0三网通 -->
<bean id="listenerContainer_NewCustomer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactoryByCPO30" />
<property name="receiveTimeout" value="${jms.receiveTimeout}"/>
<property name="destination" ref="NewCustomer" />
<property name="messageListener" ref="triple_NewCustomerListenerService" />
</bean>
<!-- ==================== A013 (接收) : 客户首单 ==================== -->
<!-- 消息定义 :CPO3.0三网通 -->
<bean id="FirstOrder" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="XH.CPO.Queue.A013.FirstOrder"/>
</bean>
<!-- 消息监听模板 :CPO3.0三网通 -->
<bean id="listenerContainer_FirstOrder"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactoryByCPO30" />
<property name="receiveTimeout" value="${jms.receiveTimeout}"/>
<property name="destination" ref="FirstOrder" />
<property name="messageListener" ref="triple_FirstOrderListenerService" />
</bean>
<!-- ==================== A014 (发送) : 客户理财经理变更 ==================== -->
<!-- 消息定义 :CPO3.0三网通 -->
<bean id="ChangeCustomerEmp" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="XH.DJR.Queue.A014.ChangeCustomerEmp,XH.JX.Queue.A014.ChangeCustomerEmp,XH.CX.Queue.A014.ChangeCustomerEmp,XH.ZCJ.Queue.A014.ChangeCustomerEmp"/>
</bean>
<!-- 消息定义 :MCRM系统 -->
<bean id="ChangeCustomerEmpByMCRM" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="XH.Topic.A014.ChangeCustomerEmp"/>
</bean>
<!-- 消息模板 :CPO3.0三网通 -->
<bean id="jmsTemplate_ChangeCustomerEmp" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsConnectionFactoryByCPO30" />
<property name="defaultDestination" ref="ChangeCustomerEmp" />
<property name="pubSubDomain" value="true" />
<property name="receiveTimeout" value="${jms.receiveTimeout}" />
<property name="explicitQosEnabled" value="true"/>
<property name="deliveryMode" value="2"/>
</bean>
<!-- 消息模板 :MCRM系统 -->
<bean id="jmsTemplate_ChangeCustomerEmpByMCRM" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsConnectionFactoryByMCRM" />
<property name="defaultDestination" ref="ChangeCustomerEmpByMCRM" />
<property name="pubSubDomain" value="true" />
<property name="receiveTimeout" value="${jms.receiveTimeout.MCRM}" />
<property name="explicitQosEnabled" value="true"/>
<property name="deliveryMode" value="2"/>
</bean>
<!-- ==================== A015 (接收) : 客户手机号变更 ==================== -->
<!-- 消息定义 :CPO3.0三网通 -->
<bean id="ChangePhone" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="XH.CPO.Queue.A015.ChangePhone"/>
</bean>
<!-- 消息模板 :CPO3.0三网通 -->
<bean id="listenerContainer_ChangePhone"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactoryByCPO30" />
<property name="receiveTimeout" value="${jms.receiveTimeout}"/>
<property name="destination" ref="ChangePhone" />
<property name="messageListener" ref="triple_ChangePhoneListenerService" />
</bean>
<!-- ==================== A016 (发送) : 用户角色信息同步 ==================== -->
<!-- 消息定义 :CPO3.0三网通 -->
<bean id="RoleInfoSync" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="XH.CX.Queue.A016.RoleInfo"/>
</bean>
<!-- 消息模板 :CPO3.0三网通 -->
<bean id="jmsTemplate_RoleInfoSync" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsConnectionFactoryByCPO30" />
<property name="defaultDestination" ref="RoleInfoSync" />
<property name="pubSubDomain" value="true" />
<property name="receiveTimeout" value="${jms.receiveTimeout}" />
<property name="explicitQosEnabled" value="true"/>
<property name="deliveryMode" value="2"/>
</bean>
<!-- ==================== A030 (CPO3.0三网通-发送 /监听) : 消息回执 ==================== -->
<!-- 定义消息(接收) : 消息回馈 -->
<bean id="CallBack" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="XH.CPO.Queue.A030.CallBack"/>
</bean>
<!-- 定义消息(发送) : 消息回馈(大金X) -->
<bean id="DJR_CallBack" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="XH.DJR.Queue.A030.CallBack"/>
</bean>
<!-- 定义消息(发送) : 消息回馈(资产家) -->
<bean id="ZCJ_CallBack" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="XH.ZCJ.Queue.A030.CallBack"/>
</bean>
<!-- 定义消息(发送) : 消息回馈(X信) -->
<bean id="JX_CallBack" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="XH.JX.Queue.A030.CallBack"/>
</bean>
<!-- 定义消息(发送) : 消息回馈(创新) -->
<bean id="CX_CallBack" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="XH.CX.Queue.A030.CallBack"/>
</bean>
<!-- 消息监听容器(接收) : 回执接收 -->
<bean id="listenerContainer_CallBack"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactoryByCPO30" />
<property name="receiveTimeout" value="${jms.receiveTimeout}"/>
<property name="destination" ref="CallBack" />
<property name="messageListener" ref="triple_CallBackListenerService" />
</bean>
</beans>
上面的bean id="DJR_CallBack" 被注入到代码中:
/** 消息目的地:大金X. */
@Autowired
@Qualifier("DJR_CallBack")
private Destination destinationByDJR;
下面代码的例子是接收MQ消息的:
package com.creditharmony.adapter.service.thirdparty;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.listener.SessionAwareMessageListener;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSON;
import com.creditharmony.adapter.constant.ClientPoint;
import com.creditharmony.adapter.constant.ClientType;
import com.creditharmony.adapter.constant.Constant;
import com.creditharmony.adapter.constant.MsgKey;
import com.creditharmony.adapter.core.client.ClientServicePoxy;
import com.creditharmony.adapter.core.service.IParamRecord;
import com.creditharmony.adapter.exception.AdapterException;
import com.creditharmony.adapter.model.thirdparty.TripleChangePhoneInfoJsonModel;
import com.creditharmony.adapter.service.triple.bean.TripleChangePhoneInBean;
import com.creditharmony.adapter.service.triple.bean.TripleChangePhoneOutBean;
import com.creditharmony.adapter.utils.AdapterUtils;
import com.creditharmony.adapter.utils.Messages;
import com.creditharmony.adapter.utils.TripleUtils;
import com.creditharmony.common.util.PropertyUtil;
/**
* 客户手机号变更 接口.<br>
* ---消息接收---<br>
* 各系统中手机号变更后将手机变更信息(旧手机号、新手机号、理财经理)推送到CPO3.0<br>
* 接收方式 : JMS Queue 消息接收方<br>
* 发送通道:<br>
* 大金X -> CPO3.0<br>
* X信 -> CPO3.0<br>
* CPO -> 创新<br>
*
*/
@Service
public class Triple_ChangePhoneListenerService implements SessionAwareMessageListener<TextMessage> {
/** log日志 */
private static final Logger logger = LoggerFactory.getLogger(Triple_ChangePhoneListenerService.class);
/** 业务处理名. */
private static String procName = PropertyUtil.getStrValue(MsgKey.LOG_PROP_FILE, MsgKey.TRIPLE_CHANGE_PHONE_TITLE, "三网合一(客户手机号变更)");
/** JMS Header : FromSys. */
private static final String FROM_SYS = "XH_CPO";
/** 参数记录处理. */
@Autowired
protected IParamRecord paramRecord;
/** 消息目的地:大金X. */
@Autowired
@Qualifier("DJR_CallBack")
private Destination destinationByDJR;
/** 消息目的地:X信. */
@Autowired
@Qualifier("JX_CallBack")
private Destination destinationByJX;
/** 消息目的地:创新. */
@Autowired
@Qualifier("CX_CallBack")
private Destination destinationByCX;
/** 消息目的地:资本家. */
@Autowired
@Qualifier("ZCJ_CallBack")
private Destination destinationByZCJ;
/**
* 客户手机号变更 监听接口.
* @param msgParam 监听消息
*/
public void onMessage(TextMessage mapMessage, Session session) {
logger.info(Messages.get(MsgKey.INFO_START, new String[] { procName }));
// 唯一编号
String serialNum = AdapterUtils.getSerialNum();
try {
// 发送方ID
String fromSys = mapMessage.getStringProperty(Constant.TRIPLE_HEADER_FROM_SYS);
// 发送消息ID
String msgTypeID = mapMessage.getStringProperty(Constant.TRIPLE_HEADER_MSG_TYPE_ID);
// 发送消息名
String msgTypeName = mapMessage.getStringProperty(Constant.TRIPLE_HEADER_MSG_TYPE_NAME);
// 消息主体
String jsonMsg = mapMessage.getText();
logger.info(
Messages.get(
MsgKey.GENERALSERVICE_INPARAM,
new String[] { procName, jsonMsg }));
paramRecord.doInParamRecord(
serialNum,
null,
jsonMsg,
"triple_ChangePhoneService",
fromSys, msgTypeID, msgTypeName);
TripleChangePhoneInfoJsonModel jsonModel = JSON.parseObject(jsonMsg, TripleChangePhoneInfoJsonModel.class);
ClientServicePoxy service = new ClientServicePoxy(ClientType.Triple_ChangePhone, ClientPoint.CF);
TripleChangePhoneInBean param = new TripleChangePhoneInBean();
param.setSerialNum(serialNum);
param.setOldPhone(jsonModel.getOldPhone());
param.setNewPhone(jsonModel.getNewPhone());
param.setEmpCode(jsonModel.getEmpCode());
param.setOsType(jsonModel.getOsType());
param.setCardId(jsonModel.getCardId());
// 调用业务参数日志输出
logger.info(
Messages.get(
MsgKey.INFO_POST_MESSAGE_DEC,
new String[] { procName, param.getParam() }));
paramRecord.doSendContentRecord(
serialNum,
param.getParam(),
Messages.get(MsgKey.MEMO_PSOT_MESSAGE_BODY));
// 业务调用
TripleChangePhoneOutBean outParam = (TripleChangePhoneOutBean) service.callService(param);
// 业务参数回执日志输出
logger.info(Messages.get(MsgKey.INFO_RET_MESSAGE, new String[] { procName, outParam.getParam() }));
paramRecord.doReceiveContentRecord(serialNum, outParam.getParam(), null);
/*
* 消息回馈处理
* 转为Json串
* 该监听部分收到消息后,判定发送方,将消息序列号反馈给发送发
*/
String postJson = TripleUtils.getReturnMessage(outParam, jsonModel.getUniqueNum());
MessageProducer producer = null;
// 根据消息发送源,设定回执方
if ("XH_DJR".equals(fromSys)) {
producer = session.createProducer(destinationByDJR);
} else if ("XH_ZCJ".equals(fromSys)) {
producer = session.createProducer(destinationByZCJ);
} else if ("XH_JX".equals(fromSys)) {
producer = session.createProducer(destinationByJX);
} else {
producer = session.createProducer(destinationByCX);
}
TextMessage message = session.createTextMessage();
message.setStringProperty("FromSys", FROM_SYS);
message.setStringProperty("MsgTypeID", msgTypeID);
message.setStringProperty("MsgTypeName", msgTypeName);
message.setJMSTimestamp(System.currentTimeMillis());
message.setText(postJson);
// 参数日志记录: 返回参数
paramRecord.doOutParamRecord(serialNum, postJson);
logger.info(Messages.get(MsgKey.GENERALSERVICE_OUTPARAM, new String[] { postJson }));
producer.send(message);
} catch (JMSException e) {
// 异常错误记录
AdapterException businessException = new AdapterException(e);
paramRecord.doExceptionRecord(serialNum, businessException);
}
logger.info(Messages.get(MsgKey.INFO_END, new String[] { procName }));
}
}
重点:onMessage 方法;接收的三个参数 fromid,etc。
下面是发送消息的方法:
/**
* 消息发送(Queue).
* @param jsonParam json字符串
*/
public void sendMqMessage(final String jsonParam) {
// 取得消息
Destination destination = jmsTemplate_OrgInfoSync.getDefaultDestination();
// 发送消息
jmsTemplate_OrgInfoSync.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage();
message.setStringProperty("FromSys", FROM_SYS);
message.setStringProperty("MsgTypeID", MSG_TYPE_ID);
message.setStringProperty("MsgTypeName", MSG_TYPE_NAME);
message.setText(jsonParam);
return message;
}
});
}
@Service
public class Triple_OrgInfoSyncService extends BaseService implements IBaseService {
/** log日志 */
private static final Logger logger = LoggerFactory.getLogger(Triple_OrgInfoSyncService.class);
/** JMS Header : FromSys. */
private static final String FROM_SYS = "XH_CPO";
/** JMS Header : MsgTypeID. */
private static final String MSG_TYPE_ID = "A010";
/** JMS Header : MsgTypeName. */
private static final String MSG_TYPE_NAME = "OrgInfo";
/** 业务处理名. */
private static String procName = PropertyUtil.getStrValue(MsgKey.LOG_PROP_FILE, MsgKey.TRIPLE_ORG_INFO_SYNC_TITLE, "三网合一(组织机构信息同步)");
/** 消息模板(CPO3.0三网). */
@Autowired
private JmsTemplate jmsTemplate_OrgInfoSync;
/**
* 组织机构信息同步.
* @param paramInfo 组织机构信息
* @return 同步状态
*/
public BaseOutInfo doExec(BaseInfo paramInfo) {
logger.info(Messages.get(MsgKey.INFO_START, new String[] { procName }));
TripleOrgInfoSyncOutInfo outParam = new TripleOrgInfoSyncOutInfo();
TripleOrgInfoSyncInfo param = null;
param = (TripleOrgInfoSyncInfo) paramInfo;
try {
TripleOrgInfoSyncInfoJsonModel jsonModel = new TripleOrgInfoSyncInfoJsonModel();
BeanUtils.copyProperties(param, jsonModel);
jsonModel.setOperation(param.getOperation().getName());
jsonModel.setStatus(param.getStatus().getName());
jsonModel.setLastModifyTime(DateUtils.formatDate(param.getLastModifyTime(), TripleOrgInfoSyncInfo.DATE_PATTERN));
// 将传入参数Bean转为Json串
String postJson = JSON.toJSONString(jsonModel);
logger.info(
Messages.get(
MsgKey.INFO_POST_MESSAGE_BODY,
new String[] { procName, postJson }));
paramRecord.doSendContentRecord(
paramInfo.getSerialNum(),
postJson,
Messages.get(MsgKey.MEMO_PSOT_MESSAGE_BODY));
try {
// 发送
this.sendMqMessage(postJson);
} catch (Exception e) {
throw new AdapterException(ErrorType.ERROR_EXT_NET, e);
}
} catch (Exception e) {
logger.error(
Messages.get(
MsgKey.ERROR_SYSTEM, new String[] { paramInfo.getSerialNum(), procName }));
// 将新产生的例外封装
if (e instanceof AdapterException) {
throw (AdapterException) e;
} else {
throw new AdapterException(e);
}
}
outParam.setRetCode(ReturnConstant.SUCCESS);
logger.info(Messages.get(MsgKey.INFO_END, new String[] { procName }));
return outParam;
}
/**
* 消息发送(Queue).
* @param jsonParam json字符串
*/
public void sendMqMessage(final String jsonParam) {
// 取得消息
Destination destination = jmsTemplate_OrgInfoSync.getDefaultDestination();
// 发送消息
jmsTemplate_OrgInfoSync.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage();
message.setStringProperty("FromSys", FROM_SYS);
message.setStringProperty("MsgTypeID", MSG_TYPE_ID);
message.setStringProperty("MsgTypeName", MSG_TYPE_NAME);
message.setText(jsonParam);
return message;
}
});
}
}
比较简单。
标签:typename 字符串 inf eth val cout ble cto pass
原文地址:http://www.cnblogs.com/hoge/p/6623256.html