案例:Flume消费Kafka数据保存Hive

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
大数据开发治理平台 DataWorks,不限时长
简介: 案例:Flume消费Kafka数据保存Hive

Flume消费Kafka数据保存到Hive

场景

通过Flume消费Kafka中数据,保存数据到ODS层,数据存储时标记消费时的元信息

创建Hive表

orc存储,snappy压缩,开启事务

ORC事务表

  • 只能是内部表
  • 必须创建桶
create TABLE hr.ods_internetbar_data
(
  k_topic string ,
  k_data string,
  k_partition int,
  k_offset int,
  k_key string,
  current_time bigint
)
  partitioned by (pt_dt int)
CLUSTERED BY(k_partition) into 5 buckets
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n' 
STORED AS orc 
TBLPROPERTIES('orc.compress'='SNAPPY','transactional'='true');

编写Interceptor

思路:我们将Kafka中的元信息(topic名称,partition等)通过Interceptor转换成以'\t'分隔的数据,然后将数据保存到Hive

拦截器com.jast.flume.ETLInterceptor代码如下:

package com.jast.flume;

import cn.hutool.json.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
 * @author Jast
 * @description 自定义拦截器
 * @date 2022-03-17 09:20
 */
public class ETLInterceptor implements Interceptor {
   

    private final String separator = "\t";
    private final String defaultValue = "";
    @Override
    public void initialize() {
   

    }

    @Override
    public Event intercept(Event event) {
   
        //拦截kafka时,这里获取到的Head,是Kafka元信息,包括key,topic,partition,offset等信息,e.g. {topic=ZW_WB_1010000009, partition=0, offset=5425705, key={"dataType":"ZW_WB_","fileName":"yc_1644913124_11086","fullLinkId":"3e9fc439e42c4059b46e5d5664d08bf8","lineNum":17,"subFullLinkId":"77dc3a9680ee4d1caeb6719d87e8d3e5","totalCount":50}, timestamp=1648642188763}
        Map<String, String> headers = event.getHeaders();
        String topic = headers.getOrDefault("topic",defaultValue);
        String partition = headers.getOrDefault("partition",defaultValue);
        String offset = headers.getOrDefault("offset",defaultValue);
        String key = headers.getOrDefault("key",defaultValue);
        long currentTimeMillis = System.currentTimeMillis();
        //System.out.println("拦截器head:"+headers);
        byte[] body = event.getBody();
        String text = new String(body, StandardCharsets.UTF_8);
        if(JSONUtil.isJson(text)){
   
            text = topic+separator+text+separator+partition+separator+offset+separator+key+separator+currentTimeMillis;
            System.out.println("text:"+text);
            event.setBody(text.getBytes());
            return event;
        }
        //System.out.println("非json格式过滤掉");
        return null;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
   
        Iterator<Event> iterator = list.iterator();
        while (iterator.hasNext()){
   
            Event next = iterator.next();
            if(intercept(next)==null){
   
                iterator.remove();
            }
        }
        return list;
    }

    @Override
    public void close() {
   

    }


    public static class Builder implements Interceptor.Builder{
   

        @Override
        public Interceptor build() {
   
            return new ETLInterceptor();
        }
        @Override
        public void configure(Context context) {
   

        }
    }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flume-interceptor</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.7.19</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

打包后生成flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar文件

将生成的包放到flume lib 目录中

  • 自己部署的Flume:

    需要先将打好的包放入到flume/lib文件夹下面

  • CDH版本Flume:

    需要先将打好的包放入到flume/lib文件夹下面

​ 具体的目录/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/flume-ng/lib/

cp flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/flume-ng/lib

编写Flume配置文件

需要注意的是a1.sinks.k1.serializer.fieldnamesa1.sources.r1.interceptors.i1.type

a1.sinks.k1.serializer.fieldnames:拦截器\t处理后在这里与Hive表的字段对应上

a1.sources.r1.interceptors.i1.type:拦截器配置

配置文件k2h.config内容如下

## 组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
# 每批次数量
a1.sources.r1.batchSize = 1000
# 将批写入通道之前的最长时间(毫秒)。每当达到第一个大小和时间时,将写入批。
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = 192.168.60.16:9092
# kafka消费主题,多个主题逗号分隔
a1.sources.r1.kafka.topics=ZW_WB_1010000009
# 默认:PLAINTEXT,如果使用某种安全级别写入 Kafka,则设置为 SASL_PLAINTEXT、SASL_SSL 或 SSL。
a1.sources.r1.kafka.consumer.security.protocol=PLAINTEXT
# 配置拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.jast.flume.ETLInterceptor$Builder

## channel1
# 基于文件存储的Channel
a1.channels.c1.type = file
# checkpoint 保存目录
a1.channels.c1.checkpointDir = /var/lib/hadoop-hdfs/flume/checkpoint/behavior1
# 用于存储日志文件的目录的逗号分隔列表。 在不同磁盘上使用多个目录可以提高文件通道性能
a1.channels.c1.dataDirs = /var/lib/hadoop-hdfs/flume/data/behavior1/
# 最大事务大小
a1.channels.c1.transactionCapacity=10000
# 通道最大容量
a1.channels.c1.capacity=1000000


a1.sinks.k1.type = hive
a1.sinks.k1.channel = c1
a1.sinks.k1.hive.metastore = thrift://192.168.60.14:9083
# hive 库名
a1.sinks.k1.hive.database = hr
# hive表名称
a1.sinks.k1.hive.table = ods_internetbar_data
# 分区使用字段,转义字符介绍在文章下面
a1.sinks.k1.hive.partition = %y%m%d
# 在替换转义序列时使用本地时间(而不是事件标头中的时间戳)。
a1.sinks.k1.useLocalTimeStamp = false
# ============================================================
# e.g. 每6小时产生一个新文件,比如把24小时分成4份,假设现在的时间是15:40,如果这时候有新的日志到来,那么hdfs 会创建一个新的hdfs文件,文件名称是2015102012 ,就是15:40 是分布在12-18这个区间的
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
# collector1.sinks.sink_hdfs.hdfs.rollSize = 2048000000 
# collector1.sinks.sink_hdfs.hdfs.rollCount = 0 
# collector1.sinks.sink_hdfs.hdfs.rollInterval = 21600
# rollsize 的配置表示到2G大小的时候回滚到下一个文件,也就是到了这个时间 hdfs就会rename正在写的文件到已经写完
# rollInterval 的配置表示每个6小时回滚到下一个文件
# ============================================================


# 数据解析方式,目前支持DELIMITED(分隔符),JSON(每行为单层Json)和REGEX(正则表达式)
a1.sinks.k1.serializer = DELIMITED
# 数据字段分割符,如果要使用特殊字符需要添加双引号,例如"\t"
a1.sinks.k1.serializer.delimiter = "\t"
# 数据解析的正则表达式,每个字段的数据被解析成一个group
# a1.sinks.k1.serializer.regex = 
# (类型:字符)自定义底层 serde 使用的分隔符。 如果 serializer.fieldnames 中的字段与表列的顺序相同,serializer.delimiter 与 serializer.serdeSeparator 相同,并且 serializer.fieldnames 中的字段数小于或等于表数,则效率会有所提高 列,因为传入事件正文中的字段不需要重新排序以匹配表列的顺序。 对特殊字符使用单引号,例如“\t”。 确保输入字段不包含此字符。 注意:如果 serializer.delimiter 是单个字符,最好将其设置为相同的字符 
a1.sinks.k1.serializer.serdeSeparator = '\t'
# 输入数据字段到datahub字段的映射,以输入的顺序标示字段,如果要跳过某个字段, 不指定列名即可,例如 c1,c2,,c3,表示将输入数据的第一、二、四字段和hive的c1,c2,c3字段进行匹配。
a1.sinks.k1.serializer.fieldnames = k_topic,k_data,k_partition,k_offset,k_key,current_time
# 单个配置单元事务中写入配置单元的最大事件数,默认15000
a1.sinks.k1.batchSize=10000


## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

上面配置中我们使用到了转义序列a1.sinks.k1.hive.partition = %y%m%d

支持的转义序列:

Alias Description
%{host} Substitute value of event header named “host”. Arbitrary header names are supported.
%t Unix time in milliseconds
%a locale’s short weekday name (Mon, Tue, ...)
%A locale’s full weekday name (Monday, Tuesday, ...)
%b locale’s short month name (Jan, Feb, ...)
%B locale’s long month name (January, February, ...)
%c locale’s date and time (Thu Mar 3 23:05:25 2005)
%d day of month (01)
%D date; same as %m/%d/%y
%H hour (00..23)
%I hour (01..12)
%j day of year (001..366)
%k hour ( 0..23)
%m month (01..12)
%M minute (00..59)
%p locale’s equivalent of am or pm
%s seconds since 1970-01-01 00:00:00 UTC
%S second (00..59)
%y last two digits of year (00..99)
%Y year (2010)
%z +hhmm numeric timezone (for example, -0400)

启动Flume收集数据

