开发者社区> 问答> 正文

请问flink sql 事件时间 Temporal Join 设计多个并行度不触发计算

环境说明:flink 1.15

场景描述:
left 表是一个来源于kafka的表且定义了水位线,right表是来自于mysql cdc的表,right表定义了主键和水位线,按flink官方说法right表就是一个版本表了。2个表按FOR SYSTEM_TIME AS OF事件时间的方式关联。且关联字段都是主键。在执行环境中设置了程序的并行度为3。

问题描述:right表本身有不少数据了,当left来一条数据的时候结果没有触发计算。但是当我把整个程序并行度设为1后又能正常触发执行。或者我把left表设为从文件读取数据且并行度=3也能触发计算。

请问有人遇到过这种问题吗?

展开
收起
游客fuzojzpl5x2bu 2024-03-25 08:38:29 101 0
4 条回答
写回答
取消 提交回答
  • 在 Flink 1.15 中要实现多个并行度的事件时间 Temporal Join,可以通过设置并行度和调整 watermark 策略来实现。以下是一个简单的示例:

    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.eventtime.WatermarkGenerator;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.types.Row;
    
    public class FlinkTemporalJoinExample {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(2); // 设置并行度为2
    
            EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
    
            // 创建第一个数据流
            DataStream<Row> stream1 = env.fromElements(
                    Row.of("A", 1L),
                    Row.of("B", 2L),
                    Row.of("C", 3L)
            ).assignTimestampsAndWatermarks(WatermarkStrategy
                    .<Row>forMonotonousTimestamps()
                    .withTimestampAssigner((event, timestamp) -> event.getField(1).asLong()));
    
            // 创建第二个数据流
            DataStream<Row> stream2 = env.fromElements(
                    Row.of("A", 4L),
                    Row.of("B", 5L),
                    Row.of("C", 6L)
            ).assignTimestampsAndWatermarks(WatermarkStrategy
                    .<Row>forMonotonousTimestamps()
                    .withTimestampAssigner((event, timestamp) -> event.getField(1).asLong()));
    
            // 将数据流转换为表
            Table table1 = tableEnv.fromDataStream(stream1, "key, value");
            Table table2 = tableEnv.fromDataStream(stream2, "key, value");
    
            // 执行事件时间 Temporal Join
            Table result = tableEnv.sqlQuery("SELECT t1.key, t1.value, t2.value FROM " + table1 + " AS t1 JOIN " + table2 + " AS t2 ON t1.key = t2.key AND t1.value <= t2.value");
    
            // 将结果打印到控制台
            tableEnv.toAppendStream(result, Row.class).print();
    
            env.execute("Flink Temporal Join Example");
        }
    }
    

    在这个示例中,我们设置了两个并行度为2的数据流,并使用 WatermarkStrategy 为每个数据流分配了时间戳和水印。然后,我们将这两个数据流转换为表,并执行事件时间 Temporal Join。最后,我们将结果打印到控制台。

    2024-03-31 17:33:35
    赞同 展开评论 打赏
  • 在 Flink 1.15 中,要实现多个并行度的事件时间 Temporal Join,可以通过设置并行度和调整 watermark 策略来实现。以下是一个简单的示例:

    1. 首先,创建两个数据源,分别表示左表和右表。这里我们使用 fromElements 方法创建个简单的数据流,并设置相应的时间戳和 watrm
      ```java
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
      import org.apache.flink.table.api.DataTypes;
      import org.apache.flink.table.descriptors.Schema;
      import org.apache.flink.table.descriptors.Watermark;
      import org.apache.flink.table.descriptors.WatermarkSpec;

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

    DataStream> leftStream = env.fromElements(
    Tuple2.of("A", 1000L),
    Tuple2.of("B", 2000L),
    Tuple2.of("C", 3000L)
    ).assignTimestampsAndWatermarks(WatermarkStrategy.>forMonotonousTimestamps()
    .withTimestampAssigner((event, timestamp) -> event.f1));

    DataStream> rightStream = env.fromElements(
    Tuple2.of("A", 1500L),
    Tuple2.of("B", 2500L),
    Tuple2.of("C", 3500L)
    ).assignTimestampsAndWatermarks(WatermarkStrategy.>forMonotonousTimestamps()
    .withTimestampAssigner((event, timestamp) -> event.f1));

    
    2. 将数据流转换为表,并注册到 TableEnvironment 中。
    
    ```java
    tableEnv.createTemporaryView("leftTable", leftStream, "key, value, rowtime.rowtime");
    tableEnv.createTemporaryView("rightTable", rightStream, "key, value, rowtime.rowtime");
    
    1. 使用 SQL 语句进行事件时间 Temporal Join。
    Table result = tableEnv.sqlQuery("SELECT l.key, l.value AS leftValue, r.value AS rightValue " +
        "FROM leftTable AS l " +
        "JOIN rightTable AS r ON l.key = r.key AND l.rowtime <= r.rowtime " +
        "WHERE l.rowtime >= '1000' AND l.rowtime <= '4000'");
    
    1. 将结果表转换为 DataStream,并设置并行度。
    DataStream<Row> resultStream = tableEnv.toAppendStream(result, Row.class).setParallelism(2);
    
    1. 执行任务。
    resultStream.print();
    env.execute("Flink SQL Event Time Temporal Join Example");
    

    在这个示例中,我们设置了两个并行度为 2 的子任务来处理事件时间 Temporal Join。这样,当数据量较大时,可以充分利用多核 CPU 资源进行计算。

    2024-03-31 16:04:29
    赞同 展开评论 打赏
  • 关于 Flink 1.15 中出现的问题,您描述的情况可能是由于并发执行过程中存在状态一致性或同步问题导致的。具体原因可能有以下几点:

    a. 状态一致性:在并行度为3的情况下,不同并行任务间可能存在状态不一致,如水位线或版本表状态更新未能及时同步至所有并行实例。检查是否正确配置了状态后端(如 RocksDB)以确保状态一致性,并确认状态 checkpoint 设置合理,能够及时保存和恢复状态。

    b. Watermark 传播与对齐:在多并行度下,确保 Watermark 正确传播和对齐至关重要。检查 Watermark 生成策略是否适应并发执行,以及是否正确处理乱序事件。同时,确保 Watermark 对齐机制(如 allowedLatenessalignedCheckpointTimeout)设置得当,以避免因 Watermark 不对齐导致的计算延迟。

    c. Kafka 与 MySQL CDC 消费不均衡:在高并行度下,Kafka 消费者可能面临分区分配不均,导致部分 Task 没有接收到 left 表的数据,或是 MySQL CDC 消费速度不均引发的状态差异。检查并调整 Kafka 分区分配策略,确保并行度与分区数匹配,同时关注 MySQL CDC 消费速率,避免消费不均衡。

    d. 版本表(right 表)处理逻辑:检查 right 表(版本表)在高并行度下的处理逻辑,确保主键关联与水位线判断正确处理并发情况,避免数据丢失或重复计算。

    为定位问题,建议通过日志分析、Flink Web UI 监控、以及调试模式运行(设置较低并行度,逐步增加至目标并行度)来观察任务执行情况,找出可能导致问题的具体环节。

    2024-03-25 10:16:10
    赞同 展开评论 打赏
  • 在 Flink 中,当你在一个流处理作业中同时处理来自 Kafka 和 MySQL CDC 的数据时,并且涉及到了基于事件时间窗口以及 FOR SYSTEM_TIME AS OF 语句的时态关联查询,确保正确触发计算的关键因素可能包括以下几个方面:

    • 事件时间对齐:

    在这种场景下,left 表(Kafka)和 right 表(MySQL CDC)都需按照事件时间进行处理,这意味着它们的数据必须根据事件时间戳正确排序和对齐,这样才能保证 Flink 能够依据正确的事件时间进行窗口计算和时态关联。

    • 水位线管理:

    水位线(watermarks)是 Flink 流处理中的一个重要概念,它用于推断事件时间的进度并决定是否可以触发窗口计算。确保两个数据源的水位线都能正确推进到能包含最新事件的时间点。

    • 并行处理与状态一致性:

    当并行度设置为 3 时,由于 Flink 的并行处理机制,每个并行任务会处理一部分分区的数据。对于有状态的操作(如时态关联),如果 Flink SQL 的执行计划导致某个算子的状态分布不均匀或者状态同步有问题,可能会出现部分并行任务无法正确关联到 right 表数据的情况。
    对于具有版本信息的 right 表(MySQL CDC),Flink 应该能够正确地管理和分发其历史版本数据到各个并行任务。如果并行任务间的状态没有正确共享或合并,就可能导致某些 left 表事件找不到对应的 right 表版本记录。

    • checkpoint 与恢复:

    如果 checkpoint 设置不当或者 state backend 存储的问题影响了状态的持久化和恢复,也有可能在高并行度下造成数据丢失或者计算不触发的现象。

    针对上述情况,请检查以下几点:

    • 确保左右两侧表的事件时间戳都被正确提取并且水位线正确推进。
    • 验证并调整 Flink SQL 的执行计划,查看时态关联操作是否被正确并行化,且状态管理符合预期。
    • 检查 checkpoint 配置是否合理,确认状态能否在并行任务之间正确地保存和恢复。
    • 如果使用了 KeyedStream 或者在 SQL 查询中指定了 ON KEY,确保关联键的哈希分布均匀,避免数据倾斜问题。

    调试建议:

    • 查看 Flink 作业的监控指标,观察水位线推进状况以及状态大小的变化。
    • 使用 Flink Web UI 查看作业图谱和任务状态,了解是否有任务未正常运行或状态异常。
    • 查看 Flink 的日志输出,定位是否存在任何关于状态管理、水位线推进或是其他错误的信息。
    2024-03-25 09:09:34
    赞同 2 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
SQL Server 2017 立即下载
GeoMesa on Spark SQL 立即下载
原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载