Apache Flink:如何使用SourceFunction以指定的时间间隔执行任务?-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文

Apache Flink:如何使用SourceFunction以指定的时间间隔执行任务?

flink小助手 2018-11-28 16:33:00 3381

"
我需要我的flink作业以指定的时间间隔从数据库中提取记录并在处理后将其归档。我已经实现了SourceFunction来从数据库中获取所需的记录,并添加了SourceFunction作为StreamExecutionEnvironment的源。如何指定StreamExecutionEnvironment需要每隔10分钟使用SourceFunction从数据库中获取记录?

SourceFunction:

public class MongoDBSourceFunction implements SourceFunction>{

public void cancel() {
    // TODO Auto-generated method stub
}

public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext

> context) throws Exception {

    List<Book> books = getBooks();

    context.collect(books);

}

public List<Book> getBooks() {
    List<Book> books = new ArrayList<Book>();

    //fetch all books from database     
    return books;
}

}
处理器:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ArchiveJob {

public static void main(String[] args) {

    final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

    env.addSource(new MongoDBSourceFunction()).print();
}

}"

Apache 数据库 流计算
分享到
取消 提交回答
全部回答(1)
  • flink小助手
    2019-07-17 23:16:52

    "你需要将此功能添加到MongoDBSourceFunction自身。例如,您可以ScheduledExecutorService在open方法中实例化a 并使用此执行程序安排读取任务。

    请注意,在发出记录时保持检查点锁定很重要。"

    0 0
数据库
使用钉钉扫一扫加入圈子
+ 订阅

分享数据库前沿,解构实战干货,推动数据库技术变革

推荐文章
相似问题