如何提升AnalyticDB实时写入性能

简介: 从AnalyticDB写入原理分析,可以从三个方面提升AnalyticDB的写入能力:降低网络传输开销、减少与硬件设备io操作和尽量少消耗cpu资源。针对这三个特性本文将介绍如何对写入sql进行改造以达到最优性能。

从AnalyticDB写入原理分析,可以从三个方面提升AnalyticDB的写入能力:降低网络传输开销、减少与硬件设备io操作和尽量少消耗cpu资源。针对这三个特性本文将介绍如何对写入sql进行改造以达到最优性能。

  • 采用批量写入(batch insert)模式,即每次在VALUES部分添加多行数据,一般建议每次批量写入数据量大约为16KB,以提高网络和磁盘吞吐。如下
INSERT INTO db_name.table_name (col1, col2, col3) VALUES ('xxx', 111, 'xxx'), ('xxx', 222, 'xxx'), ('xxx', 333, 'xxx');
  • 如果对一行的所有列都进行插入,则去除col_name并保证values顺序与表结构中的col_name顺序一致,以降低网络带宽耗用。如下
INSERT INTO db_name.table_name VALUES ('xxx', 111, 'xxx'), ('xxx', 222, 'xxx'), ('xxx', 333, 'xxx');
  • 保持主键相对有序。AnalyticDB的insert语句要求必须提供主键,且主键可以为复合主键。当确定复合主键时,根据业务含义调整复合主键中各个列的次序,从业务层面保证插入时主键是严格递增或近似递增的,也可以提升实时写入速度。
  • 增加ignore关键字。执行不带ignore关键字的insert sql,当主键冲突时,后续数据会覆盖之前插入的数据;带上ignore关键字,则主键冲突时,会保留之前插入的数据而自动忽略新数据。如果业务层没有数据覆盖的语义要求,则建议所有insert sql都加上ignore关键字,以减小覆盖数据带来的性能开销。
  • AnalyticDB需要对数据进行分区存储,当一次Batch insert中含有属于不同分区的多行数据时,将会耗费大量CPU资源进行分区号计算。因此建议在写入程序中提前计算好每行数据的分区号,并且将属于同一分区的多行数据组成一个批次,一次性插入。

实现聚合写入目前主要有两种途径:

  • 用户自行实现该聚合方法,对分区号的计算规则为:partition_num = CRC32(hash_partition_column_value) mod m,其中hash_partition_column_value是分区列的值,m是分区总数。如下代码
public class HashInsert extends AbstractJavaSamplerClient{
    private static Logger log = Logger.getLogger(HashInsert1M.class.getName());
    private static AtomicLong idGen = new AtomicLong();
    private int bufferSize =2000 
    private int batchSize = 20;
    private int partitionCnt = 100;

    public SampleResult runTest(JavaSamplerContext arg0) {
        ..........
        ..........
        String sqls[] = new String[bufferSize];
        int partNo[] = new int [bufferSize];
        int sortedSqlIndex[] = new int [bufferSize];
        int end = 100;
        for(int i = 0; i < bufferSize; i++) {
            long id = idGen.getAndIncrement();
            boolean boolean_id = DataUtil.getBoolean_test(id);
            int byte_id = DataUtil.getByte_test(id);
            int short_id = DataUtil.getShort_test(id);
            long user_id = DataUtil.getInt_test(id);
            long seller_id = id;
            float float_id = DataUtil.getFloat_test(id);
            double double_id = DataUtil.getDouble_test(id);
            String follow_id = DataUtil.getString_test(id);
            String time_id = DataUtil.getTime_test(id);
            String date_id = DataUtil.getDate_test(id);
            String timestamp_id = DataUtil.getTimestamp_test(id);
            String interest_flag = DataUtil.getMutilValue(id);
               StringBuffer sb = new StringBuffer();
            sb.append("(").append(boolean_id).append(",").append(byte_id).append(",").append(short_id).append(",").append(user_id)
            .append(",").append(seller_id).append(",").append(float_id).append(",").append(double_id).append(",'").append(follow_id)
            .append("','").append(time_id).append("','").append(date_id).append("','").append(timestamp_id).append("','").append(interest_flag)
            .append("',");
            for(int j=0;j<end-1;j++){
                sb.append("'").append(follow_id).append("',");
            }
            sb.append("'").append(follow_id).append("')");
            sqls[i]  = sb.toString();

            partNo[i] = getHashPartition("" + user_id, partitionCnt);
            sortedSqlIndex[i] = i;
        }

        for(int i = 0; i < bufferSize - 1; i++) {
            for(int j = i + 1; j < bufferSize; j++) {
                if (partNo[sortedSqlIndex[i]] > partNo[sortedSqlIndex[j]]) {
                    int tmp = sortedSqlIndex[i];
                    sortedSqlIndex[i] = sortedSqlIndex[j];
                    sortedSqlIndex[j] = tmp;
                }
            }
        }

        batchSize =  Integer.valueOf(AdsUtil.getBatchNum());
        try {
            .........
            .........
            String dbName = AdsUtil.getDBName();
            String tableName = AdsUtil.getTableName();
            String sql = "insert into " + dbName + "." + tableName  + " values ";
            for(int i = 0; i < bufferSize  - batchSize; i+= batchSize) {
                StringBuffer sb = new StringBuffer(sql);
                for(int j = 0 ; j < batchSize; j++) {
                    if (j != 0)
                        sb.append(",");
                    sb.append(sqls[sortedSqlIndex[i + j]]);                
                }
                ..............
                ..............
            }
            res = true;
        } catch (Exception e) {
            ...........
            ...........
        } finally {
            ...........
            ...........
        }

        return ...;
    }

