Flink CDC实践(二)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: Flink CDC实践(二)

flink CDC同步mysql2Doris 之scheam变更

flink CDC DataStream双流join


 

flink cdc 同步doris,上游schema变更


 

背景:

在实际生产环境表的scheama 信息经常有可能会被修改,而且整库同步时,会随着业务的增长,mysql 的压力也会越来越大。重启任务和集群,重新消费也明显是不合理的,因此在做flink cdc 时需要兼顾并解决scheama变更、增删表的问题。


解决方案:

方案1: flink cdc读取时开启schemaChange,并且doris建表时也开启light scheam change。并且维护一张配置表,以进行表的动态删减。

## flink cdc参数
.scanNewlyAddedTableEnabled(true) // enable scan the newly added tables feature
.includeSchemaChanges(true)
## doris建表时指定
light_schema_chage=true
  • 程序读取mysql中获取需要同步的表,以字段 member_id,table 字段存储doris中表A。
  • 脚本读取doris 表A数据,获取mysql中的schema,通过转换,获取doris建表语句,连接doris执行语句
  • cancel flink 任务,并重新启动flink任务【重启只适合添加新库,新表不用重启】

  • 每次重启连接doris 表A,获取database,组装 databaseList,tableList,tablseList 使用正则,database1.*,database2.*,对库内所有表进行监听,这样可以达到mysql添加新表时将新表加入同步队列
  • doris目前已经支持scheama变更了,只不过cdc ddl变更获取需要自行实现,然后通过jdbc的方式连接doris去执行ddlsql,因为sql有点差异,需要转换才能执行,结合mysql新表,可以在ddl获取create 对doris进项建表
  • 在将数据导入之doris时,速度导入过快都会出现导入失败,-235错误,可以使用控制读取binlog数量+window聚合 去批量导入
  •    如需要导入表B的数据有{"id":1,"name":"小明"},{"id":2,"name":"小红"},如果执行两次put显然时不太合理的,可以使用jsonArr的方式[{"id":1,"name":"小明"},{"id":2,"name":"小红"}]一次导入。


FlinkCDCMysql2Doris.scala

import org.apache.flink.streaming.api.TimeCharacteristic
import com.zbkj.util.{DorisStreamLoad, FlinkCDCSyncETL, KafkaUtil, PropertiesManager, PropertiesUtil, SinkDoris, SinkSchema}
import com.ververica.cdc.connectors.mysql.source.MySqlSource
import com.ververica.cdc.connectors.mysql.table.StartupOptions
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema
import com.zbkj.util.KafkaUtil.proper
import net.sf.json.JSONObject
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSource, DataStreamUtils}
import org.apache.flink.streaming.api.environment.{CheckpointConfig, StreamExecutionEnvironment}
import org.slf4j.{Logger, LoggerFactory}
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.kafka.connect.json.JsonConverterConfig
import java.util.Properties
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters.asScalaIteratorConverter
object FlinkCDCMysql2Doris {
  PropertiesManager.initUtil()
  val props: PropertiesUtil = PropertiesManager.getUtil
  val log: Logger = LoggerFactory.getLogger(this.getClass)
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 并行度
    env.setParallelism(props.parallelism)
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    /**
     * checkpoint的相关设置
     */
    // 启用检查点,指定触发checkpoint的时间间隔(单位:毫秒,默认500毫秒),默认情况是不开启的
    env.enableCheckpointing(10000L, CheckpointingMode.EXACTLY_ONCE)
    // 设定Checkpoint超时时间,默认为10分钟
    env.getCheckpointConfig.setCheckpointTimeout(600000)
    /** 设定两个Checkpoint之间的最小时间间隔,防止出现例如状态数据过大而导致Checkpoint执行时间过长,从而导致Checkpoint积压过多
     * 最终Flink应用密切触发Checkpoint操作,会占用了大量计算资源而影响到整个应用的性能(单位:毫秒) */
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(60000)
    // 默认情况下,只有一个检查点可以运行
    // 根据用户指定的数量可以同时触发多个Checkpoint,进而提升Checkpoint整体的效率
    //    env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)
    /** 外部检查点
     * 不会在任务正常停止的过程中清理掉检查点数据,而是会一直保存在外部系统介质中,另外也可以通过从外部检查点中对任务进行恢复 */
    env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    /** 如果有更近的保存点时,是否将作业回退到该检查点 */
    env.getCheckpointConfig.setPreferCheckpointForRecovery(true)
    // 设置可以允许的checkpoint失败数
    env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3)
    //设置可容忍的检查点失败数,默认值为0表示不允许容忍任何检查点失败
    env.getCheckpointConfig.setTolerableCheckpointFailureNumber(2)
    /**
     * 重启策略的配置
     */
    // 重启3次,每次失败后等待10000毫秒
    //    env.setRestartStrategy(RestartStrategies.failureRateRestart(5, Time.of(3, TimeUnit.MINUTES), Time.of(30, TimeUnit.SECONDS)))
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L))
    /**
     * 获取同步表配置
     * database table
     */
    val inputMysql = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
      .setDrivername("com.mysql.jdbc.Driver")
      .setDBUrl("jdbc:mysql://%s:%d/%s".format(props.doris_host, props.doris_port, props.sync_config_db))
      .setUsername(props.doris_user)
      .setPassword(props.doris_password)
      .setQuery("select member_id,sync_table from %s.%s".format(props.sync_config_db, props.sync_config_table))
      .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO))
      .finish()).uid("inputMysql")
    val databaseName: DataStream[String] = inputMysql.map(line => line.getField(0).toString).uid("databaseName")
    // 模糊监听
    val tableName: DataStream[String] = inputMysql.map(line => line.getField(0).toString + ".*").uid("tableName")
    val producer = KafkaUtil.getProducer
    val databaseIter = databaseName.executeAndCollect().asScala
    val databaseList = databaseIter.toSet.mkString(",")
    val tableIter = tableName.executeAndCollect().asScala
    val tableList = tableIter.toSet.mkString(",")
    println("databaseList:", databaseList)
    println("tableList:", tableList)
    val customConverterConfigs = new java.util.HashMap[String, Object] {
      put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric")
    }
    /**
     * mysql source for doris
     */
    val mySqlSource = MySqlSource.builder[String]()
      .hostname(props.rds_host)
      .port(props.rds_port)
      .databaseList(databaseList)
      .tableList(tableList)
      .username(props.rds_user)
      .password(props.rds_password)
      .serverId("11110")
      .splitSize(props.split_size)
      .fetchSize(props.fetch_size)
      //       .startupOptions(StartupOptions.latest())
      // 全量读取
      .startupOptions(StartupOptions.initial())
      .includeSchemaChanges(true)
      // 发现新表,加入同步任务,需要在tableList中配置
      .scanNewlyAddedTableEnabled(true)
      .deserializer(new JsonDebeziumDeserializationSchema(false, customConverterConfigs)).build()
    val dataStreamSource: DataStreamSource[String] = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
    val ddlSqlStream: DataStream[String] = dataStreamSource.filter(line => line.contains("historyRecord") && !line.contains("CHANGE COLUMN")).uid("ddlSqlStream")
    val dmlStream: DataStream[String] = dataStreamSource.filter(line => !line.contains("historyRecord") && !line.contains("CHANGE COLUMN")).uid("dmlStream")
    val ddlDataStream = FlinkCDCSyncETL.ddlFormat(ddlSqlStream)
    val dmlDataStream = FlinkCDCSyncETL.binLogETL(dmlStream) 
    //producer 为了在数据同步后通知分析任务
    val dorisStreamLoad = new DorisStreamLoad(props, producer)
    ddlDataStream.addSink(new SinkSchema(props)).name("ALTER TABLE TO DORIS").uid("SinkSchema")
    dmlDataStream.addSink(new SinkDoris(dorisStreamLoad)).name("Data TO DORIS").uid("SinkDoris")
    env.execute("Flink CDC Mysql To Doris With Initial")
  }
  case class dataLine(merge_type: String, db: String, table: String, data: String)
}

