请问,Flink CDC这种情况导致被压如何解决呢?insert into dws_order_statistics_record_sinkselect a.sellerTenantId, a.customerId, a.deptId, a.creatorId, a.createDate, COUNT(a.orderNo) as totalNum, -- 订单总量 COUNT(a.orderNo) FILTER (WHERE a.orderStatus in(6, 7, 12, 16)) as cjTotalNum, -- 成交总量 SUM(IFNULL(a.payableAmount, 0.00)) AS payableTotalAmount -- 应收金额FROM oms_order_source awhere a.orderType = 2 and a.orderStatus <>4 and a.sellerTenantId<>-1 and a.deptId is not nullgroup by a.sellerTenantId, a.customerId, a.deptId, a.creatorId, a.createDate; CREATE TABLE if not exists oms_order_source
( id
bigint COMMENT 'ID', sellerTenantId
bigint COMMENT '租户Id', orderNo
STRING COMMENT '订单号', orderType
tinyint COMMENT '订单类型(1:增值商品/2:普通商品/3:物流)', orderStatus
tinyint COMMENT '订单状态()', deptId
bigint COMMENT '部门Id', customerId
bigint COMMENT '客户id', payableAmount
DECIMAL(10, 2) COMMENT '应付金额', creatorId
bigint COMMENT '下单人', createTime
TIMESTAMP(3) COMMENT '下单时间', createDate
as TO_DATE(DATE_FORMAT(createTime,'yyyy-MM-dd')) comment '下单日期', PRIMARY KEY (id
) NOT ENFORCED) comment '订单主表映射信息'with ( 'connector' = 'mysql-cdc',-- 'scan.startup.mode'='latest-offset', 'hostname' = '*', 'port' = '3306', 'username' = '', 'password' = '*', 'database-name' =' ', 'table-name' = '*');
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
这种情况是由于 Flink CDC 的 insert into 语句生成的 SQL 语句太长导致的。
Flink CDC 的 insert into 语句会在一条 SQL 语句中将所有数据插入到表中。如果数据量很大,那么这条 SQL 语句就会非常长。如果 SQL 语句太长,那么就会导致 SQL 引擎出现性能问题,最终导致任务被压。
要解决这个问题,可以使用 Flink CDC 的 upsert into 语句。upsert into 语句会在每次执行时只更新一行数据。这样,SQL 语句就会比较短,不会导致 SQL 引擎出现性能问题。
以下是使用 upsert into 语句的示例:
insert into dws_order_item
(
order_item_id,
order_id,
product_id,
quantity
)
values
(
123,
456,
789,
10
)
on duplicate key update
quantity = quantity + values(quantity)
通过使用 upsert into 语句,可以避免 SQL 语句太长导致的问题。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。