Flink导入mysql数据到doris

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 经过各种实践,发现比较适合中小公司的方式。分为全量和增量。

经过各种实践,发现比较适合中小公司的方式。分为全量和增量。

1、参考链接:

1、Flink Doris Connector:https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector

2、JSON格式数据导入:https://doris.apache.org/zh-CN/docs/dev/data-operate/import/import-way/load-json-format

2、全量导入

通过外部表同步数据

3、增量导入

1、dataocean监听kafka然后insert,这种方式,官方不建议,大量的insert会把doris-be打崩溃。(目前正在用的方式)。

2、binlog load的方式,以前旧版本尝试有问题,使用新版本尝试看看。经过新版本的尝试,发现这种方式不适合,详见:binlog方式调研。

3、Flink CDC,但是这种方式得学习和搭建Flink,使得成本变高了。准备尝试这种方式了,后期使用这种方式。

4、Flink的方式

1、订阅kafka的方式

canal发个kafka的消息,可以通过如下方式同步给doris。

但是问题是:无法控制同步哪些数据到哪个表

CREATE ROUTINE LOAD ocean.test1 ON cloud_uc_company
COLUMNS(id, name, delete_at)
PROPERTIES
(
    "desired_concurrent_number"="3",
    "max_batch_interval" = "20",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
    "strict_mode" = "false",
    "format" = "json",
    "jsonpaths" = "[\"$.id\",\"$.name\",\"$.delete_at\"]",
    "strip_outer_array" = "true",
    "json_root" = "$.data"
)
FROM KAFKA
(
    "kafka_broker_list" = "10.20.1.26:9092",
    "kafka_topic" = "topic-periodfour",
    'property.group.id' = 'doris_ods_log',
    'property.client.id' = 'doris_ods_log',
    'property.kafka_default_offsets' = 'OFFSET_BEGINNING'
);

2、Flink代码可以拿到kafka消息

2.1、原理:

这种方式是拿到canal发送给里的kafka消json消息,然后取出其中的data部分,然后再写到doris里去。这种方式其实是通过doris的json序列化器,将data部分序列化成json字符串,然后再写入doris。

2.2、碰到的问题:

1、问题描述:

DorisRuntimeException: stream load error: [ANALYSIS_ERROR]errCode = 2, detailMessage = current running txns on db 11008 is 100, larger than limit 100, see more in null

解决方式:

这是因为同一个库并发导入超过了100,可通过调整 fe.conf的参数 max_running_txn_num_per_db 来解决。
执行sql:admin set frontend config ('max_running_txn_num_per_db' = '1000');

2、问题描述:

DorisRuntimeException: stream load error: [INTERNAL_ERROR]too many filtered rows, see more in http://10.20.0.92:8040/api/_load_error_log?file=__shard_182/error_log_insert_stmt_8f4f21b2fa64f604-8fbb0ec0fba8129d_8f4f21b2fa64f604_8fbb0ec0fba8129d
打开链接,看到了错误信息:Reason: JSON data is array-object, strip_outer_array must be TRUE。

解决方式:

需要增加代码:设置pro.setProperty("strip_outer_array", "true");

2.3、代码:

package com.bm001.flinkdemo;

import com.google.gson.Gson;
import org.apache.commons.collections.CollectionUtils;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.DorisRecordSerializer;
import org.apache.doris.flink.sink.writer.JsonDebeziumSchemaSerializer;
import org.apache.doris.flink.sink.writer.RowDataSerializer;
import org.apache.doris.flink.sink.writer.SimpleStringSerializer;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.DataType;

import java.io.IOException;
import java.time.LocalDate;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;

/**
 * 这种方法是将kafka的json消息,获取其中的data部分,
 * 然后再通过doris的json序列化器,将data部分序列化成json字符串,然后再写入doris
 * <p>
 * 每个同步的表都需要增加一个任务
 */
public class Kafka2DorisDataStreamDemo {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "10.20.1.26:9092");
        props.put("group.id", "group-flinkdemo");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest");

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1000);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //source config
        FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>("topic-periodfour", new SimpleStringSchema(), props);

        //sink config
        DorisSink.Builder<String> builder = DorisSink.builder();
        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
        DorisOptions dorisOptions = dorisBuilder.setFenodes("10.20.0.91:8030")
                .setTableIdentifier("ocean.cloud_uc_company")
                .setUsername("root")
                .setPassword("root12345")
                .build();

        Properties pro = new Properties();
        //json data format
        pro.setProperty("format", "json");
        pro.setProperty("read_json_by_line", "true");
        pro.setProperty("strip_outer_array", "true"); //如果serialize返回的是json数组格式,此处必须要设置为true
        DorisExecutionOptions executionOptions = DorisExecutionOptions.builder().setLabelPrefix("label-doris" + UUID.randomUUID()) //streamload label prefix,
                .setStreamLoadProp(pro).build();

        builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionOptions)
