Clickhouse-Java使用JDBC连接大批量导入(本地文件2表)

简介: Clickhouse-Java使用JDBC连接大批量导入(本地文件2表); 导入:4856w耗时294秒约5分钟;导入:212w耗时12秒。

依赖配置pom.xml

<dependency>
    <groupId>cc.blynk.clickhouse</groupId>
    <artifactId>clickhouse4j</artifactId>
    <version>1.4.4</version>
</dependency>

CK基本信息

String driver = "cc.blynk.clickhouse.ClickHouseDriver";
String ip = "xxx.xxx.xxx.xxx";
String port = "8123";
String db = "db";
String user = "user";
String pwd = "pwd";
// 数据输出文件
String fileName = "/data/table1_cols123_20211125.txt.gz";

创建ClickHouse连接

Class.forName(driver);
StringBuffer urlSb = new StringBuffer()
        .append("jdbc:clickhouse://")
        .append(ip).append(":").append(port).append("/").append(db)
        .append("?characterEncoding=utf8&useSSL=false");
BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource(urlSb.toString());
ClickHouseConnection connection = dataSource.getConnection(user, pwd);
ClickHouseStatement statement = connection.createStatement();

数据推送模式

String query = new StringBuilder()
        .append("insert into cq_report_db.xxx (x1, x2, x3)")
        .append(" FORMAT TabSeparated")
        .toString();

(1)可以指定导入数据到表的指定列
(2)数据的列数必须等于指定的表的列,否则会导入异常
(3)支持多线性并发导入

批量推送数据

InputStream inputStream = new GZIPInputStream(new FileInputStream(fileName));
Scanner scanner = new Scanner(inputStream);
StringBuffer data = new StringBuffer();
int size = 0;
int maxSize = 20000;
while (scanner.hasNextLine()) {
    if(data.length() > 1) data.append("\n");
    data.append(scanner.nextLine());
    size++; 
    
    // 打包批量推送条件。
    if(size >= maxSize) {
        InputStream dataStream = new ByteArrayInputStream(data.toString().getBytes());
        statement.sendStreamSQL(dataStream, query);
        dataStream.close();
        // 重置参数
        size = 0;
        data = new StringBuffer();
    }
}

// 剩余部分数据
if(data.length() > 1) {
    InputStream dataStream = new ByteArrayInputStream(data.toString().getBytes());
    statement.sendStreamSQL(dataStream, query);
    dataStream.close();
    // 重置参数
    size = 0;
    data = new StringBuffer();
}

关闭相关连接

if(scanner != null) scanner.close(); 
if(inputStream != null) inputStream.close();
if(connection != null && !connection.isClosed()) connection.close();
if(statement != null && !statement.isClosed()) statement.close();
相关文章
|
1月前
|
NoSQL Java MongoDB
java连接MongoDB
java连接MongoDB
|
2月前
|
数据采集 Java 关系型数据库
Java代码高效连接数据库
Java代码高效连接数据库
23 2
|
1月前
|
NoSQL Java API
Redis官方推荐的Java连接开发工具Jedis
Redis官方推荐的Java连接开发工具Jedis
|
5天前
|
监控 安全 Java
Java与物联网:连接与控制设备
Java与物联网:连接与控制设备
19 0
|
2天前
|
前端开发 Java 应用服务中间件
【异常解决】java程序连接MinIO报错The request signature we calculated does not match the signature you provided.
【异常解决】java程序连接MinIO报错The request signature we calculated does not match the signature you provided.
10 0
|
3天前
|
Oracle Java 关系型数据库
【服务器】python通过JDBC连接到位于Linux远程服务器上的Oracle数据库
【服务器】python通过JDBC连接到位于Linux远程服务器上的Oracle数据库
14 6
|
3天前
|
SQL Java 关系型数据库
【JAVA基础篇教学】第十六篇:Java连接和操作MySQL数据库
【JAVA基础篇教学】第十六篇:Java连接和操作MySQL数据库
|
5天前
|
SQL Java 数据库连接
Java数据库编程实践:连接与操作数据库
Java数据库编程实践:连接与操作数据库
9 0
|
13天前
|
NoSQL Java 关系型数据库
Java基础教程(21)-Java连接MongoDB
【4月更文挑战第21天】MongoDB是开源的NoSQL数据库,强调高性能和灵活性。Java应用通过MongoDB Java驱动与之交互,涉及MongoClient、MongoDatabase、MongoCollection和Document等组件。连接MongoDB的步骤包括:配置连接字符串、创建MongoClient、选择数据库和集合。伪代码示例展示了如何建立连接、插入和查询数据。
|
14天前
|
Java 关系型数据库 MySQL
Java基础教程(20)-Java连接mysql数据库CURD
【4月更文挑战第19天】MySQL是流行的关系型数据库管理系统,支持SQL语法。在IDEA中加载jar包到项目类路径:右击项目,选择“Open Module Settings”,添加库文件。使用JDBC连接MySQL,首先下载JDBC驱动,然后通过`Class.forName()`加载驱动,`DriverManager.getConnection()`建立连接。执行CRUD操作,例如创建表、插入数据和查询,使用`Statement`或`PreparedStatement`,并确保正确关闭数据库资源。