消费者端的Spring JMS 连接ActiveMQ接收生产者Oozie Server发送的Oozie作业执行结果-阿里云开发者社区

开发者社区> 技术mix呢> 正文

消费者端的Spring JMS 连接ActiveMQ接收生产者Oozie Server发送的Oozie作业执行结果

简介:
+关注继续查看

一,介绍

Oozie是一个Hadoop工作流服务器,接收Client提交的作业(MapReduce作业)请求,并把该作业提交给MapReduce执行。同时,Oozie还可以实现消息通知功能,只要配置好消息服务器,Oozie Server就可以把作业的执行结果发送到消息服务器上,而Client只需要订阅其感兴趣的消息即可。具体的配置参考这篇文章:Oozie 使用ActiveMQ实现 JMS通知

由于Spring内置了JMS相关的服务,因此这里记录在Spring中如何配置消费者连接ActiveMQ,从而接收生产者Oozie发送的消息。

 

二,Oozie Server作为生产者的相关配置

这主要在这篇文章Oozie 使用ActiveMQ实现 JMS通知 已经提到了。

其中Oozie的配置文件 oozie-default.xml中相关配置如下:

复制代码
 <property>
        <name>oozie.jms.producer.connection.properties</name>
        <value>java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#tcp://l
ocalhost:61616;connectionFactoryNames#ConnectionFactory</value>
 </property>

<!-- JMSAccessorService -->
    <property>
        <name>oozie.service.JMSAccessorService.connectioncontext.impl</name>
        <value>
        org.apache.oozie.jms.DefaultConnectionContext
        </value>
        <description>
        Specifies the Connection Context implementation
        </description>
    </property>
复制代码

 

Destination的相关配置如下,这里的Destination是一个Topic,即生产者发送消息的目的地,也是消费者取消息的地方。

复制代码
  <property>
        <name>oozie.service.JMSTopicService.topic.name</name>
        <value>
        default=${username}
        </value>
        <description>
        Topic options are ${username} or ${jobId} or a fixed string which can be specified as default or for a
        particular job type.
        For e.g To have a fixed string topic for workflows, coordinators and bundles,
        specify in the following comma-separated format: {jobtype1}={some_string1}, {jobtype2}={some_string2}
        where job type can be WORKFLOW, COORDINATOR or BUNDLE.
        e.g. Following defines topic for workflow job, workflow action, coordinator job, coordinator action,
        bundle job and bundle action
        WORKFLOW=workflow,
        COORDINATOR=coordinator,
        BUNDLE=bundle
        For jobs with no defined topic, default topic will be ${username}
        </description>
    </property>
复制代码

 

三,在Spring中配置消费者的连接信息

这里采用JNDI连接ActiveMQ,连接信息配置如下:

复制代码
<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
        <property name="environment">
            <props>
                <prop key="java.naming.factory.initial">
                    org.apache.activemq.jndi.ActiveMQInitialContextFactory
                </prop>
                <prop key="java.naming.provider.url">
                    tcp://192.168.121.35:61616
                </prop>
                <prop key="java.naming.security.principal">
                    system
                </prop>
                <prop key="java.naming.security.credentials">
                    manager
                </prop>
            </props>
        </property>
    </bean>
复制代码

 

配置连接工厂:

    <bean id="jndiTopicConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
        <property name="jndiTemplate" ref="jndiTemplate" />
        <property name="jndiName" value="ConnectionFactory" />
    </bean>

我是怎么知道连接工厂的value="ConnectionFactory"的呢?由于我大部分采用的是Oozie的默认配置,根据Oozie官网提供的一个示例程序,调试出的Oozie使用的连接工厂的。

 

//获得Oozie中关于JMS的相关配置信息,如Transport Connectors
        OozieClient oc = new OozieClient("http://192.168.121.35:11000/oozie");
        JMSConnectionInfo jmsInfo = oc.getJMSConnectionInfo();

        Properties jndiProperties = jmsInfo.getJNDIProperties();
        Context jndiContext = new InitialContext(jndiProperties);

这段代码建立到ActiveMQ的连接上下文,调试上述代码可以看到下面的一些信息:

{java.naming.provider.url=tcp://192.168.121.35:61616, 
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory, 
connectionFactoryNames=ConnectionFactory}

 

配置Topic

    <bean id="notifyTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="cdhfive"></constructor-arg>
    </bean>

Topic就是Destination啊。由于从oozie-default.xml中得到生产者的Topic为 ${username},而我们这里的用户名为cdhfive ,故Topic的配置如上。

 

配置监听器

复制代码
<bean id="jmsContainer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="topicConnectionFactory"></property>
        <property name="destinationResolver" ref="destinationResolver"></property>
        <property name="concurrentConsumers" value="1"></property>
        <property name="destination" ref="notifyTopic"></property>
        <property name="messageListener" ref="messageListener"></property>
    </bean>
复制代码

concurrentConsumers,表示消费者的数量。由于使用的是Pub/Sub模型,每个Consumer都会收到同样的消息。

destination,就是Topic的地址。

messageListener,就是监听器的实现bean,该bean 实现了 javax.jms.MessageListener接口

    <bean id="messageListener" class="com.schedule.tools.message.SimpleJMSReceiver" />

 

配置Spring 订阅者收到消息后,自动向ActiveMQ返回确认模式:一个有三种:①AUTO_ACKNOWLEDGE;②CLIENT_ACKNOWLEDGE;③DUPS_OK_ACKNOWLEDGE

