Hive 拉链表详解及实例

本文涉及的产品
实时数仓Hologres,5000CU*H 100GB 3个月
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
实时计算 Flink 版,5000CU*H 3个月
简介: 拉链表是一种数据仓库技术,用于处理持续增长且存在时间范围内的重复数据,以节省空间。它在Hive中通过列式存储ORC实现,适用于大规模数据场景,尤其当数据在有限时间内有多种状态变化。配置涉及事务管理和表合并选项。示例中展示了如何从原始订单表创建拉链表,通过聚合操作和动态分区减少数据冗余。增量数据可通过追加到原始表然后更新拉链表来处理。提供的Java代码用于生成模拟的订单增量数据,以演示拉链表的工作流程。

拉链表

  • 版本迭代:hive 0.14 slowly changing dimension => hive 2.6.0 merge 事务管理
    • 原来采用分区表,用户分区存储历史增量数据,缺点是重复数据太多
  • 定义:数仓用于解决持续增长且存在一定时间时间范围内重复的数据
  • 存储:创建拉链表时使用列式存储ORC
    不能使用load加载数据
    压缩比高 效率高
  • 场景:【数据规模庞大】,新数据【在有限的时间】内存在多种状态变化

  • 优点:节约空间(一份订单只有一条数据)

  • 举例:

    原始表订单:
          order_id,order_timestamp,user_id,order_status
          1,2024_01_21 16:12:37.259,87986321,0
          1,2024_01_21 16:12:47.003,87986321,1
          1,2024_01_22 09:00:28.022,87986321,2
          1,2024_01_24 15:00:00.123,87986321,3
          1,2024_02_01 00:30:00.227,87986321,4
          order_detail_id,fk_order_id,goods_id,buy_count,goods_price
    
          拉链表订单:
          order_id,user_id,order_create_timestamp,order_modify_timestamp,order_amount,order_current_status
          1,87986321,2024_01_21 16:12:37.259,2024_02_01 00:30:00.227,3242.66,4
    
  • 配置:

set hive.support.concurrency=true;
set hive.enforce.bucketing=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
set hive.compactor.initiator.on=true; -- 表合并开启
set hive.compactor.worker.threads=1; -- 表合并线程必须为一
set hive.auto.convert.join=false; -- 关闭 mapjoin
set hive.merge.cardinality.check=false; -- 关闭检查数据列的基数(列值的差异性)
set mapreduce.job.reduces=4;
  • 拉链表实例:
// 创建原始表格
    create table yb12211_2.hive_zipper_order(
        order_id bigint,
        user_id bigint,
        order_modify_dt timestamp,
        order_money decimal(10,2),
        current_status int
    )
    row format delimited fields terminated by ',';
    // 将数据文件导入原始表格
    load data local inpath '/root/hive/data/course/order_record.log'
    overwrite into table yb12211_2.hive_zipper_order;

    // 创建拉链表
    // 操作历史全量数据用动态分区
    set hive.support.concurrency=true;
    set hive.enforce.bucketing=true;
    set hive.exec.dynamic.partition.mode=nonstrict;
    set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
    set hive.compactor.initiator.on=true;
    set hive.compactor.worker.threads=1;
    set hive.auto.convert.join=false;
    set hive.merge.cardinality.check=false;
    set mapreduce.job.reduces=4;

    drop table if exists yb12211_2.hive_zipper_pc_order;
    create table yb12211_2.hive_zipper_pc_order(
        order_id bigint,
        user_id bigint,
        order_create_dt timestamp,
        order_modify_dt timestamp,
        order_money decimal(10,2),
        current_status int
    ) partitioned by(year int,month int,day int)
    clustered by(order_create_dt) into 4 buckets
    row format delimited fields terminated by ','
    stored as orc
    tblproperties("transactional"="true");

    // 对拉链表的数据进行聚合,获取订单信息的创建日期、修改日期和订单状态
    with zip_src as (
        select order_id,user_id,order_money,
            min(order_modify_dt) as order_create_dt,
            max(order_modify_dt) as order_modify_dt,
            max(current_status) as current_status
        from yb12211_2.hive_zipper_order
        group by order_id,user_id,order_money
    )

    // 将原始数据灌入拉链表
    insert overwrite table yb12211_2.hive_zipper_pc_order partition(year,month,day)
    select
        order_id,
        user_id,
        order_create_dt,
        order_modify_dt,
        order_money,
        current_status,
        year(order_create_dt) as year,
        month(order_create_dt) as month,
        day(order_create_dt) as day
    from zip_src;

    // 拉链表查询 查询之前必须先有这两句配置
    set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
    set hive.support.concurrency=true;
    select * from yb12211_2.hive_zipper_pc_order
    where to_date(order_modify_dt)='2021-02-04'
    order by order_modify_dt desc;

    // 对于追加增量数据,将增量数据覆盖在原始数据表中
    load data local inpath '/root/hive/data/course/order_record_2021_02_05.log'
    overwrite into table yb12211_2.hive_zipper_order;    

    // 将原始数据表中的增量数据插入拉链表
    // 利用源数据和目标表的order_id进行匹配,若匹配则更新现有订单信息,若不匹配则插入新订单。
    merge into yb12211_2.hive_zipper_pc_order as O
    using (
        select
            order_id,
            user_id,
            order_create_dt,
            order_modify_dt,
            order_money,
            current_status,
            year(order_create_dt) as year,
            month(order_create_dt) as month,
            day(order_create_dt) as day
        from (
            select order_id,user_id,order_money,
                min(order_modify_dt) as order_create_dt,
                max(order_modify_dt) as order_modify_dt,
                max(current_status) as current_status
            from yb12211_2.hive_zipper_order
            group by order_id,user_id,order_money
        )T
    ) as H
    on O.order_id=H.order_id
    when matched then
    update set order_modify_dt=H.order_modify_dt,current_status=H.current_status
    when not matched then
    insert values(H.order_id,H.user_id,H.order_create_dt,H.order_modify_dt,H.order_money,H.current_status,H.year,H.month,H.day);

    // 验证拉链结果:最后修改时间是否大于创建时间
    select * from yb12211_2.hive_zipper_pc_order
    where to_date(order_modify_dt)>to_date(order_create_dt);
  • 验证数据变化的三种情况:
    • 新增数据,插入原始表中的所有字段信息。
    • 更改数据,更改修改时间|结束时间|数据状态。
    • 删除数据:只需将结束日期改为删除当天即可。

