如何在E-MapReduce上提交Storm作业处理Kafka数据

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 本文演示如何在E-MapReduce上部署Storm集群和Kafka集群,并运行Storm作业消费Kafka数据。

0. 序言

本文演示如何在E-MapReduce上部署Storm集群和Kafka集群,并运行Storm作业消费Kafka数据。

1. 准备环境

这里我选择在杭州Region进行测试,版本选择EMR-3.8.0,本次测试需要的组件版本有:

  • Kafka:2.11_1.0.0
  • Storm: 1.0.1

E-MapReduce的集群管理界面地址:https://emr.console.aliyun.com/console#/cn-hangzhou/

1.1 创建Hadoop集群

由于Zookeeper和Storm组件默认不是必选的,所以在创建集群时需要记得勾选上,如下:
image

详细创建集群步骤,请参考E-MapReduce-用户指南-集群一节。

1.2 创建Kafka集群

接着创建Kafka集群,集群类型选择Kafka,如下:
image

注意:

  • 如果使用经典网络,请注意将Hadoop集群和Kafka集群放置在同一个安全组下面,这样可以省去配置安全组,避免网络不通的问题。
  • 如果使用VPC网络,请注意将Hadoop集群和Kafka集群放置在同一个VPC/VSwitch以及安全组下面,这样同样省去配置网路和安全组,避免网络不通。
  • 如果你熟悉ECS的网络和安全组,可以按需配置。

1.3 配置Storm环境

如果我们想在Storm上运行作业消费Kafka的话,集群初始环境下是会失败的,因为Storm运行环境缺少了不少必须的依赖包,如下:

以上版本依赖包经过测试可用,如果你再测试过程中引入了其他依赖,也一同添加在Storm lib中,如下操作:

image

上述操作需要在Kafka集群的每台机器执行一遍。执行完在E-MapReduce控制台重启Storm服务,如下:
image

查看操作历史,待Storm重启完毕:
image

2. 开发Storm+Kafka作业

E-MapReduce已经提供了现成的示例代码,直接使用即可,地址如下:

我们一般只要使用e-mapreduce-demo即可:

1. git clone git@github.com:aliyun/aliyun-emapreduce-demo.git
2. cd aliyun-emapreduce-demo
3. mvn clean package
4. 作业jar包在target/shaded/下面

3. Topic数据准备

登录到Kafka集群,按照如下步骤创建topic并准备一些数据:

1. 创建一个test topic,分区数10,副本数2
/usr/lib/kafka-current/bin/kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper emr-header-1:/kafka-1.0.0 --topic test --create
2. 向test topic写入100条数据
/usr/lib/kafka-current/bin/kafka-producer-perf-test.sh --num-records 100 --throughput 10000 --record-size 1024 --producer-props bootstrap.servers=emr-worker-1:9092 --topic test

以上命令在kafka集群的emr-header-1节点执行,当然也可以客户端机器上执行。

4. 运行Storm作业

登录到Hadoop集群,将第二步中编译得到的examples-1.1-shaded.jar拷贝到集群emr-header-1上,这里我放在root根目录下面。提交作业:

/usr/lib/storm-current/bin/storm jar examples-1.1-shaded.jar com.aliyun.emr.example.storm.StormKafkaSample test aaa.bbb.ccc.ddd hdfs://emr-header-1:9000 sample

5. 查看作业运行

5.1 查看Storm运行状态

查看集群上服务的WebUI有2种方式:

我这里选择了SSH隧道方式,访问地址:http://localhost:9999/index.html 。可以看到我们刚刚提交的Topology。点进去可以看到执行详情:
image

5.1 查看HDFS输出

[root@emr-header-1 ~]# hadoop fs -ls /foo/
-rw-r--r--   3 root hadoop     615000 2018-02-11 13:37 /foo/bolt-2-0-1518327393692.txt
-rw-r--r--   3 root hadoop     205000 2018-02-11 13:37 /foo/bolt-2-0-1518327441777.txt
[root@emr-header-1 ~]# hadoop fs -cat /foo/bolt-2-0-1518327441777.txt | wc -l
200

再向kafka写120条数据

