Flink导入mysql数据到doris

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
实时计算 Flink 版,5000CU*H 3个月
简介: 经过各种实践,发现比较适合中小公司的方式。分为全量和增量。

经过各种实践,发现比较适合中小公司的方式。分为全量和增量。

1、参考链接:

1、Flink Doris Connector:https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector

2、JSON格式数据导入:https://doris.apache.org/zh-CN/docs/dev/data-operate/import/import-way/load-json-format

2、全量导入

通过外部表同步数据

3、增量导入

1、dataocean监听kafka然后insert,这种方式,官方不建议,大量的insert会把doris-be打崩溃。(目前正在用的方式)。

2、binlog load的方式,以前旧版本尝试有问题,使用新版本尝试看看。经过新版本的尝试,发现这种方式不适合,详见:binlog方式调研。

3、Flink CDC,但是这种方式得学习和搭建Flink,使得成本变高了。准备尝试这种方式了,后期使用这种方式。

4、Flink的方式

1、订阅kafka的方式

canal发个kafka的消息,可以通过如下方式同步给doris。

但是问题是:无法控制同步哪些数据到哪个表

CREATE ROUTINE LOAD ocean.test1 ON cloud_uc_company
COLUMNS(id, name, delete_at)
PROPERTIES
(
    "desired_concurrent_number"="3",
    "max_batch_interval" = "20",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
    "strict_mode" = "false",
    "format" = "json",
    "jsonpaths" = "[\"$.id\",\"$.name\",\"$.delete_at\"]",
    "strip_outer_array" = "true",
    "json_root" = "$.data"
)
FROM KAFKA
(
    "kafka_broker_list" = "10.20.1.26:9092",
    "kafka_topic" = "topic-periodfour",
    'property.group.id' = 'doris_ods_log',
    'property.client.id' = 'doris_ods_log',
    'property.kafka_default_offsets' = 'OFFSET_BEGINNING'
);

2、Flink代码可以拿到kafka消息

2.1、原理:

这种方式是拿到canal发送给里的kafka消json消息,然后取出其中的data部分,然后再写到doris里去。这种方式其实是通过doris的json序列化器,将data部分序列化成json字符串,然后再写入doris。

2.2、碰到的问题:

1、问题描述:

DorisRuntimeException: stream load error: [ANALYSIS_ERROR]errCode = 2, detailMessage = current running txns on db 11008 is 100, larger than limit 100, see more in null

解决方式:

这是因为同一个库并发导入超过了100,可通过调整 fe.conf的参数 max_running_txn_num_per_db 来解决。
执行sql:admin set frontend config ('max_running_txn_num_per_db' = '1000');

2、问题描述:

DorisRuntimeException: stream load error: [INTERNAL_ERROR]too many filtered rows, see more in http://10.20.0.92:8040/api/_load_error_log?file=__shard_182/error_log_insert_stmt_8f4f21b2fa64f604-8fbb0ec0fba8129d_8f4f21b2fa64f604_8fbb0ec0fba8129d
打开链接,看到了错误信息:Reason: JSON data is array-object, strip_outer_array must be TRUE。

解决方式:

需要增加代码:设置pro.setProperty("strip_outer_array", "true");

2.3、代码:

package com.bm001.flinkdemo;

import com.google.gson.Gson;
import org.apache.commons.collections.CollectionUtils;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.DorisRecordSerializer;
import org.apache.doris.flink.sink.writer.JsonDebeziumSchemaSerializer;
import org.apache.doris.flink.sink.writer.RowDataSerializer;
import org.apache.doris.flink.sink.writer.SimpleStringSerializer;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.DataType;

import java.io.IOException;
import java.time.LocalDate;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;

/**
 * 这种方法是将kafka的json消息,获取其中的data部分,
 * 然后再通过doris的json序列化器,将data部分序列化成json字符串,然后再写入doris
 * <p>
 * 每个同步的表都需要增加一个任务
 */
public class Kafka2DorisDataStreamDemo {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "10.20.1.26:9092");
        props.put("group.id", "group-flinkdemo");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest");

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1000);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //source config
        FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>("topic-periodfour", new SimpleStringSchema(), props);

        //sink config
        DorisSink.Builder<String> builder = DorisSink.builder();
        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
        DorisOptions dorisOptions = dorisBuilder.setFenodes("10.20.0.91:8030")
                .setTableIdentifier("ocean.cloud_uc_company")
                .setUsername("root")
                .setPassword("root12345")
                .build();

        Properties pro = new Properties();
        //json data format
        pro.setProperty("format", "json");
        pro.setProperty("read_json_by_line", "true");
        pro.setProperty("strip_outer_array", "true"); //如果serialize返回的是json数组格式,此处必须要设置为true
        DorisExecutionOptions executionOptions = DorisExecutionOptions.builder().setLabelPrefix("label-doris" + UUID.randomUUID()) //streamload label prefix,
                .setStreamLoadProp(pro).build();

        builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionOptions)