设置DefaultMessageListenerContainer类的sessionAcknowledgeMode属性来配置确认模式。关于这三种确认模式在何时进行确认呢?

复制代码
AUTO_ACKNOWLEDGE
Automatic message acknowledgment before listener execution; no redelivery in case of exception thrown.

CLIENT_ACKNOWLEDGE
Automatic message acknowledgment after successful listener execution; no redelivery in case of exception thrown.

DUPS_OK_ACKNOWLEDGE
Lazy message acknowledgment during or after listener execution; potential redelivery in case of exception thrown.
复制代码

可以看出:AUTO_ACKNOWLEDGE是在 onMessage方法调用之前,Spring就已经给ActiveMQ确认消息,并且若在onMessage方法中抛出异常了,消息不会重发。

CLIENT_ACKNOWLEDGE是在onMessage方法成功执行之后,Spring才向ActiveMQ确认消息,若在onMessage方法中抛出异常了,消息不会重发。

DUPS_OK_ACKNOWLEDGE是在onMessage方法成功执行之后,Spring才向ActiveMQ确认消息(会有延迟确认),若在onMessage方法中抛出异常了,消息可能会重发(potential redelivery)。

 

至此,大部分的配置已经完成了。

 

四,实现监听器MessageListener接口,接收消息

当有消息推送给订阅者时,javax.jms.MessageListener接口的onMessage()方法被自动调用,就可以在该方法中处理收到的消息了。

复制代码
@Override
    public void onMessage(Message message) {
        String parentJobId = null;
        String jobId = null;
        String errorMessage = null;
        String status = null;
        Date startTime = null;
        Date endTime = null;
        long runTime = -1;//-1 means job run error
        
        try {
            // 普通用户作业和解释作业
            
            if (message.getStringProperty(JMSHeaderConstants.APP_TYPE).equals(
                    AppType.WORKFLOW_JOB.name())) {
                
                WorkflowJobMessage wfJobMessage = JMSMessagingUtils
                        .getEventMessage(message);

                // 是普通作业
                jobId = wfJobMessage.getId();
                errorMessage = wfJobMessage.getErrorMessage();
                status = wfJobMessage.getStatus().toString();
                startTime = wfJobMessage.getStartTime();
                endTime = wfJobMessage.getEndTime();
                
                
                if(endTime != null){
                    runTime = endTime.getTime() - startTime.getTime();
                    System.out.println(jobId + "执行了:" + (endTime.getTime()-startTime.getTime())/1000 + "s");                    
                }

//other code.....

 

五,参考资料

《JAVA消息服务》电子工业出版社

https://oozie.apache.org/docs/4.0.0/DG_JMSNotifications.html

复制代码本文转自hapjin博客园博客,原文链接:http://www.cnblogs.com/hapjin/p/5486284.html,如需转载请自行联系原作者


版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
AsyncHttpClient放在子线程执行时抛出异常的解决方法
AsyncHttpClient放在子线程执行时抛出异常的解决方法
51 0
在与SQL Server建立连接时出现与网络相关的或特定于实例的错误
        向往前一样,学习牛腩新闻发布系统的视频,敲代码,打开数据库,出现一个框框,详细内容如下:                 数据库连接不上,所有的工作都要歇班,捣鼓了会儿,简单总结一下解决该问题的方法。
1236 0
[ActiveMQ]发送消息与接收消息测试Demo
ubuntu下: 安装并启动ActiveMQ 1.官网http://activemq.apache.org/下载最新版本的ActiveMQ,并解压; 2.
1350 0
js之接收函数返回的多个结果
在下面的代码中,我们从/post中获取一个帖子,然后在/comments中获取相关评论。由于我们使用的是async/await,函数把返回值放在一个数组中。而我们使用数组解构后就可以把返回值直接赋给相应的变量
4936 0
MSSQL SERVER 2008 R2 无法连接到数据库,用户sa登录失败,错误:18456
原因:勾选了强制实施密码策略,但是设置的密码很简单依然可以,比如:123456 这是为什么?原来,这个功能要用到NetValidatePasswordPolicy() API这个函数。 (该功能只有在安全要求较高的时候才用) 解决方法:在运行里输入 gpedit.msc 打开 “本地策略”编辑器依次 展开“计算机配置” “Windows设置”-“安全设置”-“帐户策略”-“密码策略",“密码必须符合复杂性要求”应该是禁用状态, 改为已启用,之后再创建SQL Server用户即可。
1489 0
JSP第四篇【EL表达式介绍、获取各类数据、11个内置对象、执行运算、回显数据、自定义函数、fn方法库】
什么是EL表达式? 表达式语言(Expression Language,EL),EL表达式是用"${}"括起来的脚本,用来更方便的读取对象! EL表达式主要用来读取数据,进行内容的显示! 为什么要使用EL表达式? 为什么要使用EL表达式,我们先来看一下没有EL表达式是怎么样读取对象数据的吧! 在1.
1147 0
Asp.net Mvc Framework 七 (Filter及其执行顺序)
应用于Action的Filter 在Asp.netMvc中当你有以下及类似以下需求时你可以使用Filter功能判断登录与否或用户权限,决策输出缓存,防盗链,防蜘蛛,本地化设置,实现动态Action filter是一种声明式编程方式,在Asp.
694 0
+关注
2969
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
《2021云上架构与运维峰会演讲合集》
立即下载
《零基础CSS入门教程》
立即下载
《零基础HTML入门教程》
立即下载