我正在实现一个SourceFunction,它从数据库中读取数据。如果停止或压碎(即保存点和检查点),数据只能处理一次,则应该能够恢复该作业。
到目前为止我所拥有的:
@SerialVersionUID(1L)
class JDBCSource(private val waitTimeMs: Long) extends
RichParallelSourceFunction[Event] with StoppableFunction with LazyLogging{
@transient var client: PostGreClient = _
@volatile var isRunning: Boolean = true
val DEFAULT_WAIT_TIME_MS = 1000
def this(clientConfig: Serializable) =
this(clientConfig, DEFAULT_WAIT_TIME_MS)
override def stop(): Unit = {
this.isRunning = false
}
override def open(parameters: Configuration): Unit = {
super.open(parameters)
client = new JDBCClient
}
override def run(ctx: SourceFunction.SourceContext[Event]): Unit = {
while (isRunning){
val statement = client.getConnection.createStatement()
val resultSet = statement.executeQuery("SELECT name, timestamp FROM MYTABLE")
while (resultSet.next()) {
val event: String = resultSet.getString("name")
val timestamp: Long = resultSet.getLong("timestamp")
ctx.collectWithTimestamp(new Event(name, timestamp), timestamp)
}
}
}
override def cancel(): Unit = {
isRunning = false
}
}
如何确保只获取尚未处理的数据库行?我假设ctx变量会有一些关于当前水印的信息,以便我可以将我的查询更改为:
select name, timestamp from myTable where timestamp > ctx.getCurrentWaterMark
您必须实施CheckpointedFunction,以便您可以自己管理检查点。界面文档非常全面,但如果您需要一个示例,我建议您查看一个示例。
本质上,您的函数必须实现CheckpointedFunction#snapshotState使用Flink的托管状态来存储您需要的状态,然后在执行恢复时,它将读取相同的状态CheckpointedFunction#initializeState。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。