开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

实时计算 Flink版为什么 我测试出来不是这样的?

实时计算 Flink版为什么 我测试出来不是这样的?6f31a362ed2956a8960a351959a671af.png

展开
收起
真的很搞笑 2023-10-18 17:36:05 64 0
6 条回答
写回答
取消 提交回答
  • 十分耕耘,一定会有一分收获!

    楼主你好,对于阿里云实时计算Flink版,可能会出现一些特定的情况导致你的测试结果与期望不符,可能是版本兼容性,请确保你使用的Flink版本与阿里云实时计算Flink版所支持的版本相匹配,如果版本不匹配,可能会发生一些兼容性问题。

    还有就是配置错误,请检查你的配置是否正确,确保你正确设置了表的格式、连接器和其他必需的参数。

    2024-01-27 15:56:52
    赞同 展开评论 打赏
  • 在一般场景下,这种写入分区策略不会有太多问题,但是如果下游kafka有多个flink写入,举个例子:a,b...f作业都同时把数据写入到topic1中,每个flink并发度都是1,而topic1的分区数是10,这样就会导致所有的flink作业都会把数据写入到0分区中,1-9号分区没有数据,造成kafka的数据倾斜,这种情况下,只能我们自己自定义分区策略,可以简单的定义一个轮盘转方式的分区策略:

    public class FlinkRebalancePartitioner<T> extends FlinkKafkaPartitioner<T> {
        private static final long serialVersionUID = -3785320239953858777L;
        private int parallelInstanceId;
        private int nextPartitionToSendTo;
    
        public FlinkRebalancePartitioner(){
    
        }
    
        @Override
        public void open(int parallelInstanceId, int parallelInstances) {
            Preconditions.checkArgument(
                    parallelInstanceId >= 0, "Id of this subtask cannot be negative.");
            Preconditions.checkArgument(
                    parallelInstances > 0, "Number of subtasks must be larger than 0.");
    
            this.parallelInstanceId = parallelInstanceId;
            nextPartitionToSendTo = ThreadLocalRandom.current().nextInt(parallelInstances);
        }
    
        @Override
        public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
            Preconditions.checkArgument(partitions != null && partitions.length > 0, "Partitions of the target topic is empty.");
            nextPartitionToSendTo = (nextPartitionToSendTo + 1) % partitions.length;
            return partitions[nextPartitionToSendTo];
        }
    
        @Override
        public boolean equals(Object o) {
            return this == o || o instanceof FlinkRebalancePartitioner;
        }
    
        @Override
        public int hashCode() {
            return FlinkRebalancePartitioner.class.hashCode();
        }
    }
    

    ——参考链接

    2024-01-21 22:51:52
    赞同 1 展开评论 打赏
  • 深耕大数据和人工智能

    实时计算 Flink版的性能和稳定性在不同的测试环境和条件下可能会有所不同。测试结果可能受到硬件配置、网络环境、数据规模、负载类型和大小等多种因素的影响。因此,如果你测试出来的结果与官方发布的结果不一致,可能有以下几个原因:

    测试环境差异:你的测试环境和官方发布的结果可能存在差异,包括硬件配置、网络环境、数据规模等。这些差异可能导致测试结果有所不同。
    负载类型和大小:实时计算 Flink版在不同负载类型和大小下的性能表现可能会有所不同。如果你的测试负载类型和大小与官方测试的负载类型和大小不同,这可能导致测试结果存在差异。
    版本差异:实时计算 Flink版的不同版本之间可能存在差异,包括功能、性能和稳定性等方面。如果你正在使用的版本与官方测试的版本不同,可能会导致测试结果有所不同。
    操作和配置:在测试过程中,操作和配置的差异也可能影响测试结果。例如,不同的启动参数、配置文件设置等都可能影响实时计算 Flink版的性能和稳定性。
    因此,如果你测试出来的结果与官方发布的结果不一致,建议仔细检查你的测试环境和配置,并尝试调整负载类型和大小等因素,以获得更准确的测试结果。同时,你也可以参考实时计算 Flink版的官方文档和社区资源,了解更多关于性能和稳定性方面的信息和技术支持。

    2024-01-21 20:54:01
    赞同 展开评论 打赏
  • 您提到的Flink Fixed Partitioner类是用于将数据分发到Kafka分区中的固定策略,并非实际的数据轮转写入方式。

    在Apache Kafka中,每个生产者实例可以指定一个或多个目标分区来发送消息。如果指定了单个分区,则该消息总是被路由到这个特定的分区;而如果没有明确指定分区号(即使用默认值),则会根据某种算法自动分配给某个可用的分区。

    对于Flink Sink操作符来说,在SinkFunction接口中有两个方法:open()processElement(). 在open()方法中,我们可以设置Sink的操作模式,例如是否开启多线程等配置。而在processElement()方法中,我们处理接收到的消息并将其转换为下游系统可接受的形式进行消费。

    如果您想实现类似“轮转”写入的方式,请考虑以下方案:

    • 使用Flink的Windowing API创建滚动窗口。
    • 将这些窗口与不同的Kafka主题关联起来。
    • 每当新的元素到达时,它会被添加到当前活动的主题上,然后从下一个主题开始一个新的窗口。

    这样就可以达到类似于"轮转"的效果,但是请注意这需要对源代码有深入的理解以及一些额外的工作量才能完成。这种方式可能会导致性能下降或者资源消耗增加,因此请确保权衡好业务需求和技术限制之间的关系。

    2024-01-15 10:27:12
    赞同 展开评论 打赏
  • 某政企事业单位安全运维工程师,主要从事系统运维及网络安全工作,多次获得阿里云、华为云、腾讯云征文比赛一二等奖;CTF选手,白帽,全国交通行业网络安全大赛二等奖,全国数信杯数据安全大赛银奖,手握多张EDU、CNVD、CNNVD证书。

    Flink Fixed Partitioning是一种分区策略,它会在每个批次中固定数量的记录分配到固定的分区。这意味着在一个批处理周期内,同一份记录会被分派到相同的分区,除非重新启动Flink实例或更改源表的分区键。

    在这种情况下,假设您有一个源表S,每条记录都有一个唯一的标识符id,而且源表被分为三个并发消费的消费者C1,C2,C3。同时,Kafka主题T有十个分区p0-p9。FlinkFixedPartitioner会选择哪些分区来投递数据?

    C1 -> p0,p1
    C2 -> p2,p3
    C3 -> p4,p5

    这是因为FlinkFixedPartitioner按顺序轮询分区列表,直到遍历完为止。也就是说,第一次迭代,它会选取前两个分区(p0,p1); 第二次迭代,它会选取第三个和第四个分区(p2,p3); 最后一次迭代,它会选取第五个和第六个分区(p4,p5).

    如果我们改变消费者的数量,比如说只有两个消费者(C1,C2),会发生什么变化呢?

    C1 -> p0,p1
    C2 -> p2,p3

    这就解释了为何您看到的结果并非均匀分布。原因在于,FlinkFixedPartitioner不会动态地适应新的消费者数量,而是始终保持原有的分区映射规则不变。

    总结起来,Flink Fixed Partitioning策略的主要特点是保证每个批次内的记录总是被分配到相同数量的分区。这对于减少下游分区负载不平衡的情况是有益的,但也可能导致资源利用率降低。如果您的业务场景需要更灵活的分区策略,可以考虑使用其他的分区算法,如Round-robin partitioning。

    2024-01-13 17:31:13
    赞同 1 展开评论 打赏
  • 北京阿里云ACE会长

    Flink 写入 Kafka 默认采用的分区策略的代码实现在 FlinkFixedPartitioner 这个类中,并不是我们理解的轮盘转方式写入下游分区,而是每个并发固定的写入到 Kafka 的某个分区,举个例子: Flink 有 3 个 Sink 并发写入 Kafka,而 Kafka 有 10 个分区,那么数据只会写入 Kafka 的 0-2 号分区中,其他分区不会有数据。代码的实现逻辑如下:

    public class FlinkFixedPartitioner extends Partitioner {
    private final int partitionCount;
    private final int currentPartition;
    public FlinkFixedPartitioner(int partitionCount, int currentPartition) {
    this.partitionCount = partitionCount;
    this.currentPartition = currentPartition;
    }
    @Override
    public int partition(Object key, int numPartitions) {
    return currentPartition % partitionCount;
    }
    @Override
    public void close() {
    }
    }

    可以看到,FlinkFixedPartitioner 类的 partition 方法只是将当前 partition 编号 modulo 分区总数,得到的余数就是目标分区编号。因此,如果有多个并发写入 Kafka,每个并发只会写入固定的几个分区,而不是轮盘转方式写入所有分区。

    2024-01-12 21:40:28
    赞同 展开评论 打赏
滑动查看更多

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载