DataX读取Hive Orc格式表丢失数据处理记录

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
实时数仓Hologres,5000CU*H 100GB 3个月
简介: DataX读取Hive Orc格式表丢失数据处理记录

问题

问题概述

DataX读取Hive Orc存储格式表数据丢失

问题详细描述

同步Hive表将数据发送到Kafka,Hive表A数据总量如下

SQL:select count(1) from A;
数量:19397281

使用DataX将表A数据发送到Kafka,最终打印读取数据量为12649450

任务总计耗时                    :               1273s
任务平均流量                    :            2.51MB/s
记录写入速度                    :           9960rec/s
读出记录总数                    :            12649450
读写失败总数                    :                   0

在kafka中查询发送的数据为12649449(有一条垃圾数据被我写自定义KafkaWriter过滤掉了,这里忽略即可)

image-20230511152742037

原因

DataX读取HDFS Orce文件,代码有bug,当读取Hive文件大于BlockSize时会丢失数据,问题代码如下:

InputSplit[] splits = in.getSplits(conf, 1);
RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);

代码位置:

hdfsreader模块,com.alibaba.datax.plugin.reader.hdfsreader.DFSUtil类334行代码(源码为v202210版本)

这里当文件大于BlockSize大小会将文件分为多个,但是下面只取了第一个文件splits[0],其他数据就会丢失

我们发现问题后,去验证一下,hive表存储目录查询文件存储大小如下

$ hdfs dfs -du -h  /usr/hive/warehouse/dwd.db/A/dt=2022-05-05
....
518.4 K   1.5 M    /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000171_0
669.7 M   2.0 G    /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000172_0
205.6 K   616.9 K  /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000173_0
264.6 K   793.9 K  /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000174_0
1.4 M     4.3 M    /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000175_0
1.5 M     4.6 M    /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000176_0
....

发现果然有文件669.7 M 2.0 G /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000172_0大于BlockSize大小

解决方法

修改源码

修改后方法源码如下,直接替换DFSUtil.javaorcFileStartRead方法即可

public void orcFileStartRead(
            String sourceOrcFilePath,
            Configuration readerSliceConfig,
            RecordSender recordSender,
            TaskPluginCollector taskPluginCollector) {
   
   
        LOG.info(String.format("Start Read orcfile [%s].", sourceOrcFilePath));
        List<ColumnEntry> column =
                UnstructuredStorageReaderUtil.getListColumnEntry(
                        readerSliceConfig,
                        com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
        String nullFormat =
                readerSliceConfig.getString(
                        com.alibaba.datax.plugin.unstructuredstorage.reader.Key.NULL_FORMAT);
        StringBuilder allColumns = new StringBuilder();
        StringBuilder allColumnTypes = new StringBuilder();
        boolean isReadAllColumns = false;
        int columnIndexMax = -1;
        // 判断是否读取所有列
        if (null == column || column.size() == 0) {
   
   
            int allColumnsCount = getAllColumnsCount(sourceOrcFilePath);
            columnIndexMax = allColumnsCount - 1;
            isReadAllColumns = true;
        } else {
   
   
            columnIndexMax = getMaxIndex(column);
        }
        for (int i = 0; i <= columnIndexMax; i++) {
   
   
            allColumns.append("col");
            allColumnTypes.append("string");
            if (i != columnIndexMax) {
   
   
                allColumns.append(",");
                allColumnTypes.append(":");
            }
        }
        if (columnIndexMax >= 0) {
   
   
            JobConf conf = new JobConf(hadoopConf);
            Path orcFilePath = new Path(sourceOrcFilePath);
            Properties p = new Properties();
            p.setProperty("columns", allColumns.toString());
            p.setProperty("columns.types", allColumnTypes.toString());
            try {
   
   
                OrcSerde serde = new OrcSerde();
                serde.initialize(conf, p);
                StructObjectInspector inspector =
                        (StructObjectInspector) serde.getObjectInspector();
                InputFormat<?, ?> in = new OrcInputFormat();
                FileInputFormat.setInputPaths(conf, orcFilePath.toString());

                // If the network disconnected, will retry 45 times, each time the retry interval
                // for 20 seconds
                // Each file as a split
                InputSplit[] splits = in.getSplits(conf, -1);
                for (InputSplit split : splits) {
   
   
                    {
   
   
                        RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL);
                        Object key = reader.createKey();
                        Object value = reader.createValue();
                        // 获取列信息
                        List<? extends StructField> fields = inspector.getAllStructFieldRefs();

                        List<Object> recordFields;
                        while (reader.next(key, value)) {
   
   
                            recordFields = new ArrayList<Object>();

                            for (int i = 0; i <= columnIndexMax; i++) {
   
   
                                Object field = inspector.getStructFieldData(value, fields.get(i));
                                recordFields.add(field);
                            }
                            transportOneRecord(
                                    column,
                                    recordFields,
                                    recordSender,
                                    taskPluginCollector,
                                    isReadAllColumns,
                                    nullFormat);
                        }
                        reader.close();
                    }
                    // transportOneRecord(column, recordFields, recordSender,
                    //        taskPluginCollector, isReadAllColumns, nullFormat);
                }
                // reader.close();
            } catch (Exception e) {
   
   
                String message =
                        String.format("从orcfile文件路径[%s]中读取数据发生异常,请联系系统管理员。", sourceOrcFilePath);
                LOG.error(message);
                throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
            }
        } else {
   
   
            String message =
                    String.format(
                            "请确认您所读取的列配置正确!columnIndexMax 小于0,column:%s",
                            JSON.toJSONString(column));
            throw DataXException.asDataXException(HdfsReaderErrorCode.BAD_CONFIG_VALUE, message);
        }
    }

