离线数仓(二)【用户行为日志采集平台搭建】(2)

本文涉及的产品
云原生数据仓库AnalyticDB MySQL版,基础版 8ACU 100GB 1个月
日志服务 SLS,月写入数据量 50GB 1个月
简介: 离线数仓(二)【用户行为日志采集平台搭建】

离线数仓(二)【用户行为日志采集平台搭建】(1)https://developer.aliyun.com/article/1532360

还有 sink:

  • hdfs:写出到 hdfs
  • kafka:写出到 kafka,相当于 kafka 消费者
  • avro:写出到 avro 系统(avro 是一个 rpc 数据序列化系统)

为了保证数据的可靠性(以及 Kafak 具有削峰解耦的功能),我们当然是选择 Kafak Channel 作为 channel ,那么对于 Kafak Channel 的三种结构,我们应该如何选择呢?

首先第一种结构 - 结合 source 和 sink,这种结构不可取。因为 Kafka Channel 有一个参数 parseAsFlumeEvent(默认为 true) ,它的意思是把日志数据以 Flume Event  的格式存储到 Kafka,这就包括了 Event Header 和 Event Body。对于离线数仓而言,因为它本来 sink 用的就是 flume 的 sink 从 kafka channel 中读取,它可以把 event 中的 body 解析出来。但是对于实时数仓,flink 是从 kafka 中直接读取数据的,所以它读到的就是包含 header 和 body 的完整 event 数据,而 header 对于实时数仓来说根本没有,所以没有必要存储。

既然 header 不需要存储我们能不能把 parseAsFlumeEvent 设为 false 然后继续使用这种结构呢,我们知道拦截器是在 source 和 channel 之间起作用的,如果不使用 flume event 的形式存储数据就不能使用拦截器,而 channel 和 sink 之间又不能设置拦截器。所以最好的办法就是使用第二种结构 - Kafak Channel 结合 Source 使用,同时设置 parseAsFlumeEvent 参数为 false ,这样存储进 Kafka 的数据就只有 body,虽然没有了 sink,但是我们可以把存储在 Kafak 中的数据当做一个数据源,下游再用一个 Source 读取,这样我们就可以再设置拦截器了。

2、作业脚本配置

编写 flume 配置文件

file_to_kafka.conf

#定义组件
a1.sources = r1
a1.channels = c1
 
#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/taildir_position.json
 
#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false
 
#组装 
a1.sources.r1.channels = c1

测试

# 启动 zookeeper 和 kafka
zk start
kf.sh start
# 开启flume作业
bin/flume-ng agent -n a1 -c conf/ -f job/warehouse/file_to_kafka.conf -Dflume.root.logger=INFO,console
# 开启kafka消费者
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_log
# 开启日志生成器
mklog.sh

运行结果:

至此,flume 的 source -> kafka channel 配置完毕,接下来我们需要在 flume 这里进行一个简单的 ETL 数据清洗,把一些脏数据去除掉,而且这里不能做复杂的拦截器,比如添加多个拦截器,因为flume 毕竟只是一个传递数据的管道,如果这里的 ETL工作太复杂会导致数据堆积在拦截器这里,影响效率。

添加拦截器

引入依赖和打包插件:

    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
            <scope>provided</scope>
        </dependency>
 
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>
    </dependencies>
 
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

Json 工具类:

package com.lyh.gmall.utils;
 
import com.alibaba.fastjson.JSONObject;
 
public class JSONUtil {
 
    // 通过异常捕获来校验json是否合法
    public static boolean isJsonValidate(String body) {
        try {
            JSONObject.parseObject(body);
            return true;
        }catch (Exception e){
            return false;
        }
    }
}

自定义拦截器:

package com.lyh.gmall.interceptor;
 
import com.lyh.gmall.utils.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
 
import java.nio.charset.StandardCharsets;
import java.util.List;
 
public class ETLInterceptor implements Interceptor {
    @Override
    public void initialize() {
 
    }
 
    // 单个事件的拦截器
    @Override
    public Event intercept(Event event) {
        // 1. 获取body当中的数据
        String body = new String(event.getBody(), StandardCharsets.UTF_8);
        // 2. 判断数据是否合法,如果合法直接返回,否则返回 null
        return JSONUtil.isJsonValidate(body)?event:null;
    }
 
    // 多个事件的拦截器
    @Override
    public List<Event> intercept(List<Event> list) {
        for (int i = 0; i < list.size(); i++) {
            if (intercept(list.get(i)) == null){
                list.remove(i);
                i--;
            }
        }
        return list;
    }
 
    @Override
    public void close() {
 
    }
 
    public static class Builder implements Interceptor.Builder {
 
        @Override
        public Interceptor build() {
            return new ETLInterceptor();
        }
 
        @Override
        public void configure(Context context) {
 
        }
    }
}

打包后会生成两个jar包,选择带依赖的放到 flume 的 lib 目录下,在 flume 作业配置文件中添加以下配置来添加拦截器:

a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.lyh.gmall.interceptor.ETLInterceptor$Builder

测试:

因为 flume 监听的是 applog/log/app-log-xxx.log 文件,所以我们直接 echo 写入来模拟错误日志(不完整的 json 信息):

可以看到,不合法的 json 数据被过滤掉了,至此,我们的数据采集通道可用了。根据我们的项目架构图可以看到,我们需要在 hadoop102 和 hadoop103 上采集,所以 hadoop103 也许要配置:

