2-网站日志分析案例-日志采集:Flume-Kafka-Flume-HDFS

简介: 文章目录2-网站日志分析案例-日志采集:Flume-Kafka-Flume-HDFS环境安装虚拟机安装安装hadoop安装zookeeper安装过程基本命令

2-网站日志分析案例-日志采集:Flume-Kafka-Flume-HDFS

hadoop2.7.3+ kafka_2.11-2.1.0

环境安装

虚拟机安装

安装hadoop

参考:https://blog.csdn.net/m0_38139250/article/details/121155903


安装zookeeper

参考:https://blog.csdn.net/m0_38139250/article/details/121284886


安装过程

1.下载:

wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/apache-zookeeper-3.5.7-bin.tar.gz

2.解压

tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/apps

3.创建软连接

ln -s apache-zookeeper-3.5.7-bin zookeeper

4.修改文件夹权限

chown -R root:root /opt/apps/apache-zookeeper-3.5.7-bin
chmod -R 700 /opt/apps/apache-zookeeper-3.5.7-bin

5.配置环境变量

vi /etc/profile 
#添加zookeeper环境变量
export ZOOKEEPER_HOME=/opt/apps/zookeeper
export PATH=$ZOOKEEPER_HOME/bin:$ZOOKEEPER_HOME/sbin:$PATH
# 生效配置文件
source /etc/profile

6.创建zk的data0目录,并修改zk的zoo.cfg配置文件

cd /opt/apps/zookeeper/
mkdir data
touch data/myid
echo 1 > data/myid
cd conf/
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
# 修改配置文件如下:
# dataDir=/tmp/zookeeper 注释掉这句默认配置,然后添加下面的配置
dataDir=/home/hadoop/opt/app/zookeeper/data

基本命令

7.启动命令:

zkServer.sh start /opt/apps/zookeeper/conf/zoo.cfg #启动zk启动状态
zkServer.sh status /opt/apps/zookeeper/conf/zoo.cfg #查看zk启动状态
zkCli.sh -server localhost:2181 # 通过客户端访问zkServer

安装flume

参考:https://blog.csdn.net/m0_38139250/article/details/121392150

安装过程

1.下载flume

wget http://archive.apache.org/dist/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz

2.解压

tar -zxf apache-flume-1.7.0-bin.tar.gz -C /opt/apps

3.添加配置文件

cd /opt/apps/apache-flume-1.7.0-bin/conf
vi netcat2logger.conf

内容如下

# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

上面的配置文件定义了一个agent的name为a1,a1的source监听6666端口,并且读取6666端口传过来的数据, a1的channel 采用内存作为缓存,a1的sink 类型为logs,具体含义可以参考官网,或是留言。

基本命令

在flume的安装目录下执行如下命令,即可使用flume采集数据:

$ bin/flume-ng agent -n a1 -c conf -f conf/netcat2logger.conf -Dflume.root.logger=INFO,console

flume-ng agent :表示flume的启动一个agent,ng是表示这是new的版本命令

-n a1:-n 表示name ,a1表示agent的名字为a1 对应配置文件中的a1

-c conf :表示flume的配置文件目录所在位置

-f conf/netcat2logger.conf: 表示自定义的数据采集配置文件位置。

-Dflume.root.logger=INFO,console:表示我们制定flume的日志格式,并且输出到控制台。

我们再开启一个新的终端,通过telnet 或 nc命令发送socket数据。

telnet 127.0.0.1 6666
nc 127.0.0.1 6666

,然后输入hello world,会看到反馈的信息ok。

安装kafka

安装过程

1.下载kafka安装包

wget https://archive.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz

2.解压kafka

tar -zxvf kafka_2.11-2.0.0.tgz
mv kafka_2.11-2.0.0 /opt/apps
cd /opt/apps

3.配置软连接

cd /opt/apps
ln -s kafka_2.11-2.0.0 kafka

4.配置环境变量

vi /etc/profile 
#添加zookeeper环境变量
export KAFKA_HOME=/opt/apps/kafka
export PATH=$PATH:$KAFKA_HOME/bin
# 生效配置文件
source /etc/profile

5.修改配置文件

cd kafka/config
vi server.properties

修改内容如下:

# borker的编号,如果集群中有多个,则每个borker需设置不同的编号
broker.id=0
#broker对外提供服务入口的端口(默认9092)
listeners=PLAINTEXT://localhost:9092
#存放消息日志文件地址
log.dirs=/opt/apps/kafka/kafkaData/kafka-logs
# kafka所需zookeeper集群地址
zookeeper.connect=localhost:2181
# 添加
delete.topic.enable=true

常用命令

6.在kafka安装目录下执行启动命令

6.1启动关闭

# 启动kafka
bin/kafka-server-start.sh config/server.properties
# 后台启动
JMX_PORT=9991 kafka-server-start.sh -daemon /opt/apps/kafka/config/server.properties
# 关闭kafka
kafka-server-stop.sh

6.2topic相关

# 查看Kafka Topic列表
bin/kafka-topics.sh --zookeeper localhost:2181 --list
# 创建Kafka Topic列表
bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic test
# 描述topic
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
# 查看Kafka Topic列表
bin/kafka-topics.sh --zookeeper localhost:2181 --list
# 增加分区数
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test --partitions 5 
# 删除Kafka Topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
# 查看 topic 指定分区 offset 的最大值或最小值 time 为 -1 时表示最大值,为 -2 时表示最小值:
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic test --time -1 --broker-list 127.0.0.1:9092 --partitions 0 

6.3生成消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
• 1

6.4消费消息

# 从头开始消费
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
# 从尾部开始取数据,必需要指定分区
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest --partition 0
# 指定分区
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest --partition 0
# 取指定个数
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest --partition 0 --max-messages 1 

6.5 消费者 Group

#指定 Group
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test -group test_group --from-beginning
# 消费者 Group 列表
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# 查看 Group 详情
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --describe
# 删除 Group 中 Topic
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --topic test --delete
# 删除 Group
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --delete

6.6 平衡 leader

bin/kafka-preferred-replica-election.sh --bootstrap-server localhost:9092

6.7自带压测工具

bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100 --produce

案例过程

总体架构、



flume配置

把日志放在指定位置

在/tmp/logs下生成文件

/tmp/logs/app-2022-01-02.log

追加数据到该文件中:

for ((i=0; i<=1000; i++))
do
echo "toms" >> /tmp/logs/app-2022-01-02.log
sleep 2
done

第1个flume-把数据从linux采集到kafka中

文件名 file-flume-kafka.conf

cd /opt/apps/apache-flume-1.7.0-bin/conf
vi file-flume-kafka.conf

编辑内容如下:

#describe component
a1.sources=r1
a1.channels=c1
a1.sinks=k1
#config component source,we choose the taildir source,because it can breakpoint continuation
a1.sources.r1.type = exec
#监控文件夹下的test.log文件
a1.sources.r1.command = tail -F /tmp/logs/app-2022-01-02.log
#component bind
a1.sources.r1.channels = c1
#对于sink的配置描述 使用kafka做数据的消费
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = flume_kafka
a1.sinks.k1.brokerList = localhost:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

启动:

bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf
• 1

kafka消费者观看数据

到kafka目录下

kafka-topics.sh --zookeeper localhost:2181 --list
kafka-console-cosumer --bootstrap-server localhost:9092 --topic flume_kafka --from-beginning

第2个flume-把数据从kafka采集到hdfs

采集event日志:文件名 kafka-flume-hdfs.conf

a1.sources=r1
a1.channels=c1
a1.sinks=k1
#config source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers =localhost:9092
a1.sources.r1.kafka.topics=flume_kafka 
#config channel
a1.channels.c1.type = memory
#config sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logeventa1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
#bind component
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

启动命令

bin/flume-ng agent --name a1 --conf-file conf/kafka-flume-hdfs.conf