 flume-ng agent --conf-file k2h.config --name a1

查看数据

select k_topic,k_partition,k_offset,k_key,current_time,length(k_data) from ods_internetbar_data limit 10;
----

ZW_WB_1010000009        0       5441712 {
  "dataType":"ZW_WB_","fileName":"yc_1644913124_11086","fullLinkId":"f3bc635822ca4af099755b9b339e8e4c","lineNum":24,"subFullLinkId":"c2e3704534e745c0a779b16f059e6411","totalCount":50}    1648644001736   52105
ZW_WB_1010000009        0       5441713 {
  "dataType":"ZW_WB_","fileName":"yc_1644913124_11086","fullLinkId":"f3bc635822ca4af099755b9b339e8e4c","lineNum":25,"subFullLinkId":"c0cec18006ce40848ea3a0494a991999","totalCount":50}    1648644001736   35837

这里使用length(k_data)是为了方便展示,该字段值太大了

目录
相关文章
|
16天前
|
消息中间件 存储 算法
深入了解Kafka的数据持久化机制
深入了解Kafka的数据持久化机制
32 0
|
1月前
|
SQL 关系型数据库 MySQL
Sqoop【付诸实践 01】Sqoop1最新版 MySQL与HDFS\Hive\HBase 核心导入导出案例分享+多个WRAN及Exception问题处理(一篇即可学会在日常工作中使用Sqoop)
【2月更文挑战第9天】Sqoop【付诸实践 01】Sqoop1最新版 MySQL与HDFS\Hive\HBase 核心导入导出案例分享+多个WRAN及Exception问题处理(一篇即可学会在日常工作中使用Sqoop)
95 7
|
2月前
|
消息中间件 Kafka Apache
Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
【2月更文挑战第6天】Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
72 2
|
1月前
|
消息中间件 存储 缓存
Kafka【基础知识 02】集群+副本机制+数据请求+物理存储+数据存储设计(图片来源于网络)
【2月更文挑战第20天】Kafka【基础知识 02】集群+副本机制+数据请求+物理存储+数据存储设计(图片来源于网络)
43 1
|
1月前
|
XML 数据格式
Flume【付诸实践 01】flume1.9.0版 配置格式说明+常用案例分享(ExecSource+SpoolingDirectorySource+HDFSSink+AvroSourceSink)
【2月更文挑战第19天】Flume【付诸实践 01】flume1.9.0版 配置格式说明+常用案例分享(ExecSource+SpoolingDirectorySource+HDFSSink+AvroSourceSink)
29 1
|
1月前
|
SQL HIVE 索引
Hive窗口函数案例总结
Hive窗口函数案例总结
24 2
|
2月前
|
SQL 消息中间件 Kafka
Flink部署问题之hive表没有数据如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
分布式计算 资源调度 Hadoop
Flink报错问题之Sql往kafka表写聚合数据报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
机器学习/深度学习 消息中间件 人工智能
机器学习PAI报错问题之读取kafka数据报错如何解决
人工智能平台PAI是是面向开发者和企业的机器学习/深度学习工程平台,提供包含数据标注、模型构建、模型训练、模型部署、推理优化在内的AI开发全链路服务;本合集将收录PAI常见的报错信息和解决策略,帮助用户迅速定位问题并采取相应措施,确保机器学习项目的顺利推进。
|
2月前
|
SQL 消息中间件 关系型数据库
Flink CDC数据同步问题之向kafka同步数据报错如何解决
Flink CDC数据同步是指利用Flink CDC实现不同数据源之间的实时数据同步任务;本合集旨在提供Flink CDC数据同步的操作指南、性能优化建议和常见问题处理,助力用户高效实施数据同步。