FlinkCDCBinLogETL.scala

import net.sf.json.JSONObject
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.api.java.tuple.Tuple4
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.windowing.time.Time
import scala.collection.mutable.ArrayBuffer
import scala.util.matching.Regex
object FlinkCDCSyncETL {
  def binLogETL(dataStreamSource: DataStream[String]): DataStream[org.apache.flink.api.java.tuple.Tuple4[String, String, String, String]] = {
    /**
     * 根据不同日志类型 匹配load doris方式
     */
    val tupleData: DataStream[org.apache.flink.api.java.tuple.Tuple4[String, String, String, String]] = dataStreamSource.map(line => {
      var data: JSONObject = null
      var mergetype = "APPEND"
      val lineObj = JSONObject.fromObject(line)
      val source = lineObj.getJSONObject("source")
      val db = source.getString("db")
      val table = source.getString("table")
      if ("d" == lineObj.getString("op")) {
        val oo = lineObj.getJSONObject("before")
        data = lineObj.getJSONObject("before")
        mergetype = "DELETE"
      } else if ("u" == lineObj.getString("op")) {
        data = lineObj.getJSONObject("after")
        mergetype = "MERGE"
      } else if ("c" == lineObj.getString("op")) {
        data = lineObj.getJSONObject("after")
      } else if ("r" == lineObj.getString("op")) {
        data = lineObj.getJSONObject("after")
        mergetype = "APPEND"
      }
      new Tuple4[String, String, String, String](mergetype, db, table, data.toString)
    }).returns(TypeInformation.of(new TypeHint[Tuple4[String, String, String, String]] {}))
    tupleData
    /**
     * 窗口聚合数据,将相同load方式,db,table的json 数据组合为长字符串,
     */
    val byKeyData: DataStream[org.apache.flink.api.java.tuple.Tuple4[String, String, String, String]] = tupleData.keyBy(0, 1, 2)
      .timeWindow(Time.milliseconds(1000))
      .reduce((itemFirst, itemSecond) => new Tuple4(itemFirst.f0, itemFirst.f1, itemFirst.f2, itemFirst.f3 + "=-=-=" + itemSecond.f3))
    byKeyData
  }
  def ddlFormat(ddlDataStream: DataStream[String]): DataStream[String] = {
    val ddlStrDataStream: DataStream[String] = ddlDataStream.map(line => {
      try {
        val lineObj = JSONObject.fromObject(line)
        val historyRecord = JSONObject.fromObject(lineObj.getString("historyRecord"))
        val tableChangesArray = historyRecord.getJSONArray("tableChanges")
        val tableChanges = JSONObject.fromObject(tableChangesArray.getJSONObject(0))
        val tableChangeType = tableChanges.getString("type")
        var ddlSql = ""
        val table = tableChanges.optJSONObject("table")
        val primaryKeyColumnNames = table.getString("primaryKeyColumnNames").replace("[", "").replace("]", "").replace("\"", "")
        val columnsArray = table.getJSONArray("columns")
        // 建表转换
        if (tableChangeType == "CREATE") {
          val tableName = tableChanges.getString("id").replace("\"", "")
          val columnsArrayBuffer = ArrayBuffer[String]()
          columnsArray.forEach(line => {
            val columnJson = JSONObject.fromObject(line)
            val name = columnJson.getString("name")
            val typeName = columnJson.getString("typeName")
            val length = columnJson.optInt("length", 1)
            val scale = columnJson.optInt("scale", 2)
            val lastColumnType = matchColumnType(typeName, length, scale)
            val lastColumn = s"$name $lastColumnType"
            columnsArrayBuffer.+=(lastColumn)
          })
          // 对列重新排序,主键依次放在最前面,避免错误Key columns should be a ordered prefix of the scheme
          val keys = primaryKeyColumnNames.split(",")
          for (indexOfCol <- 0 until keys.length) {
            val col = keys(indexOfCol)
            var columnFormat = ""
            columnsArrayBuffer.foreach(column => {
              if (column.startsWith(col)) {
                columnFormat = column
              }
            })
            val index = columnsArrayBuffer.indexOf(columnFormat)
            columnsArrayBuffer.remove(index)
            columnsArrayBuffer.insert(indexOfCol, columnFormat)
          }
          val header = s"CREATE TABLE IF NOT EXISTS $tableName ("
          val end = s""") UNIQUE KEY($primaryKeyColumnNames)  DISTRIBUTED BY HASH($primaryKeyColumnNames) BUCKETS 10  PROPERTIES ("replication_allocation" = "tag.location.default: 1")"""
          ddlSql = header + columnsArrayBuffer.mkString(",") + end
        } else if (tableChangeType == "ALTER") {
          var ddl = historyRecord.getString("ddl").replace("\r\n", " ")
          println(ddl)
          if (ddl.startsWith("RENAME")) {
            ddl = ddl.replace("`", "")
            val arr = ddl.split("")
            ddlSql = s"ALTER TABLE ${arr(2)} RENAME ${arr(4)}"
          } else if (ddl.contains("DROP COLUMN")) {
            ddlSql = ddl
          } else {
            val dbTableName = tableChanges.getString("id").replace("\"", "")
            val addColName = ddl.split(" ")(5).replace("`", "")
            var colTpe = ""
            columnsArray.forEach(line => {
              val columnJson = JSONObject.fromObject(line)
              if (columnJson.getString("name") == addColName) {
                val typeName = columnJson.getString("typeName")
                val length = columnJson.optInt("length", 1)
                val scale = columnJson.optInt("scale", 2)
                colTpe = matchColumnType(typeName, length, scale)
              }
            })
            if (ddl.contains("ADD COLUMN")) {
              ddlSql = s"ALTER TABLE $dbTableName ADD COLUMN $addColName $colTpe"
            } else {
              ddlSql = s"ALTER TABLE $dbTableName MODIFY COLUMN $addColName $colTpe"
            }
          }
        }
        println(ddlSql)
        ddlSql
      }
      catch {
        case ex: Exception => println(ex)
          "select 1"
      }
    })
    ddlStrDataStream
  }
  def showCapital(x: Option[String]): String = x match {
    case Some(s) => s
    case None => "?"
  }
  def matchColumnType(columnType: String, length: Int, scale: Int): String = {
    var returnColumnType = "VARCHAR(255)"
    columnType match {
      case "INT UNSIGNED" => returnColumnType = s"INT($length)"
      case "INT" => returnColumnType = s"INT($length)"
      case "TINYINT" => returnColumnType = s"TINYINT($length)"
      case "VARCHAR" => returnColumnType = s"VARCHAR(${length * 3})"
      case "BIGINT" => returnColumnType = s"BIGINT(${length})"
      case "TINYTEXT" => returnColumnType = s"TINYTEXT"
      case "LONGTEXT" => returnColumnType = s"STRING"
      case "TEXT" => returnColumnType = s"STRING"
      case "DECIMAL" => returnColumnType = s"DECIMAL($length,$scale)"
      case "VARBINARY" => returnColumnType = s"STRING"
      case "TIMESTAMP" => returnColumnType = s"STRING"
      case "ENUM" => returnColumnType = s"TINYINT"
      case "MEDIUMINT" => returnColumnType = s"INT"
      case "SMALLINT" => returnColumnType = s"SMALLINT"
      case "MEDIUMTEXT" => returnColumnType = s"STRING"
      case _ => returnColumnType = s"STRING"
    }
    returnColumnType
  }
}

