如何用 sysbench 并行装载 PostgreSQL 测试数据

本文涉及的产品
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS AI 助手,专业版
简介: 本文参考老唐的使用sysbench和sqlldr并行装载Oracle测试数据而成。http://blog.osdba.net/538.html sysbench原来自带的lua数据装载脚本是使用以下方式串行装载的,速度比较慢(比单条insert快,但是比COPY慢)。 insert int

本文参考老唐的使用sysbench和sqlldr并行装载Oracle测试数据而成。
http://blog.osdba.net/538.html

sysbench原来自带的lua数据装载脚本是使用以下方式串行装载的,速度比较慢(比单条insert快,但是比COPY慢)。

insert into table1 values (),(),()....    
insert into table2 values (),(),()....    
...
insert into tablen values (),(),()....    

使用prepare导入数据的用法举例

./sysbench_pg --test=lua/oltp.lua --db-driver=pgsql --pgsql-host=127.0.0.1 --pgsql-port=1921 --pgsql-user=postgres --pgsql-password=postgres --pgsql-db=postgres --oltp-tables-count=64 --oltp-table-size=1000000 --num-threads=64 prepare    

prepare 表示装载数据,但是它串行的。
sysbench0.5中可以在命令行中指定测试时启动的并行线程数,这个测试过程是使用run命令,而且是多线程并发的,所以我们可以使用sysbench的run命令来造数据,而不再使用其提供的prepare命令的方法来造数据。run命令会根据命令行参数--num-threads来指定并发线程数的多少。
在sysbench中自定义的lua脚本中要求实现以下几个函数:

function thread_init(thread_id): 此函数在线程创建后只被执行一次  
function event(thread_id): 每执行一次就会被调用一次。  

由上可以知道,本次造数据的脚本我们只需要实现thread_init()函数就可以了。

生成测试数据的脚本沿用老唐提供的代码:

#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <stdint.h>
#include <sys/time.h>
uint64_t my_rand(struct random_data * r1, struct random_data * r2)
{
    uint64_t rand_max = 100000000000LL;
    uint64_t result;
    uint32_t u1, u2;
    random_r(r1, &u1);
    random_r(r2, &u2);
    result = (int64_t)u1 * (int64_t)u2;
    result = result % rand_max;
    return result;
}
int main(int argc, char *argv[])
{
    struct timeval tpstart;
    struct random_data r1, r2;
    int i;
    int r;
    int max_value;
    char rand_state1[128];
    char rand_state2[128];
    if (argc !=2)
    {
        printf("Usage: %s <rownums>\n", argv[0]);
        return 1;
    }
    max_value = atoi(argv[1]);
    gettimeofday(&tpstart,NULL);
    initstate_r(tpstart.tv_usec,rand_state1,sizeof(rand_state1),&r1);
    srandom_r(tpstart.tv_usec, &r1);
    gettimeofday(&tpstart,NULL);
    initstate_r(tpstart.tv_usec,rand_state2,sizeof(rand_state1),&r2);
    srandom_r(tpstart.tv_usec, &r2);
    for (i=1; i<max_value+1; i++)
    {
        r = my_rand(&r1, &r2) % max_value; 
        printf("%d,%d,%011llu-%011llu-%011llu-%011llu-%011llu-%011llu-%011llu-%011llu-%011llu-%011llu,%011llu-%011llu-%011llu-%011llu-%011llu\n",
                i,
                r,
                 my_rand(&r1, &r2),
                 my_rand(&r1, &r2),
                 my_rand(&r1, &r2),
                 my_rand(&r1, &r2),
                 my_rand(&r1, &r2),
                 my_rand(&r1, &r2),
                 my_rand(&r1, &r2),
                 my_rand(&r1, &r2),
                 my_rand(&r1, &r2),
                 my_rand(&r1, &r2),
                 my_rand(&r1, &r2),
                 my_rand(&r1, &r2),
                 my_rand(&r1, &r2),
                 my_rand(&r1, &r2),
                 my_rand(&r1, &r2)
              );
    }
    return 0;
}

编译此C语言程序的方法如下:

gcc gendata.c -o gendata  

新建一个copy.lua的脚本,内容如下
调用 common.lua中的 set_vars() 继承来自 common.lua 的全局变量。
函数 copydata(table_id) : 创建表,创建管道,将管道数据传输到psql -c "copy ..."客户端的方式导入数据。
函数 create_index(table_id) : 创建索引,调整SEQUENCE next val。
注意咯, oltp_tables_count 必须是 num_threads 的倍数,在 thread_init 中, 以num_threads 为步调,以thread_id+1为起始值,设置i的值,并调用copydata(table_id)和create_index(table_id)。

$ vi lua/copy.lua
pathtest = string.match(test, "(.*/)") or ""

dofile(pathtest .. "common.lua")

