Hive实战UDF 外部依赖文件找不到的问题

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
对象存储 OSS,内容安全 1000次 1年
简介: 其实这篇文章的起源是,我司有数据清洗时将ip转化为类似中国-湖北-武汉地区这种需求。由于ip服务商提供的Demo,只能在本地读取,我需要将ip库上传到HDFS分布式存储,每个计算节点再从HDFS下载到本地。那么到底能不能直接从HDFS读取呢?跟我强哥讲了这件事后,不服输的他把肝儿都熬黑了,终于给出了解决方案。

其实这篇文章的起源是,我司有数据清洗时将ip转化为类似中国-湖北-武汉地区这种需求。由于ip服务商提供的Demo,只能在本地读取,我需要将ip库上传到HDFS分布式存储,每个计算节点再从HDFS下载到本地。


那么到底能不能直接从HDFS读取呢?跟我强哥讲了这件事后,不服输的他把肝儿都熬黑了,终于给出了解决方案。


关于外部依赖文件找不到的问题

微信图片_20220426143447.png


其实我在上一篇的总结中也说过了你需要确定的上传的db 文件在那里,也就是你在hive 中调用add file之后 会出现添加后的文件路径或者使用list 命令来看一下


今天我们不讨论这个问题我们讨论另外一个问题,外部依赖的问题,当然这个问题的引入本来就很有意思,其实是一个很简单的事情。


为什么要使用外部依赖


重点强调一下我们的外部依赖并不是单单指的是jar包依赖,我们的程序或者是UDF 依赖的一切外部文件都可以算作是外部依赖。


使用外部依赖的的原因是我们的程序可能需要一些外部的文件,或者是其他的一些信息,例如我们这里的UDF 中的IP 解析库(DB 文件),或者是你需要在UDF 访问一些网络信息等等。


为什么idea 里面可以运行上线之后不行


我们很多如人的一个误区就是明明我在IDEA 里面都可以运行为什么上线或者是打成jar 包之后就不行,其实你在idea 可以运行之后不应该直接上线的,或者说是不应该直接创建UDF 的,而是先应该测试一下jar 是否可以正常运行,如果jar 都不能正常运行那UDF 坑定就运行报错啊。


接下来我们就看一下为什么idea 可以运行,但是jar 就不行,代码我们就不全部粘贴了,只粘贴必要的,完整代码可以看前面一篇文章

@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
    converter = ObjectInspectorConverters.getConverter(arguments[0], PrimitiveObjectInspectorFactory.writableStringObjectInspector);
    String dbPath = Ip2Region.class.getResource("/ip2region.db").getPath();
    File file = new File(dbPath);
    if (file.exists() == false) {
        System.out.println("Error: Invalid ip2region.db file");
        return null;
    }
    DbConfig config = null;
    try {
        config = new DbConfig();
        searcher = new DbSearcher(config, dbPath);
    } catch (DbMakerConfigException | FileNotFoundException e) {
        e.printStackTrace();
    }
    return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
}


这就是我们读取外部配置文件的方法,我们接下来写一个测试

@Test
public void ip2Region() throws HiveException {
    Ip2Region udf = new Ip2Region();
    ObjectInspector valueOI0 = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
    ObjectInspector[] init_args = {valueOI0};
    udf.initialize(init_args);
    String ip = "220.248.12.158";
    GenericUDF.DeferredObject valueObj0 = new GenericUDF.DeferredJavaObject(ip);
    GenericUDF.DeferredObject[] args = {valueObj0};
    Text res = (Text) udf.evaluate(args);
    System.out.println(res.toString());
}


我们发现是可以正常运行的,这里我们把它打成jar 包再运行一下,为了方便测试我们将这个测试方法改成main 方法,我们还是先在idea 里面运行一下

微信图片_20220426143459.png

我们发现还是可以正常运行,我们接下来打个jar包试一下

