基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源

04:数据源

  • 目标了解数据源的格式及实现模拟数据的生成
  • 路径
  • step1:数据格式
  • step2:数据生成
  • 实施
  • 数据格式
消息时间 发件人昵称 发件人账号 发件人性别 发件人IP 发件人系统 发件人手机型号 发件人网络制式 发件人GPS 收件人昵称 收件人IP 收件人账号 收件人系统 收件人手机型号 收件人网络制式 收件人GPS 收件人性别 消息类型 双方距离 消息
msg_time sender_nickyname sender_account sender_sex sender_ip sender_os sender_phone_type sender_network sender_gps receiver_nickyname receiver_ip receiver_account receiver_os receiver_phone_type receiver_network receiver_gps receiver_sex msg_type distance message
2020/05/08 15:11:33 古博易 14747877194 48.147.134.255 Android 8.0 小米 Redmi K30 4G 94.704577,36.247553 莱优 97.61.25.52 17832829395 IOS 10.0 Apple iPhone 10 4G 84.034145,41.423804 TEXT 77.82KM 天涯海角惆怅渡,牛郎织女隔天河。佛祖座前长顿首,只求共度一百年。

  • 数据生成
  • 创建原始文件目录
mkdir /export/data/momo_init
cd /export/data/momo_init
rz
  • 创建模拟数据目录
mkdir /export/data/momo_data
  • 运行程序生成数据
  • 语法
java -jar /export/data/momo_init/MoMo_DataGen.jar 原始数据路径 模拟数据路径 随机产生数据间隔ms时间
  • 测试:每500ms生成一条数据
java -jar /export/data/momo_init/MoMo_DataGen.jar \
/export/data/momo_init/MoMo_Data.xlsx \
/export/data/momo_data/ \
500
  • 结果:生成模拟数据文件MOMO_DATA.dat,并且每条数据中字段分隔符为\001

  • 小结
  • 了解数据源的格式及实现模拟数据的生成

05:技术架构及技术选型

  • 目标掌握实时案例的技术架构及技术选型
  • 路径
  • step1:需求分析
  • step2:技术选型
  • step3:技术架构
  • 实施
  • 需求分析
  • 离线存储计算
  • 提供离线T + 1的统计分析
  • 提供离线数据的即时查询
  • 实时存储计算
  • 提供实时统计分析
  • 技术选型
  • 离线
  • 数据采集:Flume
  • 离线存储:Hbase
  • 离线分析:Hive:复杂计算
  • 即时查询:Phoenix:高效查询
  • 实时
  • 数据采集:Flume
  • 实时存储:Kafka
  • 实时计算:Flink
  • 实时应用:MySQL + FineBI 或者 Redis + JavaWeb可视化
  • 技术架构
  • 为什么不直接将Flume的数据给Hbase,而统一的给了Kafka,再由Kafka到Hbase?
  • 避免高并发写导致机器负载过高、实现架构解耦、实现异步高效
  • 保证数据一致性
  • 小结
  • 掌握实时案例的技术架构及技术选型

06:Flume的回顾及安装

  • 目标回顾Flume基本使用及实现Flume的安装测试
  • 路径
  • step1:Flume回顾
  • step2:Flume的安装
  • step3:Flume的测试
  • 实施
  • Flume的回顾
  • 功能:实时对文件或者网络端口进行数据流监听采集
  • 场景:文件实时采集
  • 开发
  • step1:先开发一个配置文件:properties【K=V】
  • step2:运行这个文件即可
  • 组成
  • Agent:一个Agent就是一个Flume程序
  • Source:负责监听数据源,将数据源的动态数据变成每一条Event数据,将Event数据流放入Channel
  • Channel:负责临时存储Source发送过来的数据,供Sink来取数据
  • Sink:负责从Channel拉取数据写入目标地
  • Event:代表一条数据对象
  • head:Map集合[KV]
  • body:byte[]
  • Flume的安装
  • 上传安装包
cd /export/software/
rz
  • 解压安装
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /export/server/
cd /export/server
mv apache-flume-1.9.0-bin flume-1.9.0-bin
  • 修改配置
#集成HDFS,拷贝HDFS配置文件
cd /export/server/flume-1.9.0-bin
cp /export/server/hadoop/etc/hadoop/core-site.xml  ./conf/
#修改Flume环境变量
cd /export/server/flume-1.9.0-bin/conf/
mv flume-env.sh.template flume-env.sh
vim flume-env.sh 
#修改22行
export JAVA_HOME=/export/server/jdk1.8.0_65
#修改34行
export HADOOP_HOME=/export/server/hadoop-3.3.0
  • 删除Flume自带的guava包,替换成Hadoop的
cd /export/server/flume-1.9.0-bin 
rm -rf lib/guava-11.0.2.jar
cp /export/server/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar lib/
  • 创建目录
cd /export/server/flume-1.9.0-bin
#程序配置文件存储目录
mkdir usercase
#Taildir元数据存储目录
mkdir position
  • Flume的测试
  • 需求:采集聊天数据,写入HDFS
  • 分析
  • Source:taildir:动态监听多个文件实现实时数据采集
  • Channel:mem:将数据缓存在内存
  • Sink:hdfs
  • 开发