function copydata(table_id)
  local query

  query = [[
CREATE UNLOGGED TABLE sbtest]] .. table_id .. [[ (
id SERIAL NOT NULL,
k INTEGER,
c CHAR(120) DEFAULT '' NOT NULL,
pad CHAR(60) DEFAULT '' NOT NULL,
PRIMARY KEY (id)
) ]]

  db_query(query)

  os.execute ('export PGPASSWORD=' .. pgsql_password)
  os.execute ('rm -f sbtest' .. table_id .. '.dat')
  os.execute ('mknod sbtest' .. table_id .. '.dat p')
  os.execute ('./gendata ' .. oltp_table_size .. ' >> sbtest'..table_id ..'.dat &')
  os.execute ('cat sbtest' .. table_id .. '.dat | psql -h ' .. pgsql_host .. ' -p ' .. pgsql_port .. ' -U ' .. pgsql_user .. ' -d ' .. pgsql_db .. ' -c "copy sbtest' .. table_id .. ' from stdin with csv"')
  os.execute ('rm -f sbtest' .. table_id .. '.dat')
end

function create_index(table_id)
  db_query("select setval('sbtest" .. table_id .. "_id_seq', " .. (oltp_table_size+1) .. ")" )
  db_query("CREATE INDEX k_" .. table_id .. " on sbtest" .. table_id .. "(k)")
end

function thread_init(thread_id)
   set_vars()

   print("thread prepare"..thread_id)

   for i=thread_id+1, oltp_tables_count, num_threads  do
     copydata(i)
     create_index(i)
   end
end

function event(thread_id)
   os.exit()
end

用法,必须把psql放到路径中,因为lua中需要用到psql命令

export PATH=/home/digoal/pgsql9.5/bin:$PATH

生成数据,速度比以前快多了

./sysbench_pg --test=lua/copy.lua \
  --db-driver=pgsql \
  --pgsql-host=127.0.0.1 \
  --pgsql-port=1921 \
  --pgsql-user=postgres \
  --pgsql-password=postgres \
  --pgsql-db=postgres \
  --oltp-tables-count=64 \
  --oltp-table-size=1000000 \
  --num-threads=64 \
  run

清除数据, drop table

./sysbench_pg --test=lua/copy.lua \
  --db-driver=pgsql \
  --pgsql-host=127.0.0.1 \
  --pgsql-port=1921 \
  --pgsql-user=postgres \
  --pgsql-password=postgres \
  --pgsql-db=postgres \
  --oltp-tables-count=64 \
  --oltp-table-size=1000000 \
  --num-threads=64 \
  cleanup

lua全局变量代码:

sysbench/scripting/lua/src/lua.h:#define lua_register(L,n,f) (lua_pushcfunction(L, (f)), lua_setglobal(L, (n)))
sysbench/scripting/lua/src/lua.h:#define lua_setglobal(L,s)     lua_setfield(L, LUA_GLOBALSINDEX, (s))
sysbench/scripting/lua/src/lbaselib.c:  lua_setglobal(L, "_G");
sysbench/scripting/lua/src/lbaselib.c:  lua_setglobal(L, "_VERSION");  /* set global _VERSION */
sysbench/scripting/lua/src/lbaselib.c:  lua_setglobal(L, "newproxy");  /* set global `newproxy' */
sysbench/scripting/script_lua.c:    lua_setglobal(state, opt->name);
sysbench/scripting/script_lua.c:  lua_setglobal(state, "sb_rand");
sysbench/scripting/script_lua.c:  lua_setglobal(state, "sb_rand_uniq");
sysbench/scripting/script_lua.c:  lua_setglobal(state, "sb_rnd");
sysbench/scripting/script_lua.c:  lua_setglobal(state, "sb_rand_str");
sysbench/scripting/script_lua.c:  lua_setglobal(state, "sb_rand_uniform");
sysbench/scripting/script_lua.c:  lua_setglobal(state, "sb_rand_gaussian");
sysbench/scripting/script_lua.c:  lua_setglobal(state, "sb_rand_special");
sysbench/scripting/script_lua.c:  lua_setglobal(state, "db_connect");
sysbench/scripting/script_lua.c:  lua_setglobal(state, "db_disconnect");
sysbench/scripting/script_lua.c:  lua_setglobal(state, "db_query");
sysbench/scripting/script_lua.c:  lua_setglobal(state, "db_bulk_insert_init");
sysbench/scripting/script_lua.c:  lua_setglobal(state, "db_bulk_insert_next");
sysbench/scripting/script_lua.c:  lua_setglobal(state, "db_bulk_insert_done");
sysbench/scripting/script_lua.c:  lua_setglobal(state, "db_prepare");
sysbench/scripting/script_lua.c:  lua_setglobal(state, "db_bind_param");
sysbench/scripting/script_lua.c:  lua_setglobal(state, "db_bind_result");
sysbench/scripting/script_lua.c:  lua_setglobal(state, "db_execute");
sysbench/scripting/script_lua.c:  lua_setglobal(state, "db_close");
sysbench/scripting/script_lua.c:  lua_setglobal(state, "db_store_results");
sysbench/scripting/script_lua.c:  lua_setglobal(state, "db_free_results");
sysbench/scripting/script_lua.c:  lua_setglobal(state, "DB_ERROR_NONE");
sysbench/scripting/script_lua.c:  lua_setglobal(state, "DB_ERROR_DEADLOCK");
sysbench/scripting/script_lua.c:  lua_setglobal(state, "DB_ERROR_FAILED");
sysbench/scripting/script_lua.c:  lua_setglobal(L, "db_driver");