flume 采集脚本

#!/bin/bash
 
case $1 in
"start"){
        for i in hadoop102 hadoop103
        do
                echo " -------- $i flume开始采集 -------"
                ssh $i "nohup /opt/module/flume-1.9.0/bin/flume-ng agent -n a1 -c /opt/module/flume-1.9.0/conf/ -f /opt/module/flume-1.9.0/job/warehouse/file_to_kafka.conf >/dev/null 2>&1 &"
        done
};; 
"stop"){
        for i in hadoop102 hadoop103
        do
                echo " -------- $i flume停止采集-------"
                ssh $i "ps -ef | grep file_to_kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "
        done
};;
esac

总结

       至此,用户行为日志的采集环境搭建好了,只要输入命令 f1.sh start 就可以让在 hadoop102 和 hadoop103 上的 flume开启采集日志了。接下来就是业务数据的采集了。

       这一部分还算简单,对于 flume 的操作无非就是拦截器的编写,逻辑也没多复杂,flume 的 source 使用了 taildir 监听由脚本不断生成的日志文件,channel 使用了 Kafka source ,sink 估计不会采用 flume 的 sink。

相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
1月前
|
存储 运维 监控
超越传统模型:从零开始构建高效的日志分析平台——基于Elasticsearch的实战指南
【10月更文挑战第8天】随着互联网应用和微服务架构的普及,系统产生的日志数据量日益增长。有效地收集、存储、检索和分析这些日志对于监控系统健康状态、快速定位问题以及优化性能至关重要。Elasticsearch 作为一种分布式的搜索和分析引擎,以其强大的全文检索能力和实时数据分析能力成为日志处理的理想选择。
105 6
|
2月前
|
存储 消息中间件 网络协议
日志平台-ELK实操系列(一)
日志平台-ELK实操系列(一)
|
21天前
|
SQL 存储 数据挖掘
快速入门:利用AnalyticDB构建实时数据分析平台
【10月更文挑战第22天】在大数据时代,实时数据分析成为了企业和开发者们关注的焦点。传统的数据仓库和分析工具往往无法满足实时性要求,而AnalyticDB(ADB)作为阿里巴巴推出的一款实时数据仓库服务,凭借其强大的实时处理能力和易用性,成为了众多企业的首选。作为一名数据分析师,我将在本文中分享如何快速入门AnalyticDB,帮助初学者在短时间内掌握使用AnalyticDB进行简单数据分析的能力。
32 2
|
1月前
|
人工智能 自然语言处理 关系型数据库
阿里云云原生数据仓库 AnalyticDB PostgreSQL 版已完成和开源LLMOps平台Dify官方集成
近日,阿里云云原生数据仓库 AnalyticDB PostgreSQL 版已完成和开源LLMOps平台Dify官方集成。
|
1月前
|
DataWorks 数据挖掘 关系型数据库
基于hologres搭建轻量OLAP分析平台解决方案评测
一文带你详细了解基于hologres搭建轻量OLAP分析平台解决方案的优与劣
197 8
|
2月前
|
数据可视化 数据挖掘 OLAP
基于 Hologres 搭建轻量 OLAP 分析平台评测报告
【9月更文第6天】开作为互联网手游公司的产品经理和项目经理,数据分析对于我们的业务至关重要。我们一直在寻找高效、可靠的数据分析解决方案,以更好地了解玩家行为、优化游戏体验和提升运营效率。近期,我们体验并部署了《基于 Hologres 搭建轻量 OLAP 分析平台》解决方案,以下是我们对该方案的评测报告。
85 12
基于 Hologres 搭建轻量 OLAP 分析平台评测报告
|
2月前
|
SQL 人工智能 DataWorks
【云栖实录】DataWorks:新一代智能湖仓一体数据开发与治理平台
在9月21日的云栖大会上,DataWorks发布了新一代智能湖仓一体数据开发与治理平台。DataWorks历经Kubernetes改造与云原生调度系统的优化,实现了资源组全面Serverless化,降低了使用成本,最高可节省40%。新推出的DataWorks Data Studio,支持多种计算引擎,提供更开放的云原生WebIDE,提升开发效率。DataWorks Copilot智能助手也得到升级,支持多种SQL方言和Python代码生成,平均提升数据开发效率35%。此外,DataWorks还推出了全方位的数据资产治理体系,涵盖业务和技术视角,助力企业实现数据智能化管理和转型。
334 0
【云栖实录】DataWorks:新一代智能湖仓一体数据开发与治理平台
|
3月前
|
搜索推荐 OLAP 流计算
OneSQL OLAP实践问题之基于 Flink 打造流批一体的数据计算平台如何解决
OneSQL OLAP实践问题之基于 Flink 打造流批一体的数据计算平台如何解决
55 1
|
3月前
|
消息中间件 监控 关系型数据库
Serverless 应用的监控与调试问题之实时离线数仓一体化常用的解决方案有什么问题
Serverless 应用的监控与调试问题之实时离线数仓一体化常用的解决方案有什么问题
|
3月前
|
监控 Java Serverless
美团 Flink 大作业部署问题之想在Serverless平台上实时查看Spring Boot应用的日志要怎么操作
美团 Flink 大作业部署问题之想在Serverless平台上实时查看Spring Boot应用的日志要怎么操作