修改完成后将源码打包,打包后在hdfsreader模块下target目录有target/hdfsreader-0.0.1-SNAPSHOT.jar文件,将文件上传到部署服务器上的目录datax/plugin/reader/hdfsreader下,替换之前的包.


[hadoop@10 /datax/plugin/reader/hdfsreader]$ pwd
/datax/plugin/reader/hdfsreader
[hadoop@10 /datax/plugin/reader/hdfsreader]$ ll
total 52
-rw-rw-r-- 1 hadoop hadoop 28828 May 11 14:08 hdfsreader-0.0.1-SNAPSHOT.jar
drwxrwxr-x 2 hadoop hadoop  8192 Dec  9 15:06 libs
-rw-rw-r-- 1 hadoop hadoop   217 Oct 26  2022 plugin_job_template.json
-rw-rw-r-- 1 hadoop hadoop   302 Oct 26  2022 plugin.json

验证

重新启动DataX同步脚本,发现同步数据与Hive表保存数据一致。

目录
相关文章
|
7月前
|
存储 SQL Java
bigdata-18-Hive数据结构与存储格式
bigdata-18-Hive数据结构与存储格式
71 0
|
7月前
|
SQL 存储 HIVE
Hive中的表是如何定义的?请解释表的结构和数据类型。
Hive中的表是如何定义的?请解释表的结构和数据类型。
109 0
|
4月前
|
SQL 物联网 数据处理
"颠覆传统,Hive SQL与Flink激情碰撞!解锁流批一体数据处理新纪元,让数据决策力瞬间爆表,你准备好了吗?"
【8月更文挑战第9天】数据时代,实时性和准确性至关重要。传统上,批处理与流处理各司其职,但Apache Flink打破了这一界限,尤其Flink与Hive SQL的结合,开创了流批一体的数据处理新时代。这不仅简化了数据处理流程,还极大提升了效率和灵活性。例如,通过Flink SQL,可以轻松实现流数据与批数据的融合分析,无需在两者间切换。这种融合不仅降低了技术门槛,还为企业提供了更强大的数据支持,无论是在金融、电商还是物联网领域,都将发挥巨大作用。
64 6
|
6月前
|
SQL 缓存 关系型数据库
ClickHouse(19)ClickHouse集成Hive表引擎详细解析
Hive引擎允许对HDFS Hive表执行 `SELECT` 查询。目前它支持如下输入格式: -文本:只支持简单的标量列类型,除了 `Binary` - ORC:支持简单的标量列类型,除了`char`; 只支持 `array` 这样的复杂类型 - Parquet:支持所有简单标量列类型;只支持 `array` 这样的复杂类型
226 1
|
7月前
|
SQL 关系型数据库 MySQL
Hive 表注释乱码解决
Hive元数据在MySQL默认使用`latin1`字符集导致注释乱码。可通过修改MySQL配置文件`/etc/my.cnf`,在`[mysqld]`和末尾添加`character-set-server=utf8`等设置,重启MySQL。然后在Hive数据库中调整表字段、分区字段、索引注释的字符集。注意,这仅对新表生效。测试创建带注释的Hive表,问题解决。
101 0
|
7月前
|
SQL 存储 分布式计算
【Hive】hive内部表和外部表的区别
【4月更文挑战第14天】【Hive】hive内部表和外部表的区别
|
7月前
|
SQL HIVE
Hive表删除数据不支持使用Delete From...
Hive表删除数据不支持使用Delete From...
325 0
|
7月前
|
存储 SQL 算法
【Hive】ORC、Parquet等列式存储的优点
【4月更文挑战第14天】【Hive】ORC、Parquet等列式存储的优点
|
7月前
|
SQL 存储 分布式计算
Hive【基础 01】核心概念+体系架构+数据类型+内容格式+存储格式+内外部表(部分图片来源于网络)
【4月更文挑战第6天】Hive【基础 01】核心概念+体系架构+数据类型+内容格式+存储格式+内外部表(部分图片来源于网络)
147 1