flink CDC同步mysql2Doris 之scheam变更
flink CDC DataStream双流join
flink cdc 同步doris,上游schema变更
背景:
在实际生产环境表的scheama 信息经常有可能会被修改,而且整库同步时,会随着业务的增长,mysql 的压力也会越来越大。重启任务和集群,重新消费也明显是不合理的,因此在做flink cdc 时需要兼顾并解决scheama变更、增删表的问题。
解决方案:
方案1: flink cdc读取时开启schemaChange,并且doris建表时也开启light scheam change。并且维护一张配置表,以进行表的动态删减。
## flink cdc参数 .scanNewlyAddedTableEnabled(true) // enable scan the newly added tables feature .includeSchemaChanges(true) ## doris建表时指定 light_schema_chage=true
- 程序读取mysql中获取需要同步的表,以字段 member_id,table 字段存储doris中表A。
- 脚本读取doris 表A数据,获取mysql中的schema,通过转换,获取doris建表语句,连接doris执行语句
- cancel flink 任务,并重新启动flink任务【重启只适合添加新库,新表不用重启】
- 每次重启连接doris 表A,获取database,组装 databaseList,tableList,tablseList 使用正则,database1.*,database2.*,对库内所有表进行监听,这样可以达到mysql添加新表时将新表加入同步队列
- doris目前已经支持scheama变更了,只不过cdc ddl变更获取需要自行实现,然后通过jdbc的方式连接doris去执行ddlsql,因为sql有点差异,需要转换才能执行,结合mysql新表,可以在ddl获取create 对doris进项建表
- 在将数据导入之doris时,速度导入过快都会出现导入失败,-235错误,可以使用控制读取binlog数量+window聚合 去批量导入
- 如需要导入表B的数据有{"id":1,"name":"小明"},{"id":2,"name":"小红"},如果执行两次put显然时不太合理的,可以使用jsonArr的方式[{"id":1,"name":"小明"},{"id":2,"name":"小红"}]一次导入。
FlinkCDCMysql2Doris.scala
import org.apache.flink.streaming.api.TimeCharacteristic import com.zbkj.util.{DorisStreamLoad, FlinkCDCSyncETL, KafkaUtil, PropertiesManager, PropertiesUtil, SinkDoris, SinkSchema} import com.ververica.cdc.connectors.mysql.source.MySqlSource import com.ververica.cdc.connectors.mysql.table.StartupOptions import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema import com.zbkj.util.KafkaUtil.proper import net.sf.json.JSONObject import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.common.time.Time import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSource, DataStreamUtils} import org.apache.flink.streaming.api.environment.{CheckpointConfig, StreamExecutionEnvironment} import org.slf4j.{Logger, LoggerFactory} import org.apache.flink.api.java.io.jdbc.JDBCInputFormat import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.kafka.connect.json.JsonConverterConfig import java.util.Properties import java.util.concurrent.TimeUnit import scala.collection.JavaConverters.asScalaIteratorConverter object FlinkCDCMysql2Doris { PropertiesManager.initUtil() val props: PropertiesUtil = PropertiesManager.getUtil val log: Logger = LoggerFactory.getLogger(this.getClass) def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 并行度 env.setParallelism(props.parallelism) env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) /** * checkpoint的相关设置 */ // 启用检查点,指定触发checkpoint的时间间隔(单位:毫秒,默认500毫秒),默认情况是不开启的 env.enableCheckpointing(10000L, CheckpointingMode.EXACTLY_ONCE) // 设定Checkpoint超时时间,默认为10分钟 env.getCheckpointConfig.setCheckpointTimeout(600000) /** 设定两个Checkpoint之间的最小时间间隔,防止出现例如状态数据过大而导致Checkpoint执行时间过长,从而导致Checkpoint积压过多 * 最终Flink应用密切触发Checkpoint操作,会占用了大量计算资源而影响到整个应用的性能(单位:毫秒) */ env.getCheckpointConfig.setMinPauseBetweenCheckpoints(60000) // 默认情况下,只有一个检查点可以运行 // 根据用户指定的数量可以同时触发多个Checkpoint,进而提升Checkpoint整体的效率 // env.getCheckpointConfig.setMaxConcurrentCheckpoints(2) /** 外部检查点 * 不会在任务正常停止的过程中清理掉检查点数据,而是会一直保存在外部系统介质中,另外也可以通过从外部检查点中对任务进行恢复 */ env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) /** 如果有更近的保存点时,是否将作业回退到该检查点 */ env.getCheckpointConfig.setPreferCheckpointForRecovery(true) // 设置可以允许的checkpoint失败数 env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3) //设置可容忍的检查点失败数,默认值为0表示不允许容忍任何检查点失败 env.getCheckpointConfig.setTolerableCheckpointFailureNumber(2) /** * 重启策略的配置 */ // 重启3次,每次失败后等待10000毫秒 // env.setRestartStrategy(RestartStrategies.failureRateRestart(5, Time.of(3, TimeUnit.MINUTES), Time.of(30, TimeUnit.SECONDS))) env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L)) /** * 获取同步表配置 * database table */ val inputMysql = env.createInput(JDBCInputFormat.buildJDBCInputFormat() .setDrivername("com.mysql.jdbc.Driver") .setDBUrl("jdbc:mysql://%s:%d/%s".format(props.doris_host, props.doris_port, props.sync_config_db)) .setUsername(props.doris_user) .setPassword(props.doris_password) .setQuery("select member_id,sync_table from %s.%s".format(props.sync_config_db, props.sync_config_table)) .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)) .finish()).uid("inputMysql") val databaseName: DataStream[String] = inputMysql.map(line => line.getField(0).toString).uid("databaseName") // 模糊监听 val tableName: DataStream[String] = inputMysql.map(line => line.getField(0).toString + ".*").uid("tableName") val producer = KafkaUtil.getProducer val databaseIter = databaseName.executeAndCollect().asScala val databaseList = databaseIter.toSet.mkString(",") val tableIter = tableName.executeAndCollect().asScala val tableList = tableIter.toSet.mkString(",") println("databaseList:", databaseList) println("tableList:", tableList) val customConverterConfigs = new java.util.HashMap[String, Object] { put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric") } /** * mysql source for doris */ val mySqlSource = MySqlSource.builder[String]() .hostname(props.rds_host) .port(props.rds_port) .databaseList(databaseList) .tableList(tableList) .username(props.rds_user) .password(props.rds_password) .serverId("11110") .splitSize(props.split_size) .fetchSize(props.fetch_size) // .startupOptions(StartupOptions.latest()) // 全量读取 .startupOptions(StartupOptions.initial()) .includeSchemaChanges(true) // 发现新表,加入同步任务,需要在tableList中配置 .scanNewlyAddedTableEnabled(true) .deserializer(new JsonDebeziumDeserializationSchema(false, customConverterConfigs)).build() val dataStreamSource: DataStreamSource[String] = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source") val ddlSqlStream: DataStream[String] = dataStreamSource.filter(line => line.contains("historyRecord") && !line.contains("CHANGE COLUMN")).uid("ddlSqlStream") val dmlStream: DataStream[String] = dataStreamSource.filter(line => !line.contains("historyRecord") && !line.contains("CHANGE COLUMN")).uid("dmlStream") val ddlDataStream = FlinkCDCSyncETL.ddlFormat(ddlSqlStream) val dmlDataStream = FlinkCDCSyncETL.binLogETL(dmlStream) //producer 为了在数据同步后通知分析任务 val dorisStreamLoad = new DorisStreamLoad(props, producer) ddlDataStream.addSink(new SinkSchema(props)).name("ALTER TABLE TO DORIS").uid("SinkSchema") dmlDataStream.addSink(new SinkDoris(dorisStreamLoad)).name("Data TO DORIS").uid("SinkDoris") env.execute("Flink CDC Mysql To Doris With Initial") } case class dataLine(merge_type: String, db: String, table: String, data: String) }
FlinkCDCBinLogETL.scala
import net.sf.json.JSONObject import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation} import org.apache.flink.api.java.tuple.Tuple4 import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.windowing.time.Time import scala.collection.mutable.ArrayBuffer import scala.util.matching.Regex object FlinkCDCSyncETL { def binLogETL(dataStreamSource: DataStream[String]): DataStream[org.apache.flink.api.java.tuple.Tuple4[String, String, String, String]] = { /** * 根据不同日志类型 匹配load doris方式 */ val tupleData: DataStream[org.apache.flink.api.java.tuple.Tuple4[String, String, String, String]] = dataStreamSource.map(line => { var data: JSONObject = null var mergetype = "APPEND" val lineObj = JSONObject.fromObject(line) val source = lineObj.getJSONObject("source") val db = source.getString("db") val table = source.getString("table") if ("d" == lineObj.getString("op")) { val oo = lineObj.getJSONObject("before") data = lineObj.getJSONObject("before") mergetype = "DELETE" } else if ("u" == lineObj.getString("op")) { data = lineObj.getJSONObject("after") mergetype = "MERGE" } else if ("c" == lineObj.getString("op")) { data = lineObj.getJSONObject("after") } else if ("r" == lineObj.getString("op")) { data = lineObj.getJSONObject("after") mergetype = "APPEND" } new Tuple4[String, String, String, String](mergetype, db, table, data.toString) }).returns(TypeInformation.of(new TypeHint[Tuple4[String, String, String, String]] {})) tupleData /** * 窗口聚合数据,将相同load方式,db,table的json 数据组合为长字符串, */ val byKeyData: DataStream[org.apache.flink.api.java.tuple.Tuple4[String, String, String, String]] = tupleData.keyBy(0, 1, 2) .timeWindow(Time.milliseconds(1000)) .reduce((itemFirst, itemSecond) => new Tuple4(itemFirst.f0, itemFirst.f1, itemFirst.f2, itemFirst.f3 + "=-=-=" + itemSecond.f3)) byKeyData } def ddlFormat(ddlDataStream: DataStream[String]): DataStream[String] = { val ddlStrDataStream: DataStream[String] = ddlDataStream.map(line => { try { val lineObj = JSONObject.fromObject(line) val historyRecord = JSONObject.fromObject(lineObj.getString("historyRecord")) val tableChangesArray = historyRecord.getJSONArray("tableChanges") val tableChanges = JSONObject.fromObject(tableChangesArray.getJSONObject(0)) val tableChangeType = tableChanges.getString("type") var ddlSql = "" val table = tableChanges.optJSONObject("table") val primaryKeyColumnNames = table.getString("primaryKeyColumnNames").replace("[", "").replace("]", "").replace("\"", "") val columnsArray = table.getJSONArray("columns") // 建表转换 if (tableChangeType == "CREATE") { val tableName = tableChanges.getString("id").replace("\"", "") val columnsArrayBuffer = ArrayBuffer[String]() columnsArray.forEach(line => { val columnJson = JSONObject.fromObject(line) val name = columnJson.getString("name") val typeName = columnJson.getString("typeName") val length = columnJson.optInt("length", 1) val scale = columnJson.optInt("scale", 2) val lastColumnType = matchColumnType(typeName, length, scale) val lastColumn = s"$name $lastColumnType" columnsArrayBuffer.+=(lastColumn) }) // 对列重新排序,主键依次放在最前面,避免错误Key columns should be a ordered prefix of the scheme val keys = primaryKeyColumnNames.split(",") for (indexOfCol <- 0 until keys.length) { val col = keys(indexOfCol) var columnFormat = "" columnsArrayBuffer.foreach(column => { if (column.startsWith(col)) { columnFormat = column } }) val index = columnsArrayBuffer.indexOf(columnFormat) columnsArrayBuffer.remove(index) columnsArrayBuffer.insert(indexOfCol, columnFormat) } val header = s"CREATE TABLE IF NOT EXISTS $tableName (" val end = s""") UNIQUE KEY($primaryKeyColumnNames) DISTRIBUTED BY HASH($primaryKeyColumnNames) BUCKETS 10 PROPERTIES ("replication_allocation" = "tag.location.default: 1")""" ddlSql = header + columnsArrayBuffer.mkString(",") + end } else if (tableChangeType == "ALTER") { var ddl = historyRecord.getString("ddl").replace("\r\n", " ") println(ddl) if (ddl.startsWith("RENAME")) { ddl = ddl.replace("`", "") val arr = ddl.split("") ddlSql = s"ALTER TABLE ${arr(2)} RENAME ${arr(4)}" } else if (ddl.contains("DROP COLUMN")) { ddlSql = ddl } else { val dbTableName = tableChanges.getString("id").replace("\"", "") val addColName = ddl.split(" ")(5).replace("`", "") var colTpe = "" columnsArray.forEach(line => { val columnJson = JSONObject.fromObject(line) if (columnJson.getString("name") == addColName) { val typeName = columnJson.getString("typeName") val length = columnJson.optInt("length", 1) val scale = columnJson.optInt("scale", 2) colTpe = matchColumnType(typeName, length, scale) } }) if (ddl.contains("ADD COLUMN")) { ddlSql = s"ALTER TABLE $dbTableName ADD COLUMN $addColName $colTpe" } else { ddlSql = s"ALTER TABLE $dbTableName MODIFY COLUMN $addColName $colTpe" } } } println(ddlSql) ddlSql } catch { case ex: Exception => println(ex) "select 1" } }) ddlStrDataStream } def showCapital(x: Option[String]): String = x match { case Some(s) => s case None => "?" } def matchColumnType(columnType: String, length: Int, scale: Int): String = { var returnColumnType = "VARCHAR(255)" columnType match { case "INT UNSIGNED" => returnColumnType = s"INT($length)" case "INT" => returnColumnType = s"INT($length)" case "TINYINT" => returnColumnType = s"TINYINT($length)" case "VARCHAR" => returnColumnType = s"VARCHAR(${length * 3})" case "BIGINT" => returnColumnType = s"BIGINT(${length})" case "TINYTEXT" => returnColumnType = s"TINYTEXT" case "LONGTEXT" => returnColumnType = s"STRING" case "TEXT" => returnColumnType = s"STRING" case "DECIMAL" => returnColumnType = s"DECIMAL($length,$scale)" case "VARBINARY" => returnColumnType = s"STRING" case "TIMESTAMP" => returnColumnType = s"STRING" case "ENUM" => returnColumnType = s"TINYINT" case "MEDIUMINT" => returnColumnType = s"INT" case "SMALLINT" => returnColumnType = s"SMALLINT" case "MEDIUMTEXT" => returnColumnType = s"STRING" case _ => returnColumnType = s"STRING" } returnColumnType } }
DorisStreamLoad.scala
import net.sf.json.JSONObject import net.sf.json.JSONArray import org.apache.http.HttpHeaders import org.apache.http.client.methods.HttpPut import org.apache.http.entity.StringEntity import org.apache.http.entity.BufferedHttpEntity import org.apache.http.impl.client.{DefaultRedirectStrategy, HttpClientBuilder, HttpClients} import org.apache.http.util.EntityUtils import org.slf4j.{Logger, LoggerFactory} import org.apache.commons.codec.binary.Base64 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import java.io.IOException import java.nio.charset.StandardCharsets import java.util.UUID class DorisStreamLoad(props: PropertiesUtil,producer:KafkaProducer[String, String]) extends Serializable { lazy val httpClientBuilder: HttpClientBuilder = HttpClients.custom.setRedirectStrategy(new DefaultRedirectStrategy() { override protected def isRedirectable(method: String): Boolean = { // If the connection target is FE, you need to deal with 307 redirect。 true } }) def loadJson(jsonData: String, mergeType: String, db: String, table: String): Unit = try { val loadUrlPattern = "http://%s/api/%s/%s/_stream_load?" val arr = jsonData.split("=-=-=") val jsonArray = new JSONArray() for (line <- arr) { try { val js = JSONObject.fromObject(line) jsonArray.add(js) } catch { case e: Exception => println(e) println(line) } } val jsonArrayStr = jsonArray.toString() val client = httpClientBuilder.build val loadUrlStr = String.format(loadUrlPattern, props.doris_load_host, db, table) try { val put = new HttpPut(loadUrlStr) put.removeHeaders(HttpHeaders.CONTENT_LENGTH) put.removeHeaders(HttpHeaders.TRANSFER_ENCODING) put.setHeader(HttpHeaders.EXPECT, "100-continue") put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader) val label = UUID.randomUUID.toString // You can set stream load related properties in the Header, here we set label and column_separator. put.setHeader("label", label) put.setHeader("merge_type", mergeType) put.setHeader("two_phase_commit", "true") put.setHeader("column_separator", ",") put.setHeader("format", "json") put.setHeader("strip_outer_array", "true") put.setHeader("exec_mem_limit", "6442450944") val entity = new StringEntity(jsonArrayStr, "UTF-8") put.setEntity(entity) try { val response = client.execute(put) try { var loadResult = "" if (response.getEntity != null) { loadResult = EntityUtils.toString(response.getEntity) } val statusCode = response.getStatusLine.getStatusCode if (statusCode != 200) { throw new IOException("Stream load failed. status: %s load result: %s".format(statusCode, loadResult)) } } finally if (response != null) { response.close() } } } finally if (client != null) client.close() } /** * Construct authentication information, the authentication method used by doris here is Basic Auth * */ def basicAuthHeader: String = { val tobeEncode = props.doris_user + ":" + props.doris_password val encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8)) "Basic " + new String(encoded) } }
SinkSchema.scala
import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import java.sql.{Connection, DriverManager, PreparedStatement} class SinkSchema(props:PropertiesUtil) extends RichSinkFunction[String] { var conn: Connection = _ var ps : PreparedStatement = _ var mysqlPool: MysqlPool = _ override def open(parameters: Configuration): Unit = { super.open(parameters) mysqlPool = MysqlManager.getMysqlPool conn = mysqlPool.getConnection conn.setAutoCommit(false) } override def close(): Unit = { super.close() if (conn != null) { conn.close() } if (ps != null) { ps.close() } } override def invoke(sql: String, context: SinkFunction.Context): Unit = { super.invoke(sql, context) if (sql !="" && sql.nonEmpty){ ps = conn.prepareStatement(sql) try { ps.execute() }catch { case ex:Exception=>println(ex) } conn.commit() } } }
SinkDoris.scala
import org.apache.flink.api.java.tuple.Tuple4 import com.zbkj.mysql2doris.FlinkCDCMysql2Doris.dataLine import net.sf.json.JSONObject import org.apache.flink.streaming.api.functions.sink.SinkFunction class SinkDoris(dorisStreamLoad:DorisStreamLoad) extends SinkFunction[Tuple4[String, String, String, String]] { /** * 每个元素的插入都要调用一次invoke()方法进行插入操作 */ override def invoke(value:Tuple4[String, String, String, String]): Unit = { dorisStreamLoad.loadJson(value.f3,value.f0,value.f1,value.f2) val producer = KafkaUtil.getProducer val json = new JSONObject() json.put("db",value.f2) json.put("table",value.f3) KafkaUtil.sendKafkaMsg(producer, json.toString, "sync_table") } }
PropertiesUtil.scala
import java.io.FileInputStream import java.util.Properties /** * propertiesUtil * */ class PropertiesUtil extends Serializable { private val props = new Properties() var doris_host = "" var doris_port = 0 var doris_user = "" var doris_password = "" var database_list = "" var table_list = "" var rds_host = "" var rds_port = 0 var rds_user = "" var rds_password = "" var rds_database = "" var sync_config_db = "" var sync_config_table = "" var address_table = "" var parallelism = 0 var split_size = 0 var fetch_size = 0 var bootstrap_servers = "" var topic = "" var group_id = "" var offset_mode = "" def init(filePath: String): Unit = { props.load(new FileInputStream(filePath)) // hdfs doris_host = props.getProperty("doris.host") doris_port = props.getProperty("doris.port").toInt doris_user = props.getProperty("doris.user") doris_password = props.getProperty("doris.password") database_list = props.getProperty("database.list") table_list = props.getProperty("table.list") doris_load_host = props.getProperty("doris.load.host") rds_host = props.getProperty("rds.host") rds_port = props.getProperty("rds.port").toInt rds_user = props.getProperty("rds.user") rds_password = props.getProperty("rds.password") rds_database = props.getProperty("rds.database") sync_config_db = props.getProperty("sync.config.db") sync_config_table = props.getProperty("sync.config.table") address_table = props.getProperty("address.table") parallelism = props.getProperty("parallelism").toInt split_size = props.getProperty("split.size").toInt fetch_size = props.getProperty("fetch.size").toInt bootstrap_servers = props.getProperty("bootstrap.servers") topic = props.getProperty("topic") group_id = props.getProperty("group.id") offset_mode = props.getProperty("offset.mode") } def stringToInt(prop: String): Int = { try { prop.toInt } catch { case ex: Exception => { 0 } } } } //惰性单例,真正计算时才初始化对象 object PropertiesManager { @volatile private var propertiesUtil: PropertiesUtil = _ def getUtil: PropertiesUtil = { propertiesUtil } def initUtil(): Unit = { var filePath = "config.properties" // filePath = this.getClass.getResource("/").toString.replace("file:", "") + "config.properties" filePath = "/opt/flink-1.13.6/job/mysql2doris/config.properties" if (propertiesUtil == null) { propertiesUtil = new PropertiesUtil } propertiesUtil.init(filePath) } }
方案2: 自己定义序列化方式,进行scheama解析和转化
1: 实现自己的 DebeziumDeserializationSchema, 需要实现 deserialize、getProducedType 两个函数。deserialize 实现转换数据的逻辑,getProducedType 定义返回的类型,这里返回两个参数,第一个Boolean 类型的参数表示数据是 upsert 或是 delete,第二个参数返回转换后的 JSON string,这里的 JSON 将会包含 Schema 变更后的 Column 与对应的 Value。
2:如果启动时设置的 .serverTimeZone("Asia/Shanghai") 并没有生效,查源码可以发现,底层的 Debezium 并没有实现 serverTimeZone 的配置,相应的转换是在 RowDataDebeziumDeserializeSchema 内实现的。
interface DeserializationRuntimeConverter extends Serializable { Object convert(Object dbzObj, Schema schema); }
import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.debezium.utils.TemporalConversions; import io.debezium.data.Envelope; import io.debezium.time.Date; import io.debezium.time.MicroTimestamp; import io.debezium.time.NanoTimestamp; import io.debezium.time.Timestamp; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.table.data.TimestampData; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.Map; import java.util.stream.Collectors; public class JsonStringDebeziumDeserializationSchema implements DebeziumDeserializationSchema { int zoneOffset; @Override public void deserialize(SourceRecord record, Collector out) throws Exception { Envelope.Operation op = Envelope.operationFor(record); Struct value = (Struct) record.value(); Schema valueSchema = record.valueSchema(); if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) { String insert = extractAfterRow(value, valueSchema); out.collect(new Tuple2<>(true, insert)); } else if (op == Envelope.Operation.DELETE) { String delete = extractBeforeRow(value, valueSchema); out.collect(new Tuple2<>(false, delete)); }else { String after = extractAfterRow(value, valueSchema); out.collect(new Tuple2<>(true, after)); } } public JsonStringDebeziumDeserializationSchema() { //实现一个用于转换时间的Converter this.runtimeConverter = (dbzObj, schema) -> { if (schema.name() != null) { switch (schema.name()) { case Timestamp.SCHEMA_NAME: return TimestampData.fromEpochMillis((Long) dbzObj).toLocalDateTime().atOffset(ZoneOffset.ofHours(zoneOffset)).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME); case MicroTimestamp.SCHEMA_NAME: long micro = (long) dbzObj; return TimestampData.fromEpochMillis(micro / 1000, (int) (micro % 1000 * 1000)).toLocalDateTime().atOffset(ZoneOffset.ofHours(zoneOffset)).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME); case NanoTimestamp.SCHEMA_NAME: long nano = (long) dbzObj; return TimestampData.fromEpochMillis(nano / 1000_000, (int) (nano % 1000_000)).toLocalDateTime().atOffset(ZoneOffset.ofHours(zoneOffset)).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME); case Date.SCHEMA_NAME: return TemporalConversions.toLocalDate(dbzObj).format(DateTimeFormatter.ISO_LOCAL_DATE); } } return dbzObj; }; } private final DeserializationRuntimeConverter runtimeConverter; private Map<String,Object> getRowMap(Struct after){ //转换时使用对应的转换器 return after.schema().fields().stream().collect(Collectors.toMap(Field::name, f->after.get(f))); } private String extractAfterRow(Struct value, Schema valueSchema) throws Exception { Struct after = value.getStruct(Envelope.FieldName.AFTER); Map<String,Object> rowMap = getRowMap(after); ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.writeValueAsString(rowMap); } private String extractBeforeRow(Struct value, Schema valueSchema) throws Exception { Struct after = value.getStruct(Envelope.FieldName.BEFORE); Map<String,Object> rowMap = getRowMap(after); ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.writeValueAsString(rowMap); } @Override public TypeInformation getProducedType() { return TypeInformation.of(new TypeHint<Tuple2<Boolean,String>>(){}); } }
import com.ververica.cdc.connectors.mysql.source.MySqlSource; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class MySQLCDC { public static void main(String[] args) throws Exception { ParameterTool params = ParameterTool.fromArgs(args); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 关闭 Operator Chaining, 令运行图更容易初学者理解 env.disableOperatorChaining(); env.setParallelism(1); //checkpoint的一些配置 env.enableCheckpointing(params.getInt("checkpointInterval",60000)); env.getCheckpointConfig().setCheckpointTimeout(5000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); MySqlSource source = MySqlSource.<String>builder() .hostname(params.get("hostname", "127.0.0.1")) .port(params.getInt("port", 3306)) .username(params.get("username", "root")) .password(params.get("password", "")) .serverTimeZone("Asia/Shanghai") //设置我们自己的实现 .deserializer(new JsonStringDebeziumDeserializationSchema()) .databaseList(params.get("databaseList", "test")) .tableList(params.get("tableList", "test.my_test")) .build(); // enable checkpoint env.enableCheckpointing(3000); // 定义数据源 env .fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source") // set 4 parallel source tasks .setParallelism(4) .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering env.execute(MySQLCDC.class.getSimpleName()); } }
Flink CDC双流join
flink CDC常见的三种join场景:
MySQL 数据准备
# 创建数据库 flinkcdc_etl_test create database flinkcdc_etl_test; # 使用数据库 flinkcdc_etl_test use flinkcdc_etl_test; # 创建教师表 -- ---------------------------- -- Table structure for teacher -- ---------------------------- DROP TABLE IF EXISTS `teacher`; CREATE TABLE `teacher` ( `t_id` varchar(3) NOT NULL COMMENT '主键', `t_name` varchar(10) NOT NULL COMMENT '教师名称', PRIMARY KEY (`t_id`) USING BTREE )COMMENT = '教师表'; -- ---------------------------- -- Records of teacher -- ---------------------------- INSERT INTO `teacher` VALUES ('001', '张三'); INSERT INTO `teacher` VALUES ('002', '李四'); INSERT INTO `teacher` VALUES ('003', '王五'); -- ---------------------------- -- Table structure for course -- ---------------------------- DROP TABLE IF EXISTS `course`; CREATE TABLE `course` ( `c_id` varchar(3) NOT NULL COMMENT '主键', `c_name` varchar(20) NOT NULL COMMENT '课程名称', `c_tid` varchar(3) NOT NULL COMMENT '教师表主键', PRIMARY KEY (`c_id`) USING BTREE, INDEX `c_tid`(`c_tid`) USING BTREE )COMMENT = '课程表'; -- ---------------------------- -- Records of course -- ---------------------------- INSERT INTO `course` VALUES ('1', '语文', '001'); INSERT INTO `course` VALUES ('2', '数学', '002');
代码实现:
1:基础类
/** * CDC 中op类型 * */ public enum OpEnum { /** * 新增 */ CREATE("c", "create", "新增"), /** * 修改 */ UPDATA("u", "update", "更新"), /** * 删除 */ DELETE("d", "delete", "删除"), /** * 读 */ READ("r", "read", "首次读"); /** * 字典码 */ private String dictCode; /** * 字典码翻译值 */ private String dictValue; /** * 字典码描述 */ private String description; OpEnum(String dictCode, String dictValue, String description) { this.dictCode = dictCode; this.dictValue = dictValue; this.description = description; } public String getDictCode() { return dictCode; } public String getDictValue() { return dictValue; } public String getDescription() { return description; } }
2: 工具类
import com.alibaba.fastjson.JSONObject; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.time.Duration; /** * 转换工具类 * * @author hy * @version 1.0 * @date 2022/5/6 16:25 */ public class TransformUtil { /** * 格式化抽取数据格式 * 去除before、after、source等冗余内容 * * @param extractData 抽取的数据 * @return */ public static JSONObject formatResult(String extractData) { JSONObject formatDataObj = new JSONObject(); JSONObject rawDataObj = JSONObject.parseObject(extractData); formatDataObj.putAll(rawDataObj); formatDataObj.remove("before"); formatDataObj.remove("after"); formatDataObj.remove("source"); String op = rawDataObj.getString("op"); if (OpEnum.DELETE.getDictCode().equals(op)) { // 新增取 before结构体数据 formatDataObj.putAll(rawDataObj.getJSONObject("before")); } else { // 其余取 after结构体数据 formatDataObj.putAll(rawDataObj.getJSONObject("after")); } return formatDataObj; } static MySqlSource<String> getStringMySqlSource(String dbNasme, String tableName) { MySqlSource<String> teacherSouce = MySqlSource.<String>builder() .hostname("192.168.18.101") .port(3306) .username("root") .password("123456") .databaseList(dbNasme) .tableList(dbNasme+"."+tableName) .startupOptions(StartupOptions.initial()) .deserializer(new JsonDebeziumDeserializationSchema()) .serverTimeZone("Asia/Shanghai") .build(); return teacherSouce; } static DataStreamSource<String> getStringDataStreamSource(MySqlSource<String> dataStream, StreamExecutionEnvironment env) { DataStreamSource<String> mysqlDataStreamSource = env.fromSource( dataStream, WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(1L)).withTimestampAssigner( new SerializableTimestampAssigner<String>() { @Override public long extractTimestamp(String extractData, long l) { return JSONObject.parseObject(extractData).getLong("ts_ms"); } } ), "DataStreamWithWatermark Source" ); return mysqlDataStreamSource; } }
3: 基于事件时间的窗口内关联
import com.alibaba.fastjson.JSONObject; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import static org.apache.doris.flink.test.TransformUtil.*; /** * 基于 事件时间 的 window inner join 把教师表和课程表进行联结 * <p> * 只有两者数据流关联到数据,才会进行打印 * */ public class WindowInnerJoinByEventTimeTest { public static void main(String[] args) throws Exception { // 1.创建Flink-MySQL-CDC的Source MySqlSource<String> teacherSouce = getStringMySqlSource("flinkcdc_etl_test","teacher"); MySqlSource<String> courseSouce = getStringMySqlSource("flinkcdc_etl_test","course"); // 2.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 3.提取waterMark DataStreamSource<String> teacherDataStreamSource = getStringDataStreamSource(teacherSouce, env); DataStreamSource<String> courseDataStreamSource = getStringDataStreamSource(courseSouce, env); // 4.转换为指定格式 DataStream<JSONObject> teacherDataStream = teacherDataStreamSource.map(rawData -> formatResult(rawData)); DataStream<JSONObject> courseDataStream = courseDataStreamSource.map(rawData -> formatResult(rawData)); // 5.窗口联结(教师流和课程表)打印输出 windowInnerJoinAndPrint(teacherDataStream, courseDataStream); // 6.执行任务 env.execute("WindowInnerJoinByEventTimeTest Job"); } /** * 窗口联结并打印输出 * 只支持 inner join,即窗口内联关联到的才会下发,关联不到的则直接丢掉。 * 如果想实现Window上的 outer join,需要使用coGroup算子 * @param teacherDataStream 教师数据流 * @param courseDataStream 课程数据流 */ private static void windowInnerJoinAndPrint(DataStream<JSONObject> teacherDataStream, DataStream<JSONObject> courseDataStream) { DataStream<JSONObject> teacherCourseDataStream = teacherDataStream .join(courseDataStream) .where(teacher -> teacher.getString("t_id")) .equalTo(couse -> couse.getString("c_tid")) .window(TumblingEventTimeWindows.of(Time.seconds(10L))) .apply( new JoinFunction<JSONObject, JSONObject, JSONObject>() { @Override public JSONObject join(JSONObject jsonObject, JSONObject jsonObject2) { // 拼接 jsonObject.putAll(jsonObject2); return jsonObject; } } ); teacherCourseDataStream.print("Window Inner Join By Event Time"); } }
4: 基于事件时间的窗口内关联
import com.alibaba.fastjson.JSONObject; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; import static org.apache.doris.flink.test.TransformUtil.*; /** * 根据 event time(事件时间) 进行 window outer join(窗口外关联) * 把教师表和课程表进行窗口外联联结,关联不到的数据也会下发 * */ public class WindowOuterJoinByEventTimeTest { public static void main(String[] args) throws Exception { // 1.创建Flink-MySQL-CDC的Source MySqlSource<String> teacherSouce = getStringMySqlSource("flinkcdc_etl_test","teacher"); MySqlSource<String> courseSouce = getStringMySqlSource("flinkcdc_etl_test","course"); // 2.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 3.提取waterMark DataStreamSource<String> teacherDataStreamSource = getStringDataStreamSource(teacherSouce, env); DataStreamSource<String> courseDataStreamSource = getStringDataStreamSource(courseSouce, env); // 4.转换为指定格式 DataStream<JSONObject> teacherDataStream = teacherDataStreamSource.map(rawData -> formatResult(rawData)); DataStream<JSONObject> courseDataStream = courseDataStreamSource.map(rawData -> formatResult(rawData)); // 5.窗口联结(教师流和课程表)打印输出 windowOuterJoinAndPrint(teacherDataStream, courseDataStream); // 6.执行任务 env.execute("WindowOuterJoinByEventTimeTest Job"); } /** * 窗口外联并打印输出 * Window上的 outer join,使用coGroup算子,关联不到的数据也会下发 * * @param teacherDataStream 教师数据流 * @param courseDataStream 课程数据流 */ private static void windowOuterJoinAndPrint(DataStream<JSONObject> teacherDataStream, DataStream<JSONObject> courseDataStream) { DataStream<JSONObject> teacherCourseDataStream = teacherDataStream .coGroup(courseDataStream) .where(teacher -> teacher.getString("t_id")) .equalTo(course -> course.getString("c_tid")) .window(TumblingEventTimeWindows.of(Time.seconds(10L))) .apply( new CoGroupFunction<JSONObject, JSONObject, JSONObject>() { @Override public void coGroup(Iterable<JSONObject> iterable, Iterable<JSONObject> iterable1, Collector<JSONObject> collector) { JSONObject result = new JSONObject(); for (JSONObject jsonObject : iterable) { result.putAll(jsonObject); } for (JSONObject jsonObject : iterable1) { result.putAll(jsonObject); } collector.collect(result); } } ); teacherCourseDataStream.print("Window Outer Join By Event Time"); } }
5: 间隔联结
import com.alibaba.fastjson.JSONObject; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; import static org.apache.doris.flink.test.TransformUtil.*; /** * interal join(间隔联结) 把教师表和课程表进行联结 * 间隔联结只支持 事件时间,不支持 处理时间 * */ public class InteralJoinByEventTimeTest { public static void main(String[] args) throws Exception { // 1.创建Flink-MySQL-CDC的Source MySqlSource<String> teacherSouce = getStringMySqlSource("flinkcdc_etl_test","teacher"); MySqlSource<String> courseSouce = getStringMySqlSource("flinkcdc_etl_test","course"); // 2.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 3.提取waterMark DataStreamSource<String> teacherDataStreamSource = getStringDataStreamSource(teacherSouce, env); DataStreamSource<String> courseDataStreamSource = getStringDataStreamSource(courseSouce, env); // 4.转换为指定格式 DataStream<JSONObject> teacherDataStream = teacherDataStreamSource.map(rawData -> formatResult(rawData)); DataStream<JSONObject> courseDataStream = courseDataStreamSource.map(rawData -> formatResult(rawData)); // 3.间隔联结(教师流和课程表)打印输出 intervalJoinAndPrint(teacherDataStream, courseDataStream); // 4.执行任务 env.execute("TeacherJoinCourseTest Job"); } /** * 间隔联结并打印输出 * * @param teacherDataStream 教师数据流 * @param courseDataStream 课程数据流 */ private static void intervalJoinAndPrint(DataStream<JSONObject> teacherDataStream, DataStream<JSONObject> courseDataStream) { DataStream<JSONObject> teacherCourseDataStream = teacherDataStream .keyBy(teacher -> teacher.getString("t_id")) .intervalJoin( courseDataStream.keyBy(course -> course.getString("c_tid")) ) .between( Time.seconds(-5), Time.seconds(5) ) .process( new ProcessJoinFunction<JSONObject, JSONObject, JSONObject>() { @Override public void processElement(JSONObject left, JSONObject right, Context ctx, Collector<JSONObject> out) { left.putAll(right); out.collect(left); } } ); teacherCourseDataStream.print("Interval Join By Event Time"); } }