DataX读取Hive Orc格式表丢失数据处理记录

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
大数据开发治理平台 DataWorks,不限时长
简介: DataX读取Hive Orc格式表丢失数据处理记录

问题

问题概述

DataX读取Hive Orc存储格式表数据丢失

问题详细描述

同步Hive表将数据发送到Kafka,Hive表A数据总量如下

SQL:select count(1) from A;
数量:19397281

使用DataX将表A数据发送到Kafka,最终打印读取数据量为12649450

任务总计耗时                    :               1273s
任务平均流量                    :            2.51MB/s
记录写入速度                    :           9960rec/s
读出记录总数                    :            12649450
读写失败总数                    :                   0

在kafka中查询发送的数据为12649449(有一条垃圾数据被我写自定义KafkaWriter过滤掉了,这里忽略即可)

image-20230511152742037

原因

DataX读取HDFS Orce文件,代码有bug,当读取Hive文件大于BlockSize时会丢失数据,问题代码如下:

InputSplit[] splits = in.getSplits(conf, 1);
RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);

代码位置:

hdfsreader模块,com.alibaba.datax.plugin.reader.hdfsreader.DFSUtil类334行代码(源码为v202210版本)

这里当文件大于BlockSize大小会将文件分为多个,但是下面只取了第一个文件splits[0],其他数据就会丢失

我们发现问题后,去验证一下,hive表存储目录查询文件存储大小如下

$ hdfs dfs -du -h  /usr/hive/warehouse/dwd.db/A/dt=2022-05-05
....
518.4 K   1.5 M    /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000171_0
669.7 M   2.0 G    /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000172_0
205.6 K   616.9 K  /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000173_0
264.6 K   793.9 K  /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000174_0
1.4 M     4.3 M    /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000175_0
1.5 M     4.6 M    /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000176_0
....

发现果然有文件669.7 M 2.0 G /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000172_0大于BlockSize大小

解决方法

修改源码

修改后方法源码如下,直接替换DFSUtil.javaorcFileStartRead方法即可

public void orcFileStartRead(
            String sourceOrcFilePath,
            Configuration readerSliceConfig,
            RecordSender recordSender,
            TaskPluginCollector taskPluginCollector) {
   
   
        LOG.info(String.format("Start Read orcfile [%s].", sourceOrcFilePath));
        List<ColumnEntry> column =
                UnstructuredStorageReaderUtil.getListColumnEntry(
                        readerSliceConfig,
                        com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
        String nullFormat =
                readerSliceConfig.getString(
                        com.alibaba.datax.plugin.unstructuredstorage.reader.Key.NULL_FORMAT);
        StringBuilder allColumns = new StringBuilder();
        StringBuilder allColumnTypes = new StringBuilder();
        boolean isReadAllColumns = false;
        int columnIndexMax = -1;
        // 判断是否读取所有列
        if (null == column || column.size() == 0) {
   
   
            int allColumnsCount = getAllColumnsCount(sourceOrcFilePath);
            columnIndexMax = allColumnsCount - 1;
            isReadAllColumns = true;
        } else {
   
   
            columnIndexMax = getMaxIndex(column);
        }
        for (int i = 0; i <= columnIndexMax; i++) {
   
   
            allColumns.append("col");
            allColumnTypes.append("string");
            if (i != columnIndexMax) {
   
   
                allColumns.append(",");
                allColumnTypes.append(":");
            }
        }
        if (columnIndexMax >= 0) {
   
   
            JobConf conf = new JobConf(hadoopConf);
            Path orcFilePath = new Path(sourceOrcFilePath);
            Properties p = new Properties();
            p.setProperty("columns", allColumns.toString());
            p.setProperty("columns.types", allColumnTypes.toString());
            try {
   
   
                OrcSerde serde = new OrcSerde();
                serde.initialize(conf, p);
                StructObjectInspector inspector =
                        (StructObjectInspector) serde.getObjectInspector();
                InputFormat<?, ?> in = new OrcInputFormat();
                FileInputFormat.setInputPaths(conf, orcFilePath.toString());

                // If the network disconnected, will retry 45 times, each time the retry interval
                // for 20 seconds
                // Each file as a split
                InputSplit[] splits = in.getSplits(conf, -1);
                for (InputSplit split : splits) {
   
   
                    {
   
   
                        RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL);
                        Object key = reader.createKey();
                        Object value = reader.createValue();
                        // 获取列信息
                        List<? extends StructField> fields = inspector.getAllStructFieldRefs();

                        List<Object> recordFields;
                        while (reader.next(key, value)) {
   
   
                            recordFields = new ArrayList<Object>();

                            for (int i = 0; i <= columnIndexMax; i++) {
   
   
                                Object field = inspector.getStructFieldData(value, fields.get(i));
                                recordFields.add(field);
                            }
                            transportOneRecord(
                                    column,
                                    recordFields,
                                    recordSender,
                                    taskPluginCollector,
                                    isReadAllColumns,
                                    nullFormat);
                        }
                        reader.close();
                    }
                    // transportOneRecord(column, recordFields, recordSender,
                    //        taskPluginCollector, isReadAllColumns, nullFormat);
                }
                // reader.close();
            } catch (Exception e) {
   
   
                String message =
                        String.format("从orcfile文件路径[%s]中读取数据发生异常,请联系系统管理员。", sourceOrcFilePath);
                LOG.error(message);
                throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
            }
        } else {
   
   
            String message =
                    String.format(
                            "请确认您所读取的列配置正确!columnIndexMax 小于0,column:%s",
                            JSON.toJSONString(column));
            throw DataXException.asDataXException(HdfsReaderErrorCode.BAD_CONFIG_VALUE, message);
        }
    }

