开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

flink中如何能够产生-D的change log的呢?

flink中如何能够产生-D的change log的呢?

实际的业务场景是,我这边有一个滑动窗口,但是我希望根据第二次滑动后窗口产生的数据和第一次滑动后窗口产生的数据做对比,按照主键做对比,新增数据做insert,消失的数据做delete,分别产生需要insert和delete的数据。

但是网上不管怎么搜,都只有产生+I -U +U类型的example,就是没有产生-D的example

展开
收起
三分钟热度的鱼 2023-12-06 19:20:30 120 0
2 条回答
写回答
取消 提交回答
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    在Flink中,要产生-D的change log,可以通过自定义一个ChangelogProducer来实现。以下是一个简单的示例:

    首先,创建一个自定义的ChangelogProducer类,继承自org.apache.flink.streaming.api.functions.source.SourceFunction<ChangeEvent<T>>

    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.api.watermark.Watermark;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    
    public class CustomChangelogProducer<T> implements SourceFunction<ChangeEvent<T>> {
        private final Supplier<T> dataSupplier;
        private final long windowSize;
        private T currentData;
    
        public CustomChangelogProducer(Supplier<T> dataSupplier, long windowSize) {
            this.dataSupplier = dataSupplier;
            this.windowSize = windowSize;
        }
    
        @Override
        public void run(SourceContext<ChangeEvent<T>> ctx) throws Exception {
            while (true) {
                long currentTime = System.currentTimeMillis();
                T newData = dataSupplier.get();
                if (newData != null) {
                    currentData = newData;
                    ctx.collect(new ChangeEvent<>(currentTime, currentTime + windowSize, currentData));
                }
                Thread.sleep(1000);
            }
        }
    
        @Override
        public void cancel() {
            // No-op
        }
    
        @Override
        public boolean isBounded() {
            return false;
        }
    
        @Override
        public void onElement(ChangeEvent<T> element, Context context, Collector<ChangeEvent<T>> collector) throws Exception {
            // Do nothing
        }
    
        @Override
        public void onTimeout(long timeout, TimeWindow window, Context context) throws Exception {
            // Do nothing
        }
    
        @Override
        public void onWatermark(Watermark mark) throws Exception {
            // Do nothing
        }
    }
    

    然后,在你的Flink程序中使用这个自定义的ChangelogProducer:

    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.api.watermark.WatermarkStrategy;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    
    public class Main {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 使用自定义的ChangelogProducer
            DataStream<String> inputStream = env.addSource(new CustomChangelogProducer<>(() -> "example", 1000L));
    
            // 对输入数据进行处理,例如按照主键进行对比,生成insert和delete的数据
            // ...
    
            env.execute("Flink Changelog Example");
        }
    }
    

    这样,你就可以在Flink程序中产生-D的change log了。注意,这个示例仅用于演示目的,实际应用中可能需要根据具体需求进行调整。

    2023-12-06 21:31:52
    赞同 展开评论 打赏
  • 在Apache Flink中,你可以使用Table API或SQL来处理数据流,并生成Change Log(变更日志)。这些变更日志可以包含INSERT、UPDATE和DELETE操作。通常情况下,Flink的CDC(Change Data Capture)功能是针对数据库源的数据变更捕获。

    要实现你所描述的场景,你需要将滑动窗口产生的结果转换为I/U/D形式的变更日志。以下是大致的步骤:

    1. 创建表结构
      首先,定义一个与你要比较的数据相对应的表结构。这个表结构应该包含主键列和其他属性列。

    2. 从数据源读取数据
      使用Table API或SQL从你的数据源读取数据,并将其注册为一个临时表。

    3. 应用滑动窗口
      使用GROUP BYTUMBLE等SQL语句对数据进行滑动窗口分组,并计算每个窗口内的数据。

    4. 计算差异
      通过对比两次滑动窗口的结果,找出新增、更新和删除的数据。这可能需要编写一些自定义的UDF(用户自定义函数),以便根据主键值进行比较。

    5. 产生变更日志
      将计算出的差异转换为INSERT、UPDATE和DELETE操作。对于INSERT和UPDATE,你可以直接使用INSERT INTOUPDATE SQL语句。对于DELETE操作,你需要找到上次窗口中存在的但当前窗口中不存在的数据,并生成相应的DELETE语句。

    6. 输出变更日志
      最后,将产生的变更日志写入到另一个表或者输出到外部系统。

    由于Flink没有直接提供这种场景的内置操作,你可能需要编写一些自定义代码来实现上述逻辑。这里是一个简化的示例,展示了如何在Flink Table API中使用row_number()窗口函数和聚合操作来找到前一次窗口与当前窗口之间的差异:

    // 假设我们有一个名为input的原始表,其中包含两个字段:id(主键)和value
    
    // 定义一个临时视图,用于存储前一次窗口的结果
    tableEnv.createTemporaryView("previous_window", previousWindowResult);
    
    // 对输入表应用滑动窗口,并与上一次窗口的结果进行JOIN
    Table result = input
        .window(Tumble.over(lit(5).minutes()).on($("timestamp")).as("w"))
        .groupBy($("w"), $("id"))
        .select(
            $("id"),
            $("value").as("current_value"),
            $("previous_window.value").as("previous_value")
        );
    
    // 对比当前窗口和上一次窗口的结果,找出新增、更新和删除的数据
    result
        .where($("previous_value").isNull())
        .insertInto("inserts");
    
    result
        .where($("previous_value").isNotNull())
        .where($("current_value").notEqual($("previous_value")))
        .insertInto("updates");
    
    result
        .where($("current_value").isNull())
        .insertInto("deletes");
    
    2023-12-06 20:09:11
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    PostgresChina2018_赖思超_PostgreSQL10_hash索引的WAL日志修改版final 立即下载
    Kubernetes下日志实时采集、存储与计算实践 立即下载
    日志数据采集与分析对接 立即下载