最近在研究ofbiz jms,用了activemq玩了一下,ofbiz使用JNDI与其它JMS进行消息接收与发送,总结一下。
ofbiz12.04 版本
activemq 5.5 版本
在base/lib下引入包
activemq-all-5.5.0.jar
geronimo-j2ee-management_1.1_spec-1.0.1.jar
<span style="font-size:14px;"><jms-service name="serviceMessenger" send-mode="all">
<server jndi-server-name="default"
jndi-name="topicConnectionFactory"
topic-queue="OFBTopic"
type="topic"
listen="true"/>
</jms-service></span><span style="font-size:14px;">java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory java.naming.provider.url=tcp://127.0.0.1:61616 topic.OFBTopic=OFBTopic connectionFactoryNames=connectionFactory, queueConnectionFactory, topicConnectionFactory</span>
<span style="font-size:14px;">queue.OFBQueue= OFBQueue</span>
先启动activemq ,再启动ofbiz
ofbiz启动之后看到:
<span style="font-size:14px;">2015-01-15 11:47:56,970 (org.ofbiz.service.jms.JmsListenerFactory@1b1ce9a) [ JmsListenerFactory.java:89 :INFO ] JMS Listener Factory Thread Finished; All listeners connected.</span>如果报监听失败,自己查查什么问题。
之后再看activemq控制台
消息发送和接收
咱们是用ofbiz给出的测试例子:
services_test.xml
<span style="font-size:14px;"><service name="testJMSTopic" engine="jms" location="serviceMessenger" invoke="testScv">
<description>Test JMS Topic service</description>
<attribute name="message" type="String" mode="IN"/>
</service></span><span style="font-size:14px;">2015-01-15 12:09:10,754 (ActiveMQ Session Task-1) [ ModelService.java:469:INFO ] Set default value [999.9999] for parameter [defaultValue]
2015-01-15 12:09:10,755 (ActiveMQ Session Task-1) [ ServiceEcaRule.java:137:INFO ] For Service ECA [testScv] on [invoke] got false for condition: [message][equals][auto][true][String]
---- SVC-CONTEXT: message => 测试TOPIC消息
---- SVC-CONTEXT: locale => zh_CN
---- SVC-CONTEXT: timeZone => sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=28800000,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]
---- SVC-CONTEXT: userLogin => [GenericEntity:UserLogin][createdStamp,2014-07-17 20:36:06.0(java.sql.Timestamp)][createdTxStamp,2014-07-17 20:36:06.0(java.sql.Timestamp)][currentPassword,{SHA}47ca69ebb4bdc9ae0adec130880165d2cc05db1a(java.lang.String)][enabled,Y(java.lang.String)][hasLoggedOut,N(java.lang.String)][lastTimeZone,Asia/Macao(java.lang.String)][lastUpdatedStamp,2014-11-26 16:12:57.0(java.sql.Timestamp)][lastUpdatedTxStamp,2014-11-26 16:12:57.0(java.sql.Timestamp)][partyId,admin(java.lang.String)][successiveFailedLogins,0(java.lang.Long)][userLoginId,admin(java.lang.String)]
---- SVC-CONTEXT: defaultValue => 999.9999
-----SERVICE TEST----- : 测试TOPIC消息
----- SVC: entity-default -----</span>看下activemq控制台
入列消息一条,接收消息一条。
说明一下为什么会这样
我们运行服务发送了一条广播消息,这就是为什么Messages Enqueued有一条数据,那么为什么Messages Dequeued也有一条消息,因为ofbiz是服务端同时也是客户端,我们设置了监听不是。。
我们只要看一下JMS的服务引擎就明白了,不多解释
这块是怎么玩呢?
首先我们在服务引擎配置文件设置了监听为true,
其次,我们看下在加载服务引擎之后,在根据服务引擎配置文件加载JmsListenerFactory.java去加载JMS的监听。
最后,贴出来一段代码
JmsTopicListener.java
<span style="font-size:14px;"> public synchronized void load() throws GenericServiceException {
try {
InitialContext jndi = JNDIContextFactory.getInitialContext(jndiServer);
TopicConnectionFactory factory = (TopicConnectionFactory) jndi.lookup(jndiName);
if (factory != null) {
con = factory.createTopicConnection(userName, password);
con.setExceptionListener(this);
session = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topic = (Topic) jndi.lookup(topicName);
if (topic != null) {
TopicSubscriber subscriber = session.createSubscriber(topic);
subscriber.setMessageListener(this);
con.start();
this.setConnected(true);
if (Debug.infoOn()) Debug.logInfo("Listening to topic [" + topicName + "] on [" + jndiServer + "]...", module);
} else {
throw new GenericServiceException("Topic lookup failed.");
}
} else {
throw new GenericServiceException("Factory (broker) lookup failed.");
}
} catch (NamingException ne) {
throw new GenericServiceException("JNDI lookup problems; listener not running.", ne);
} catch (JMSException je) {
throw new GenericServiceException("JMS internal error; listener not running.", je);
} catch (GeneralException ge) {
throw new GenericServiceException("Problems with InitialContext; listener not running.", ge);
}
}</span>当时,还有一个疑问,那有监听之后ofbiz做了什么事情去处理监听到消息事件之后的事情?
同样,贴出一段代码,大家就都明白了:
AbstractJmsListener.java
<span style="font-size:14px;">/**
* Receives the MapMessage and processes the service.
* @see javax.jms.MessageListener#onMessage(Message)
*/
public void onMessage(Message message) {
MapMessage mapMessage = null;
if (Debug.verboseOn()) Debug.logVerbose("JMS Message Received --> " + message, module);
if (message instanceof MapMessage) {
mapMessage = (MapMessage) message;
} else {
Debug.logError("Received message is not a MapMessage!", module);
return;
}
runService(mapMessage);
}</span><span style="font-size:14px;">/**
* Runs the service defined in the MapMessage
* @param message
* @return Map
*/
protected Map<String, Object> runService(MapMessage message) {
Map<String, ? extends Object> context = null;
String serviceName = null;
String xmlContext = null;
try {
serviceName = message.getString("serviceName");
xmlContext = message.getString("serviceContext");
if (serviceName == null || xmlContext == null) {
Debug.logError("Message received is not an OFB service message. Ignored!", module);
return null;
}
Object o = XmlSerializer.deserialize(xmlContext, dispatcher.getDelegator());
if (Debug.verboseOn()) Debug.logVerbose("De-Serialized Context --> " + o, module);
if (ObjectType.instanceOf(o, "java.util.Map"))
context = UtilGenerics.checkMap(o);
} catch (JMSException je) {
Debug.logError(je, "Problems reading message.", module);
} catch (Exception e) {
Debug.logError(e, "Problems deserializing the service context.", module);
}
try {
ModelService model = dispatcher.getDispatchContext().getModelService(serviceName);
if (!model.export) {
Debug.logWarning("Attempt to invoke a non-exported service: " + serviceName, module);
return null;
}
} catch (GenericServiceException e) {
Debug.logError(e, "Unable to get ModelService for service : " + serviceName, module);
}
if (Debug.verboseOn()) Debug.logVerbose("Running service: " + serviceName, module);
Map<String, Object> result = null;
if (context != null) {
try {
result = dispatcher.runSync(serviceName, context);
} catch (GenericServiceException gse) {
Debug.logError(gse, "Problems with service invocation.", module);
}
}
return result;
}</span>可以同时配置队列和广播两种模式消息,配置同上并参考ACTIVE MQ官网对JNDI支持
参考
ofbiz jms:https://cwiki.apache.org/confluence/display/OFBIZ/Distributed+Entity+Cache+Clear+(DCC)+Mechanism
activemq jndi:http://activemq.apache.org/jndi-support.html
原文地址:http://blog.csdn.net/fclwd/article/details/42738851