    public static int getHashPartition(String value, int totalHashPartitionNum) {
        long crc32 = (value == null ? getCRC32("-1") : getCRC32(value));
        return (int) (crc32 % totalHashPartitionNum);
    }

    private static long getCRC32(String value) {
        Checksum checksum = new CRC32();
        byte[] bytes = value.getBytes();
        checksum.update(bytes, 0, bytes.length);
        return checksum.getValue();
    }
}
  • 采用AnalyticDB搭配的同步工具”数据集成”进行实时数据同步。一般建议采用第二种方法。
相关实践学习
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
目录
相关文章
|
SQL 存储 数据挖掘
快速入门:利用AnalyticDB构建实时数据分析平台
【10月更文挑战第22天】在大数据时代,实时数据分析成为了企业和开发者们关注的焦点。传统的数据仓库和分析工具往往无法满足实时性要求,而AnalyticDB(ADB)作为阿里巴巴推出的一款实时数据仓库服务,凭借其强大的实时处理能力和易用性,成为了众多企业的首选。作为一名数据分析师,我将在本文中分享如何快速入门AnalyticDB,帮助初学者在短时间内掌握使用AnalyticDB进行简单数据分析的能力。
949 2
|
存储 缓存 监控
ClickHouse 架构原理及核心特性详解
ClickHouse 是由 Yandex 开发的开源列式数据库,专为 OLAP 场景设计,支持高效的大数据分析。其核心特性包括列式存储、字段压缩、丰富的数据类型、向量化执行和分布式查询。ClickHouse 通过多种表引擎(如 MergeTree、ReplacingMergeTree、SummingMergeTree)优化了数据写入和查询性能,适用于电商数据分析、日志分析等场景。然而,它在事务处理、单条数据更新删除及内存占用方面存在不足。
4727 21
|
关系型数据库 MySQL 大数据
AnalyticDB for MySQL 3.0基础版重磅发布
AnalyticDB for MySQL3.0基础版(以下简称ADB for MySQL3.0)是在总结ADB for MySQL2.0产品研发与应用经验的基础上,匠心打磨推出的新一代分析型数据库。
8054 1
|
4月前
|
存储 人工智能 缓存
AI问诊系统开发架构解析:大模型 + 医疗知识库如何落地
本文详解可商用AI问诊系统落地实践:摒弃纯对话模式,采用“大模型+医疗知识库(RAG)+分诊规则引擎+业务系统”四层架构,解决幻觉、不可控、非结构化、合规风险等核心痛点,涵盖架构设计、知识检索、症状抽取、智能分诊与生产级部署关键代码与经验。(239字)
|
SQL 存储 关系型数据库
深入分析 Flink SQL 工作机制
本文首先会介绍推动这些优化背后的思考,展示统一的架构如何更好地处理流式和批式查询,其次将深入剖析 Flink SQL 的编译及优化过程。
深入分析 Flink SQL 工作机制
|
JSON 关系型数据库 MySQL
MySQL:json字段查询:数组、对象、成员检查
MySQL:json字段查询:数组、对象、成员检查
1676 0
|
人工智能 监控 数据可视化
Agent TARS:一键让AI托管电脑!字节开源PC端多模态AI助手,无缝集成浏览器与系统操作
Agent TARS 是一款开源的多模态AI助手,能够通过视觉解析网页并无缝集成命令行和文件系统,帮助用户高效完成复杂任务。
5350 13
Agent TARS:一键让AI托管电脑!字节开源PC端多模态AI助手,无缝集成浏览器与系统操作
|
存储 缓存 NoSQL
redis4.0之MEMORY命令详解
在过去,查看redis的内存使用状态只有info memory命令,而且也只有一些基础信息,想要获取全局信息就有些困难。4.0开始redis提供了`MEMORY`命令,一切都变得简单起来。
10487 153
|
SQL 分布式数据库 Apache
网易游戏 x Apache Doris:湖仓一体架构演进之路
网易游戏 Apache Doris 集群超 20 个 ,总节点数百个,已对接内部 200+ 项目,日均查询量超过 1500 万,总存储数据量 PB 级别。
1212 3
网易游戏 x Apache Doris:湖仓一体架构演进之路
|
机器学习/深度学习 人工智能 自然语言处理
16CODEIPPROMPT:顶会ICML’23 从GitHub到AI,探索代码生成的侵权风险与缓解策略的最新进展:训练数据`有限制性许可;模型微调+动态Token过滤【网安AIGC专题11.8】
16CODEIPPROMPT:顶会ICML’23 从GitHub到AI,探索代码生成的侵权风险与缓解策略的最新进展:训练数据`有限制性许可;模型微调+动态Token过滤【网安AIGC专题11.8】
552 2

热门文章

最新文章

相关产品

  • 云原生数据仓库AnalyticDB MySQL版