相关文章
|
6月前
|
数据采集 存储 大数据
大数据之路:阿里巴巴大数据实践——日志采集与数据同步
本资料全面介绍大数据处理技术架构,涵盖数据采集、同步、计算与服务全流程。内容包括Web/App端日志采集方案、数据同步工具DataX与TimeTunnel、离线与实时数仓架构、OneData方法论及元数据管理等核心内容,适用于构建企业级数据平台体系。
|
存储 运维 开发工具
警惕日志采集失败的 6 大经典雷区:从本地管理反模式到 LoongCollector 标准实践
本文探讨了日志管理中的常见反模式及其潜在问题,强调科学的日志管理策略对系统可观测性的重要性。文中分析了6种反模式:copy truncate轮转导致的日志丢失或重复、NAS/OSS存储引发的采集不一致、多进程写入造成的日志混乱、创建文件空洞释放空间的风险、频繁覆盖写带来的数据完整性问题,以及使用vim编辑日志文件导致的重复采集。针对这些问题,文章提供了最佳实践建议,如使用create模式轮转日志、本地磁盘存储、单线程追加写入等方法,以降低日志采集风险,提升系统可靠性。最后总结指出,遵循这些实践可显著提高故障排查效率和系统性能。
1299 21
|
7月前
|
存储 运维 开发工具
警惕日志采集失败的 6 大经典雷区:从本地管理反模式到 LoongCollector 标准实践
本文总结了日志管理中的六大反模式及优化建议,涵盖日志轮转、存储选择、并发写入等常见问题,帮助提升日志采集的完整性与系统可观测性,适用于运维及开发人员优化日志管理策略。
280 7
|
3月前
|
数据采集 缓存 大数据
【赵渝强老师】大数据日志采集引擎Flume
Apache Flume 是一个分布式、可靠的数据采集系统,支持从多种数据源收集日志信息,并传输至指定目的地。其核心架构由Source、Channel、Sink三组件构成,通过Event封装数据,保障高效与可靠传输。
287 1
|
4月前
|
存储 Kubernetes 监控
Kubernetes日志管理:使用Loki进行日志采集
通过以上步骤,在Kubernetes环境下利用LoKi进行有效率且易于管理地logs采集变成可能。此外,在实施过程中需要注意版本兼容性问题,并跟进社区最新动态以获取功能更新或安全补丁信息。
356 16
|
8月前
|
存储 缓存 Apache
StarRocks+Paimon 落地阿里日志采集:万亿级实时数据秒级查询
本文介绍了阿里集团A+流量分析平台的日志查询优化方案,针对万亿级日志数据的写入与查询挑战,提出基于Flink、Paimon和StarRocks的技术架构。通过Paimon存储日志数据,结合StarRocks高效计算能力,实现秒级查询性能。具体包括分桶表设计、数据缓存优化及文件大小控制等措施,解决高并发、大数据量下的查询效率问题。最终,日志查询耗时从分钟级降至秒级,显著提升业务响应速度,并为未来更低存储成本、更高性能及更多业务场景覆盖奠定基础。
|
5月前
|
存储 缓存 Apache
StarRocks+Paimon 落地阿里日志采集:万亿级实时数据秒级查询
A+流量分析平台是阿里集团统一的全域流量数据分析平台,致力于通过埋点、采集、计算构建流量数据闭环,助力业务提升流量转化。面对万亿级日志数据带来的写入与查询挑战,平台采用Flink+Paimon+StarRocks技术方案,实现高吞吐写入与秒级查询,优化存储成本与扩展性,提升日志分析效率。
734 1
|
6月前
|
JSON 安全 网络安全
LoongCollector 安全日志接入实践:企业级防火墙场景的日志标准化采集
LoonCollector 是一款轻量级日志采集工具,支持多源安全日志的标准化接入,兼容 Syslog、JSON、CSV 等格式,适用于长亭 WAF、FortiGate、Palo Alto 等主流安全设备。通过灵活配置解析规则,LoonCollector 可将原始日志转换为结构化数据,写入阿里云 SLS 日志库,便于后续查询分析、威胁检测与合规审计,有效降低数据孤岛问题,提升企业安全运营效率。
|
8月前
|
消息中间件 存储 JSON
日志采集 Agent 性能大比拼——LoongCollector 性能深度测评
为了展现 LoongCollector 的卓越性能,本文通过纵向(LoongCollector 与 iLogtail 产品升级对比)和横向(LoongCollector 与其他开源日志采集 Agent 对比)两方面对比,深度测评不同采集 Agent 在常见的日志采集场景下的性能。
846 35
|
8月前
|
SQL 监控 关系型数据库
MySQL日志分析:binlog、redolog、undolog三大日志的深度探讨。
数据库管理其实和写小说一样,需要规划,需要修订,也需要有能力回滚。理解这些日志的作用与优化,就像把握写作工具的使用与运用,为我们的数据库保驾护航。
359 23