DorisStreamLoad.scala

import net.sf.json.JSONObject
import net.sf.json.JSONArray
import org.apache.http.HttpHeaders
import org.apache.http.client.methods.HttpPut
import org.apache.http.entity.StringEntity
import org.apache.http.entity.BufferedHttpEntity
import org.apache.http.impl.client.{DefaultRedirectStrategy, HttpClientBuilder, HttpClients}
import org.apache.http.util.EntityUtils
import org.slf4j.{Logger, LoggerFactory}
import org.apache.commons.codec.binary.Base64
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import java.io.IOException
import java.nio.charset.StandardCharsets
import java.util.UUID
class DorisStreamLoad(props: PropertiesUtil,producer:KafkaProducer[String, String]) extends Serializable {
  lazy val httpClientBuilder: HttpClientBuilder = HttpClients.custom.setRedirectStrategy(new DefaultRedirectStrategy() {
    override protected def isRedirectable(method: String): Boolean = {
      // If the connection target is FE, you need to deal with 307 redirect。
      true
    }
  })
  def loadJson(jsonData: String, mergeType: String, db: String, table: String): Unit = try {
    val loadUrlPattern = "http://%s/api/%s/%s/_stream_load?"
    val arr = jsonData.split("=-=-=")
    val jsonArray = new JSONArray()
    for (line <- arr) {
      try {
        val js = JSONObject.fromObject(line)
        jsonArray.add(js)
      } catch {
        case e: Exception =>
          println(e)
          println(line)
      }
    }
    val jsonArrayStr = jsonArray.toString()
    val client = httpClientBuilder.build
    val loadUrlStr = String.format(loadUrlPattern, props.doris_load_host, db, table)
    try {
      val put = new HttpPut(loadUrlStr)
      put.removeHeaders(HttpHeaders.CONTENT_LENGTH)
      put.removeHeaders(HttpHeaders.TRANSFER_ENCODING)
      put.setHeader(HttpHeaders.EXPECT, "100-continue")
      put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader)
      val label = UUID.randomUUID.toString
      // You can set stream load related properties in the Header, here we set label and column_separator.
      put.setHeader("label", label)
      put.setHeader("merge_type", mergeType)
      put.setHeader("two_phase_commit", "true")
      put.setHeader("column_separator", ",")
      put.setHeader("format", "json")
      put.setHeader("strip_outer_array", "true")
      put.setHeader("exec_mem_limit", "6442450944")
      val entity = new StringEntity(jsonArrayStr, "UTF-8")
      put.setEntity(entity)
      try {
        val response = client.execute(put)
        try {
          var loadResult = ""
          if (response.getEntity != null) {
            loadResult = EntityUtils.toString(response.getEntity)
          }
          val statusCode = response.getStatusLine.getStatusCode
          if (statusCode != 200) {
            throw new IOException("Stream load failed. status: %s load result: %s".format(statusCode, loadResult))
          }
        } finally if (response != null) {
          response.close()
        }
      }
    }
    finally
      if (client != null) client.close()
  }
  /**
   * Construct authentication information, the authentication method used by doris here is Basic Auth
   *
   */
  def basicAuthHeader: String = {
    val tobeEncode = props.doris_user + ":" + props.doris_password
    val encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8))
    "Basic " + new String(encoded)
  }
}

