flink mysql 到 MySQL,哪位大神有样例?

flink mysql 到 MySQL,哪位大神有样例?

展开
收起
wenti 2023-02-13 14:15:02 179 分享 版权
1 条回答
写回答
取消 提交回答
  • 可以参考下面看看是否可以:

    def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        val dataStream: DataStream[SensorReading] = env.addSource(new MyDefSource)
        dataStream.addSink(new MyJdbcSinkFunction())
    
        env.execute()
    }
    
    class MyJdbcSinkFunction extends RichSinkFunction[SensorReading]{
      var connection: Connection =_
      var insertStmt: PreparedStatement=_
      var updateStmt: PreparedStatement=_
      override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
        updateStmt.setDouble(1,value.temperature)
        updateStmt.setString(2,value.id)
        updateStmt.execute()
    
        if(updateStmt.getUpdateCount == 0){
          insertStmt.setString(1,value.id)
          insertStmt.setDouble(2,value.temperature)
          insertStmt.execute()
        }
    
      }
    
      override def open(parameters: Configuration): Unit = {
        connection = DriverManager.getConnection("jdbc:mysql://192.168.91.180:3306/flinkdb?useSSL=false", "root", "123123")
        insertStmt = connection.prepareStatement("insert into sensor_temp(id,temp) value(?,?)")
        updateStmt = connection.prepareStatement("update sensor_temp set temp=? where id=?")
    
      }
    
      override def close(): Unit = {
        insertStmt.close()
        updateStmt.close()
        connection.close()
      }
    
    
    2023-02-17 08:10:46
    赞同 展开评论

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

收录在圈子:
实时计算 Flink 版(Alibaba Cloud Realtime Compute for Apache Flink,Powered by Ververica)是阿里云基于 Apache Flink 构建的企业级、高性能实时大数据处理系统,由 Apache Flink 创始团队官方出品,拥有全球统一商业化品牌,完全兼容开源 Flink API,提供丰富的企业级增值功能。
还有其他疑问?
咨询AI助理