04:数据源
- 目标:了解数据源的格式及实现模拟数据的生成
- 路径
- step1:数据格式
- step2:数据生成
- 实施
- 数据格式
消息时间 | 发件人昵称 | 发件人账号 | 发件人性别 | 发件人IP | 发件人系统 | 发件人手机型号 | 发件人网络制式 | 发件人GPS | 收件人昵称 | 收件人IP | 收件人账号 | 收件人系统 | 收件人手机型号 | 收件人网络制式 | 收件人GPS | 收件人性别 | 消息类型 | 双方距离 | 消息 |
msg_time | sender_nickyname | sender_account | sender_sex | sender_ip | sender_os | sender_phone_type | sender_network | sender_gps | receiver_nickyname | receiver_ip | receiver_account | receiver_os | receiver_phone_type | receiver_network | receiver_gps | receiver_sex | msg_type | distance | message |
2020/05/08 15:11:33 | 古博易 | 14747877194 | 男 | 48.147.134.255 | Android 8.0 | 小米 Redmi K30 | 4G | 94.704577,36.247553 | 莱优 | 97.61.25.52 | 17832829395 | IOS 10.0 | Apple iPhone 10 | 4G | 84.034145,41.423804 | 女 | TEXT | 77.82KM | 天涯海角惆怅渡,牛郎织女隔天河。佛祖座前长顿首,只求共度一百年。 |
- 数据生成
- 创建原始文件目录
mkdir /export/data/momo_init
- 上传模拟数据程序
cd /export/data/momo_init rz
- 创建模拟数据目录
mkdir /export/data/momo_data
- 运行程序生成数据
- 语法
java -jar /export/data/momo_init/MoMo_DataGen.jar 原始数据路径 模拟数据路径 随机产生数据间隔ms时间
- 测试:每500ms生成一条数据
java -jar /export/data/momo_init/MoMo_DataGen.jar \ /export/data/momo_init/MoMo_Data.xlsx \ /export/data/momo_data/ \ 500
- 结果:生成模拟数据文件MOMO_DATA.dat,并且每条数据中字段分隔符为\001
- 小结
- 了解数据源的格式及实现模拟数据的生成
05:技术架构及技术选型
- 目标:掌握实时案例的技术架构及技术选型
- 路径
- step1:需求分析
- step2:技术选型
- step3:技术架构
- 实施
- 需求分析
- 离线存储计算
- 提供离线T + 1的统计分析
- 提供离线数据的即时查询
- 实时存储计算
- 提供实时统计分析
- 技术选型
- 离线
- 数据采集:Flume
- 离线存储:Hbase
- 离线分析:Hive:复杂计算
- 即时查询:Phoenix:高效查询
- 实时
- 数据采集:Flume
- 实时存储:Kafka
- 实时计算:Flink
- 实时应用:MySQL + FineBI 或者 Redis + JavaWeb可视化
- 技术架构
- 为什么不直接将Flume的数据给Hbase,而统一的给了Kafka,再由Kafka到Hbase?
- 避免高并发写导致机器负载过高、实现架构解耦、实现异步高效
- 保证数据一致性
- 小结
- 掌握实时案例的技术架构及技术选型
06:Flume的回顾及安装
- 目标:回顾Flume基本使用及实现Flume的安装测试
- 路径
- step1:Flume回顾
- step2:Flume的安装
- step3:Flume的测试
- 实施
- Flume的回顾
- 功能:实时对文件或者网络端口进行数据流监听采集
- 场景:文件实时采集
- 开发
- step1:先开发一个配置文件:properties【K=V】
- step2:运行这个文件即可
- 组成
- Agent:一个Agent就是一个Flume程序
- Source:负责监听数据源,将数据源的动态数据变成每一条Event数据,将Event数据流放入Channel
- Channel:负责临时存储Source发送过来的数据,供Sink来取数据
- Sink:负责从Channel拉取数据写入目标地
- Event:代表一条数据对象
- head:Map集合[KV]
- body:byte[]
- Flume的安装
- 上传安装包
cd /export/software/ rz
- 解压安装
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /export/server/ cd /export/server mv apache-flume-1.9.0-bin flume-1.9.0-bin
- 修改配置
#集成HDFS,拷贝HDFS配置文件 cd /export/server/flume-1.9.0-bin cp /export/server/hadoop/etc/hadoop/core-site.xml ./conf/ #修改Flume环境变量 cd /export/server/flume-1.9.0-bin/conf/ mv flume-env.sh.template flume-env.sh vim flume-env.sh
#修改22行 export JAVA_HOME=/export/server/jdk1.8.0_65 #修改34行 export HADOOP_HOME=/export/server/hadoop-3.3.0
- 删除Flume自带的guava包,替换成Hadoop的
cd /export/server/flume-1.9.0-bin rm -rf lib/guava-11.0.2.jar cp /export/server/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar lib/
- 创建目录
cd /export/server/flume-1.9.0-bin #程序配置文件存储目录 mkdir usercase #Taildir元数据存储目录 mkdir position
- Flume的测试
- 需求:采集聊天数据,写入HDFS
- 分析
- Source:taildir:动态监听多个文件实现实时数据采集
- Channel:mem:将数据缓存在内存
- Sink:hdfs
- 开发
vim /export/server/flume-1.9.0-bin/usercase/momo_mem_hdfs.properties
# define a1 a1.sources = s1 a1.channels = c1 a1.sinks = k1 #define s1 a1.sources.s1.type = TAILDIR #指定一个元数据记录文件 a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_hdfs.json #将所有需要监控的数据源变成一个组 a1.sources.s1.filegroups = f1 #指定了f1是谁:监控目录下所有文件 a1.sources.s1.filegroups.f1 = /export/data/momo_data/.* #指定f1采集到的数据的header中包含一个KV对 a1.sources.s1.headers.f1.type = momo a1.sources.s1.fileHeader = true #define c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 1000 #define k1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /flume/momo/test/daystr=%Y-%m-%d a1.sinks.k1.hdfs.fileType = DataStream #指定按照时间生成文件,一般关闭 a1.sinks.k1.hdfs.rollInterval = 0 #指定文件大小生成文件,一般120 ~ 125M对应的字节数 a1.sinks.k1.hdfs.rollSize = 102400 #指定event个数生成文件,一般关闭 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.filePrefix = momo a1.sinks.k1.hdfs.fileSuffix = .log a1.sinks.k1.hdfs.useLocalTimeStamp = true #bound a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
- 启动HDFS
start-dfs.sh
- 运行Flume
cd /export/server/flume-1.9.0-bin bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_hdfs.properties -Dflume.root.logger=INFO,console
- 运行模拟数据
java -jar /export/data/momo_init/MoMo_DataGen.jar \ /export/data/momo_init/MoMo_Data.xlsx \ /export/data/momo_data/ \ 100
- 查看结果
- 小结
- 回顾Flume基本使用及实现Flume的安装测试
07:Flume采集程序开发
- 目标:实现案例Flume采集程序的开发
- 路径
- step1:需求分析
- step2:程序开发
- step3:测试实现
- 实施
- 需求分析
- 需求:采集聊天数据,实时写入Kafka
- Source:taildir
- Channel:mem
- Sink:Kafka sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.topic = mytopic a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.linger.ms = 1 a1.sinks.k1.kafka.producer.compression.type = snappy
- 程序开发
vim /export/server/flume-1.9.0-bin/usercase/momo_mem_kafka.properties
# define a1 a1.sources = s1 a1.channels = c1 a1.sinks = k1 #define s1 a1.sources.s1.type = TAILDIR #指定一个元数据记录文件 a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_kafka.json #将所有需要监控的数据源变成一个组 a1.sources.s1.filegroups = f1 #指定了f1是谁:监控目录下所有文件 a1.sources.s1.filegroups.f1 = /export/data/momo_data/.* #指定f1采集到的数据的header中包含一个KV对 a1.sources.s1.headers.f1.type = momo a1.sources.s1.fileHeader = true #define c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 1000 #define k1 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = MOMO_MSG a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092 a1.sinks.k1.kafka.flumeBatchSize = 10 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 100 #bound a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
- 测试实现
- 启动Kafka
start-zk-all.sh start-kafka.sh
- 创建Topic
kafka-topics.sh --create --topic MOMO_MSG --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092
注意:Kafka2.11版本用–zookeeper 替代
kafka-topics.sh --create --topic MOMO_MSG --partitions 3 --replication-factor 2 --zookeeper node01:9092
- 列举
kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092
- 启动消费者
kafka-console-consumer.sh --topic MOMO_MSG --bootstrap-server node1:9092,node2:9092,node3:9092
- 启动Flume程序
cd /export/server/flume-1.9.0-bin bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
- 启动模拟数据
java -jar /export/data/momo_init/MoMo_DataGen.jar \ /export/data/momo_init/MoMo_Data.xlsx \ /export/data/momo_data/ \ 50
- 观察结果
- 小结
- 实现案例Flume采集程序的开发