修改完成后将源码打包,打包后在hdfsreader模块下target目录有target/hdfsreader-0.0.1-SNAPSHOT.jar文件,将文件上传到部署服务器上的目录datax/plugin/reader/hdfsreader下,替换之前的包.


[hadoop@10 /datax/plugin/reader/hdfsreader]$ pwd
/datax/plugin/reader/hdfsreader
[hadoop@10 /datax/plugin/reader/hdfsreader]$ ll
total 52
-rw-rw-r-- 1 hadoop hadoop 28828 May 11 14:08 hdfsreader-0.0.1-SNAPSHOT.jar
drwxrwxr-x 2 hadoop hadoop  8192 Dec  9 15:06 libs
-rw-rw-r-- 1 hadoop hadoop   217 Oct 26  2022 plugin_job_template.json
-rw-rw-r-- 1 hadoop hadoop   302 Oct 26  2022 plugin.json

验证

重新启动DataX同步脚本,发现同步数据与Hive表保存数据一致。

目录
相关文章
|
2月前
|
存储 SQL Java
bigdata-18-Hive数据结构与存储格式
bigdata-18-Hive数据结构与存储格式
23 0
|
4月前
|
SQL 存储 HIVE
Hive中的表是如何定义的?请解释表的结构和数据类型。
Hive中的表是如何定义的?请解释表的结构和数据类型。
34 0
|
6月前
|
SQL HIVE
49 Hive修改表
49 Hive修改表
19 0
49 Hive修改表
|
14天前
|
SQL 数据库 HIVE
Hive【基础知识 05】常用DDL操作(数据库操作+创建表+修改表+清空删除表+其他命令)
【4月更文挑战第8天】Hive【基础知识 05】常用DDL操作(数据库操作+创建表+修改表+清空删除表+其他命令)
22 0
|
15天前
|
存储 SQL 算法
【Hive】ORC、Parquet等列式存储的优点
【4月更文挑战第14天】【Hive】ORC、Parquet等列式存储的优点
|
16天前
|
SQL 存储 分布式计算
Hive【基础 01】核心概念+体系架构+数据类型+内容格式+存储格式+内外部表(部分图片来源于网络)
【4月更文挑战第6天】Hive【基础 01】核心概念+体系架构+数据类型+内容格式+存储格式+内外部表(部分图片来源于网络)
31 1
|
3月前
|
SQL 消息中间件 Kafka
Flink部署问题之hive表没有数据如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
4月前
|
SQL 分布式计算 关系型数据库
Sqoop数据导入到Hive表的最佳实践
Sqoop数据导入到Hive表的最佳实践
|
5月前
|
SQL 存储 HIVE
❤️Hive的基本知识(二)Hive中的各种表❤️
❤️Hive的基本知识(二)Hive中的各种表❤️
31 0
|
5月前
|
SQL 存储 关系型数据库
Presto【实践 01】Presto查询性能优化(数据存储+SQL优化+无缝替换Hive表+注意事项)及9个实践问题分享
Presto【实践 01】Presto查询性能优化(数据存储+SQL优化+无缝替换Hive表+注意事项)及9个实践问题分享
105 0