[root@emr-header-1 ~]# /usr/lib/kafka-current/bin/kafka-producer-perf-test.sh --num-records 120 --throughput 10000 --record-size 1024 --producer-props bootstrap.servers=emr-worker-1:9092 --topic test
120 records sent, 816.326531 records/sec (0.80 MB/sec), 35.37 ms avg latency, 134.00 ms max latency, 35 ms 50th, 39 ms 95th, 41 ms 99th, 134 ms 99.9th.

查看HDFS文件输出

[root@emr-header-1 ~]# hadoop fs -cat /foo/bolt-2-0-1518327441777.txt | wc -l
320

6. 小结

至此,我们成功实现了在E-MapReduce上部署一套Storm集群和一套Kafka集群,并运行Storm作业消费Kafka数据。当然,E-MapReduce也支持Spark Streaming和Flink组件,同样可以方便在Hadoop集群上运行,处理Kafka数据。

有一点需要注意:由于E-MapReduce没有单独的Storm集群类别,所以我们是创建的Hadoop集群,并安装了Storm组件。如果你在使用过程中用不到其他组件,可以很方便地在E-MapReduce管理控制台将那些组件停掉。这样,可以将Hadoop集群作为一个纯粹的Storm集群使用。

目录
相关文章
|
数据采集 分布式计算 搜索推荐
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
|
数据采集 缓存 分布式计算
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)
|
消息中间件 存储 分布式计算
Hadoop生态系统中的实时数据处理技术:Apache Kafka和Apache Storm的应用
Hadoop生态系统中的实时数据处理技术:Apache Kafka和Apache Storm的应用
|
SQL 存储 分布式计算
HADOOP MapReduce 处理 Spark 抽取的 Hive 数据【解决方案一】
开端: 今天咱先说问题,经过几天测试题的练习,我们有从某题库中找到了新题型,并且成功把我们干趴下,昨天今天就干了一件事,站起来。 沙问题? java mapeduce 清洗 hive 中的数据 ,清晰之后将driver代码 进行截图提交。
332 0
HADOOP MapReduce 处理 Spark 抽取的 Hive 数据【解决方案一】
|
存储 分布式计算 资源调度
MapReduce框架--InputFormat数据输入--切片优化(11)
MapReduce框架--InputFormat数据输入--切片优化(11)
298 0
MapReduce框架--InputFormat数据输入--切片优化(11)
|
分布式计算 资源调度 Java
Hadoop中的MapReduce概述、优缺点、核心思想、编程规范、进程、官方WordCount源码、提交到集群测试、常用数据序列化类型、WordCount案例实操
Hadoop中的MapReduce概述、优缺点、核心思想、编程规范、进程、官方WordCount源码、提交到集群测试、常用数据序列化类型、WordCount案例实操
Hadoop中的MapReduce概述、优缺点、核心思想、编程规范、进程、官方WordCount源码、提交到集群测试、常用数据序列化类型、WordCount案例实操
|
消息中间件 大数据 Kafka
Flume+Kafka+Storm实战:二、Flume与Kafka整合
Flume+Kafka+Storm实战:二、Flume与Kafka整合
175 0
Flume+Kafka+Storm实战:二、Flume与Kafka整合
|
消息中间件 Kafka 流计算
Flume+Kafka+Storm实战:一、Kakfa与Storm整合(下)
Flume+Kafka+Storm实战:一、Kakfa与Storm整合(下)
185 0
Flume+Kafka+Storm实战:一、Kakfa与Storm整合(下)
|
消息中间件 Java Kafka
Flume+Kafka+Storm实战:一、Kakfa与Storm整合(上)
Flume+Kafka+Storm实战:一、Kakfa与Storm整合(上)
152 0
Flume+Kafka+Storm实战:一、Kakfa与Storm整合(上)
|
SQL JSON druid
【Druid】(九)E-MapReduce Druid 集群集成 Superset(数据探查与可视化平台 )2
【Druid】(九)E-MapReduce Druid 集群集成 Superset(数据探查与可视化平台 )2
189 0
【Druid】(九)E-MapReduce Druid 集群集成 Superset(数据探查与可视化平台 )2