Error: Invalid ip2region.db file
java.io.FileNotFoundException: file: /Users/liuwenqiang/workspace/code/idea/HiveUDF/target/HiveUDF-0.0.4.jar!/ip2region.db (No such file or directory)
        at java.io.RandomAccessFile.open0(Native Method)
        at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
        at java.io.RandomAccessFile.<init>(RandomAccessFile.java:243)
        at java.io.RandomAccessFile.<init>(RandomAccessFile.java:124)
        at org.lionsoul.ip2region.DbSearcher.<init>(DbSearcher.java:58)
        at com.kingcall.bigdata.HiveUDF.Ip2Region.main((Ip2Region.java:42)
Exception in thread "main" java.lang.NullPointerException
        at com.kingcall.bigdata.HiveUDF.Ip2Region.main(Ip2Region.java:48)


我们发现jar 包已经报错了,那你的UDF 肯定运行不了了啊,其实如果你仔细看的话就知道为什么报错了/Users/liuwenqiang/workspace/code/idea/HiveUDF/target/HiveUDF-0.0.4.jar!/ip2region.db 其实就是这个路径,我们很明显看到这个路径是不对的,所以这就是我们UDF报错的原因


依赖文件直接打包在jar 包里面不香吗


上面找到了这个问题,现在我们就看一下如何解决这个问题,出现这个问题的原因就是打包后的路径不对,导致我们的不能找到这个依赖文件,那我们为什要这个路径呢。这个主要是因为我们使用的API 的原因

DbConfig config = new DbConfig();
DbSearcher searcher = new DbSearcher(config, dbPath);


也就是说我们的new DbSearcher(config, dbPath) 第二个参数传的是DB 的路径,所以我们很自然的想到看一下源码是怎么使用这个路径的,能不能传一个其他特定的路径进去,其实我们从idea 里面可以运行就知道,我们是可以传入一个本地路径的。

这里我们以memorySearch 方法作为入口

// 构造方法
    public DbSearcher(DbConfig dbConfig, String dbFile) throws FileNotFoundException {
        this.dbConfig = dbConfig;
        this.raf = new RandomAccessFile(dbFile, "r");
    }
    // 构造方法
    public DbSearcher(DbConfig dbConfig, byte[] dbBinStr) {
        this.dbConfig = dbConfig;
        this.dbBinStr = dbBinStr;
        this.firstIndexPtr = Util.getIntLong(dbBinStr, 0);
        this.lastIndexPtr = Util.getIntLong(dbBinStr, 4);
        this.totalIndexBlocks = (int)((this.lastIndexPtr - this.firstIndexPtr) / (long)IndexBlock.getIndexBlockLength()) + 1;
    }
    // memorySearch 方法
    public DataBlock memorySearch(long ip) throws IOException {
        int blen = IndexBlock.getIndexBlockLength();
        // 读取文件到内存数组
        if (this.dbBinStr == null) {
            this.dbBinStr = new byte[(int)this.raf.length()];
            this.raf.seek(0L);
            this.raf.readFully(this.dbBinStr, 0, this.dbBinStr.length);
            this.firstIndexPtr = Util.getIntLong(this.dbBinStr, 0);
            this.lastIndexPtr = Util.getIntLong(this.dbBinStr, 4);
            this.totalIndexBlocks = (int)((this.lastIndexPtr - this.firstIndexPtr) / (long)blen) + 1;
        }
        int l = 0;
        int h = this.totalIndexBlocks;
        long dataptr = 0L;
        int m;
        int p;
        while(l <= h) {
            m = l + h >> 1;
            p = (int)(this.firstIndexPtr + (long)(m * blen));
            long sip = Util.getIntLong(this.dbBinStr, p);
            if (ip < sip) {
                h = m - 1;
            } else {
                long eip = Util.getIntLong(this.dbBinStr, p + 4);
                if (ip <= eip) {
                    dataptr = Util.getIntLong(this.dbBinStr, p + 8);
                    break;
                }
                l = m + 1;
            }
        }
        if (dataptr == 0L) {
            return null;
        } else {
            m = (int)(dataptr >> 24 & 255L);
            p = (int)(dataptr & 16777215L);
            int city_id = (int)Util.getIntLong(this.dbBinStr, p);
            String region = new String(this.dbBinStr, p + 4, m - 4, "UTF-8");
            return new DataBlock(city_id, region, p);
        }
    }


其实我们看到memorySearch 方法首先是读取DB 文件到内存的字节数组然后使用,而且我们看到有这样一个字节数组的构造方法DbSearcher(DbConfig dbConfig, byte[] dbBinStr)


既然读取文件不行,那我们能不能直接传入字节数组呢?其实可以的

DbSearcher searcher=null;
DbConfig config = new DbConfig();
try {
    config = new DbConfig();
} catch (DbMakerConfigException e) {
    e.printStackTrace();
}
InputStream inputStream = Ip2Region.class.getResourceAsStream("/ip2region.db");
ByteArrayOutputStream output = new ByteArrayOutputStream();
byte[] buffer = new byte[4096];
int n = 0;
while (-1 != (n = inputStream.read(buffer))) {
    output.write(buffer, 0, n);
}
byte[] bytes = output.toByteArray();
searcher = new DbSearcher(config, bytes);
// 只能使用memorySearch 方法
DataBlock block = searcher.memorySearch(ip);
//打印位置信息(格式:国家|大区|省份|城市|运营商)
System.out.println(block.getRegion());


我们还是先在Idea 里面测试,我们发现是可以运行的,然后我们还是打成jar包进行测试,这次我们发现还是可以运行中国|0|上海|上海市|联通


也就是说我们已经把这个问题解决了,有没有什么问题呢?有那就是DB 文件在jar 包里面,不能单独更新,前面我们将分词的时候也水果,停用词库是随着公司的业务发展需要更新的 DB库也是一样的。


也就是说可以这样解决但是不完美,我看到有的人是这样做的他使用getResourceAsStream 把数据读取到内存,然后再写出成本地临时文件,然后再使用,我只想说这个解决方式也太不友好了吧


  1. 文件不能更新


  1. 需要写临时文件(权限问题,如果被删除了还得重写)


只能使用memorySearch 方法


这个原因值得说明一下,因为你使用其他两个search 方法的时候都会抛出异常Exception in thread "main" java.lang.NullPointerException

这主要是因为其他两个方法都是涉及到从文件读取数据进来,但是我们的raf 是null


学会独立思考并且解决问题


上面我们的UDF 其实已经可以正常使用了,但是有不足之处,这里我们就处理一下这个问题,前面我们说过了其实在IDEA 里的路径参数可以使用,那就说明传入本地文件是可以的,但是有一个问题就是我们的UDF 是可能在所有节点上运行的,所以传入本地路径的前提是需要保证所有节点上这个本地路径都可用,但是这样维护成本也很高,还不如直接将依赖放在jar 包里面。


继承DbSearcher


其实我们是可以将这个依赖放在OSS或者是HDFS 上的,但是这个时候你传入路径之后,还是有问题,因为构造方法里面读取文件的时候默认的是本地方法,其实这个时候你可以继承DbSearcher 方法,然后添加新的构造方法,完成从HDFS 上读取文件。

// 构造方法
public DbSearcher(DbConfig dbConfig, byte[] dbBinStr) {
    this.dbConfig = dbConfig;
    this.dbBinStr = dbBinStr;
    this.firstIndexPtr = Util.getIntLong(dbBinStr, 0);
    this.lastIndexPtr = Util.getIntLong(dbBinStr, 4);
    this.totalIndexBlocks = (int)((this.lastIndexPtr - this.firstIndexPtr) / (long)IndexBlock.getIndexBlockLength()) + 1;
}



读取文件传入字节数组


还有一个方法就是我们直接使用第二个构造方法,dbBinStr 就是我们读取进来的字节数组,这个时候不论这个依赖是在HDFS 还是OSS 上你只要调用相关的API 就可以了,其实这个方法我们在读取jar包里面的文件的时候已经使用过了


下面的ctx就是OSS的上下问,用来从OSS上读取数据,同理你可以从任何你需要的地方读取数据。

DbConfig config = null;
try {
    config = new DbConfig();
} catch (DbMakerConfigException e) {
    e.printStackTrace();
}
InputStream inputStream = ctx.readResourceFileAsStream("ip2region.db");
ByteArrayOutputStream output = new ByteArrayOutputStream();
byte[] buffer = new byte[4096];
int n = 0;
while (-1 != (n = inputStream.read(buffer))) {
    output.write(buffer, 0, n);
}
byte[] bytes = output.toByteArray();
searcher = new DbSearcher(config, bytes);


总结


  1. Idea 里面使用文件路径是可以的,但是jar里面不行,要使用也是本地文件或者是使用getResourceAsStream 获取InputStream;


  1. 存储在HDFS或者OSS 上的文件也不能使用路径,因为默认是读取本地文件的;


  1. 多思考,为什么,看看源码,最后请你思考一下怎么在外部依赖的情况下使用binarySearch或者是btreeSearch方法;
相关实践学习
借助OSS搭建在线教育视频课程分享网站
本教程介绍如何基于云服务器ECS和对象存储OSS,搭建一个在线教育视频课程分享网站。
目录
相关文章
|
22天前
|
SQL 存储 Java
Hive UDF UDTF UDAF 自定义函数详解
Hive UDF UDTF UDAF 自定义函数详解
42 2
Hive UDF UDTF UDAF 自定义函数详解
|
8天前
|
SQL Java 程序员
Hive反射函数的使用-程序员是怎么学UDF函数的
Hive反射函数的使用-程序员是怎么学UDF函数的
4 0
|
11天前
|
SQL 缓存 Java
Hive 之 UDF 运用(包会的)
Hive的UDF允许用户自定义数据处理函数,扩展其功能。`reflect()`函数通过Java反射调用JDK中的方法,如静态或实例方法。例如,调用`MathUtils.addNumbers()`进行加法运算。要创建自定义UDF,可以继承`GenericUDF`,实现`initialize`、`evaluate`和`getDisplayString`方法。在`initialize`中检查参数类型,在`evaluate`中执行业务逻辑。最后,打包项目成JAR,上传到HDFS,并在Hive中注册以供使用。
|
22天前
|
SQL 数据采集 存储
Hive实战 —— 电商数据分析(全流程详解 真实数据)
关于基于小型数据的Hive数仓构建实战,目的是通过分析某零售企业的门店数据来进行业务洞察。内容涵盖了数据清洗、数据分析和Hive表的创建。项目需求包括客户画像、消费统计、资源利用率、特征人群定位和数据可视化。数据源包括Customer、Transaction、Store和Review四张表,涉及多个维度的聚合和分析,如按性别、国家统计客户、按时间段计算总收入等。项目执行需先下载数据和配置Zeppelin环境,然后通过Hive进行数据清洗、建表和分析。在建表过程中,涉及ODS、DWD、DWT、DWS和DM五层,每层都有其特定的任务和粒度。最后,通过Hive SQL进行各种业务指标的计算和分析。
148 1
Hive实战 —— 电商数据分析(全流程详解 真实数据)
|
22天前
|
SQL Java 数据处理
【Hive】Hive的函数:UDF、UDAF、UDTF的区别?
【4月更文挑战第17天】【Hive】Hive的函数:UDF、UDAF、UDTF的区别?
|
22天前
|
SQL 存储 算法
【Hive】Hive 小文件过多怎么解决?
【4月更文挑战第16天】【Hive】Hive 小文件过多怎么解决?
|
22天前
|
SQL 存储 分布式计算
Hive【基础知识 02-2】【Hive CLI 命令行工具使用】【详细举例-包含测试脚本文件】
【4月更文挑战第7天】Hive【基础知识 02-2】【Hive CLI 命令行工具使用】【详细举例-包含测试脚本文件】
26 0
|
22天前
|
SQL Java HIVE
Flink依赖问题之connector hive依赖冲突如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
22天前
|
SQL 消息中间件 Apache
Flink报错问题之使用hive udf函数报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
22天前
|
SQL 搜索推荐 Java
Hive中的UDF是什么?请解释其作用和使用方法。
Hive中的UDF是什么?请解释其作用和使用方法。
50 0