开发者社区> 问答> 正文

RichParallelSourceFunction中的水印

我正在实现一个SourceFunction,它从数据库中读取数据。如果停止或压碎(即保存点和检查点),数据只能处理一次,则应该能够恢复该作业。

到目前为止我所拥有的:

@SerialVersionUID(1L)
class JDBCSource(private val waitTimeMs: Long) extends
RichParallelSourceFunction[Event] with StoppableFunction with LazyLogging{

@transient var client: PostGreClient = _
@volatile var isRunning: Boolean = true
val DEFAULT_WAIT_TIME_MS = 1000

def this(clientConfig: Serializable) =
    this(clientConfig, DEFAULT_WAIT_TIME_MS)

override def stop(): Unit = {
    this.isRunning = false
}

override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    client = new JDBCClient
}

override def run(ctx: SourceFunction.SourceContext[Event]): Unit = {

    while (isRunning){
       val statement = client.getConnection.createStatement()
       val resultSet = statement.executeQuery("SELECT name, timestamp FROM MYTABLE")

        while (resultSet.next()) {
            val event: String = resultSet.getString("name")
            val timestamp: Long = resultSet.getLong("timestamp")

            ctx.collectWithTimestamp(new Event(name, timestamp), timestamp)

        }
    }
}

override def cancel(): Unit = {
    isRunning = false
}

}
如何确保只获取尚未处理的数据库行?我假设ctx变量会有一些关于当前水印的信息,以便我可以将我的查询更改为:

select name, timestamp from myTable where timestamp > ctx.getCurrentWaterMark

展开
收起
flink小助手 2018-12-06 17:15:22 3703 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    您必须实施CheckpointedFunction,以便您可以自己管理检查点。界面文档非常全面,但如果您需要一个示例,我建议您查看一个示例。

    本质上,您的函数必须实现CheckpointedFunction#snapshotState使用Flink的托管状态来存储您需要的状态,然后在执行恢复时,它将读取相同的状态CheckpointedFunction#initializeState。

    2019-07-17 23:18:36
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载