flink mysql 到 MySQL,哪位大神有样例?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
可以参考下面看看是否可以:
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[SensorReading] = env.addSource(new MyDefSource)
dataStream.addSink(new MyJdbcSinkFunction())
env.execute()
}
class MyJdbcSinkFunction extends RichSinkFunction[SensorReading]{
var connection: Connection =_
var insertStmt: PreparedStatement=_
var updateStmt: PreparedStatement=_
override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
updateStmt.setDouble(1,value.temperature)
updateStmt.setString(2,value.id)
updateStmt.execute()
if(updateStmt.getUpdateCount == 0){
insertStmt.setString(1,value.id)
insertStmt.setDouble(2,value.temperature)
insertStmt.execute()
}
}
override def open(parameters: Configuration): Unit = {
connection = DriverManager.getConnection("jdbc:mysql://192.168.91.180:3306/flinkdb?useSSL=false", "root", "123123")
insertStmt = connection.prepareStatement("insert into sensor_temp(id,temp) value(?,?)")
updateStmt = connection.prepareStatement("update sensor_temp set temp=? where id=?")
}
override def close(): Unit = {
insertStmt.close()
updateStmt.close()
connection.close()
}
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。