"
我需要我的flink作业以指定的时间间隔从数据库中提取记录并在处理后将其归档。我已经实现了SourceFunction来从数据库中获取所需的记录,并添加了SourceFunction作为StreamExecutionEnvironment的源。如何指定StreamExecutionEnvironment需要每隔10分钟使用SourceFunction从数据库中获取记录?
SourceFunction:
public class MongoDBSourceFunction implements SourceFunction>{
public void cancel() {
// TODO Auto-generated method stub
}
public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
> context) throws Exception {
List<Book> books = getBooks();
context.collect(books);
}
public List<Book> getBooks() {
List<Book> books = new ArrayList<Book>();
//fetch all books from database
return books;
}
}
处理器:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ArchiveJob {
public static void main(String[] args) {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new MongoDBSourceFunction()).print();
}
}"
"你需要将此功能添加到MongoDBSourceFunction自身。例如,您可以ScheduledExecutorService在open方法中实例化a 并使用此执行程序安排读取任务。
请注意,在发出记录时保持检查点锁定很重要。"
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。