需求 假设,我的hive表为tmp表,若干字段,如何以dt、hour、sid为分区,其中sid为渠道的含义。 我当前基于FlinkSQL从kafka表中读取数据,转写到hive表tmp中,采用流式写入,提交策略metastore、success-file,触发假设用process-time,delay为1h。 检查点每1min检查一次,连续2次检查点间隔10min,本质就是10min做一次检查点。
当前情况 由于数据量较大,kafka分区数量为60,因此我的任务并发可以选择60以内,假设并发也选了60。 那么对于每个时间点,dt肯定只有1个,hour也基本只有1个,sid的话假设有10个。 文件数情况为: 每10分钟,10(sid)*60(parallelism)= 600个。 每小时有6个10分钟(即6次检查点),那么就是6000个文件。 如上,每小时差不多6000个文件生成,只会多不会少,因为考虑到roll policy等。
目前我需要的是,由于不同sid的数据量不一样,我想能否对于小数据量的sid,只被1个subtask消费,这样对于这个sid对应的分区下,每10分钟的文件数量就是1个,而不是60个。 对于数据量大的sid,则多个并行subtask消费。 大概想法类似于datastream api中先keyBy sid(当然这里可能有数据倾斜,我可以自己去想法解决,比如将大流量sid分散为sid+randomInt),然后基于streamingSink来消费并写入hive。
请问如上想法datastream、以及 flinkSQL 是否都能实现呢?
目前我看insert into tmp select ... from kafka_tmp;这样的话,默认生成2个task,一个kafkaSouce+streamSink(chain在一起)+ partition commiter,这是不满足需要的肯定。*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在Flink中,您的需求可以通过不同的方式实现,具体取决于您选择使用DataStream API还是Flink SQL。下面分别说明两种方法的可行性:
KeyBy + Partition Customization: 您的想法是可行的。首先,通过keyBy
操作将数据按照sdk
进行分区,这可以确保相同sdk
的数据被发送到同一个子任务上处理。然后,您可以自定义Partitioner
来控制如何将 keyed stream 分发到下游 operator 的多个实例上。对于小流量的sdk
,可以设计一个策略使得它们只分配给一个子任务,而对于大流量的sdk
则可以分配给多个子任务。
Custom Sink or Side Output: 在自定义sink阶段,可以根据sdk
的流量大小决定是直接写出文件(对于小流量sdk
)还是进一步并行化处理(对于大流量sdk
)。您甚至可以利用Side Output功能,为不同级别的流量定义不同的输出路径或处理逻辑。
Flink SQL本身不直接提供像DataStream API那样细粒度的控制能力,比如自定义分区器或者基于数据量动态调整并行度。但是,您可以通过一些间接的方式来尝试接近您的需求:
动态表与窗口聚合: 利用Flink SQL的窗口函数和动态表特性,可以在处理过程中先对数据按需聚合,减少写入Hive时的数据量。但这并不能直接解决您希望根据sdk
流量动态调整并行度的问题。
UDF与多sink: 通过用户自定义函数(UDF)来识别sdk
的流量大小,并根据判断结果将数据路由到不同的sink。每个sink可以配置不同的并行度,理论上可以模拟出类似DataStream API中的效果。但请注意,这种方式可能较为复杂且维护成本较高,且需要手动管理sink的输出逻辑以适应不同sdk
的写入需求。
外部系统辅助决策: 另一种思路是,在SQL作业执行前,通过外部系统分析各sdk
的数据量,然后动态生成或修改Flink SQL作业的配置(如并行度),以适应不同sdk
的需求。这种方法依赖于外部系统的智能调度和分析能力。
综上所述,虽然Flink SQL在原生支持上可能不如DataStream API灵活,但通过一些间接手段和技巧,部分需求仍然可以得到满足。而DataStream API提供了更底层、更灵活的控制能力,更适合实现您所描述的定制化需求。