请问,Flink CDC这种情况导致被压如何解决呢?insert into dws_order_

请问,Flink CDC这种情况导致被压如何解决呢?insert into dws_order_statistics_record_sinkselect a.sellerTenantId, a.customerId, a.deptId, a.creatorId, a.createDate, COUNT(a.orderNo) as totalNum, -- 订单总量 COUNT(a.orderNo) FILTER (WHERE a.orderStatus in(6, 7, 12, 16)) as cjTotalNum, -- 成交总量 SUM(IFNULL(a.payableAmount, 0.00)) AS payableTotalAmount -- 应收金额FROM oms_order_source awhere a.orderType = 2 and a.orderStatus <>4 and a.sellerTenantId<>-1 and a.deptId is not nullgroup by a.sellerTenantId, a.customerId, a.deptId, a.creatorId, a.createDate; CREATE TABLE if not exists oms_order_source( id bigint COMMENT 'ID', sellerTenantId bigint COMMENT '租户Id', orderNo STRING COMMENT '订单号', orderType tinyint COMMENT '订单类型(1:增值商品/2:普通商品/3:物流)', orderStatus tinyint COMMENT '订单状态()', deptId bigint COMMENT '部门Id', customerId bigint COMMENT '客户id', payableAmount DECIMAL(10, 2) COMMENT '应付金额', creatorId bigint COMMENT '下单人', createTime TIMESTAMP(3) COMMENT '下单时间', createDate as TO_DATE(DATE_FORMAT(createTime,'yyyy-MM-dd')) comment '下单日期', PRIMARY KEY (id) NOT ENFORCED) comment '订单主表映射信息'with ( 'connector' = 'mysql-cdc',-- 'scan.startup.mode'='latest-offset', 'hostname' = '*', 'port' = '3306', 'username' = '', 'password' = '*', 'database-name' =' ', 'table-name' = '*'); image.png

展开
收起
真的很搞笑 2023-08-01 13:57:30 86 分享 版权
1 条回答
写回答
取消 提交回答
  • 这种情况是由于 Flink CDC 的 insert into 语句生成的 SQL 语句太长导致的。

    Flink CDC 的 insert into 语句会在一条 SQL 语句中将所有数据插入到表中。如果数据量很大,那么这条 SQL 语句就会非常长。如果 SQL 语句太长,那么就会导致 SQL 引擎出现性能问题,最终导致任务被压。

    要解决这个问题,可以使用 Flink CDC 的 upsert into 语句。upsert into 语句会在每次执行时只更新一行数据。这样,SQL 语句就会比较短,不会导致 SQL 引擎出现性能问题。

    以下是使用 upsert into 语句的示例:

    insert into dws_order_item
    (
    order_item_id,
    order_id,
    product_id,
    quantity
    )
    values
    (
    123,
    456,
    789,
    10
    )
    on duplicate key update
    quantity = quantity + values(quantity)
    通过使用 upsert into 语句,可以避免 SQL 语句太长导致的问题。

    2023-09-25 18:01:19
    赞同 展开评论

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理