Apache Hudi实时入湖之DeltaStreamer最佳实践

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
全局流量管理 GTM,标准版 1个月
简介: Apache Hudi实时入湖之DeltaStreamer最佳实践

1. 背景

传统大数据平台的组织架构是针对离线数据处理需求设计的,常用的数据导入方式为采用sqoop定时作业批量导入。随着数据分析对实时性要求不断提高,按小时、甚至分钟级的数据同步越来越普遍。由此展开了基于spark/flink流处理机制的(准)实时同步系统的开发。

然而实时同步从一开始就面临如下几个挑战:

小文件问题。不论是spark的microbatch模式,还是flink的逐条处理模式,每次写入HDFS时都是几MB甚至几十KB的文件。长时间下来产生的大量小文件,会对HDFS namenode产生巨大的压力。对update操作的支持。HDFS系统本身不支持数据的修改,无法实现同步过程中对记录进行修改。事务性。不论是追加数据还是修改数据,如何保证事务性。即数据只在流处理程序commit操作时一次性写入HDFS,当程序rollback时,已写入或部分写入的数据能随之删除。

Hudi就是针对以上问题的解决方案之一。使用Hudi自带的DeltaStreamer工具写数据到Hudi,开启–enable-hive-sync 即可同步数据到hive表。

2. Hudi DeltaStreamer写入工具介绍

DeltaStreamer工具使用参考 https://hudi.apache.org/cn/docs/writing_data.html

HoodieDeltaStreamer实用工具 (hudi-utilities-bundle中的一部分) 提供了从DFS或Kafka等不同来源进行摄取的方式,并具有以下功能。

从Kafka单次摄取新事件,从Sqoop、HiveIncrementalPuller输出或DFS文件夹中的多个文件支持json、avro或自定义记录类型的传入数据管理检查点,回滚和恢复利用DFS或Confluent schema注册表的Avro模式。支持自定义转换操作

3. 场景说明

1.生产库数据通过CDC工具(debezium)实时录入到MRS集群中Kafka的指定topic里。2.通过Hudi提供的DeltaStreamer工具,读取Kafka指定topic里的数据并解析处理。3.同时使用DeltaStreamer工具将处理后的数据写入到MRS集群的hive里。

样例数据简介 生产库MySQL原始数据:

CDC工具debezium简介 对接步骤具体参考:https://fusioninsight.github.io/ecosystem/zh-hans/Data_Integration/DEBEZIUM/

完成对接后,针对MySQL生产库分别做增、改、删除操作对应的kafka消息

增加操作: insert into hudi.hudisource3 values (11,“蒋语堂”,“38”,“女”,“图”,“播放器”,“28732”);

对应kafka消息体:

更改操作:UPDATE hudi.hudisource3 SET uname=‘Anne Marie333’ WHERE uid=11;

对应kafka消息体:

删除操作:delete from hudi.hudisource3 where uid=11;

对应kafka消息体:

4. 调试步骤

4.1 华为MRS Hudi样例工程获取

根据实际MRS版本登录github获取样例代码:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.1.0

打开工程SparkOnHudiJavaExample

4.2 样例代码修改及介绍

1. debeziumJsonParser

说明:对debezium的消息体进行解析,获取到op字段。

源码如下:

package com.huawei.bigdata.hudi.examples;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.alibaba.fastjson.TypeReference;public class debeziumJsonParser {    public static String getOP(String message){        JSONObject json_obj = JSON.parseObject(message);        String op = json_obj.getJSONObject("payload").get("op").toString();        return  op;    }}

2. MyJsonKafkaSource

说明:DeltaStreamer默认使用org.apache.hudi.utilities.sources.JsonKafkaSource消费kafka指定topic的数据,如果消费阶段涉及数据的解析操作,则需要重写MyJsonKafkaSource进行处理。

以下是源码,增加注释

