RichParallelSourceFunction中的水印-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文

RichParallelSourceFunction中的水印

flink小助手 2018-12-06 17:15:22 1621

我正在实现一个SourceFunction,它从数据库中读取数据。如果停止或压碎(即保存点和检查点),数据只能处理一次,则应该能够恢复该作业。

到目前为止我所拥有的:

@SerialVersionUID(1L)
class JDBCSource(private val waitTimeMs: Long) extends
RichParallelSourceFunction[Event] with StoppableFunction with LazyLogging{

@transient var client: PostGreClient = _
@volatile var isRunning: Boolean = true
val DEFAULT_WAIT_TIME_MS = 1000

def this(clientConfig: Serializable) =
    this(clientConfig, DEFAULT_WAIT_TIME_MS)

override def stop(): Unit = {
    this.isRunning = false
}

override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    client = new JDBCClient
}

override def run(ctx: SourceFunction.SourceContext[Event]): Unit = {

    while (isRunning){
       val statement = client.getConnection.createStatement()
       val resultSet = statement.executeQuery("SELECT name, timestamp FROM MYTABLE")

        while (resultSet.next()) {
            val event: String = resultSet.getString("name")
            val timestamp: Long = resultSet.getLong("timestamp")

            ctx.collectWithTimestamp(new Event(name, timestamp), timestamp)

        }
    }
}

override def cancel(): Unit = {
    isRunning = false
}

}
如何确保只获取尚未处理的数据库行?我假设ctx变量会有一些关于当前水印的信息,以便我可以将我的查询更改为:

select name, timestamp from myTable where timestamp > ctx.getCurrentWaterMark

数据库 数据安全/隐私保护
分享到
取消 提交回答
全部回答(1)
  • flink小助手
    2019-07-17 23:18:36

    您必须实施CheckpointedFunction,以便您可以自己管理检查点。界面文档非常全面,但如果您需要一个示例,我建议您查看一个示例。

    本质上,您的函数必须实现CheckpointedFunction#snapshotState使用Flink的托管状态来存储您需要的状态,然后在执行恢复时,它将读取相同的状态CheckpointedFunction#initializeState。

    0 0
数据库
使用钉钉扫一扫加入圈子
+ 订阅

分享数据库前沿,解构实战干货,推动数据库技术变革

推荐文章
相似问题
推荐课程