从AnalyticDB写入原理分析,可以从三个方面提升AnalyticDB的写入能力:降低网络传输开销、减少与硬件设备io操作和尽量少消耗cpu资源。针对这三个特性本文将介绍如何对写入sql进行改造以达到最优性能。
- 采用批量写入(batch insert)模式,即每次在VALUES部分添加多行数据,一般建议每次批量写入数据量大约为16KB,以提高网络和磁盘吞吐。如下
INSERT INTO db_name.table_name (col1, col2, col3) VALUES ('xxx', 111, 'xxx'), ('xxx', 222, 'xxx'), ('xxx', 333, 'xxx');
- 如果对一行的所有列都进行插入,则去除col_name并保证values顺序与表结构中的col_name顺序一致,以降低网络带宽耗用。如下
INSERT INTO db_name.table_name VALUES ('xxx', 111, 'xxx'), ('xxx', 222, 'xxx'), ('xxx', 333, 'xxx');
- 保持主键相对有序。AnalyticDB的insert语句要求必须提供主键,且主键可以为复合主键。当确定复合主键时,根据业务含义调整复合主键中各个列的次序,从业务层面保证插入时主键是严格递增或近似递增的,也可以提升实时写入速度。
- 增加ignore关键字。执行不带ignore关键字的insert sql,当主键冲突时,后续数据会覆盖之前插入的数据;带上ignore关键字,则主键冲突时,会保留之前插入的数据而自动忽略新数据。如果业务层没有数据覆盖的语义要求,则建议所有insert sql都加上ignore关键字,以减小覆盖数据带来的性能开销。
- AnalyticDB需要对数据进行分区存储,当一次Batch insert中含有属于不同分区的多行数据时,将会耗费大量CPU资源进行分区号计算。因此建议在写入程序中提前计算好每行数据的分区号,并且将属于同一分区的多行数据组成一个批次,一次性插入。
实现聚合写入目前主要有两种途径:
- 用户自行实现该聚合方法,对分区号的计算规则为:partition_num = CRC32(hash_partition_column_value) mod m,其中hash_partition_column_value是分区列的值,m是分区总数。如下代码
public class HashInsert extends AbstractJavaSamplerClient{
private static Logger log = Logger.getLogger(HashInsert1M.class.getName());
private static AtomicLong idGen = new AtomicLong();
private int bufferSize =2000
private int batchSize = 20;
private int partitionCnt = 100;
public SampleResult runTest(JavaSamplerContext arg0) {
..........
..........
String sqls[] = new String[bufferSize];
int partNo[] = new int [bufferSize];
int sortedSqlIndex[] = new int [bufferSize];
int end = 100;
for(int i = 0; i < bufferSize; i++) {
long id = idGen.getAndIncrement();
boolean boolean_id = DataUtil.getBoolean_test(id);
int byte_id = DataUtil.getByte_test(id);
int short_id = DataUtil.getShort_test(id);
long user_id = DataUtil.getInt_test(id);
long seller_id = id;
float float_id = DataUtil.getFloat_test(id);
double double_id = DataUtil.getDouble_test(id);
String follow_id = DataUtil.getString_test(id);
String time_id = DataUtil.getTime_test(id);
String date_id = DataUtil.getDate_test(id);
String timestamp_id = DataUtil.getTimestamp_test(id);
String interest_flag = DataUtil.getMutilValue(id);
StringBuffer sb = new StringBuffer();
sb.append("(").append(boolean_id).append(",").append(byte_id).append(",").append(short_id).append(",").append(user_id)
.append(",").append(seller_id).append(",").append(float_id).append(",").append(double_id).append(",'").append(follow_id)
.append("','").append(time_id).append("','").append(date_id).append("','").append(timestamp_id).append("','").append(interest_flag)
.append("',");
for(int j=0;j<end-1;j++){
sb.append("'").append(follow_id).append("',");
}
sb.append("'").append(follow_id).append("')");
sqls[i] = sb.toString();
partNo[i] = getHashPartition("" + user_id, partitionCnt);
sortedSqlIndex[i] = i;
}
for(int i = 0; i < bufferSize - 1; i++) {
for(int j = i + 1; j < bufferSize; j++) {
if (partNo[sortedSqlIndex[i]] > partNo[sortedSqlIndex[j]]) {
int tmp = sortedSqlIndex[i];
sortedSqlIndex[i] = sortedSqlIndex[j];
sortedSqlIndex[j] = tmp;
}
}
}
batchSize = Integer.valueOf(AdsUtil.getBatchNum());
try {
.........
.........
String dbName = AdsUtil.getDBName();
String tableName = AdsUtil.getTableName();
String sql = "insert into " + dbName + "." + tableName + " values ";
for(int i = 0; i < bufferSize - batchSize; i+= batchSize) {
StringBuffer sb = new StringBuffer(sql);
for(int j = 0 ; j < batchSize; j++) {
if (j != 0)
sb.append(",");
sb.append(sqls[sortedSqlIndex[i + j]]);
}
..............
..............
}
res = true;
} catch (Exception e) {
...........
...........
} finally {
...........
...........
}
return ...;
}
public static int getHashPartition(String value, int totalHashPartitionNum) {
long crc32 = (value == null ? getCRC32("-1") : getCRC32(value));
return (int) (crc32 % totalHashPartitionNum);
}
private static long getCRC32(String value) {
Checksum checksum = new CRC32();
byte[] bytes = value.getBytes();
checksum.update(bytes, 0, bytes.length);
return checksum.getValue();
}
}
- 采用AnalyticDB搭配的同步工具”数据集成”进行实时数据同步。一般建议采用第二种方法。