Storm Topology 提交 总结---Kettle On Storm 实现

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介:

一,目的

在学习的过程中,需要用到 PDI ---一个开源的ETL软件。主要是用它来设计一些转换流程来处理数据。但是,在PDI中设计好的 transformation 是在本地的执行引擎中执行的,(参考源码中的 Trans.java ),现可以对DI加以改造:在DI中设计的转换,将之转换成Storm的Topology,然后再把该Topology提交到Storm集群中执行。这样,既可以利用DI强大的设计能力(因为在DI中可以设计各种各样的转换流程,这些用DI设计出来的 transformation流程是前人已经实现好的数据处理功能,把该 transformation 转换成 Storm Topology 可以避免自己编写实现数据处理功能的Storm Topology 代码);又可以利用Storm的分布式实时流处理数据的能力。

在Pentaho Lab 官网上有一个相应的开源项目,但是貌似已经不再更新了。参考: kettle-storm 的 github 以及 相关介绍

 

二,实现概述

①Storm 端的运行流程:

 

不管在DI中设计了何种转换,将转换 转化成 Storm 的Topology时,都只得到两种类型的Topology,即输入拓扑和处理拓扑。提交Topology的代码如下:

1 StormSubmitter.submitTopology(basictopo.getName(), config, basictopo.getInputTopology());//commit inputtopology
2 StormSubmitter.submitTopology(basictopo.getName(), config, basictopo.getProcessTopology());//commit processtopology

 

②PDI中的转换设计

一个典型的转换如下图:图片来源

 

 

设计好一个这样的转换流程图后,在运行该转换前,需要将设计好的转换保存成一个 .ktr 文件,PDI从该.ktr 文件中解析出 transMeta 对象,(transMeta对象 代表运行过程中的 某个转换)。transMeta对象记录了整个转换过程中需要的所有信息。而我们的目标就是将上图中的设计的transformation 自动 变成 Storm的Topology。从而,可以在Storm中执行如上图所示的 transformation了。

 

③transformation 到 Topology 的变换

由上图设计的转换可以看出,它是一个有向无环图。而Storm的Topology也是一个DAG图,因此可以通过算法将 transformation 变成 Topology。

 

三,细节及遇到的问题

transMeta 对象不能直接序列化,如何在Storm集群中获得transMeta对象,然后根据该对象构造Topology?

 主要用 .ktr 文件 来生成 TransMeta对象。当在本地运行时,TransMeta对象代表整个转换的执行,但当提交到集群中时,本地执行过程中的transMeta对象不可能直接提交到集群中去的。需要将之序列化,再发送到集群中。而,每个 .ktr文件就代表了一个转换,因此可以直接将 .ktr 文件打包上传到集群中,然后再解析jar包中的 .ktr 文件来生成 构成 InputSpout 和 Bolt 所需的transMeta 对象。关于序列化知识,可参考

InputSpout 解析transMeta对象:

URL fileURL = this.getClass().getResource("/test-random.ktr");
ktrFile = fileURL.toString();
transMeta = new TransMeta(ktrFile, (Repository) null);

 

TransactionalBolt解析transMeta对象:

复制代码
InputStream inputStream = this.getClass().getResourceAsStream("/test-random.ktr");
        if(transMeta == null )
        {
            try{
                KettleEnvironment.init();
                transMeta = new TransMeta(inputStream, (Repository)null, true, null, null);
            }catch(Exception e){
                e.printStackTrace();
                e.getMessage();
            }
        }
复制代码

 遇到了一个奇怪的问题:即InputSpout中解析transMeta对象是在本地完成的,也即在Topology的main函数所在的类执行时完成的。而TransactionalBolt.java中解析transMeta对象是在将Topology提交到集群上之后,在集群上执行时去读取 .ktr 文件并解析transMeta对象。

因此,InputSpout.java中读取 .ktr 文件解析transMeta对象时,是按照文件流的形式读取的。而在把Topology提交到集群后, .ktr 文件是随着Topology的jar包一起打包提交的,因此读取jar包中的 .ktr文件。 

 

Storm执行Topology时需要依赖DI中的jar包,依赖包问题如何解决?