//                .setSerializer(new SimpleStringSerializer()) //serialize according to string
                .setSerializer(new DorisRecordSerializer<String>() {
                    @Override
                    public byte[] serialize(String s) throws IOException {
                        CanalMessage canalMessage = new Gson().fromJson(s, CanalMessage.class);
                        String type = canalMessage.getType();
                        if ("INSERT".equals(type) || "UPDATE".equals(type)) {
                            if ("uc_company".equals(canalMessage.getTable())) {
//                                Map<String, Object> dataMap = canalMessage.getData().get(0);
//                                return new Gson().toJson(dataMap).getBytes();
                                return new Gson().toJson(canalMessage.getData()).getBytes();
                            }
                        }
                        return null;
                    }
                }).setDorisOptions(dorisOptions);


        //build stream
        DataStreamSource<String> dataStreamSource = env.addSource(flinkKafkaConsumer);
        dataStreamSource.sinkTo(builder.build());

        env.execute("flink kafka to doris by datastream");
    }
}
相关文章
|
2月前
|
存储 监控 数据处理
flink 向doris 数据库写入数据时出现背压如何排查?
本文介绍了如何确定和解决Flink任务向Doris数据库写入数据时遇到的背压问题。首先通过Flink Web UI和性能指标监控识别背压,然后从Doris数据库性能、网络连接稳定性、Flink任务数据处理逻辑及资源配置等方面排查原因,并通过分析相关日志进一步定位问题。
200 61
|
25天前
|
存储 关系型数据库 MySQL
mysql怎么查询longblob类型数据的大小
通过本文的介绍,希望您能深入理解如何查询MySQL中 `LONG BLOB`类型数据的大小,并结合优化技术提升查询性能,以满足实际业务需求。
90 6
|
2月前
|
存储 Oracle 关系型数据库
【赵渝强老师】MySQL InnoDB的数据文件与重做日志文件
本文介绍了MySQL InnoDB存储引擎中的数据文件和重做日志文件。数据文件包括`.ibd`和`ibdata`文件,用于存放InnoDB数据和索引。重做日志文件(redo log)确保数据的可靠性和事务的持久性,其大小和路径可由相关参数配置。文章还提供了视频讲解和示例代码。
163 11
【赵渝强老师】MySQL InnoDB的数据文件与重做日志文件
|
1月前
|
SQL 关系型数据库 MySQL
mysql分页读取数据重复问题
在服务端开发中,与MySQL数据库进行数据交互时,常因数据量大、网络延迟等因素需分页读取数据。文章介绍了使用`limit`和`offset`参数实现分页的方法,并针对分页过程中可能出现的数据重复问题进行了详细分析,提出了利用时间戳或确保排序规则绝对性等解决方案。
|
2月前
|
关系型数据库 MySQL 数据库
GBase 数据库如何像MYSQL一样存放多行数据
GBase 数据库如何像MYSQL一样存放多行数据
|
2月前
|
缓存 NoSQL 关系型数据库
Redis和Mysql如何保证数据⼀致?
在项目中,为了解决Redis与Mysql的数据一致性问题,我们采用了多种策略:对于低一致性要求的数据,不做特别处理;时效性数据通过设置缓存过期时间来减少不一致风险;高一致性但时效性要求不高的数据,利用MQ异步同步确保最终一致性;而对一致性和时效性都有高要求的数据,则采用分布式事务(如Seata TCC模式)来保障。
75 14
|
16天前
|
存储 Oracle 关系型数据库
数据库传奇:MySQL创世之父的两千金My、Maria
《数据库传奇:MySQL创世之父的两千金My、Maria》介绍了MySQL的发展历程及其分支MariaDB。MySQL由Michael Widenius等人于1994年创建,现归Oracle所有,广泛应用于阿里巴巴、腾讯等企业。2009年,Widenius因担心Oracle收购影响MySQL的开源性,创建了MariaDB,提供额外功能和改进。维基百科、Google等已逐步替换为MariaDB,以确保更好的性能和社区支持。掌握MariaDB作为备用方案,对未来发展至关重要。
41 3
|
16天前
|
安全 关系型数据库 MySQL
MySQL崩溃保险箱:探秘Redo/Undo日志确保数据库安全无忧!
《MySQL崩溃保险箱:探秘Redo/Undo日志确保数据库安全无忧!》介绍了MySQL中的三种关键日志:二进制日志(Binary Log)、重做日志(Redo Log)和撤销日志(Undo Log)。这些日志确保了数据库的ACID特性,即原子性、一致性、隔离性和持久性。Redo Log记录数据页的物理修改,保证事务持久性;Undo Log记录事务的逆操作,支持回滚和多版本并发控制(MVCC)。文章还详细对比了InnoDB和MyISAM存储引擎在事务支持、锁定机制、并发性等方面的差异,强调了InnoDB在高并发和事务处理中的优势。通过这些机制,MySQL能够在事务执行、崩溃和恢复过程中保持
43 3
|
16天前
|
SQL 关系型数据库 MySQL
数据库灾难应对:MySQL误删除数据的救赎之道,技巧get起来!之binlog
《数据库灾难应对:MySQL误删除数据的救赎之道,技巧get起来!之binlog》介绍了如何利用MySQL的二进制日志(Binlog)恢复误删除的数据。主要内容包括: 1. **启用二进制日志**:在`my.cnf`中配置`log-bin`并重启MySQL服务。 2. **查看二进制日志文件**:使用`SHOW VARIABLES LIKE &#39;log_%&#39;;`和`SHOW MASTER STATUS;`命令获取当前日志文件及位置。 3. **创建数据备份**:确保在恢复前已有备份,以防意外。 4. **导出二进制日志为SQL语句**:使用`mysqlbinlog`
59 2
|
29天前
|
关系型数据库 MySQL 数据库
Python处理数据库:MySQL与SQLite详解 | python小知识
本文详细介绍了如何使用Python操作MySQL和SQLite数据库,包括安装必要的库、连接数据库、执行增删改查等基本操作,适合初学者快速上手。
198 15