SinkSchema.scala

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import java.sql.{Connection, DriverManager, PreparedStatement}
class SinkSchema(props:PropertiesUtil) extends RichSinkFunction[String]  {
  var conn: Connection = _
  var ps : PreparedStatement  = _
  var mysqlPool: MysqlPool = _
  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    mysqlPool = MysqlManager.getMysqlPool
    conn = mysqlPool.getConnection
       conn.setAutoCommit(false)
  }
  override def close(): Unit = {
    super.close()
    if (conn != null) {
      conn.close()
    }
    if (ps != null) {
      ps.close()
    }
  }
  override def invoke(sql: String, context: SinkFunction.Context): Unit = {
    super.invoke(sql, context)
    if (sql !="" && sql.nonEmpty){
      ps = conn.prepareStatement(sql)
      try {
        ps.execute()
      }catch {
        case ex:Exception=>println(ex)
      }
      conn.commit()
    }
  }
}

SinkDoris.scala

import org.apache.flink.api.java.tuple.Tuple4
import com.zbkj.mysql2doris.FlinkCDCMysql2Doris.dataLine
import net.sf.json.JSONObject
import org.apache.flink.streaming.api.functions.sink.SinkFunction
class SinkDoris(dorisStreamLoad:DorisStreamLoad) extends SinkFunction[Tuple4[String, String, String, String]]  {
  /**
   * 每个元素的插入都要调用一次invoke()方法进行插入操作
   */
  override def invoke(value:Tuple4[String, String, String, String]): Unit = {
    dorisStreamLoad.loadJson(value.f3,value.f0,value.f1,value.f2)
    val producer = KafkaUtil.getProducer
    val json = new JSONObject()
    json.put("db",value.f2)
    json.put("table",value.f3)
    KafkaUtil.sendKafkaMsg(producer, json.toString, "sync_table")
  }
}

PropertiesUtil.scala

import java.io.FileInputStream
import java.util.Properties
/**
 * propertiesUtil
 *
 */
