Clickhouse-Java使用JDBC连接大批量导入(本地文件2表)

简介: Clickhouse-Java使用JDBC连接大批量导入(本地文件2表); 导入:4856w耗时294秒约5分钟;导入:212w耗时12秒。

依赖配置pom.xml

<dependency>
    <groupId>cc.blynk.clickhouse</groupId>
    <artifactId>clickhouse4j</artifactId>
    <version>1.4.4</version>
</dependency>

CK基本信息

String driver = "cc.blynk.clickhouse.ClickHouseDriver";
String ip = "xxx.xxx.xxx.xxx";
String port = "8123";
String db = "db";
String user = "user";
String pwd = "pwd";
// 数据输出文件
String fileName = "/data/table1_cols123_20211125.txt.gz";

创建ClickHouse连接

Class.forName(driver);
StringBuffer urlSb = new StringBuffer()
        .append("jdbc:clickhouse://")
        .append(ip).append(":").append(port).append("/").append(db)
        .append("?characterEncoding=utf8&useSSL=false");
BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource(urlSb.toString());
ClickHouseConnection connection = dataSource.getConnection(user, pwd);
ClickHouseStatement statement = connection.createStatement();

数据推送模式

String query = new StringBuilder()
        .append("insert into cq_report_db.xxx (x1, x2, x3)")
        .append(" FORMAT TabSeparated")
        .toString();

(1)可以指定导入数据到表的指定列
(2)数据的列数必须等于指定的表的列,否则会导入异常
(3)支持多线性并发导入

批量推送数据

InputStream inputStream = new GZIPInputStream(new FileInputStream(fileName));
Scanner scanner = new Scanner(inputStream);
StringBuffer data = new StringBuffer();
int size = 0;
int maxSize = 20000;
while (scanner.hasNextLine()) {
    if(data.length() > 1) data.append("\n");
    data.append(scanner.nextLine());
    size++; 
    
    // 打包批量推送条件。
    if(size >= maxSize) {
        InputStream dataStream = new ByteArrayInputStream(data.toString().getBytes());
        statement.sendStreamSQL(dataStream, query);
        dataStream.close();
        // 重置参数
        size = 0;
        data = new StringBuffer();
    }
}

// 剩余部分数据
if(data.length() > 1) {
    InputStream dataStream = new ByteArrayInputStream(data.toString().getBytes());
    statement.sendStreamSQL(dataStream, query);
    dataStream.close();
    // 重置参数
    size = 0;
    data = new StringBuffer();
}

关闭相关连接

if(scanner != null) scanner.close(); 
if(inputStream != null) inputStream.close();
if(connection != null && !connection.isClosed()) connection.close();
if(statement != null && !statement.isClosed()) statement.close();
相关文章
|
3月前
|
SQL Java 关系型数据库
Java连接MySQL数据库环境设置指南
请注意,在实际部署时应该避免将敏感信息(如用户名和密码)硬编码在源码文件里面;应该使用配置文件或者环境变量等更为安全可靠地方式管理这些信息。此外,在处理大量数据时考虑使用PreparedStatement而不是Statement可以提高性能并防止SQL注入攻击;同时也要注意正确处理异常情况,并且确保所有打开过得资源都被正确关闭释放掉以防止内存泄漏等问题发生。
124 13
|
10月前
|
消息中间件 存储 NoSQL
java连接redis和基础操作命令
通过以上内容,您可以掌握在Java中连接Redis以及进行基础操作的基本方法,进而在实际项目中灵活应用。
520 30
|
11月前
|
前端开发 Java 数据库连接
Java后端开发-使用springboot进行Mybatis连接数据库步骤
本文介绍了使用Java和IDEA进行数据库操作的详细步骤,涵盖从数据库准备到测试类编写及运行的全过程。主要内容包括: 1. **数据库准备**:创建数据库和表。 2. **查询数据库**:验证数据库是否可用。 3. **IDEA代码配置**:构建实体类并配置数据库连接。 4. **测试类编写**:编写并运行测试类以确保一切正常。
472 2
|
11月前
|
Java Linux 数据库
java连接kerberos用户认证
java连接kerberos用户认证
305 22
|
Java 数据库连接 数据库
深入探讨Java连接池技术如何通过复用数据库连接、减少连接建立和断开的开销,从而显著提升系统性能
在Java应用开发中,数据库操作常成为性能瓶颈。本文通过问题解答形式,深入探讨Java连接池技术如何通过复用数据库连接、减少连接建立和断开的开销,从而显著提升系统性能。文章介绍了连接池的优势、选择和使用方法,以及优化配置的技巧。
221 1
|
SQL Java 数据库连接
在Java应用中,数据库访问常成为性能瓶颈。连接池技术通过预建立并复用数据库连接,有效减少连接开销,提升访问效率
在Java应用中,数据库访问常成为性能瓶颈。连接池技术通过预建立并复用数据库连接,有效减少连接开销,提升访问效率。本文介绍了连接池的工作原理、优势及实现方法,并提供了HikariCP的示例代码。
243 3
|
Java 数据库连接 数据库
Java连接池在数据库性能优化中的重要作用。连接池通过预先创建和管理数据库连接,避免了频繁创建和关闭连接的开销
本文深入探讨了Java连接池在数据库性能优化中的重要作用。连接池通过预先创建和管理数据库连接,避免了频繁创建和关闭连接的开销,显著提升了系统的响应速度和吞吐量。文章介绍了连接池的工作原理,并以HikariCP为例,展示了如何在Java应用中使用连接池。通过合理配置和优化,连接池技术能够有效提升应用性能。
195 1
|
2月前
|
JSON 网络协议 安全
【Java】(10)进程与线程的关系、Tread类;讲解基本线程安全、网络编程内容;JSON序列化与反序列化
几乎所有的操作系统都支持进程的概念,进程是处于运行过程中的程序,并且具有一定的独立功能,进程是系统进行资源分配和调度的一个独立单位一般而言,进程包含如下三个特征。独立性动态性并发性。
158 1
|
2月前
|
JSON 网络协议 安全
【Java基础】(1)进程与线程的关系、Tread类;讲解基本线程安全、网络编程内容;JSON序列化与反序列化
几乎所有的操作系统都支持进程的概念,进程是处于运行过程中的程序,并且具有一定的独立功能,进程是系统进行资源分配和调度的一个独立单位一般而言,进程包含如下三个特征。独立性动态性并发性。
173 1
|
3月前
|
数据采集 存储 弹性计算
高并发Java爬虫的瓶颈分析与动态线程优化方案
高并发Java爬虫的瓶颈分析与动态线程优化方案