将DI的执行引擎换成了Storm执行引擎,但不管是什么执行引擎,执行的代码还是会依赖PDI的lib目录下的一些依赖包。比如,PDI连接数据库时需要一个 mysql-connector-java-5.1.26.jar ,当把从 transformation中生成的Topology提交到Storm集群时,若transformation中有访问数据库的操作且storm 安装目录下的 lib 目录下没有相应的jar包时,Topology会执行失败。因此需要将mysql-connector-java-5.1.26.jar 复制到 storm 安装目录下的 lib 目录中。同理,其他的一些依赖jar包都需要复制到 storm 安装目录下的 lib 目录中。依赖的包如下:

复制代码
metrics-core-2.2.0.jar
scala-library-2.11.5.jar
kafka_2.11-0.8.2.0.jar
esapi-2.0.1.jar
source.jar
etl-core-5.0.jar
kafka-clients-0.8.2.0.jar
mysql-connector-java-5.1.26.jar
zookeeper-3.4.6.jar
commons-vfs-20100924-pentaho.jar
复制代码

 

Topology的构造需要依赖transMeta对象,进而需要依赖DI源码中的 .class 文件,如何正确打包 storm.jar 并自动提交(不是通过命令行 运行 storm jar 进行提交)?

命令行的storm jar 命令是Python 脚本实现的,它完成的功能主要是设置环境变量(将jar文件的路径设置成 "storm.jar" 环境变量STORM_JAR)并打包成jar文件,该部分功能可以在主类中实现,从而不需要在命令行运行 storm jar 命令进行Topology的提交。

在具体的学习中,直接把整个工程的 bin 目录下的所有编译好的文件都打进jar包,这样可以解决Topology运行时依赖DI中的代码而报的 java.lang.NoClassDefFoundError 错误。但是,这样做使得需要提交的jar包非常的大!

四,缺点

由于PDI中转换中的许多插件都不是针对分布式环境而设计的,因此,在PDI中设计的部分 transformation 是不能正常运行在 Storm之上的,可参考Pentaho Lab中关于Kettle On Storm的描述。

其次,PDI中已有的一些插件不能满足大数据集群的需求,需要重新开发新的PDI插件。

 本文转自hapjin博客园博客,原文链接:http://www.cnblogs.com/hapjin/,如需转载请自行联系原作者

相关文章
|
资源调度 Java 流计算
flink yarn-per-job提交报错
flink yarn-per-job提交这个是什么问题吗,主机端口都是正常的,就报了拒绝连接?image.png 一提交到yarn就报这个错,然后失败,提交命令 flink run -t yarn-per-job -c app.dwm.UVFilterDetail /root/gmall_flink/flink_app/gmall-start-try-self-do-1.0-SNAPSHOT-jar-with-dependencies.jar,yarn-session提交是正常的,就per-job有问题?
96 1
|
6月前
|
资源调度 关系型数据库 数据库
实时计算 Flink版产品使用合集之flink-cdc.sh xx.yaml提交到yarn 发现没有启动task manager的,怎么处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
SQL 资源调度 数据处理
实时计算 Flink版产品使用问题之-s参数在yarn-session.sh命令中是否有效
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
资源调度 分布式计算 Hadoop
实时计算 Flink版操作报错合集之perjob提交给yarn,报错显示无法连接yarn- Connecting to ResourceManager,是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
资源调度 分布式计算 Oracle
实时计算 Flink版操作报错合集之flink on yarn job manager 可以启动, 但不给分配slot ,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
175 0
|
6月前
|
存储 监控 调度
【Flink】怎么提交的实时任务,有多少Job Manager?
【4月更文挑战第18天】【Flink】怎么提交的实时任务,有多少Job Manager?
|
6月前
|
存储 Java Linux
Storm详细配置
Storm详细配置
83 0
|
存储 监控 安全
storm笔记:storm集群
Strom集群结构是有一个主节点(nimbus)和多个工作节点(supervisor)组成的主从结构,主节点通过配置静态指定(还有一种主从结构是在运行时动态选举,比如zookeeper)。通常这种主从结构存在出现单点故障的风险,Storm通过特殊处理规避这种风险,后面将解释Storm的半容错结构。
424 0
storm笔记:storm集群
|
消息中间件 大数据 测试技术
如何在E-MapReduce上提交Storm作业处理Kafka数据
本文演示如何在E-MapReduce上部署Storm集群和Kafka集群,并运行Storm作业消费Kafka数据。
2816 0
|
消息中间件 分布式计算 Hadoop
使用E-MapReduce提交Storm作业处理Kafka数据
本文演示如何在E-MapReduce上部署Storm集群和Kafka集群,并运行Storm作业消费Kafka数据。 环境准备 本文选择在杭州Region进行测试,版本选择EMR-3.
2186 0