Clickhouse-Java使用JDBC连接大批量导入(本地文件2表)

简介: Clickhouse-Java使用JDBC连接大批量导入(本地文件2表); 导入:4856w耗时294秒约5分钟;导入:212w耗时12秒。

依赖配置pom.xml

<dependency>
    <groupId>cc.blynk.clickhouse</groupId>
    <artifactId>clickhouse4j</artifactId>
    <version>1.4.4</version>
</dependency>

CK基本信息

String driver = "cc.blynk.clickhouse.ClickHouseDriver";
String ip = "xxx.xxx.xxx.xxx";
String port = "8123";
String db = "db";
String user = "user";
String pwd = "pwd";
// 数据输出文件
String fileName = "/data/table1_cols123_20211125.txt.gz";

创建ClickHouse连接

Class.forName(driver);
StringBuffer urlSb = new StringBuffer()
        .append("jdbc:clickhouse://")
        .append(ip).append(":").append(port).append("/").append(db)
        .append("?characterEncoding=utf8&useSSL=false");
BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource(urlSb.toString());
ClickHouseConnection connection = dataSource.getConnection(user, pwd);
ClickHouseStatement statement = connection.createStatement();

数据推送模式

String query = new StringBuilder()
        .append("insert into cq_report_db.xxx (x1, x2, x3)")
        .append(" FORMAT TabSeparated")
        .toString();

(1)可以指定导入数据到表的指定列
(2)数据的列数必须等于指定的表的列,否则会导入异常
(3)支持多线性并发导入

批量推送数据

InputStream inputStream = new GZIPInputStream(new FileInputStream(fileName));
Scanner scanner = new Scanner(inputStream);
StringBuffer data = new StringBuffer();
int size = 0;
int maxSize = 20000;
while (scanner.hasNextLine()) {
    if(data.length() > 1) data.append("\n");
    data.append(scanner.nextLine());
    size++; 
    
    // 打包批量推送条件。
    if(size >= maxSize) {
        InputStream dataStream = new ByteArrayInputStream(data.toString().getBytes());
        statement.sendStreamSQL(dataStream, query);
        dataStream.close();
        // 重置参数
        size = 0;
        data = new StringBuffer();
    }
}

// 剩余部分数据
if(data.length() > 1) {
    InputStream dataStream = new ByteArrayInputStream(data.toString().getBytes());
    statement.sendStreamSQL(dataStream, query);
    dataStream.close();
    // 重置参数
    size = 0;
    data = new StringBuffer();
}

关闭相关连接

if(scanner != null) scanner.close(); 
if(inputStream != null) inputStream.close();
if(connection != null && !connection.isClosed()) connection.close();
if(statement != null && !statement.isClosed()) statement.close();
相关文章
|
3月前
|
Java 关系型数据库 数据库连接
JDBC:Java与数据库的“黄金搭档”,为何它如此重要?
JDBC:Java与数据库的“黄金搭档”,为何它如此重要?
48 8
|
3月前
|
Java 数据库连接 API
JDBC:Java数据库连接的“黑科技”大揭秘
JDBC:Java数据库连接的“黑科技”大揭秘
38 7
|
3月前
|
SQL Java 数据库连接
为何JDBC是Java开发者的“心头好”?原因竟然这么简单!
为何JDBC是Java开发者的“心头好”?原因竟然这么简单!
40 3
|
3月前
|
Java 数据库连接
JDBC连接复习
JDBC连接复习
40 1
|
23天前
|
数据采集 存储 分布式计算
ClickHouse大规模数据导入优化:批处理与并行处理
【10月更文挑战第27天】在数据驱动的时代,高效的数据导入和处理能力是企业竞争力的重要组成部分。作为一位数据工程师,我在实际工作中经常遇到需要将大量数据导入ClickHouse的需求。ClickHouse是一款高性能的列式数据库系统,非常适合进行大规模数据的分析和查询。然而,如何优化ClickHouse的数据导入过程,提高导入的效率和速度,是我们面临的一个重要挑战。本文将从我个人的角度出发,详细介绍如何通过批处理、并行处理和数据预处理等技术优化ClickHouse的数据导入过程。
47 0
|
1月前
|
SQL Java 数据库连接
如何在 Java 脚本中有效地使用 JDBC
如何在 Java 脚本中有效地使用 JDBC
17 0
|
3月前
|
SQL Java 关系型数据库
【前端学java】JDBC快速入门
【8月更文挑战第12天】JDBC快速入门
34 2
【前端学java】JDBC快速入门
|
3月前
|
Java 数据库连接 网络安全
JDBC数据库编程(java实训报告)
这篇文章是关于JDBC数据库编程的实训报告,涵盖了实验要求、实验环境、实验内容和总结。文中详细介绍了如何使用Java JDBC技术连接数据库,并进行增删改查等基本操作。实验内容包括建立数据库连接、查询、添加、删除和修改数据,每个部分都提供了相应的Java代码示例和操作测试结果截图。作者在总结中分享了在实验过程中遇到的问题和解决方案,以及对Java与数据库连接操作的掌握情况。
JDBC数据库编程(java实训报告)
|
3月前
|
SQL Java 数据库连接
JDBC之旅:从陌生到熟悉的Java数据库连接之路
JDBC之旅:从陌生到熟悉的Java数据库连接之路
32 9
|
3月前
|
SQL Java 关系型数据库
探索Java数据库连接的奥秘:JDBC技术全攻略
探索Java数据库连接的奥秘:JDBC技术全攻略
58 8
下一篇
无影云桌面