vim /export/server/flume-1.9.0-bin/usercase/momo_mem_hdfs.properties
# define a1
a1.sources = s1 
a1.channels = c1
a1.sinks = k1
#define s1
a1.sources.s1.type = TAILDIR
#指定一个元数据记录文件
a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_hdfs.json
#将所有需要监控的数据源变成一个组
a1.sources.s1.filegroups = f1
#指定了f1是谁:监控目录下所有文件
a1.sources.s1.filegroups.f1 = /export/data/momo_data/.*
#指定f1采集到的数据的header中包含一个KV对
a1.sources.s1.headers.f1.type = momo
a1.sources.s1.fileHeader = true
#define c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
#define k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/momo/test/daystr=%Y-%m-%d
a1.sinks.k1.hdfs.fileType = DataStream
#指定按照时间生成文件,一般关闭
a1.sinks.k1.hdfs.rollInterval = 0
#指定文件大小生成文件,一般120 ~ 125M对应的字节数
a1.sinks.k1.hdfs.rollSize = 102400
#指定event个数生成文件,一般关闭
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.filePrefix = momo
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#bound
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
  • 启动HDFS
start-dfs.sh
  • 运行Flume
cd /export/server/flume-1.9.0-bin
bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_hdfs.properties -Dflume.root.logger=INFO,console
  • 运行模拟数据
java -jar /export/data/momo_init/MoMo_DataGen.jar \
/export/data/momo_init/MoMo_Data.xlsx \
/export/data/momo_data/ \
100
  • 查看结果

  • 小结
  • 回顾Flume基本使用及实现Flume的安装测试

07:Flume采集程序开发

  • 目标实现案例Flume采集程序的开发
  • 路径
  • step1:需求分析
  • step2:程序开发
  • step3:测试实现
  • 实施
  • 需求分析
  • 需求:采集聊天数据,实时写入Kafka
  • Source:taildir
  • Channel:mem
  • Sink:Kafka sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
  • 程序开发
vim /export/server/flume-1.9.0-bin/usercase/momo_mem_kafka.properties
# define a1
a1.sources = s1 
a1.channels = c1
a1.sinks = k1
#define s1
a1.sources.s1.type = TAILDIR
#指定一个元数据记录文件
a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_kafka.json
#将所有需要监控的数据源变成一个组
a1.sources.s1.filegroups = f1
#指定了f1是谁:监控目录下所有文件
a1.sources.s1.filegroups.f1 = /export/data/momo_data/.*
#指定f1采集到的数据的header中包含一个KV对
a1.sources.s1.headers.f1.type = momo
a1.sources.s1.fileHeader = true
#define c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
#define k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = MOMO_MSG
a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092
a1.sinks.k1.kafka.flumeBatchSize = 10
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 100
#bound
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
  • 测试实现
  • 启动Kafka
start-zk-all.sh
start-kafka.sh 
  • 创建Topic
kafka-topics.sh --create --topic MOMO_MSG  --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092

注意:Kafka2.11版本用–zookeeper 替代

kafka-topics.sh --create --topic MOMO_MSG --partitions 3 --replication-factor 2 --zookeeper node01:9092

  • 列举
kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092
  • 启动消费者
kafka-console-consumer.sh --topic MOMO_MSG --bootstrap-server node1:9092,node2:9092,node3:9092
  • 启动Flume程序
cd /export/server/flume-1.9.0-bin
bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
  • 启动模拟数据
java -jar /export/data/momo_init/MoMo_DataGen.jar \
/export/data/momo_init/MoMo_Data.xlsx \
/export/data/momo_data/ \
50
  • 观察结果

  • 小结
  • 实现案例Flume采集程序的开发


目录
相关文章
|
2月前
|
消息中间件 前端开发 Kafka
【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls
【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls
|
2月前
|
消息中间件 监控 Java
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
|
2月前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
城市大脑 流计算 计算机视觉
Flink+HBase场景化解决方案
在中国HBase技术社区第十届Meetup杭州站上,阿里巴巴高级产品工程师高旸为大家分享了实时计算技术相关的发展背景,并介绍了基于Flink+HBase的实时计算场景化解决方案,并对于在线教育、城市大脑、实时风控等典型的实时计算方案应用场景进行了介绍。
4453 0
|
11天前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
3月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
731 7
阿里云实时计算Flink在多行业的应用和实践
|
2月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
13天前
|
存储 运维 监控
阿里云实时计算Flink版的评测
阿里云实时计算Flink版的评测
44 15
|
12天前
|
运维 分布式计算 监控
评测报告:阿里云实时计算Flink版
本评测主要针对阿里云实时计算Flink版在用户行为分析中的应用。作为一名数据分析师,我利用该服务处理了大量日志数据,包括用户点击流和登录行为。Flink的强大实时处理能力让我能够迅速洞察用户行为变化,及时调整营销策略。此外,其卓越的性能和稳定性显著降低了运维负担,提升了项目效率。产品文档详尽且易于理解,但建议增加故障排查示例。
下一篇
无影云桌面