Oozie 生成JMS消息并向 JMS Provider发送消息过程分析

简介:

一,涉及到的工程

从官网下载源码,mvn 编译成 Eclipse工程文件:

 

对于JMS消息这一块,主要涉及到两个工程:

oozie-core工程有问题的原因是还需要一些其他的依赖工程未导入:

 

二,Oozie 生成 JMS消息 主要涉及到的一些类

oozie-core 工程中的:

 

 

oozie-client工程中的:

 

三,相关代码:

对于Oozie Server而言,它是消息的生产者。在oozie-default.xml/oozie-site.xml里面配置好连接参数,消息服务器....Oozie就使用这些配置进行连接,产生消息,发送消息。

JMSAccessorService.java

复制代码
/**
 * This class will <ul>
 * <li> Create/Manage JMS connections using user configured JNDI properties. </li>
 * <li> Create/Manage session for specific connection/topic and reconnects on failures. </li>
 * <li> Provide a way to create a subscriber and publisher </li>
 * <li> Pure JMS compliant (implementation independent but primarily tested against Apache ActiveMQ). </li>
 * </ul>
 */
public class JMSAccessorService implements Service {
复制代码

直接看注释就知道这个类的功能了。

 

复制代码
    /**
     * Map of JMS connection info to established JMS Connection
     */
    private ConcurrentMap<JMSConnectionInfo, ConnectionContext> connectionMap =
            new ConcurrentHashMap<JMSConnectionInfo, ConnectionContext>();
    /**
     * Map of JMS connection info to topic names to MessageReceiver
     */
    private ConcurrentMap<JMSConnectionInfo, Map<String, MessageReceiver>> receiversMap =
            new ConcurrentHashMap<JMSConnectionInfo, Map<String, MessageReceiver>>();
复制代码

ConcurrentHashMap线程安全的,用来保存与JMS Provider的连接信息

 

synchronized (this) {
                if (jmsProducerConnContext == null || !jmsProducerConnContext.isConnectionInitialized()) {
                    try {
                        jmsProducerConnContext = getConnectionContextImpl();
                        jmsProducerConnContext.createConnection(connInfo.getJNDIProperties());
                        jmsProducerConnContext.setExceptionListener(new JMSExceptionListener(connInfo,
复制代码
  private ConnectionContext getConnectionContextImpl() {
        Class<?> defaultClazz = conf.getClass(JMS_CONNECTION_CONTEXT_IMPL, DefaultConnectionContext.class);
        ConnectionContext connCtx = null;
        if (defaultClazz == DefaultConnectionContext.class) {
            connCtx = new DefaultConnectionContext();
        }
        else {
            connCtx = (ConnectionContext) ReflectionUtils.newInstance(defaultClazz, null);
        }
        return connCtx;
    }
复制代码

创建 Producer 连接的上下文环境

 

DefaultConnectionContext.java  默认的连接上下文环境

复制代码
public class DefaultConnectionContext implements ConnectionContext {

    protected Connection connection;
    protected String connectionFactoryName;
    private static XLog LOG = XLog.getLog(ConnectionContext.class);

    @Override
    public void createConnection(Properties props) throws NamingException, JMSException {
        Context jndiContext = new InitialContext(props);
        connectionFactoryName = (String) jndiContext.getEnvironment().get("connectionFactoryNames");
        if (connectionFactoryName == null || connectionFactoryName.trim().length() == 0) {
            connectionFactoryName = "ConnectionFactory";
        }
        ConnectionFactory connectionFactory = (ConnectionFactory) jndiContext.lookup(connectionFactoryName);
        LOG.info("Connecting with the following properties \n" + jndiContext.getEnvironment().toString());
        try {
            connection = connectionFactory.createConnection();
            connection.start();
复制代码

 

创建生产者的方法:

    @Override
    public MessageProducer createProducer(Session session, String topicName) throws JMSException {
        Topic topic = session.createTopic(topicName);
        MessageProducer producer = session.createProducer(topic);
        return producer;
    }

它由org.apache.oozie.jms.JMSJobEventListener类中的 sendMessage()调用。

 

Oozie 配置中关于JMSAccessorService的配置如下:

 

再来看看:JMSTopicService.java

    static {
        ALLOWED_TOPIC_NAMES.add(TopicType.USER.value);
        ALLOWED_TOPIC_NAMES.add(TopicType.JOBID.value);
    }
复制代码
    public static enum TopicType {
        USER("${username}"), JOBID("${jobId}");

        private String value;

        TopicType(String value) {
            this.value = value;
        }

        String getValue() {
            return value;
        }

    }
复制代码

可用的Topic名称有 ${username},也可以用jobId作为Topic名称,再看Oozie官方文档解释:

The topic is obtained by concatenating topic prefix and the substituted value for topic pattern. The topic pattern can be a constant value like workflow or coordinator which the administrator has configured or ${username}.

The getJMSTopicName API can be used if the job id is already known and will give the exact topic name to which the notifications for that job are published.

 

 private void parseTopicConfiguration() throws ServiceException {
        String topicName = conf.get(TOPIC_NAME, "default=" + TopicType.USER.value);
        if (topicName == null) {
            throw new ServiceException(ErrorCode.E0100, getClass().getName(), "JMS topic cannot be null ");
        }

Topic默认是${username}

 

发送消息的实现类JMSJobEventListener.java  根据相应的作业事件发送作业的执行结果

复制代码
/**
 * Class to send JMS notifications related to job events.
 *
 */
public class JMSJobEventListener extends JobEventListener {
    private JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
    private JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
    private JMSConnectionInfo connInfo;
    public static final String JMS_CONNECTION_PROPERTIES = "oozie.jms.producer.connection.properties";
    public static final String JMS_SESSION_OPTS = "oozie.jms.producer.session.opts";
    public static final String JMS_DELIVERY_MODE = "oozie.jms.delivery.mode";
    public static final String JMS_EXPIRATION_DATE = "oozie.jms.expiration.date";
复制代码

 

连接方式、发送消息后是否自动回复、消息的生命周期,持久消息还是非持久消息...

复制代码
    public void init(Configuration conf) {
        LOG = XLog.getLog(getClass());
        String jmsProps = conf.get(JMS_CONNECTION_PROPERTIES);
        LOG.info("JMS producer connection properties [{0}]", jmsProps);
        connInfo = new JMSConnectionInfo(jmsProps);
        jmsSessionOpts = conf.getInt(JMS_SESSION_OPTS, Session.AUTO_ACKNOWLEDGE);
        jmsDeliveryMode = conf.getInt(JMS_DELIVERY_MODE, DeliveryMode.PERSISTENT);
        jmsExpirationDate = conf.getInt(JMS_EXPIRATION_DATE, 0);

    }
复制代码

 

发送消息的过程:

1)EventHandlerService ,里面有个内部类EventWoker线程,当有相应的作业事件发生时,Listener被触发

复制代码
**
 * Service class that handles the events system - creating events queue,
 * managing configured properties and managing and invoking various event
 * listeners via worker threads
 */
public class EventHandlerService implements Service {

//.....

public class EventWorker implements Runnable {

        @Override
        public void run() {
//.....other code
 while (iter.hasNext()) {
              try {
                     if (msgType == MessageType.JOB) {
                            invokeJobEventListener((JobEventListener) iter.next(), (JobEvent) event);
                        }

 private void invokeJobEventListener(JobEventListener jobListener, JobEvent event) {
            switch (event.getAppType()) {
                case WORKFLOW_JOB:
                    jobListener.onWorkflowJobEvent((WorkflowJobEvent)event);
复制代码

 

相应的作业监听器被触发后,创建相应的作业,获得待发送的地址Topic,并序列化消息

    @Override
    public void onWorkflowJobEvent(WorkflowJobEvent event) {
        WorkflowJobMessage wfJobMessage = MessageFactory.createWorkflowJobMessage(event);
        serializeJMSMessage(wfJobMessage, getTopic(event));
    }

 

序列化后,调用send进行发送

    private void serializeJMSMessage(JobMessage jobMessage, String topicName) {
        MessageSerializer serializer = MessageFactory.getMessageSerializer();
        String messageBody = serializer.getSerializedObject(jobMessage);
        sendMessage(jobMessage.getMessageProperties(), messageBody, topicName, serializer.getMessageFormat());
    }

 

创建连接上下文、创建会话、创建消息、设置消息的属性、创建生产者、设置传送模式和消息的生命周期、然后send消息。

复制代码
    protected void sendMessage(Map<String, String> messageProperties, String messageBody, String topicName,
            String messageFormat) {
        jmsContext = jmsService.createProducerConnectionContext(connInfo);
        if (jmsContext != null) {
            try {
                Session session = jmsContext.createThreadLocalSession(jmsSessionOpts);
                TextMessage textMessage = session.createTextMessage(messageBody);
                for (Map.Entry<String, String> property : messageProperties.entrySet()) {
                    textMessage.setStringProperty(property.getKey(), property.getValue());
                }
                textMessage.setStringProperty(JMSHeaderConstants.MESSAGE_FORMAT, messageFormat);
                LOG.trace("Event related JMS text body [{0}]", textMessage.getText());
                LOG.trace("Event related JMS entire message [{0}]", textMessage.toString());
                MessageProducer producer = jmsContext.createProducer(session, topicName);
                producer.setDeliveryMode(jmsDeliveryMode);
                producer.setTimeToLive(jmsExpirationDate);
                producer.send(textMessage);
                producer.close();
            }
复制代码

 

WorkflowJobMessage.java展示了一条Workflow消息长什么样:

复制代码
    /**
     * Constructor for a workflow job message
     * @param eventStatus event status
     * @param workflowJobId the workflow job id
     * @param coordinatorActionId the parent coordinator action id
     * @param startTime start time of workflow
     * @param endTime end time of workflow
     * @param status status of workflow
     * @param user the user
     * @param appName appName of workflow
     * @param errorCode errorCode of the failed wf actions
     * @param errorMessage errorMessage of the failed wf action
     */
    public WorkflowJobMessage(EventStatus eventStatus, String workflowJobId,
            String coordinatorActionId, Date startTime, Date endTime, WorkflowJob.Status status, String user,
            String appName, String errorCode, String errorMessage) {
        super(eventStatus, AppType.WORKFLOW_JOB, workflowJobId, coordinatorActionId, startTime,
                endTime, user, appName);
        this.status = status;
        this.errorCode = errorCode;
        this.errorMessage = errorMessage;
    }
复制代码

当提交的是Workflow Job,就会生成Workflow消息。

它有一个属性: @param coordinatorActionId the parent coordinator action id  (Coordinator Job里面的Action是Workflow Job)

看完了JMS消息体,再来看看消息头:

 

复制代码
/**
 *
 * Class holding constants used in JMS selectors
 */
public final class JMSHeaderConstants {
    // JMS Application specific properties for selectors
    public static final String EVENT_STATUS = "eventStatus";
    public static final String SLA_STATUS = "slaStatus";
    public static final String APP_NAME = "appName";
    public static final String USER = "user";
    public static final String MESSAGE_TYPE = "msgType";
    public static final String APP_TYPE = "appType";
    
    public static final String JOBID = "jobId";// add for my specific selectors
    // JMS Header property
    public static final String MESSAGE_FORMAT = "msgFormat";
}
复制代码

 

消息头里面的属性主要用来过滤。根据消息头里面的字段,使用JMS消息选择器对消息进行过滤。关于根据JobId进行过滤,可参考:Oozie JMS通知消息实现--根据作业ID来过滤消息

不知道有没有bug????

 

复制代码
/**
 * Message deserializer to convert from JSON to java object
 */
public class JSONMessageDeserializer extends MessageDeserializer {

    static ObjectMapper mapper = new ObjectMapper(); // Thread-safe.

    static {
        mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    }
复制代码

消息的序列化机制,用的是jackson-mapper-asl jar包。因为消息要从生产者发给消息服务器,就需要序列化了。

 

有序列化就有反序列化:

复制代码
/**
 * Class to deserialize the jms message to java object
 */
public abstract class MessageDeserializer {

    /**
     * Constructs the event message from JMS message
     *
     * @param message the JMS message
     * @return EventMessage
     * @throws JMSException
     */
    @SuppressWarnings("unchecked")
    public <T extends EventMessage> T getEventMessage(Message message) throws JMSException {
        TextMessage textMessage = (TextMessage) message;
        String appTypeString = textMessage.getStringProperty(JMSHeaderConstants.APP_TYPE);
        String msgType = textMessage.getStringProperty(JMSHeaderConstants.MESSAGE_TYPE);
复制代码

先根据消息的属性解析出消息的类型。

 

        if (MessageType.valueOf(msgType) == MessageType.JOB) {
            switch (AppType.valueOf(appTypeString)) {
                case WORKFLOW_JOB:
                    WorkflowJobMessage wfJobMsg = getDeserializedObject(messageBody, WorkflowJobMessage.class);
                    wfJobMsg.setProperties(textMessage);
                    eventMsg = (T) wfJobMsg;

再根据类型来构造对象。


本文转自hapjin博客园博客,原文链接:http://www.cnblogs.com/hapjin/p/5511598.html,如需转载请自行联系原作者

相关文章
|
20天前
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
|
4月前
|
消息中间件 存储 Java
后端开发Spring框架之消息介绍 同步异步 JMS AMQP MQTT Kafka介绍
后端开发Spring框架之消息介绍 同步异步 JMS AMQP MQTT Kafka介绍
27 0
|
12月前
|
消息中间件 存储 缓存
远程调用RPC和消息MQ区别
远程调用RPC和消息MQ区别
94 0
|
消息中间件 算法 数据可视化
SpringBoot整合RocketMQ发送顺序消息
严格按照消息的发送顺序进行消费的消息。默认情况下生产者会把消息以Round Robin轮询方式发送到不同的Queue分区队列,而消费消息时会从多个Queue上拉取消息,这种情况下的发送和消费是不能保证顺序的,如果将消息仅发送到同一个Queue中,消费时也只从这个Queue上拉取消息,就严格保证了消息的顺序性
|
消息中间件 Java Spring
springboot整合mq接收消息队列
继上篇springboot整合mq发送消息队列 本篇主要在上篇基础上进行activiemq消息队列的接收springboot整合mq发送消息队列 第一步:新建marven项目,配置pom文件 4.
2729 0
|
消息中间件 数据库
RabbitMQ——使用事务控制消息的发送和接收
RabbitMQ——使用事务控制消息的发送和接收
RabbitMQ——使用事务控制消息的发送和接收
|
消息中间件 Java 中间件
SpringBoot整合RabbitMQ实现消息的发送与接收,确认消息,延时消息
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
SpringBoot整合RabbitMQ实现消息的发送与接收,确认消息,延时消息
|
消息中间件 Java
springboot之rabbit - producer的confirm和consumer的ack模式
本篇和大家分享的是关于rabbit的生产和消费方的一些实用的操作;正如文章标题,主要内容如producer的confirm和consumer的ack,这两者使用的模式都是用来保证数据完整性,防止数据丢失。
8786 0
|
消息中间件 监控
轻松搞定RabbitMQ5:主题机制与RPC调用
轻松搞定RabbitMQ(六)——主题 翻译地址:http://www.rabbitmq.com/tutorials/tutorial-five-java.html 在上一篇博文中,我们进一步改良了日志系统。
|
消息中间件 Java Kafka
Kafka是什么,JMS是什么,常见的类JMS消息服务器,为什么需要消息队列(来自学习笔记)
Kafka是什么,JMS是什么,常见的类JMS消息服务器,为什么需要消息队列(来自学习笔记) 1、Kafka是什么  Apache Kafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目。
3353 0