构造增量数据

此处提供了订单日增量数据的自动生成代码,读者可利用代码实现对增量数据文件的生成,以便于体验拉链表的作用。

  • HiveZipMaker类
package cn.ybg;

import java.io.*;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;


public class HiveZipTableMaker {
   
    static Properties pro = new Properties();
    static Random rand = new Random();
    static Calendar calendar = Calendar.getInstance(Locale.CHINA);
    static SimpleDateFormat timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:dd.SSS");
    static SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
    static long orderId = 0;
    static long timestamp = 0;
    static String order_log = "order/order_log.properties";
    static String order_record = "order/order_record_2021_02_05.log";
    static String order_list = "order/order_list.log";
    static BufferedWriter bw;
    static long size;
    static LinkedList<Order> orders;

    static {
   
        try {
   
            pro.load(new FileReader(order_log));
            timestamp = Long.parseLong(pro.getProperty("timestamp"));
            calendar.setTimeInMillis(timestamp);
            orderId = Long.parseLong(pro.getProperty("orderId"));
            size = new File(order_record).length();
            bw = new BufferedWriter(new FileWriter(order_record,true));
            File orderList = new File(order_list);
            if (orderList.exists() && orderList.length()>0){
   
                ObjectInputStream ois = new ObjectInputStream(new FileInputStream(orderList));
                orders = (LinkedList<Order>) ois.readObject();
                ois.close();
            }else{
   
                orders = new LinkedList<>();
            }
        } catch (Exception e) {
   
            e.printStackTrace();
            System.exit(-1);
        }
    }

    static int userId(){
   
        return rand.nextInt(10000000)+1;
    }

    static long orderId(){
   
        return ++orderId;
    }

    static Date now(){
   
        calendar.add(Calendar.MILLISECOND,rand.nextInt(5000));
        return calendar.getTime();
    }

    static void write(String line) throws IOException {
   
        if (orderId>1){
   
            bw.newLine();
        }
        bw.write(line);
    }

