基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源

04:数据源

  • 目标了解数据源的格式及实现模拟数据的生成
  • 路径
  • step1:数据格式
  • step2:数据生成
  • 实施
  • 数据格式
消息时间 发件人昵称 发件人账号 发件人性别 发件人IP 发件人系统 发件人手机型号 发件人网络制式 发件人GPS 收件人昵称 收件人IP 收件人账号 收件人系统 收件人手机型号 收件人网络制式 收件人GPS 收件人性别 消息类型 双方距离 消息
msg_time sender_nickyname sender_account sender_sex sender_ip sender_os sender_phone_type sender_network sender_gps receiver_nickyname receiver_ip receiver_account receiver_os receiver_phone_type receiver_network receiver_gps receiver_sex msg_type distance message
2020/05/08 15:11:33 古博易 14747877194 48.147.134.255 Android 8.0 小米 Redmi K30 4G 94.704577,36.247553 莱优 97.61.25.52 17832829395 IOS 10.0 Apple iPhone 10 4G 84.034145,41.423804 TEXT 77.82KM 天涯海角惆怅渡,牛郎织女隔天河。佛祖座前长顿首,只求共度一百年。

  • 数据生成
  • 创建原始文件目录
mkdir /export/data/momo_init
cd /export/data/momo_init
rz
  • 创建模拟数据目录
mkdir /export/data/momo_data
  • 运行程序生成数据
  • 语法
java -jar /export/data/momo_init/MoMo_DataGen.jar 原始数据路径 模拟数据路径 随机产生数据间隔ms时间
  • 测试:每500ms生成一条数据
java -jar /export/data/momo_init/MoMo_DataGen.jar \
/export/data/momo_init/MoMo_Data.xlsx \
/export/data/momo_data/ \
500
  • 结果:生成模拟数据文件MOMO_DATA.dat,并且每条数据中字段分隔符为\001

  • 小结
  • 了解数据源的格式及实现模拟数据的生成

05:技术架构及技术选型

  • 目标掌握实时案例的技术架构及技术选型
  • 路径
  • step1:需求分析
  • step2:技术选型
  • step3:技术架构
  • 实施
  • 需求分析
  • 离线存储计算
  • 提供离线T + 1的统计分析
  • 提供离线数据的即时查询
  • 实时存储计算
  • 提供实时统计分析
  • 技术选型
  • 离线
  • 数据采集:Flume
  • 离线存储:Hbase
  • 离线分析:Hive:复杂计算
  • 即时查询:Phoenix:高效查询
  • 实时
  • 数据采集:Flume
  • 实时存储:Kafka
  • 实时计算:Flink
  • 实时应用:MySQL + FineBI 或者 Redis + JavaWeb可视化
  • 技术架构
  • 为什么不直接将Flume的数据给Hbase,而统一的给了Kafka,再由Kafka到Hbase?
  • 避免高并发写导致机器负载过高、实现架构解耦、实现异步高效
  • 保证数据一致性
  • 小结
  • 掌握实时案例的技术架构及技术选型

06:Flume的回顾及安装

  • 目标回顾Flume基本使用及实现Flume的安装测试
  • 路径
  • step1:Flume回顾
  • step2:Flume的安装
  • step3:Flume的测试
  • 实施
  • Flume的回顾
  • 功能:实时对文件或者网络端口进行数据流监听采集
  • 场景:文件实时采集
  • 开发
  • step1:先开发一个配置文件:properties【K=V】
  • step2:运行这个文件即可
  • 组成
  • Agent:一个Agent就是一个Flume程序
  • Source:负责监听数据源,将数据源的动态数据变成每一条Event数据,将Event数据流放入Channel
  • Channel:负责临时存储Source发送过来的数据,供Sink来取数据
  • Sink:负责从Channel拉取数据写入目标地
  • Event:代表一条数据对象
  • head:Map集合[KV]
  • body:byte[]
  • Flume的安装
  • 上传安装包
cd /export/software/
rz
  • 解压安装
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /export/server/
cd /export/server
mv apache-flume-1.9.0-bin flume-1.9.0-bin
  • 修改配置
#集成HDFS,拷贝HDFS配置文件
cd /export/server/flume-1.9.0-bin
cp /export/server/hadoop/etc/hadoop/core-site.xml  ./conf/
#修改Flume环境变量
cd /export/server/flume-1.9.0-bin/conf/
mv flume-env.sh.template flume-env.sh
vim flume-env.sh 
#修改22行
export JAVA_HOME=/export/server/jdk1.8.0_65
#修改34行
export HADOOP_HOME=/export/server/hadoop-3.3.0
  • 删除Flume自带的guava包,替换成Hadoop的
cd /export/server/flume-1.9.0-bin 
rm -rf lib/guava-11.0.2.jar
cp /export/server/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar lib/
  • 创建目录
cd /export/server/flume-1.9.0-bin
#程序配置文件存储目录
mkdir usercase
#Taildir元数据存储目录
mkdir position
  • Flume的测试
  • 需求:采集聊天数据,写入HDFS
  • 分析
  • Source:taildir:动态监听多个文件实现实时数据采集
  • Channel:mem:将数据缓存在内存
  • Sink:hdfs
  • 开发
