开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

flink es connector7丢数据

目前我司通过flinkcdc将mysql表同步至es过程中出现数据丢失,但是丢失数据更新任意字段又能同步上去,sql如下:


CREATE TABLE prod_erp_goods_cdc (
    id INT,
   ...
    created_at TIMESTAMP(3),
    updated_at TIMESTAMP(3),
    deleted_at TIMESTAMP(3),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '',
    'port' = '',
    'username' = '',
    'password' = '',
    'database-name' = '',
    'table-name' = '',
    'scan.incremental.snapshot.enabled'='true'
);
CREATE TABLE esprod_erp_goods_sink (
    id INT,
    ...
    created_at timestamp_ltz(3),
    updated_at timestamp_ltz(3),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'elasticsearch-7',
    'hosts' = '',
    'index' = '',
    'sink.bulk-flush.max-actions' = '1000',  -- 每条数据直接flush到ES
    -- 'sink.bulk-flush.max-size' = '1mb',   -- 数据大小达到1mb时flush到ES
    'sink.bulk-flush.interval' = '1s',     -- 每1秒钟进行一次flush
    -- 'connection.timeout' = '10000',
    -- 'socket.timeout' = '30000',
    'format' = 'json', -- 非常重要的配置
    'json.timestamp-format.standard' = 'ISO-8601' -- 非常重要的配置
);
insert into esprod_erp_goods_sink
select id ,
...
JSON_VALUE(img_url_list, '$[0]'),
...
cast(created_at as timestamp_ltz(3)),
cast(updated_at as timestamp_ltz(3))
from prod_erp_goods_cdc
where deleted_at is null;

展开
收起
游客zzkzlyjzlewpy 2024-08-07 17:25:45 56 0
0 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载