//                .setSerializer(new SimpleStringSerializer()) //serialize according to string
                .setSerializer(new DorisRecordSerializer<String>() {
                    @Override
                    public byte[] serialize(String s) throws IOException {
                        CanalMessage canalMessage = new Gson().fromJson(s, CanalMessage.class);
                        String type = canalMessage.getType();
                        if ("INSERT".equals(type) || "UPDATE".equals(type)) {
                            if ("uc_company".equals(canalMessage.getTable())) {
//                                Map<String, Object> dataMap = canalMessage.getData().get(0);
//                                return new Gson().toJson(dataMap).getBytes();
                                return new Gson().toJson(canalMessage.getData()).getBytes();
                            }
                        }
                        return null;
                    }
                }).setDorisOptions(dorisOptions);


        //build stream
        DataStreamSource<String> dataStreamSource = env.addSource(flinkKafkaConsumer);
        dataStreamSource.sinkTo(builder.build());

        env.execute("flink kafka to doris by datastream");
    }
}
相关文章
|
3天前
|
存储 消息中间件 Kafka
基于 Flink 的中国电信星海时空数据多引擎实时改造
本文整理自中国电信集团大数据架构师李新虎老师在Flink Forward Asia 2024的分享,围绕星海时空智能系统展开,涵盖四个核心部分:时空数据现状、实时场景多引擎化、典型应用及未来展望。系统日处理8000亿条数据,具备亚米级定位能力,通过Flink多引擎架构解决数据膨胀与响应时效等问题,优化资源利用并提升计算效率。应用场景包括运动状态识别、个体行为分析和群智感知,未来将推进湖仓一体改造与三维时空服务体系建设,助力数字化转型与智慧城市建设。
161 1
基于 Flink 的中国电信星海时空数据多引擎实时改造
|
1月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
210 0
|
29天前
|
存储 缓存 数据挖掘
Flink + Doris 实时湖仓解决方案
本文整理自SelectDB技术副总裁陈明雨在Flink Forward Asia 2024的分享,聚焦Apache Doris与湖仓一体解决方案。内容涵盖三部分:一是介绍Apache Doris,一款高性能实时分析数据库,支持多场景应用;二是基于Doris、Flink和Paimon的湖仓解决方案,解决批流融合与数据一致性挑战;三是Doris社区生态及云原生发展,包括存算分离架构与600多位贡献者的活跃社区。文章深入探讨了Doris在性能、易用性及场景支持上的优势,并展示了其在多维分析、日志分析和湖仓分析中的实际应用案例。
158 17
Flink + Doris 实时湖仓解决方案
|
1月前
|
Oracle 关系型数据库 Java
【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
本文介绍通过Flink CDC实现Oracle数据实时同步至崖山数据库(YashanDB)的方法,支持全量与增量同步,并涵盖新增、修改和删除的DML操作。内容包括环境准备(如JDK、Flink版本等)、Oracle日志归档启用、用户权限配置、增量日志记录设置、元数据迁移、Flink安装与配置、生成Flink SQL文件、Streampark部署,以及创建和启动实时同步任务的具体步骤。适合需要跨数据库实时同步方案的技术人员参考。
【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
|
2月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
358 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
|
1月前
|
消息中间件 关系型数据库 Kafka
阿里云基于 Flink CDC 的现代数据栈云上实践
阿里云基于 Flink CDC 的现代数据栈云上实践
|
1月前
|
关系型数据库 MySQL Java
【YashanDB知识库】原生mysql驱动配置连接崖山数据库
【YashanDB知识库】原生mysql驱动配置连接崖山数据库
【YashanDB知识库】原生mysql驱动配置连接崖山数据库
|
1月前
|
关系型数据库 MySQL 数据库连接
docker拉取MySQL后数据库连接失败解决方案
通过以上方法,可以解决Docker中拉取MySQL镜像后数据库连接失败的常见问题。关键步骤包括确保容器正确启动、配置正确的环境变量、合理设置网络和权限,以及检查主机防火墙设置等。通过逐步排查,可以快速定位并解决连接问题,确保MySQL服务的正常使用。
302 82
|
2天前
|
负载均衡 算法 关系型数据库
大数据新视界--大数据大厂之MySQL数据库课程设计:MySQL集群架构负载均衡故障排除与解决方案
本文深入探讨 MySQL 集群架构负载均衡的常见故障及排除方法。涵盖请求分配不均、节点无法响应、负载均衡器故障等现象,介绍多种负载均衡算法及故障排除步骤,包括检查负载均衡器状态、调整算法、诊断修复节点故障等。还阐述了预防措施与确保系统稳定性的方法,如定期监控维护、备份恢复策略、团队协作与知识管理等。为确保 MySQL 数据库系统高可用性提供全面指导。
|
7天前
|
SQL 关系型数据库 MySQL
大数据新视界--大数据大厂之MySQL数据库课程设计:MySQL 数据库 SQL 语句调优方法详解(2-1)
本文深入介绍 MySQL 数据库 SQL 语句调优方法。涵盖分析查询执行计划,如使用 EXPLAIN 命令及理解关键指标;优化查询语句结构,包括避免子查询、减少函数使用、合理用索引列及避免 “OR”。还介绍了索引类型知识,如 B 树索引、哈希索引等。结合与 MySQL 数据库课程设计相关文章,强调 SQL 语句调优重要性。为提升数据库性能提供实用方法,适合数据库管理员和开发人员。

热门文章

最新文章

下一篇
oss创建bucket