vim /export/server/flume-1.9.0-bin/usercase/momo_mem_hdfs.properties
# define a1
a1.sources = s1 
a1.channels = c1
a1.sinks = k1
#define s1
a1.sources.s1.type = TAILDIR
#指定一个元数据记录文件
a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_hdfs.json
#将所有需要监控的数据源变成一个组
a1.sources.s1.filegroups = f1
#指定了f1是谁:监控目录下所有文件
a1.sources.s1.filegroups.f1 = /export/data/momo_data/.*
#指定f1采集到的数据的header中包含一个KV对
a1.sources.s1.headers.f1.type = momo
a1.sources.s1.fileHeader = true
#define c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
#define k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/momo/test/daystr=%Y-%m-%d
a1.sinks.k1.hdfs.fileType = DataStream
#指定按照时间生成文件,一般关闭
a1.sinks.k1.hdfs.rollInterval = 0
#指定文件大小生成文件,一般120 ~ 125M对应的字节数
a1.sinks.k1.hdfs.rollSize = 102400
#指定event个数生成文件,一般关闭
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.filePrefix = momo
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#bound
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
  • 启动HDFS
start-dfs.sh
  • 运行Flume
cd /export/server/flume-1.9.0-bin
bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_hdfs.properties -Dflume.root.logger=INFO,console
  • 运行模拟数据
java -jar /export/data/momo_init/MoMo_DataGen.jar \
/export/data/momo_init/MoMo_Data.xlsx \
/export/data/momo_data/ \
100
  • 查看结果

  • 小结
  • 回顾Flume基本使用及实现Flume的安装测试

07:Flume采集程序开发

  • 目标实现案例Flume采集程序的开发
  • 路径
  • step1:需求分析
  • step2:程序开发
  • step3:测试实现
  • 实施
  • 需求分析
  • 需求:采集聊天数据,实时写入Kafka
  • Source:taildir
  • Channel:mem
  • Sink:Kafka sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
  • 程序开发
vim /export/server/flume-1.9.0-bin/usercase/momo_mem_kafka.properties
# define a1
a1.sources = s1 
a1.channels = c1
a1.sinks = k1
#define s1
a1.sources.s1.type = TAILDIR
#指定一个元数据记录文件
a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_kafka.json
#将所有需要监控的数据源变成一个组
a1.sources.s1.filegroups = f1
#指定了f1是谁:监控目录下所有文件
a1.sources.s1.filegroups.f1 = /export/data/momo_data/.*
#指定f1采集到的数据的header中包含一个KV对
a1.sources.s1.headers.f1.type = momo
a1.sources.s1.fileHeader = true
#define c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
#define k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = MOMO_MSG
a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092
a1.sinks.k1.kafka.flumeBatchSize = 10
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 100
#bound
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
  • 测试实现
  • 启动Kafka
start-zk-all.sh
start-kafka.sh 
  • 创建Topic
kafka-topics.sh --create --topic MOMO_MSG  --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092

注意:Kafka2.11版本用–zookeeper 替代

kafka-topics.sh --create --topic MOMO_MSG --partitions 3 --replication-factor 2 --zookeeper node01:9092

  • 列举
kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092
  • 启动消费者
kafka-console-consumer.sh --topic MOMO_MSG --bootstrap-server node1:9092,node2:9092,node3:9092
  • 启动Flume程序
cd /export/server/flume-1.9.0-bin
bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
  • 启动模拟数据
java -jar /export/data/momo_init/MoMo_DataGen.jar \
/export/data/momo_init/MoMo_Data.xlsx \
/export/data/momo_data/ \
50
  • 观察结果

  • 小结
  • 实现案例Flume采集程序的开发


目录
相关文章
|
2月前
|
SQL 消息中间件 关系型数据库
Flink数据源问题之读取mysql报错如何解决
Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。
51 0
|
2月前
|
消息中间件 关系型数据库 MySQL
Flink数据源问题之转换异常如何解决
Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。
33 2
|
2月前
|
消息中间件 SQL Kafka
Flink数据源问题之定时扫描key如何解决
Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。
40 0
|
2月前
|
存储 Oracle 关系型数据库
Flink CDC 数据源问题之连接释放冲突如何解决
Flink CDC数据源指的是使用Apache Flink的CDC特性来连接并捕获外部数据库变更数据的数据源;本合集将介绍如何配置和管理Flink CDC数据源,以及解决数据源连接和同步过程中遇到的问题。
69 0
|
2月前
|
消息中间件 关系型数据库 MySQL
Flink CDC 数据源问题之数据变动如何解决
Flink CDC数据源指的是使用Apache Flink的CDC特性来连接并捕获外部数据库变更数据的数据源;本合集将介绍如何配置和管理Flink CDC数据源,以及解决数据源连接和同步过程中遇到的问题。
33 1
|
2月前
|
SQL Java 数据库连接
Flink CDC 数据源问题之数据源连接如何解决
Flink CDC数据源指的是使用Apache Flink的CDC特性来连接并捕获外部数据库变更数据的数据源;本合集将介绍如何配置和管理Flink CDC数据源,以及解决数据源连接和同步过程中遇到的问题。
41 0
|
2月前
|
Oracle NoSQL 关系型数据库
Flink CDC 数据源问题之定时扫描key如何解决
Flink CDC数据源指的是使用Apache Flink的CDC特性来连接并捕获外部数据库变更数据的数据源;本合集将介绍如何配置和管理Flink CDC数据源,以及解决数据源连接和同步过程中遇到的问题。
25 0
|
1月前
|
API 数据库 流计算
有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
【2月更文挑战第27天】有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
52 3
|
2月前
|
SQL 机器学习/深度学习 HIVE
Flink数据源问题之无法写入数据如何解决
Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。
35 2
|
2月前
|
SQL 关系型数据库 流计算
Flink数据源问题之脏数据如何解决
Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。
41 2

热门文章

最新文章