class PropertiesUtil extends Serializable {
  private val props = new Properties()
  var doris_host = ""
  var doris_port = 0
  var doris_user = ""
  var doris_password = ""
  var database_list = ""
  var table_list = ""
  var rds_host = ""
  var rds_port = 0
  var rds_user = ""
  var rds_password = ""
  var rds_database = ""
  var sync_config_db = ""
  var sync_config_table = ""
  var address_table = ""
  var parallelism = 0
  var split_size = 0
  var fetch_size = 0
  var bootstrap_servers = ""
  var topic = ""
  var group_id = ""
  var offset_mode = ""
  def init(filePath: String): Unit = {
    props.load(new FileInputStream(filePath))
    // hdfs
    doris_host = props.getProperty("doris.host")
    doris_port = props.getProperty("doris.port").toInt
    doris_user = props.getProperty("doris.user")
    doris_password = props.getProperty("doris.password")
    database_list = props.getProperty("database.list")
    table_list = props.getProperty("table.list")
    doris_load_host = props.getProperty("doris.load.host")
    rds_host = props.getProperty("rds.host")
    rds_port = props.getProperty("rds.port").toInt
    rds_user = props.getProperty("rds.user")
    rds_password = props.getProperty("rds.password")
    rds_database = props.getProperty("rds.database")
    sync_config_db = props.getProperty("sync.config.db")
    sync_config_table = props.getProperty("sync.config.table")
    address_table = props.getProperty("address.table")
    parallelism = props.getProperty("parallelism").toInt
    split_size = props.getProperty("split.size").toInt
    fetch_size = props.getProperty("fetch.size").toInt
    bootstrap_servers = props.getProperty("bootstrap.servers")
    topic = props.getProperty("topic")
    group_id = props.getProperty("group.id")
    offset_mode = props.getProperty("offset.mode")
  }
  def stringToInt(prop: String): Int = {
    try {
      prop.toInt
    } catch {
      case ex: Exception => {
        0
      }
    }
  }
}
//惰性单例,真正计算时才初始化对象
object PropertiesManager {
  @volatile private var propertiesUtil: PropertiesUtil = _
  def getUtil: PropertiesUtil = {
    propertiesUtil
  }
  def initUtil(): Unit = {
    var filePath = "config.properties"
//        filePath = this.getClass.getResource("/").toString.replace("file:", "") + "config.properties"
    filePath = "/opt/flink-1.13.6/job/mysql2doris/config.properties"
    if (propertiesUtil == null) {
      propertiesUtil = new PropertiesUtil
    }
    propertiesUtil.init(filePath)
  }
}

方案2: 自己定义序列化方式,进行scheama解析和转化

1: 实现自己的 DebeziumDeserializationSchema, 需要实现 deserialize、getProducedType 两个函数。deserialize 实现转换数据的逻辑,getProducedType 定义返回的类型,这里返回两个参数,第一个Boolean 类型的参数表示数据是 upsert 或是 delete,第二个参数返回转换后的 JSON string,这里的 JSON 将会包含 Schema 变更后的 Column 与对应的 Value。

2:如果启动时设置的 .serverTimeZone("Asia/Shanghai") 并没有生效,查源码可以发现,底层的 Debezium 并没有实现 serverTimeZone 的配置,相应的转换是在 RowDataDebeziumDeserializeSchema 内实现的。