传入参数,可以把sysbench_pg的参数-替换成_在lua脚本中使用这些变量,例子

--pgsql-host=127.0.0.1  -> 对应lua中的变量名 pgsql_host
--pgsql-port=1921   -> 对应lua中的变量名 pgsql_port
--pgsql-user=postgres   -> 对应lua中的变量名 pgsql_user
--pgsql-password=postgres   -> 对应lua中的变量名 pgsql_password
--pgsql-db=postgres   -> 对应lua中的变量名 pgsql_db
--oltp-tables-count=64   -> 对应lua中的变量名 oltp_tables_count
--oltp-table-size=1000000   -> 对应lua中的变量名 oltp_table_size
--num-threads=64  -> 对应lua中的变量名 num_threads
相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍如何基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
1323 0
|
DataWorks 安全 关系型数据库
DataWorks产品使用合集之使用Flink CDC读取PostgreSQL数据时如何指定编码格式
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
299 0
|
12月前
|
SQL 关系型数据库 PostgreSQL
【YashanDB 知识库】从 PostgreSQL 迁移到 YashanDB 如何进行数据行数比对
【YashanDB 知识库】从 PostgreSQL 迁移到 YashanDB 如何进行数据行数比对
|
12月前
|
SQL Oracle 关系型数据库
【YashanDB知识库】从PostgreSQL迁移到YashanDB如何进行数据行数比对
本文介绍了通过Oracle视图`v$sql`和`v$sql_plan`分析SQL性能的方法。首先,可通过`plan_hash_value`从`v$sql_plan`获取SQL执行计划,结合示例展示了具体查询方式。文章还创建了一个UDF函数`REPEAT`用于格式化输出,便于阅读复杂执行计划。最后,通过实例展示了如何根据`plan_hash_value`获取SQL文本及其内存中的执行计划,帮助优化性能问题。
|
存储 关系型数据库 数据库
【赵渝强老师】PostgreSQL的数据文件
PostgreSQL的物理存储结构主要包括数据文件、日志文件等。数据文件按oid命名,超过1G时自动拆分。通过查询数据库和表的oid,可定位到具体的数据文件。例如,查询数据库oid后,再查询特定表的oid及relfilenode,即可找到该表对应的数据文件位置。
371 1
|
关系型数据库 MySQL 测试技术
【赵渝强老师】MySQL的基准测试与sysbench
本文介绍了MySQL数据库的基准测试及其重要性,并详细讲解了如何使用sysbench工具进行测试。内容涵盖sysbench的安装、基本使用方法,以及具体测试MySQL数据库的步骤,包括创建测试数据库、准备测试数据、执行测试和清理测试数据。通过这些步骤,可以帮助读者掌握如何有效地评估MySQL数据库的性能。
554 5
|
关系型数据库 MySQL OLTP
性能工具之 MySQL OLTP Sysbench BenchMark 测试示例
【8月更文挑战第6天】使用 pt-query-digest 工具分析 MySQL 慢日志性能工具之 MySQL OLTP Sysbench BenchMark 测试示例
1033 0
性能工具之 MySQL OLTP Sysbench BenchMark 测试示例
|
SQL 关系型数据库 MySQL
SQL Server、MySQL、PostgreSQL:主流数据库SQL语法异同比较——深入探讨数据类型、分页查询、表创建与数据插入、函数和索引等关键语法差异,为跨数据库开发提供实用指导
【8月更文挑战第31天】SQL Server、MySQL和PostgreSQL是当今最流行的关系型数据库管理系统,均使用SQL作为查询语言,但在语法和功能实现上存在差异。本文将比较它们在数据类型、分页查询、创建和插入数据以及函数和索引等方面的异同,帮助开发者更好地理解和使用这些数据库。尽管它们共用SQL语言,但每个系统都有独特的语法规则,了解这些差异有助于提升开发效率和项目成功率。
1979 0
|
SQL 关系型数据库 HIVE
实时计算 Flink版产品使用问题之如何将PostgreSQL数据实时入库Hive并实现断点续传
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
关系型数据库 5G PostgreSQL
postgreSQL 导出数据、导入
postgreSQL 导出数据、导入
210 1

相关产品

  • 云原生数据库 PolarDB
  • 云数据库 RDS PostgreSQL 版
  • 推荐镜像

    更多