    static void close(){
   
        try {
   
            bw.close();
            pro.setProperty("timestamp",String.valueOf(timestamp));
            pro.setProperty("orderId",String.valueOf(orderId));
            pro.store(new FileWriter(order_log,false),timeFormat.format(new Date()));
            if (orders.size()>0){
   
                ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(order_list,false));
                oos.writeObject(orders);
                oos.close();
            }
        } catch (IOException e) {
   
            e.printStackTrace();
        }
    }

    static double randMoney(){
   
        return new BigDecimal(rand.nextInt(500000)/100+10)
                .setScale(2, RoundingMode.HALF_UP)
                .doubleValue();
    }

    static boolean percentage(){
   
        return rand.nextInt(100)<65;
    }

    public static void main(String[] args) throws IOException, ParseException {
   
        /**
         * 订单编号:
         * 用户编号:
         * 操作日期:
         * 订单金额:
         * 当前状态:0(suspending),1(paid),2(signed),3(completed),4(return)
         */

        final long TWO_WEEKS = 14*24*60*60*1000;
        String format = dateFormat.format(now());
        Order order;
        Date now;
        Date dateEnd = dateFormat.parse("2021-02-06");
        while ((now = now()).before(dateEnd)){
   
            timestamp = now.getTime();
            String nowFormat = dateFormat.format(now);
            if (!format.equals(nowFormat)){
   
                format = nowFormat;
                if (orders.size()>0) {
   
                    Date finalNow = now;
                    orders.removeIf(o2-> finalNow.getTime()-o2.getModifyDate().getTime()>TWO_WEEKS);
                    orders.remove(rand.nextInt(orders.size()));
                }
            }
            if (percentage() && orders.size()>=20+rand.nextInt(21)) {
   
                for (int j = 0; j < rand.nextInt(orders.size()/15) ; j++){
   
                    int index = rand.nextInt(orders.size());
                    order = orders.get(index);
                    if (order.getStatus()<2){
   
                        order.setStatus(order.getStatus()+1);
                    }else{
   
                        order.setStatus(percentage()?3:4);
                        orders.remove(index);
                    }
                    order.setModifyDate(now);
                    write(order.toString());
                }
            }else{
   
                orderId = orderId();
                int userId = userId();
                double money = randMoney();
                order = new Order(orderId, userId, now, money, 0);
                orders.add(order);
                write(order.toString());
            }
        }
        close();
    }
}
  • Order类
package cn.ybg;

import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * 订单编号:
 * 用户编号:
 * 操作日期:
 * 订单金额:
 * 当前状态:0(suspending),1(paid),2(signed),3(completed),4(return)
 */
public class Order implements Serializable {
   
    static SimpleDateFormat timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:dd.SSS");
    private long orderId;
    private int userId;
    private Date modifyDate;
    private double money;
    private int status;

    public Order(long orderId, int userId, Date modifyDate, double money, int status) {
   
        this.orderId = orderId;
        this.userId = userId;
        this.modifyDate = modifyDate;
        this.money = money;
        this.status = status;
    }

    public long getOrderId() {
   
        return orderId;
    }

    public void setOrderId(long orderId) {
   
        this.orderId = orderId;
    }

    public int getUserId() {
   
        return userId;
    }

    public void setUserId(int userId) {
   
        this.userId = userId;
    }

    public Date getModifyDate() {
   
        return modifyDate;
    }

    public void setModifyDate(Date modifyDate) {
   
        this.modifyDate = modifyDate;
    }

    public double getMoney() {
   
        return money;
    }

    public void setMoney(double money) {
   
        this.money = money;
    }

    public int getStatus() {
   
        return status;
    }

    public void setStatus(int status) {
   
        this.status = status;
    }

    @Override
    public String toString() {
   
        return String.format("%d,%d,%s,%.2f,%d",orderId,userId,timeFormat.format(modifyDate),money,status);
    }
}
目录
相关文章
|
6月前
|
SQL 监控 HIVE
Hive 全量表、增量表、拉链表 解析
Hive 全量表、增量表、拉链表 解析
780 0
|
SQL 存储 关系型数据库
大数据Hive拉链表的设计与实现
大数据Hive拉链表的设计与实现
221 2
|
SQL 分布式计算 Java
hive java 实例
      下载  jdo2-api-2.3-ec hive hdfs 所需jar  http://download.csdn.net/download/knight_black_bob/9725194   常见命令 hive 常见命令 create table tes...
890 0
|
SQL 分布式计算 Java
hive 增删查改 实例
  pom org.apache.hive hive-jdbc 0.11.0 org.apache.
813 0
|
SQL 数据库 HIVE
bboss持久层操作hive实例
先在应用中导入bboss 持久层和hive驱动(bboss persistent版本号5.0.1,以实际为准:查看最新版本号): maven坐标     com.bbossgroups     bboss-persistent     5.
688 0
|
SQL 存储 Java
Hive的数据类型解析和表的操作实例
另外一篇关于Hive的表,外部表,分区,桶的理解的博客:点击阅读 本文所需要的HIveQL源码和所需的测试用例github地址为:点击查看 一:Hive中的数据类型 Hive支持两种数据类型,一类叫原子数据类型,一类叫复杂数据类型。
2812 0
|
SQL 存储 Shell
Hive任意命令/代码执行漏洞+渗透实例
Author: kindle Date: 2013-02-9 Hive是建立在 Hadoop 上的数据仓库基础构架。
2236 0
|
6月前
|
SQL 数据采集 数据挖掘
大数据行业应用之Hive数据分析航班线路相关的各项指标
大数据行业应用之Hive数据分析航班线路相关的各项指标
191 1
|
1月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
34 0