开发者社区> 问答> 正文

在Flink KeyedStream上暂停处理

"我有一个Flink流应用程序,需要能够在特定的键控流上“暂停”和“取消暂停”处理。“处理”意味着只对流执行一些简单的异常检测。

我们正在思考的流程是这样的:

命令流,或者ProcessCommand,PauseCommand或者ResumeCommand每个都有一个id用于的命令流KeyBy。

ProcessCommands 将检查密钥在处理之前是否暂停,如果没有则缓冲。

PauseCommands 将暂停键的处理。

ResumeCommands 将取消暂停键的处理并刷新缓冲区。

这种流程看起来是否合理,如果是这样,我能否使用像split运营商这样的东西来实现?

示例流,省略了各个记录时间戳:

[{command: process, id: 1}, {command: pause, id: 1}, {command: process, id: 1}, {command: resume, id: 1}, {command: process, id: 1}]

Flow:
=>
{command: process, id: 1} # Sent downstream for analysis
=>
{command: pause, id: 1} # Starts the buffer for id 1
=>
{command: process, id: 1} # Buffered into another output stream
=>
{command: resume, id: 1} # Turns off buffering, flushes [{command: process, id: 1}] downstream
=>
{command: process, id: 1} # Sent downstream for processing as the buffering has been toggled off "

展开
收起
flink小助手 2018-11-28 16:21:02 3127 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    "这可以使用Flink的Window运算符来实现。首先,通过应用操作创建POJO或tuple基于流map。

    然后,根据你的需要,你可以keyBy在该流上使用以获得keyedStream。

    现在,通过使用基于时间的无限window,a trigger和a的组合window function,你可以实现命令流的切换行为。

    基本上,你可以使用windows缓冲区,它在收到暂停记录后保存进程记录,直到收到恢复记录。你可以编写一个自定义触发器,根据你的方案驱逐窗口(缓冲区)。

    以下是Trigger具有onElement()重写方法的自定义实现。

    /**

    • We trigger the window processing as per command inside the record. The
    • process records are buffered when a pause record is received and the
    • buffer is evicted once resume record is received. If no pause record is
    • received earlier, then for each process record the buffer is evicted.
      */

    @Override
    public TriggerResult onElement(Tuple2 element, long timestamp, Window window,

        TriggerContext context) throws Exception {
    if (element.f1.equals(""pause"")) {
        paused = true;
        return TriggerResult.CONTINUE;
    } else if (element.f1.equals(""resume"")) {
        paused = false;
        return TriggerResult.FIRE_AND_PURGE;
    } else if (paused) // paused is a ValueState per keyed stream.
        return TriggerResult.CONTINUE;
    return TriggerResult.FIRE_AND_PURGE;

    }
    "

    2019-07-17 23:16:50
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
《Apache Flink-实时即未来》 立即下载
实时即未来-Apache Flink 年度最佳实践 立即下载
实时即未来-Apache Flink年度最佳实践 立即下载