interface DeserializationRuntimeConverter extends Serializable {
    Object convert(Object dbzObj, Schema schema);
}
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.utils.TemporalConversions;
import io.debezium.data.Envelope;
import io.debezium.time.Date;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.NanoTimestamp;
import io.debezium.time.Timestamp;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.stream.Collectors;
public class JsonStringDebeziumDeserializationSchema implements DebeziumDeserializationSchema {
    int zoneOffset;
    @Override
    public void deserialize(SourceRecord record, Collector out) throws Exception {
        Envelope.Operation op = Envelope.operationFor(record);
        Struct value = (Struct) record.value();
        Schema valueSchema = record.valueSchema();
        if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
            String insert = extractAfterRow(value, valueSchema);
            out.collect(new Tuple2<>(true, insert));
        } else if (op == Envelope.Operation.DELETE) {
            String delete = extractBeforeRow(value, valueSchema);
            out.collect(new Tuple2<>(false, delete));
        }else {
            String after = extractAfterRow(value, valueSchema);
            out.collect(new Tuple2<>(true, after));
        }
    }
    public JsonStringDebeziumDeserializationSchema() {
        //实现一个用于转换时间的Converter
        this.runtimeConverter = (dbzObj, schema) -> {
            if (schema.name() != null) {
                switch (schema.name()) {
                    case Timestamp.SCHEMA_NAME:
                        return TimestampData.fromEpochMillis((Long) dbzObj).toLocalDateTime().atOffset(ZoneOffset.ofHours(zoneOffset)).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
                    case MicroTimestamp.SCHEMA_NAME:
                        long micro = (long) dbzObj;
                        return TimestampData.fromEpochMillis(micro / 1000, (int) (micro % 1000 * 1000)).toLocalDateTime().atOffset(ZoneOffset.ofHours(zoneOffset)).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
                    case NanoTimestamp.SCHEMA_NAME:
                        long nano = (long) dbzObj;
                        return TimestampData.fromEpochMillis(nano / 1000_000, (int) (nano % 1000_000)).toLocalDateTime().atOffset(ZoneOffset.ofHours(zoneOffset)).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
                    case Date.SCHEMA_NAME:
                        return TemporalConversions.toLocalDate(dbzObj).format(DateTimeFormatter.ISO_LOCAL_DATE);
                }
            }
            return dbzObj;
        };
    }
    private final DeserializationRuntimeConverter runtimeConverter;
    private Map<String,Object> getRowMap(Struct after){
        //转换时使用对应的转换器
        return after.schema().fields().stream().collect(Collectors.toMap(Field::name, f->after.get(f)));
    }
    private String extractAfterRow(Struct value, Schema valueSchema) throws Exception {
        Struct after = value.getStruct(Envelope.FieldName.AFTER);
        Map<String,Object> rowMap = getRowMap(after);
        ObjectMapper objectMapper = new ObjectMapper();
        return objectMapper.writeValueAsString(rowMap);
    }
    private String extractBeforeRow(Struct value, Schema valueSchema) throws Exception {
        Struct after = value.getStruct(Envelope.FieldName.BEFORE);
        Map<String,Object> rowMap = getRowMap(after);
        ObjectMapper objectMapper = new ObjectMapper();
        return objectMapper.writeValueAsString(rowMap);
    }
    @Override
    public TypeInformation getProducedType() {
        return TypeInformation.of(new TypeHint<Tuple2<Boolean,String>>(){});
    }
}
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class MySQLCDC {
    public static void main(String[] args) throws Exception {
        ParameterTool params = ParameterTool.fromArgs(args);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 关闭 Operator Chaining, 令运行图更容易初学者理解
        env.disableOperatorChaining();
        env.setParallelism(1);
        //checkpoint的一些配置
        env.enableCheckpointing(params.getInt("checkpointInterval",60000));
        env.getCheckpointConfig().setCheckpointTimeout(5000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        MySqlSource source = MySqlSource.<String>builder()
                .hostname(params.get("hostname", "127.0.0.1"))
                .port(params.getInt("port", 3306))
                .username(params.get("username", "root"))
                .password(params.get("password", ""))
                .serverTimeZone("Asia/Shanghai")
                //设置我们自己的实现
                .deserializer(new JsonStringDebeziumDeserializationSchema())
                .databaseList(params.get("databaseList", "test"))
                .tableList(params.get("tableList", "test.my_test"))
                .build();
        // enable checkpoint
        env.enableCheckpointing(3000);
        // 定义数据源
        env
                .fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source")
                // set 4 parallel source tasks
                .setParallelism(4)
                .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
        env.execute(MySQLCDC.class.getSimpleName());
    }
}


 


Flink CDC双流join



flink CDC常见的三种join场景:

MySQL 数据准备
# 创建数据库 flinkcdc_etl_test
create database flinkcdc_etl_test;
# 使用数据库 flinkcdc_etl_test
use flinkcdc_etl_test;
# 创建教师表
-- ----------------------------
-- Table structure for teacher
-- ----------------------------
DROP TABLE IF EXISTS `teacher`;
CREATE TABLE `teacher`  (
  `t_id` varchar(3) NOT NULL COMMENT '主键',
  `t_name` varchar(10)  NOT NULL COMMENT '教师名称',
  PRIMARY KEY (`t_id`) USING BTREE
)COMMENT = '教师表';
-- ----------------------------
-- Records of teacher
-- ----------------------------
INSERT INTO `teacher` VALUES ('001', '张三');
INSERT INTO `teacher` VALUES ('002', '李四');
INSERT INTO `teacher` VALUES ('003', '王五');
-- ----------------------------
-- Table structure for course
-- ----------------------------
DROP TABLE IF EXISTS `course`;
CREATE TABLE `course`  (
  `c_id` varchar(3)  NOT NULL COMMENT '主键',
  `c_name` varchar(20)  NOT NULL COMMENT '课程名称',
  `c_tid` varchar(3)  NOT NULL COMMENT '教师表主键',
  PRIMARY KEY (`c_id`) USING BTREE,
  INDEX `c_tid`(`c_tid`) USING BTREE
)COMMENT = '课程表';
-- ----------------------------
-- Records of course
-- ----------------------------
INSERT INTO `course` VALUES ('1', '语文', '001');
INSERT INTO `course` VALUES ('2', '数学', '002');

代码实现:
1:基础类

/**
 * CDC 中op类型
 *
 */
public enum OpEnum {
    /**
     * 新增
     */
    CREATE("c", "create", "新增"),
    /**
     * 修改
     */
    UPDATA("u", "update", "更新"),
    /**
     * 删除
     */
    DELETE("d", "delete", "删除"),
    /**
     * 读
     */
    READ("r", "read", "首次读");
    /**
     * 字典码
     */
    private String dictCode;
    /**
     * 字典码翻译值
     */
    private String dictValue;
    /**
     * 字典码描述
     */
    private String description;
    OpEnum(String dictCode, String dictValue, String description) {
        this.dictCode = dictCode;
        this.dictValue = dictValue;
        this.description = description;
    }
    public String getDictCode() {
        return dictCode;
    }
    public String getDictValue() {
        return dictValue;
    }
    public String getDescription() {
        return description;
    }
}

2: 工具类

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Duration;
/**
 * 转换工具类
 *
 * @author hy
 * @version 1.0
 * @date 2022/5/6 16:25
 */
public class TransformUtil {
    /**
     * 格式化抽取数据格式
     * 去除before、after、source等冗余内容
     *
     * @param extractData 抽取的数据
     * @return
     */
    public static JSONObject formatResult(String extractData) {
        JSONObject formatDataObj = new JSONObject();
        JSONObject rawDataObj = JSONObject.parseObject(extractData);
        formatDataObj.putAll(rawDataObj);
        formatDataObj.remove("before");
        formatDataObj.remove("after");
        formatDataObj.remove("source");
        String op = rawDataObj.getString("op");
        if (OpEnum.DELETE.getDictCode().equals(op)) {
            // 新增取 before结构体数据
            formatDataObj.putAll(rawDataObj.getJSONObject("before"));
        } else {
            // 其余取 after结构体数据
            formatDataObj.putAll(rawDataObj.getJSONObject("after"));
        }
        return formatDataObj;
    }
    static MySqlSource<String> getStringMySqlSource(String dbNasme, String tableName) {
        MySqlSource<String> teacherSouce = MySqlSource.<String>builder()
                .hostname("192.168.18.101")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList(dbNasme)
                .tableList(dbNasme+"."+tableName)
                .startupOptions(StartupOptions.initial())
                .deserializer(new JsonDebeziumDeserializationSchema())
                .serverTimeZone("Asia/Shanghai")
                .build();
        return teacherSouce;
    }
    static DataStreamSource<String> getStringDataStreamSource(MySqlSource<String> dataStream, StreamExecutionEnvironment env) {
        DataStreamSource<String> mysqlDataStreamSource = env.fromSource(
                dataStream,
                WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(1L)).withTimestampAssigner(
                        new SerializableTimestampAssigner<String>() {
                            @Override
                            public long extractTimestamp(String extractData, long l) {
                                return JSONObject.parseObject(extractData).getLong("ts_ms");
                            }
                        }
                ),
                "DataStreamWithWatermark Source"
        );
        return mysqlDataStreamSource;
    }
}

