从官网下载源码,mvn 编译成 Eclipse工程文件:
二,Oozie 生成 JMS消息 主要涉及到的一些类
oozie-core 工程中的:
对于Oozie Server而言,它是消息的生产者。在oozie-default.xml/oozie-site.xml里面配置好连接参数,消息服务器....Oozie就使用这些配置进行连接,产生消息,发送消息。
/** * This class will <ul> * <li> Create/Manage JMS connections using user configured JNDI properties. </li> * <li> Create/Manage session for specific connection/topic and reconnects on failures. </li> * <li> Provide a way to create a subscriber and publisher </li> * <li> Pure JMS compliant (implementation independent but primarily tested against Apache ActiveMQ). </li> * </ul> */ public class JMSAccessorService implements Service {
/** * Map of JMS connection info to established JMS Connection */ private ConcurrentMap<JMSConnectionInfo, ConnectionContext> connectionMap = new ConcurrentHashMap<JMSConnectionInfo, ConnectionContext>(); /** * Map of JMS connection info to topic names to MessageReceiver */ private ConcurrentMap<JMSConnectionInfo, Map<String, MessageReceiver>> receiversMap = new ConcurrentHashMap<JMSConnectionInfo, Map<String, MessageReceiver>>();
ConcurrentHashMap线程安全的,用来保存与JMS Provider的连接信息
synchronized (this) { if (jmsProducerConnContext == null || !jmsProducerConnContext.isConnectionInitialized()) { try { jmsProducerConnContext = getConnectionContextImpl(); jmsProducerConnContext.createConnection(connInfo.getJNDIProperties()); jmsProducerConnContext.setExceptionListener(new JMSExceptionListener(connInfo,
private ConnectionContext getConnectionContextImpl() { Class<?> defaultClazz = conf.getClass(JMS_CONNECTION_CONTEXT_IMPL, DefaultConnectionContext.class); ConnectionContext connCtx = null; if (defaultClazz == DefaultConnectionContext.class) { connCtx = new DefaultConnectionContext(); } else { connCtx = (ConnectionContext) ReflectionUtils.newInstance(defaultClazz, null); } return connCtx; }
创建 Producer 连接的上下文环境
DefaultConnectionContext.java 默认的连接上下文环境
public class DefaultConnectionContext implements ConnectionContext { protected Connection connection; protected String connectionFactoryName; private static XLog LOG = XLog.getLog(ConnectionContext.class); @Override public void createConnection(Properties props) throws NamingException, JMSException { Context jndiContext = new InitialContext(props); connectionFactoryName = (String) jndiContext.getEnvironment().get("connectionFactoryNames"); if (connectionFactoryName == null || connectionFactoryName.trim().length() == 0) { connectionFactoryName = "ConnectionFactory"; } ConnectionFactory connectionFactory = (ConnectionFactory) jndiContext.lookup(connectionFactoryName); LOG.info("Connecting with the following properties \n" + jndiContext.getEnvironment().toString()); try { connection = connectionFactory.createConnection(); connection.start();
@Override public MessageProducer createProducer(Session session, String topicName) throws JMSException { Topic topic = session.createTopic(topicName); MessageProducer producer = session.createProducer(topic); return producer; }
它由org.apache.oozie.jms.JMSJobEventListener类中的 sendMessage()调用。
Oozie 配置中关于JMSAccessorService的配置如下:
static { ALLOWED_TOPIC_NAMES.add(TopicType.USER.value); ALLOWED_TOPIC_NAMES.add(TopicType.JOBID.value); }
public static enum TopicType { USER("${username}"), JOBID("${jobId}"); private String value; TopicType(String value) { this.value = value; } String getValue() { return value; } }
可用的Topic名称有 ${username},也可以用jobId作为Topic名称,再看Oozie官方文档解释:
The topic is obtained by concatenating topic prefix and the substituted value for topic pattern. The topic pattern can be a constant value like workflow or coordinator which the administrator has configured or ${username}.
The getJMSTopicName API can be used if the job id is already known and will give the exact topic name to which the notifications for that job are published.
private void parseTopicConfiguration() throws ServiceException { String topicName = conf.get(TOPIC_NAME, "default=" + TopicType.USER.value); if (topicName == null) { throw new ServiceException(ErrorCode.E0100, getClass().getName(), "JMS topic cannot be null "); }
发送消息的实现类JMSJobEventListener.java 根据相应的作业事件发送作业的执行结果
/** * Class to send JMS notifications related to job events. * */ public class JMSJobEventListener extends JobEventListener { private JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class); private JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class); private JMSConnectionInfo connInfo; public static final String JMS_CONNECTION_PROPERTIES = "oozie.jms.producer.connection.properties"; public static final String JMS_SESSION_OPTS = "oozie.jms.producer.session.opts"; public static final String JMS_DELIVERY_MODE = "oozie.jms.delivery.mode"; public static final String JMS_EXPIRATION_DATE = "oozie.jms.expiration.date";
public void init(Configuration conf) { LOG = XLog.getLog(getClass()); String jmsProps = conf.get(JMS_CONNECTION_PROPERTIES); LOG.info("JMS producer connection properties [{0}]", jmsProps); connInfo = new JMSConnectionInfo(jmsProps); jmsSessionOpts = conf.getInt(JMS_SESSION_OPTS, Session.AUTO_ACKNOWLEDGE); jmsDeliveryMode = conf.getInt(JMS_DELIVERY_MODE, DeliveryMode.PERSISTENT); jmsExpirationDate = conf.getInt(JMS_EXPIRATION_DATE, 0); }
1)EventHandlerService ,里面有个内部类EventWoker线程,当有相应的作业事件发生时,Listener被触发
** * Service class that handles the events system - creating events queue, * managing configured properties and managing and invoking various event * listeners via worker threads */ public class EventHandlerService implements Service {
public class EventWorker implements Runnable {
public void run() {
//.....other code
while (iter.hasNext()) {
try {
if (msgType == MessageType.JOB) {
invokeJobEventListener((JobEventListener) iter.next(), (JobEvent) event);
private void invokeJobEventListener(JobEventListener jobListener, JobEvent event) {
switch (event.getAppType()) {
@Override public void onWorkflowJobEvent(WorkflowJobEvent event) { WorkflowJobMessage wfJobMessage = MessageFactory.createWorkflowJobMessage(event); serializeJMSMessage(wfJobMessage, getTopic(event)); }
private void serializeJMSMessage(JobMessage jobMessage, String topicName) { MessageSerializer serializer = MessageFactory.getMessageSerializer(); String messageBody = serializer.getSerializedObject(jobMessage); sendMessage(jobMessage.getMessageProperties(), messageBody, topicName, serializer.getMessageFormat()); }
protected void sendMessage(Map<String, String> messageProperties, String messageBody, String topicName, String messageFormat) { jmsContext = jmsService.createProducerConnectionContext(connInfo); if (jmsContext != null) { try { Session session = jmsContext.createThreadLocalSession(jmsSessionOpts); TextMessage textMessage = session.createTextMessage(messageBody); for (Map.Entry<String, String> property : messageProperties.entrySet()) { textMessage.setStringProperty(property.getKey(), property.getValue()); } textMessage.setStringProperty(JMSHeaderConstants.MESSAGE_FORMAT, messageFormat); LOG.trace("Event related JMS text body [{0}]", textMessage.getText()); LOG.trace("Event related JMS entire message [{0}]", textMessage.toString()); MessageProducer producer = jmsContext.createProducer(session, topicName); producer.setDeliveryMode(jmsDeliveryMode); producer.setTimeToLive(jmsExpirationDate); producer.send(textMessage); producer.close(); }
/** * Constructor for a workflow job message * @param eventStatus event status * @param workflowJobId the workflow job id * @param coordinatorActionId the parent coordinator action id * @param startTime start time of workflow * @param endTime end time of workflow * @param status status of workflow * @param user the user * @param appName appName of workflow * @param errorCode errorCode of the failed wf actions * @param errorMessage errorMessage of the failed wf action */ public WorkflowJobMessage(EventStatus eventStatus, String workflowJobId, String coordinatorActionId, Date startTime, Date endTime, WorkflowJob.Status status, String user, String appName, String errorCode, String errorMessage) { super(eventStatus, AppType.WORKFLOW_JOB, workflowJobId, coordinatorActionId, startTime, endTime, user, appName); this.status = status; this.errorCode = errorCode; this.errorMessage = errorMessage; }
当提交的是Workflow Job,就会生成Workflow消息。
它有一个属性: @param coordinatorActionId the parent coordinator action id (Coordinator Job里面的Action是Workflow Job)
/** * * Class holding constants used in JMS selectors */ public final class JMSHeaderConstants { // JMS Application specific properties for selectors public static final String EVENT_STATUS = "eventStatus"; public static final String SLA_STATUS = "slaStatus"; public static final String APP_NAME = "appName"; public static final String USER = "user"; public static final String MESSAGE_TYPE = "msgType"; public static final String APP_TYPE = "appType"; public static final String JOBID = "jobId";// add for my specific selectors // JMS Header property public static final String MESSAGE_FORMAT = "msgFormat"; }
消息头里面的属性主要用来过滤。根据消息头里面的字段,使用JMS消息选择器对消息进行过滤。关于根据JobId进行过滤,可参考:Oozie JMS通知消息实现--根据作业ID来过滤消息
/** * Message deserializer to convert from JSON to java object */ public class JSONMessageDeserializer extends MessageDeserializer { static ObjectMapper mapper = new ObjectMapper(); // Thread-safe. static { mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); }
消息的序列化机制,用的是jackson-mapper-asl jar包。因为消息要从生产者发给消息服务器,就需要序列化了。
/** * Class to deserialize the jms message to java object */ public abstract class MessageDeserializer { /** * Constructs the event message from JMS message * * @param message the JMS message * @return EventMessage * @throws JMSException */ @SuppressWarnings("unchecked") public <T extends EventMessage> T getEventMessage(Message message) throws JMSException { TextMessage textMessage = (TextMessage) message; String appTypeString = textMessage.getStringProperty(JMSHeaderConstants.APP_TYPE); String msgType = textMessage.getStringProperty(JMSHeaderConstants.MESSAGE_TYPE);
if (MessageType.valueOf(msgType) == MessageType.JOB) { switch (AppType.valueOf(appTypeString)) { case WORKFLOW_JOB: WorkflowJobMessage wfJobMsg = getDeserializedObject(messageBody, WorkflowJobMessage.class); wfJobMsg.setProperties(textMessage); eventMsg = (T) wfJobMsg;
Oozie 生成JMS消息并向 JMS Provider发送消息过程分析