基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源

04:数据源

  • 目标了解数据源的格式及实现模拟数据的生成
  • 路径
  • step1:数据格式
  • step2:数据生成
  • 实施
  • 数据格式
消息时间 发件人昵称 发件人账号 发件人性别 发件人IP 发件人系统 发件人手机型号 发件人网络制式 发件人GPS 收件人昵称 收件人IP 收件人账号 收件人系统 收件人手机型号 收件人网络制式 收件人GPS 收件人性别 消息类型 双方距离 消息
msg_time sender_nickyname sender_account sender_sex sender_ip sender_os sender_phone_type sender_network sender_gps receiver_nickyname receiver_ip receiver_account receiver_os receiver_phone_type receiver_network receiver_gps receiver_sex msg_type distance message
2020/05/08 15:11:33 古博易 14747877194 48.147.134.255 Android 8.0 小米 Redmi K30 4G 94.704577,36.247553 莱优 97.61.25.52 17832829395 IOS 10.0 Apple iPhone 10 4G 84.034145,41.423804 TEXT 77.82KM 天涯海角惆怅渡,牛郎织女隔天河。佛祖座前长顿首,只求共度一百年。

  • 数据生成
  • 创建原始文件目录
mkdir /export/data/momo_init
cd /export/data/momo_init
rz
  • 创建模拟数据目录
mkdir /export/data/momo_data
  • 运行程序生成数据
  • 语法
java -jar /export/data/momo_init/MoMo_DataGen.jar 原始数据路径 模拟数据路径 随机产生数据间隔ms时间
  • 测试:每500ms生成一条数据
java -jar /export/data/momo_init/MoMo_DataGen.jar \
/export/data/momo_init/MoMo_Data.xlsx \
/export/data/momo_data/ \
500
  • 结果:生成模拟数据文件MOMO_DATA.dat,并且每条数据中字段分隔符为\001

  • 小结
  • 了解数据源的格式及实现模拟数据的生成

05:技术架构及技术选型

  • 目标掌握实时案例的技术架构及技术选型
  • 路径
  • step1:需求分析
  • step2:技术选型
  • step3:技术架构
  • 实施
  • 需求分析
  • 离线存储计算
  • 提供离线T + 1的统计分析
  • 提供离线数据的即时查询
  • 实时存储计算
  • 提供实时统计分析
  • 技术选型
  • 离线
  • 数据采集:Flume
  • 离线存储:Hbase
  • 离线分析:Hive:复杂计算
  • 即时查询:Phoenix:高效查询
  • 实时
  • 数据采集:Flume
  • 实时存储:Kafka
  • 实时计算:Flink
  • 实时应用:MySQL + FineBI 或者 Redis + JavaWeb可视化
  • 技术架构
  • 为什么不直接将Flume的数据给Hbase,而统一的给了Kafka,再由Kafka到Hbase?
  • 避免高并发写导致机器负载过高、实现架构解耦、实现异步高效
  • 保证数据一致性
  • 小结
  • 掌握实时案例的技术架构及技术选型

06:Flume的回顾及安装

  • 目标回顾Flume基本使用及实现Flume的安装测试
  • 路径
  • step1:Flume回顾
  • step2:Flume的安装
  • step3:Flume的测试
  • 实施
  • Flume的回顾
  • 功能:实时对文件或者网络端口进行数据流监听采集
  • 场景:文件实时采集
  • 开发
  • step1:先开发一个配置文件:properties【K=V】
  • step2:运行这个文件即可
  • 组成
  • Agent:一个Agent就是一个Flume程序
  • Source:负责监听数据源,将数据源的动态数据变成每一条Event数据,将Event数据流放入Channel
  • Channel:负责临时存储Source发送过来的数据,供Sink来取数据
  • Sink:负责从Channel拉取数据写入目标地
  • Event:代表一条数据对象
  • head:Map集合[KV]
  • body:byte[]
  • Flume的安装
  • 上传安装包
cd /export/software/
rz
  • 解压安装
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /export/server/
cd /export/server
mv apache-flume-1.9.0-bin flume-1.9.0-bin
  • 修改配置
#集成HDFS,拷贝HDFS配置文件
cd /export/server/flume-1.9.0-bin
cp /export/server/hadoop/etc/hadoop/core-site.xml  ./conf/
#修改Flume环境变量
cd /export/server/flume-1.9.0-bin/conf/
mv flume-env.sh.template flume-env.sh
vim flume-env.sh 
#修改22行
export JAVA_HOME=/export/server/jdk1.8.0_65
#修改34行
export HADOOP_HOME=/export/server/hadoop-3.3.0
  • 删除Flume自带的guava包,替换成Hadoop的
cd /export/server/flume-1.9.0-bin 
rm -rf lib/guava-11.0.2.jar
cp /export/server/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar lib/
  • 创建目录
cd /export/server/flume-1.9.0-bin
#程序配置文件存储目录
mkdir usercase
#Taildir元数据存储目录
mkdir position
  • Flume的测试
  • 需求:采集聊天数据,写入HDFS
  • 分析
  • Source:taildir:动态监听多个文件实现实时数据采集
  • Channel:mem:将数据缓存在内存
  • Sink:hdfs
  • 开发