3: 基于事件时间的窗口内关联

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import static org.apache.doris.flink.test.TransformUtil.*;
/**
 * 基于 事件时间 的 window inner join 把教师表和课程表进行联结
 * <p>
 * 只有两者数据流关联到数据,才会进行打印
 *
 */
public class WindowInnerJoinByEventTimeTest {
    public static void main(String[] args) throws Exception {
        // 1.创建Flink-MySQL-CDC的Source
        MySqlSource<String> teacherSouce = getStringMySqlSource("flinkcdc_etl_test","teacher");
        MySqlSource<String> courseSouce = getStringMySqlSource("flinkcdc_etl_test","course");
        // 2.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 3.提取waterMark
        DataStreamSource<String> teacherDataStreamSource = getStringDataStreamSource(teacherSouce, env);
        DataStreamSource<String> courseDataStreamSource = getStringDataStreamSource(courseSouce, env);
        // 4.转换为指定格式
        DataStream<JSONObject> teacherDataStream = teacherDataStreamSource.map(rawData -> formatResult(rawData));
        DataStream<JSONObject> courseDataStream = courseDataStreamSource.map(rawData -> formatResult(rawData));
        // 5.窗口联结(教师流和课程表)打印输出
        windowInnerJoinAndPrint(teacherDataStream, courseDataStream);
        // 6.执行任务
        env.execute("WindowInnerJoinByEventTimeTest Job");
    }
    /**
     * 窗口联结并打印输出
     * 只支持 inner join,即窗口内联关联到的才会下发,关联不到的则直接丢掉。
     * 如果想实现Window上的 outer join,需要使用coGroup算子
     * @param teacherDataStream 教师数据流
     * @param courseDataStream  课程数据流
     */
    private static void windowInnerJoinAndPrint(DataStream<JSONObject> teacherDataStream,
                                                DataStream<JSONObject> courseDataStream) {
        DataStream<JSONObject> teacherCourseDataStream = teacherDataStream
                .join(courseDataStream)
                .where(teacher -> teacher.getString("t_id"))
                .equalTo(couse -> couse.getString("c_tid"))
                .window(TumblingEventTimeWindows.of(Time.seconds(10L)))
                .apply(
                        new JoinFunction<JSONObject, JSONObject, JSONObject>() {
                            @Override
                            public JSONObject join(JSONObject jsonObject,
                                                   JSONObject jsonObject2) {
                                // 拼接
                                jsonObject.putAll(jsonObject2);
                                return jsonObject;
                            }
                        }
                );
        teacherCourseDataStream.print("Window Inner Join By Event Time");
    }
}

4: 基于事件时间的窗口内关联

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import static org.apache.doris.flink.test.TransformUtil.*;
/**
 * 根据 event time(事件时间) 进行 window outer join(窗口外关联)
 * 把教师表和课程表进行窗口外联联结,关联不到的数据也会下发
 *
 */
