"我有一个Flink流应用程序,需要能够在特定的键控流上“暂停”和“取消暂停”处理。“处理”意味着只对流执行一些简单的异常检测。
我们正在思考的流程是这样的:
命令流,或者ProcessCommand,PauseCommand或者ResumeCommand每个都有一个id用于的命令流KeyBy。
ProcessCommands 将检查密钥在处理之前是否暂停,如果没有则缓冲。
PauseCommands 将暂停键的处理。
ResumeCommands 将取消暂停键的处理并刷新缓冲区。
这种流程看起来是否合理,如果是这样,我能否使用像split运营商这样的东西来实现?
示例流,省略了各个记录时间戳:
[{command: process, id: 1}, {command: pause, id: 1}, {command: process, id: 1}, {command: resume, id: 1}, {command: process, id: 1}]
Flow:
=>
{command: process, id: 1} # Sent downstream for analysis
=>
{command: pause, id: 1} # Starts the buffer for id 1
=>
{command: process, id: 1} # Buffered into another output stream
=>
{command: resume, id: 1} # Turns off buffering, flushes [{command: process, id: 1}] downstream
=>
{command: process, id: 1} # Sent downstream for processing as the buffering has been toggled off "
"这可以使用Flink的Window运算符来实现。首先,通过应用操作创建POJO或tuple基于流map。
然后,根据你的需要,你可以keyBy在该流上使用以获得keyedStream。
现在,通过使用基于时间的无限window,a trigger和a的组合window function,你可以实现命令流的切换行为。
基本上,你可以使用windows缓冲区,它在收到暂停记录后保存进程记录,直到收到恢复记录。你可以编写一个自定义触发器,根据你的方案驱逐窗口(缓冲区)。
以下是Trigger具有onElement()重写方法的自定义实现。
/**
@Override
public TriggerResult onElement(Tuple2 element, long timestamp, Window window,
TriggerContext context) throws Exception {
if (element.f1.equals(""pause"")) {
paused = true;
return TriggerResult.CONTINUE;
} else if (element.f1.equals(""resume"")) {
paused = false;
return TriggerResult.FIRE_AND_PURGE;
} else if (paused) // paused is a ValueState per keyed stream.
return TriggerResult.CONTINUE;
return TriggerResult.FIRE_AND_PURGE;
}
"
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。