package com.huawei.bigdata.hudi.examples;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.alibaba.fastjson.parser.Feature;import org.apache.hudi.common.config.TypedProperties;import org.apache.hudi.common.util.Option;import org.apache.hudi.config.HoodieWriteConfig;import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;import org.apache.hudi.utilities.schema.SchemaProvider;import org.apache.hudi.utilities.sources.InputBatch;import org.apache.hudi.utilities.sources.JsonSource;import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.log4j.LogManager;import org.apache.log4j.Logger;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.sql.SparkSession;import org.apache.spark.streaming.kafka010.KafkaUtils;import org.apache.spark.streaming.kafka010.LocationStrategies;import org.apache.spark.streaming.kafka010.OffsetRange;import java.util.Map;/** * Read json kafka data. */public class MyJsonKafkaSource extends JsonSource {    private static final Logger LOG = LogManager.getLogger(MyJsonKafkaSource.class);    private final KafkaOffsetGen offsetGen;    private final HoodieDeltaStreamerMetrics metrics;    public MyJsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession,                             SchemaProvider schemaProvider) {        super(properties, sparkContext, sparkSession, schemaProvider);        HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder();        this.metrics = new HoodieDeltaStreamerMetrics(builder.withProperties(properties).build());        properties.put("key.deserializer", StringDeserializer.class);        properties.put("value.deserializer", StringDeserializer.class);        offsetGen = new KafkaOffsetGen(properties);    }    @Override    protected InputBatch<JavaRDD<String>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {        OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics);        long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);        LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());        if (totalNewMsgs <= 0) {            return new InputBatch<>(Option.empty(), CheckpointUtils.offsetsToStr(offsetRanges));        }        JavaRDD<String> newDataRDD = toRDD(offsetRanges);        return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges));    }    private JavaRDD<String> toRDD(OffsetRange[] offsetRanges) {        return KafkaUtils.createRDD(this.sparkContext, this.offsetGen.getKafkaParams(), offsetRanges, LocationStrategies.PreferConsistent()).filter((x)->{            //过滤空行和脏数据            String msg = (String)x.value();            if (msg == null) {                return false;            }            try{                String op = debeziumJsonParser.getOP(msg);            }catch (Exception e){                return false;            }            return true;        }).map((x) -> {            //将debezium接进来的数据解析写进map,在返回map的tostring, 这样结构改动最小            String msg = (String)x.value();            String op = debeziumJsonParser.getOP(msg);            JSONObject json_obj = JSON.parseObject(msg, Feature.OrderedField);            Boolean is_delete = false;            String out_str = "";            Object out_obj = new Object();            if(op.equals("c")){                out_obj =  json_obj.getJSONObject("payload").get("after");            }            else if(op.equals("u")){                out_obj =   json_obj.getJSONObject("payload").get("after");            }            else {                is_delete = true;                out_obj =   json_obj.getJSONObject("payload").get("before");            }            Map out_map = (Map)out_obj;            out_map.put("_hoodie_is_deleted",is_delete);            out_map.put("op",op);            return out_map.toString();        });    }}

3. TransformerExample

说明:入湖hudi表或者hive表时候需要指定的字段

以下是源码,增加注释

package com.huawei.bigdata.hudi.examples;import org.apache.hudi.common.config.TypedProperties;import org.apache.hudi.utilities.transform.Transformer;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.RowFactory;import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;import java.io.Serializable;import java.util.ArrayList;import java.util.List;/** * 功能描述 * 对获取的数据进行format */public class TransformerExample implements Transformer, Serializable {    /**     * format data     *     * @param JavaSparkContext jsc     * @param SparkSession sparkSession     * @param Dataset<Row> rowDataset     * @param TypedProperties properties     * @return Dataset<Row>     */    @Override    public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,        TypedProperties properties) {        JavaRDD<Row> rowJavaRdd = rowDataset.toJavaRDD();        List<Row> rowList = new ArrayList<>();        for (Row row : rowJavaRdd.collect()) {            Row one_row = buildRow(row);            rowList.add(one_row);        }        JavaRDD<Row> stringJavaRdd = jsc.parallelize(rowList);        List<StructField> fields = new ArrayList<>();        builFields(fields);        StructType schema = DataTypes.createStructType(fields);        Dataset<Row> dataFrame = sparkSession.createDataFrame(stringJavaRdd, schema);        return dataFrame;    }    private void builFields(List<StructField> fields) {        fields.add(DataTypes.createStructField("uid", DataTypes.IntegerType, true));        fields.add(DataTypes.createStructField("uname", DataTypes.StringType, true));        fields.add(DataTypes.createStructField("age", DataTypes.StringType, true));        fields.add(DataTypes.createStructField("sex", DataTypes.StringType, true));        fields.add(DataTypes.createStructField("mostlike", DataTypes.StringType, true));        fields.add(DataTypes.createStructField("lastview", DataTypes.StringType, true));        fields.add(DataTypes.createStructField("totalcost", DataTypes.StringType, true));        fields.add(DataTypes.createStructField("_hoodie_is_deleted", DataTypes.BooleanType, true));        fields.add(DataTypes.createStructField("op", DataTypes.StringType, true));    }    private Row buildRow(Row row) {        Integer uid = row.getInt(0);        String uname = row.getString(1);        String age = row.getString(2);        String sex = row.getString(3);        String mostlike = row.getString(4);        String lastview = row.getString(5);        String totalcost = row.getString(6);        Boolean _hoodie_is_deleted = row.getBoolean(7);        String op = row.getString(8);        Row returnRow = RowFactory.create(uid, uname, age, sex, mostlike, lastview, totalcost, _hoodie_is_deleted, op);        return returnRow;    }}

4. DataSchemaProviderExample

说明:分别指定MyJsonKafkaSource返回的数据格式为source schema,TransformerExample写入的数据格式为target schema

以下是源码

package com.huawei.bigdata.hudi.examples;import org.apache.avro.Schema;import org.apache.hudi.common.config.TypedProperties;import org.apache.hudi.utilities.schema.SchemaProvider;import org.apache.spark.api.java.JavaSparkContext;/** * 功能描述 * 提供sorce和target的schema */public class DataSchemaProviderExample extends SchemaProvider {    public DataSchemaProviderExample(TypedProperties props, JavaSparkContext jssc) {        super(props, jssc);    }    /**     * source schema     *     * @return Schema     */    @Override    public Schema getSourceSchema() {        Schema avroSchema = new Schema.Parser().parse(                "{\"type\":\"record\",\"name\":\"hoodie_source\",\"fields\":[{\"name\":\"uid\",\"type\":\"int\"},{\"name\":\"uname\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"string\"},{\"name\":\"sex\",\"type\":\"string\"},{\"name\":\"mostlike\",\"type\":\"string\"},{\"name\":\"lastview\",\"type\":\"string\"},{\"name\":\"totalcost\",\"type\":\"string\"},{\"name\":\"_hoodie_is_deleted\",\"type\":\"boolean\"},{\"name\":\"op\",\"type\":\"string\"}]}");        return avroSchema;    }    /**     * target schema     *     * @return Schema     */    @Override    public Schema getTargetSchema() {        Schema avroSchema = new Schema.Parser().parse(            "{\"type\":\"record\",\"name\":\"mytest_record\",\"namespace\":\"hoodie.mytest\",\"fields\":[{\"name\":\"uid\",\"type\":\"int\"},{\"name\":\"uname\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"string\"},{\"name\":\"sex\",\"type\":\"string\"},{\"name\":\"mostlike\",\"type\":\"string\"},{\"name\":\"lastview\",\"type\":\"string\"},{\"name\":\"totalcost\",\"type\":\"string\"},{\"name\":\"_hoodie_is_deleted\",\"type\":\"boolean\"},{\"name\":\"op\",\"type\":\"string\"}]}");        return avroSchema;    }}

将工程打包(hudi-security-examples-0.7.0.jar)以及json解析包(fastjson-1.2.4.jar)上传至MRS客户端

5. DeltaStreamer启动命令

登录客户端执行一下命令获取环境变量以及认证

source /opt/hadoopclient/bigdata_envkinit developusersource /opt/hadoopclient/Hudi/component_env

DeltaStreamer启动命令如下:

spark-submit --master yarn-client \--jars /opt/hudi-demo2/fastjson-1.2.4.jar,/opt/hudi-demo2/hudi-security-examples-0.7.0.jar \--driver-class-path /opt/hadoopclient/Hudi/hudi/conf:/opt/hadoopclient/Hudi/hudi/lib/*:/opt/hadoopclient/Spark2x/spark/jars/*:/opt/hudi-demo2/hudi-security-examples-0.7.0.jar \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \spark-internal --props file:///opt/hudi-demo2/kafka-source.properties \--target-base-path /tmp/huditest/delta_demo2 \--table-type COPY_ON_WRITE  \--target-table delta_demo2  \--source-ordering-field uid \--source-class com.huawei.bigdata.hudi.examples.MyJsonKafkaSource \--schemaprovider-class com.huawei.bigdata.hudi.examples.DataSchemaProviderExample \--transformer-class com.huawei.bigdata.hudi.examples.TransformerExample \--enable-hive-sync --continuous

kafka.properties配置

// hudi配置hoodie.datasource.write.recordkey.field=uidhoodie.datasource.write.partitionpath.field=hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGeneratorhoodie.datasource.write.hive_style_partitioning=truehoodie.delete.shuffle.parallelism=10hoodie.upsert.shuffle.parallelism=10hoodie.bulkinsert.shuffle.parallelism=10hoodie.insert.shuffle.parallelism=10hoodie.finalize.write.parallelism=10hoodie.cleaner.parallelism=10hoodie.datasource.write.precombine.field=uidhoodie.base.path = /tmp/huditest/delta_demo2hoodie.timeline.layout.version = 1`// hive confighoodie.datasource.hive_sync.table=delta_demo2hoodie.datasource.hive_sync.partition_fields=hoodie.datasource.hive_sync.assume_date_partitioning=falsehoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractorhoodie.datasource.hive_sync.use_jdbc=false// Kafka Source topichoodie.deltastreamer.source.kafka.topic=hudisource// checkpointhoodie.deltastreamer.checkpoint.provider.path=hdfs://hacluster/tmp/delta_demo2/checkpoint/// Kafka propsbootstrap.servers=172.16.9.117:21005auto.offset.reset=earliestgroup.id=a5offset.rang.limit=10000

注意:kafka服务端配置 allow.everyone.if.no.acl.found 为true

5. 使用Spark查询

spark-shell --master yarnval roViewDF = spark.read.format("org.apache.hudi").load("/tmp/huditest/delta_demo2/*")roViewDF.createOrReplaceTempView("hudi_ro_table")spark.sql("select * from  hudi_ro_table").show()

Mysql增加操作对应spark中hudi表查询结果:

Mysql更新操作对应spark中hudi表查询结果:

删除操作:

6. 使用Hive查询

beelineselect * from delta_demo2;

Mysql增加操作对应hive表中查询结果:

Mysql更新操作对应hive表中查询结果:

Mysql删除操作对应hive表中查询结果:

目录
相关文章
|
7月前
|
存储 Apache
Apache Hudi Savepoint实现分析
Apache Hudi Savepoint实现分析
122 0
|
1月前
|
消息中间件 监控 大数据
优化Apache Kafka性能:最佳实践与调优策略
【10月更文挑战第24天】作为一名已经对Apache Kafka有所了解并有实际使用经验的开发者,我深知在大数据处理和实时数据流传输中,Kafka的重要性不言而喻。然而,在面对日益增长的数据量和业务需求时,如何保证系统的高性能和稳定性成为了摆在我们面前的一个挑战。本文将从我的个人视角出发,分享一些关于如何通过合理的配置和调优来提高Kafka性能的经验和建议。
84 4
|
7月前
|
Apache
Apache Hudi Rollback实现分析
Apache Hudi Rollback实现分析
103 0
|
5月前
|
SQL 分布式计算 Apache
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
本文将在 Docker 环境下,为读者介绍如何快速搭建 Apache Doris + Apache Hudi 的测试及演示环境,并对各功能操作进行演示,帮助读者快速入门。
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
|
6月前
|
消息中间件 Java Kafka
实时计算 Flink版操作报错合集之从hudi读数据,报错NoSuchMethodError:org.apache.hudi.format.cow.vector.reader.PaequetColumnarRowSplit.getRecord(),该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
129 0
|
7月前
|
存储 SQL 分布式计算
使用Amazon EMR和Apache Hudi在S3上插入,更新,删除数据
使用Amazon EMR和Apache Hudi在S3上插入,更新,删除数据
228 0
|
7月前
|
存储 分布式计算 Hadoop
一文了解Apache Hudi架构、工具和最佳实践
一文了解Apache Hudi架构、工具和最佳实践
1308 0
|
7月前
|
SQL 分布式计算 NoSQL
使用Apache Hudi和Debezium构建健壮的CDC管道
使用Apache Hudi和Debezium构建健壮的CDC管道
81 0
|
7月前
|
存储 SQL 消息中间件
Apache Hudi:统一批和近实时分析的存储和服务
Apache Hudi:统一批和近实时分析的存储和服务
109 0
|
7月前
|
分布式计算 API Apache
解锁Apache Hudi删除记录新姿势
解锁Apache Hudi删除记录新姿势
218 0

推荐镜像

更多