public class WindowOuterJoinByEventTimeTest {
    public static void main(String[] args) throws Exception {
        // 1.创建Flink-MySQL-CDC的Source
        MySqlSource<String> teacherSouce = getStringMySqlSource("flinkcdc_etl_test","teacher");
        MySqlSource<String> courseSouce = getStringMySqlSource("flinkcdc_etl_test","course");
        // 2.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 3.提取waterMark
        DataStreamSource<String> teacherDataStreamSource = getStringDataStreamSource(teacherSouce, env);
        DataStreamSource<String> courseDataStreamSource = getStringDataStreamSource(courseSouce, env);
        // 4.转换为指定格式
        DataStream<JSONObject> teacherDataStream = teacherDataStreamSource.map(rawData -> formatResult(rawData));
        DataStream<JSONObject> courseDataStream = courseDataStreamSource.map(rawData -> formatResult(rawData));
        // 5.窗口联结(教师流和课程表)打印输出
        windowOuterJoinAndPrint(teacherDataStream, courseDataStream);
        // 6.执行任务
        env.execute("WindowOuterJoinByEventTimeTest Job");
    }
    /**
     * 窗口外联并打印输出
     * Window上的 outer join,使用coGroup算子,关联不到的数据也会下发
     *
     * @param teacherDataStream 教师数据流
     * @param courseDataStream  课程数据流
     */
    private static void windowOuterJoinAndPrint(DataStream<JSONObject> teacherDataStream,
                                                DataStream<JSONObject> courseDataStream) {
        DataStream<JSONObject> teacherCourseDataStream = teacherDataStream
                .coGroup(courseDataStream)
                .where(teacher -> teacher.getString("t_id"))
                .equalTo(course -> course.getString("c_tid"))
                .window(TumblingEventTimeWindows.of(Time.seconds(10L)))
                .apply(
                        new CoGroupFunction<JSONObject, JSONObject, JSONObject>() {
                            @Override
                            public void coGroup(Iterable<JSONObject> iterable,
                                                Iterable<JSONObject> iterable1,
                                                Collector<JSONObject> collector) {
                                JSONObject result = new JSONObject();
                                for (JSONObject jsonObject : iterable) {
                                    result.putAll(jsonObject);
                                }
                                for (JSONObject jsonObject : iterable1) {
                                    result.putAll(jsonObject);
                                }
                                collector.collect(result);
                            }
                        }
                );
        teacherCourseDataStream.print("Window Outer Join By Event Time");
    }
}

5: 间隔联结

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import static org.apache.doris.flink.test.TransformUtil.*;
/**
 * interal join(间隔联结) 把教师表和课程表进行联结
 * 间隔联结只支持 事件时间,不支持 处理时间
 *
 */
public class InteralJoinByEventTimeTest {
    public static void main(String[] args) throws Exception {
        // 1.创建Flink-MySQL-CDC的Source
        MySqlSource<String> teacherSouce = getStringMySqlSource("flinkcdc_etl_test","teacher");
        MySqlSource<String> courseSouce = getStringMySqlSource("flinkcdc_etl_test","course");
        // 2.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 3.提取waterMark
        DataStreamSource<String> teacherDataStreamSource = getStringDataStreamSource(teacherSouce, env);
        DataStreamSource<String> courseDataStreamSource = getStringDataStreamSource(courseSouce, env);
        // 4.转换为指定格式
        DataStream<JSONObject> teacherDataStream = teacherDataStreamSource.map(rawData -> formatResult(rawData));
        DataStream<JSONObject> courseDataStream = courseDataStreamSource.map(rawData -> formatResult(rawData));
        // 3.间隔联结(教师流和课程表)打印输出
        intervalJoinAndPrint(teacherDataStream, courseDataStream);
        // 4.执行任务
        env.execute("TeacherJoinCourseTest Job");
    }
    /**
     * 间隔联结并打印输出
     *
     * @param teacherDataStream 教师数据流
     * @param courseDataStream  课程数据流
     */
    private static void intervalJoinAndPrint(DataStream<JSONObject> teacherDataStream,
                                             DataStream<JSONObject> courseDataStream) {
        DataStream<JSONObject> teacherCourseDataStream = teacherDataStream
                .keyBy(teacher -> teacher.getString("t_id"))
                .intervalJoin(
                        courseDataStream.keyBy(course -> course.getString("c_tid"))
                )
                .between(
                        Time.seconds(-5),
                        Time.seconds(5)
                )
                .process(
                        new ProcessJoinFunction<JSONObject, JSONObject, JSONObject>() {
                            @Override
                            public void processElement(JSONObject left, JSONObject right,
                                                       Context ctx, Collector<JSONObject> out) {
                                left.putAll(right);
                                out.collect(left);
                            }
                        }
                );
        teacherCourseDataStream.print("Interval Join By Event Time");
    }
}

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
消息中间件 Java Kafka
Flink CDC 在外部查询某个 job 中的表数据
【2月更文挑战第27天】Flink CDC 在外部查询某个 job 中的表数据
44 5
|
1月前
|
关系型数据库 MySQL API
Flink CDC产品常见问题之mysql整库同步到starrock时任务挂掉如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
关系型数据库 MySQL API
Flink CDC产品常见问题之读取不到或读取不全消息如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
API 数据库 流计算
有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
【2月更文挑战第27天】有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
52 3
|
1月前
|
Oracle 关系型数据库 MySQL
Flink CDC产品常见问题之flink Oraclecdc 捕获19C数据时报错错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
30天前
|
SQL 存储 API
阿里云实时计算Flink的产品化思考与实践【下】
本文整理自阿里云高级产品专家黄鹏程和阿里云技术专家陈婧敏在 FFA 2023 平台建设专场中的分享。
110850 99
阿里云实时计算Flink的产品化思考与实践【下】
|
1月前
|
自然语言处理 Java Scala
Flink CDC产品常见问题之大文件整库同步怎么解决
Flink CDC产品常见问题之大文件整库同步怎么解决
|
1月前
|
消息中间件 Kafka 流计算
如果有多个版本的Flink CDC在同一环境中运行,可能会导致Debezium版本冲突
【2月更文挑战第30天】如果有多个版本的Flink CDC在同一环境中运行,可能会导致Debezium版本冲突
20 2
|
1月前
|
消息中间件 API Apache
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
本文整理自阿里云开源大数据平台徐榜江 (雪尽),关于阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会。
1418 1
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
|
1月前
|
Java 关系型数据库 MySQL
Flink CDC有见这个报错不?
【2月更文挑战第29天】Flink CDC有见这个报错不?
22 2