vim /export/server/flume-1.9.0-bin/usercase/momo_mem_hdfs.properties
# define a1
a1.sources = s1 
a1.channels = c1
a1.sinks = k1
#define s1
a1.sources.s1.type = TAILDIR
#指定一个元数据记录文件
a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_hdfs.json
#将所有需要监控的数据源变成一个组
a1.sources.s1.filegroups = f1
#指定了f1是谁:监控目录下所有文件
a1.sources.s1.filegroups.f1 = /export/data/momo_data/.*
#指定f1采集到的数据的header中包含一个KV对
a1.sources.s1.headers.f1.type = momo
a1.sources.s1.fileHeader = true
#define c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
#define k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/momo/test/daystr=%Y-%m-%d
a1.sinks.k1.hdfs.fileType = DataStream
#指定按照时间生成文件,一般关闭
a1.sinks.k1.hdfs.rollInterval = 0
#指定文件大小生成文件,一般120 ~ 125M对应的字节数
a1.sinks.k1.hdfs.rollSize = 102400
#指定event个数生成文件,一般关闭
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.filePrefix = momo
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#bound
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
  • 启动HDFS
start-dfs.sh
  • 运行Flume
cd /export/server/flume-1.9.0-bin
bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_hdfs.properties -Dflume.root.logger=INFO,console
  • 运行模拟数据
java -jar /export/data/momo_init/MoMo_DataGen.jar \
/export/data/momo_init/MoMo_Data.xlsx \
/export/data/momo_data/ \
100
  • 查看结果

  • 小结
  • 回顾Flume基本使用及实现Flume的安装测试

07:Flume采集程序开发

  • 目标实现案例Flume采集程序的开发
  • 路径
  • step1:需求分析
  • step2:程序开发
  • step3:测试实现
  • 实施
  • 需求分析
  • 需求:采集聊天数据,实时写入Kafka
  • Source:taildir
  • Channel:mem
  • Sink:Kafka sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
  • 程序开发
vim /export/server/flume-1.9.0-bin/usercase/momo_mem_kafka.properties
# define a1
a1.sources = s1 
a1.channels = c1
a1.sinks = k1
#define s1
a1.sources.s1.type = TAILDIR
#指定一个元数据记录文件
a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_kafka.json
#将所有需要监控的数据源变成一个组
a1.sources.s1.filegroups = f1
#指定了f1是谁:监控目录下所有文件
a1.sources.s1.filegroups.f1 = /export/data/momo_data/.*
#指定f1采集到的数据的header中包含一个KV对
a1.sources.s1.headers.f1.type = momo
a1.sources.s1.fileHeader = true
#define c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
#define k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = MOMO_MSG
a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092
a1.sinks.k1.kafka.flumeBatchSize = 10
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 100
#bound
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
  • 测试实现
  • 启动Kafka
start-zk-all.sh
start-kafka.sh 
  • 创建Topic
kafka-topics.sh --create --topic MOMO_MSG  --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092

注意:Kafka2.11版本用–zookeeper 替代

kafka-topics.sh --create --topic MOMO_MSG --partitions 3 --replication-factor 2 --zookeeper node01:9092

  • 列举
kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092
  • 启动消费者
kafka-console-consumer.sh --topic MOMO_MSG --bootstrap-server node1:9092,node2:9092,node3:9092
  • 启动Flume程序
cd /export/server/flume-1.9.0-bin
bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
  • 启动模拟数据
java -jar /export/data/momo_init/MoMo_DataGen.jar \
/export/data/momo_init/MoMo_Data.xlsx \
/export/data/momo_data/ \
50
  • 观察结果

  • 小结
  • 实现案例Flume采集程序的开发


目录
相关文章
|
3月前
|
消息中间件 存储 Cloud Native
云消息队列 Kafka 版 V3 系列荣获信通院“云原生技术创新标杆案例”
2024 年 12 月 24 日,由中国信息通信研究院(以下简称“中国信通院”)主办的“2025 中国信通院深度观察报告会:算力互联网分论坛”,在北京隆重召开。本次论坛以“算力互联网 新质生产力”为主题,全面展示中国信通院在算力互联网产业领域的研究、实践与业界共识,与产业先行者共同探索算力互联网产业未来发展的方向。会议公布了“2024 年度云原生与应用现代化标杆案例”评选结果,“云消息队列 Kafka 版 V3 系列”荣获“云原生技术创新标杆案例”。
|
2天前
|
SQL 存储 分布式数据库
分布式存储数据恢复—hbase和hive数据库数据恢复案例
分布式存储数据恢复环境: 16台某品牌R730xd服务器节点,每台服务器节点上有数台虚拟机。 虚拟机上部署Hbase和Hive数据库。 分布式存储故障: 数据库底层文件被误删除,数据库不能使用。要求恢复hbase和hive数据库。
37 12
|
6月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
490 0
|
6月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
108 3
|
6月前
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
91 2
|
6月前
|
消息中间件 分布式计算 druid
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
99 1
|
6月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
412 0
|
6月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
99 0
|
8月前
|
消息中间件 安全 Kafka
"深入实践Kafka多线程Consumer:案例分析、实现方式、优缺点及高效数据处理策略"
【8月更文挑战第10天】Apache Kafka是一款高性能的分布式流处理平台,以高吞吐量和可扩展性著称。为提升数据处理效率,常采用多线程消费Kafka数据。本文通过电商订单系统的案例,探讨了多线程Consumer的实现方法及其利弊,并提供示例代码。案例展示了如何通过并行处理加快订单数据的处理速度,确保数据正确性和顺序性的同时最大化资源利用。多线程Consumer有两种主要模式:每线程一个实例和单实例多worker线程。前者简单易行但资源消耗较大;后者虽能解耦消息获取与处理,却增加了系统复杂度。通过合理设计,多线程Consumer能够有效支持高并发数据处理需求。
257 4
|
10月前
|
Oracle 关系型数据库 数据处理
实时计算 Flink版产品使用问题之如何进行Oracle到HBase的同步
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

热门文章

最新文章