案例:Flume消费Kafka数据保存Hive

本文涉及的产品
实时数仓Hologres,5000CU*H 100GB 3个月
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 案例:Flume消费Kafka数据保存Hive

Flume消费Kafka数据保存到Hive

场景

通过Flume消费Kafka中数据,保存数据到ODS层,数据存储时标记消费时的元信息

创建Hive表

orc存储,snappy压缩,开启事务

ORC事务表

  • 只能是内部表
  • 必须创建桶
create TABLE hr.ods_internetbar_data
(
  k_topic string ,
  k_data string,
  k_partition int,
  k_offset int,
  k_key string,
  current_time bigint
)
  partitioned by (pt_dt int)
CLUSTERED BY(k_partition) into 5 buckets
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n' 
STORED AS orc 
TBLPROPERTIES('orc.compress'='SNAPPY','transactional'='true');

编写Interceptor

思路:我们将Kafka中的元信息(topic名称,partition等)通过Interceptor转换成以'\t'分隔的数据,然后将数据保存到Hive

拦截器com.jast.flume.ETLInterceptor代码如下:

package com.jast.flume;

import cn.hutool.json.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
 * @author Jast
 * @description 自定义拦截器
 * @date 2022-03-17 09:20
 */
public class ETLInterceptor implements Interceptor {
   

    private final String separator = "\t";
    private final String defaultValue = "";
    @Override
    public void initialize() {
   

    }

    @Override
    public Event intercept(Event event) {
   
        //拦截kafka时,这里获取到的Head,是Kafka元信息,包括key,topic,partition,offset等信息,e.g. {topic=ZW_WB_1010000009, partition=0, offset=5425705, key={"dataType":"ZW_WB_","fileName":"yc_1644913124_11086","fullLinkId":"3e9fc439e42c4059b46e5d5664d08bf8","lineNum":17,"subFullLinkId":"77dc3a9680ee4d1caeb6719d87e8d3e5","totalCount":50}, timestamp=1648642188763}
        Map<String, String> headers = event.getHeaders();
        String topic = headers.getOrDefault("topic",defaultValue);
        String partition = headers.getOrDefault("partition",defaultValue);
        String offset = headers.getOrDefault("offset",defaultValue);
        String key = headers.getOrDefault("key",defaultValue);
        long currentTimeMillis = System.currentTimeMillis();
        //System.out.println("拦截器head:"+headers);
        byte[] body = event.getBody();
        String text = new String(body, StandardCharsets.UTF_8);
        if(JSONUtil.isJson(text)){
   
            text = topic+separator+text+separator+partition+separator+offset+separator+key+separator+currentTimeMillis;
            System.out.println("text:"+text);
            event.setBody(text.getBytes());
            return event;
        }
        //System.out.println("非json格式过滤掉");
        return null;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
   
        Iterator<Event> iterator = list.iterator();
        while (iterator.hasNext()){
   
            Event next = iterator.next();
            if(intercept(next)==null){
   
                iterator.remove();
            }
        }
        return list;
    }

    @Override
    public void close() {
   

    }


    public static class Builder implements Interceptor.Builder{
   

        @Override
        public Interceptor build() {
   
            return new ETLInterceptor();
        }
        @Override
        public void configure(Context context) {
   

        }
    }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flume-interceptor</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.7.19</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

打包后生成flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar文件

将生成的包放到flume lib 目录中

  • 自己部署的Flume:

    需要先将打好的包放入到flume/lib文件夹下面

  • CDH版本Flume:

    需要先将打好的包放入到flume/lib文件夹下面

​ 具体的目录/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/flume-ng/lib/

cp flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/flume-ng/lib

编写Flume配置文件

需要注意的是a1.sinks.k1.serializer.fieldnamesa1.sources.r1.interceptors.i1.type

a1.sinks.k1.serializer.fieldnames:拦截器\t处理后在这里与Hive表的字段对应上

a1.sources.r1.interceptors.i1.type:拦截器配置

配置文件k2h.config内容如下

## 组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
# 每批次数量
a1.sources.r1.batchSize = 1000
# 将批写入通道之前的最长时间(毫秒)。每当达到第一个大小和时间时,将写入批。
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = 192.168.60.16:9092
# kafka消费主题,多个主题逗号分隔
a1.sources.r1.kafka.topics=ZW_WB_1010000009
# 默认:PLAINTEXT,如果使用某种安全级别写入 Kafka,则设置为 SASL_PLAINTEXT、SASL_SSL 或 SSL。
a1.sources.r1.kafka.consumer.security.protocol=PLAINTEXT
# 配置拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.jast.flume.ETLInterceptor$Builder

## channel1
# 基于文件存储的Channel
a1.channels.c1.type = file
# checkpoint 保存目录
a1.channels.c1.checkpointDir = /var/lib/hadoop-hdfs/flume/checkpoint/behavior1
# 用于存储日志文件的目录的逗号分隔列表。 在不同磁盘上使用多个目录可以提高文件通道性能
a1.channels.c1.dataDirs = /var/lib/hadoop-hdfs/flume/data/behavior1/
# 最大事务大小
a1.channels.c1.transactionCapacity=10000
# 通道最大容量
a1.channels.c1.capacity=1000000


a1.sinks.k1.type = hive
a1.sinks.k1.channel = c1
a1.sinks.k1.hive.metastore = thrift://192.168.60.14:9083
# hive 库名
a1.sinks.k1.hive.database = hr
# hive表名称
a1.sinks.k1.hive.table = ods_internetbar_data
# 分区使用字段,转义字符介绍在文章下面
a1.sinks.k1.hive.partition = %y%m%d
# 在替换转义序列时使用本地时间(而不是事件标头中的时间戳)。
a1.sinks.k1.useLocalTimeStamp = false
# ============================================================
# e.g. 每6小时产生一个新文件,比如把24小时分成4份,假设现在的时间是15:40,如果这时候有新的日志到来,那么hdfs 会创建一个新的hdfs文件,文件名称是2015102012 ,就是15:40 是分布在12-18这个区间的
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
# collector1.sinks.sink_hdfs.hdfs.rollSize = 2048000000 
# collector1.sinks.sink_hdfs.hdfs.rollCount = 0 
# collector1.sinks.sink_hdfs.hdfs.rollInterval = 21600
# rollsize 的配置表示到2G大小的时候回滚到下一个文件,也就是到了这个时间 hdfs就会rename正在写的文件到已经写完
# rollInterval 的配置表示每个6小时回滚到下一个文件
# ============================================================


# 数据解析方式,目前支持DELIMITED(分隔符),JSON(每行为单层Json)和REGEX(正则表达式)
a1.sinks.k1.serializer = DELIMITED
# 数据字段分割符,如果要使用特殊字符需要添加双引号,例如"\t"
a1.sinks.k1.serializer.delimiter = "\t"
# 数据解析的正则表达式,每个字段的数据被解析成一个group
# a1.sinks.k1.serializer.regex = 
# (类型:字符)自定义底层 serde 使用的分隔符。 如果 serializer.fieldnames 中的字段与表列的顺序相同,serializer.delimiter 与 serializer.serdeSeparator 相同,并且 serializer.fieldnames 中的字段数小于或等于表数,则效率会有所提高 列,因为传入事件正文中的字段不需要重新排序以匹配表列的顺序。 对特殊字符使用单引号,例如“\t”。 确保输入字段不包含此字符。 注意:如果 serializer.delimiter 是单个字符,最好将其设置为相同的字符 
a1.sinks.k1.serializer.serdeSeparator = '\t'
# 输入数据字段到datahub字段的映射,以输入的顺序标示字段,如果要跳过某个字段, 不指定列名即可,例如 c1,c2,,c3,表示将输入数据的第一、二、四字段和hive的c1,c2,c3字段进行匹配。
a1.sinks.k1.serializer.fieldnames = k_topic,k_data,k_partition,k_offset,k_key,current_time
# 单个配置单元事务中写入配置单元的最大事件数,默认15000
a1.sinks.k1.batchSize=10000


## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

上面配置中我们使用到了转义序列a1.sinks.k1.hive.partition = %y%m%d

支持的转义序列:

Alias Description
%{host} Substitute value of event header named “host”. Arbitrary header names are supported.
%t Unix time in milliseconds
%a locale’s short weekday name (Mon, Tue, ...)
%A locale’s full weekday name (Monday, Tuesday, ...)
%b locale’s short month name (Jan, Feb, ...)
%B locale’s long month name (January, February, ...)
%c locale’s date and time (Thu Mar 3 23:05:25 2005)
%d day of month (01)
%D date; same as %m/%d/%y
%H hour (00..23)
%I hour (01..12)
%j day of year (001..366)
%k hour ( 0..23)
%m month (01..12)
%M minute (00..59)
%p locale’s equivalent of am or pm
%s seconds since 1970-01-01 00:00:00 UTC
%S second (00..59)
%y last two digits of year (00..99)
%Y year (2010)
%z +hhmm numeric timezone (for example, -0400)

启动Flume收集数据

 flume-ng agent --conf-file k2h.config --name a1

查看数据

select k_topic,k_partition,k_offset,k_key,current_time,length(k_data) from ods_internetbar_data limit 10;
----

ZW_WB_1010000009        0       5441712 {
  "dataType":"ZW_WB_","fileName":"yc_1644913124_11086","fullLinkId":"f3bc635822ca4af099755b9b339e8e4c","lineNum":24,"subFullLinkId":"c2e3704534e745c0a779b16f059e6411","totalCount":50}    1648644001736   52105
ZW_WB_1010000009        0       5441713 {
  "dataType":"ZW_WB_","fileName":"yc_1644913124_11086","fullLinkId":"f3bc635822ca4af099755b9b339e8e4c","lineNum":25,"subFullLinkId":"c0cec18006ce40848ea3a0494a991999","totalCount":50}    1648644001736   35837

这里使用length(k_data)是为了方便展示,该字段值太大了

目录
相关文章
|
1月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
1月前
|
SQL 分布式计算 Hadoop
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
38 4
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-21 Sqoop 数据迁移工具 简介与环境配置 云服务器 ETL工具 MySQL与Hive数据互相迁移 导入导出
Hadoop-21 Sqoop 数据迁移工具 简介与环境配置 云服务器 ETL工具 MySQL与Hive数据互相迁移 导入导出
50 3
|
1月前
|
SQL 分布式计算 Hadoop
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
43 2
|
1月前
|
SQL
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(二)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(二)
33 2
|
1月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
46 1
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
84 0
|
3月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
268 9
|
3月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
134 0