flink 集成iceberg 实践

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
简介: flink 集成iceberg 实践

flink cdc postgresql sink iceberg with datastream

package com.zjyg.iceberg.flink;
import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.TypeTransformations;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Table;
import org.apache.iceberg.Schema;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.types.Types;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
public class Pg2Iceberg {
    private static final Schema SCHEMA =
            new Schema(
                    Types.NestedField.optional(1, "id", Types.IntegerType.get()),
                    Types.NestedField.optional(2, "name", Types.StringType.get()),
                    Types.NestedField.optional(3, "age", Types.IntegerType.get()),
                    Types.NestedField.optional(4, "sex", Types.StringType.get())
            );
    public static void main(String[] args) throws Exception {
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        checkpointConfig.setCheckpointStorage(parameterTool.get("checkpoint_base_dir")+"/"+parameterTool.get("catalog_name")+"."+parameterTool.get("iceberg_db_name")+"."+parameterTool.get("iceberg_tb_name"));
        checkpointConfig.setCheckpointInterval(60 * 1000L);
        checkpointConfig.setMinPauseBetweenCheckpoints(60 * 1000L);
        checkpointConfig.setTolerableCheckpointFailureNumber(10);
        checkpointConfig.setCheckpointTimeout(120 * 1000L);
        DataStreamSource<RowData> src = env.addSource(getPgCdc(parameterTool));
        icebergSink_hadoop(src, parameterTool);
        env.execute(parameterTool.get("app_name"));
    }
    private static void icebergSink_hadoop(DataStream<RowData> src, ParameterTool tool) {
        Map<String, String> properties = new HashMap<>();
        properties.put("type", "iceberg");
        properties.put("catalog-type", "hadoop");
        properties.put("property-version", "1");
        properties.put("warehouse", tool.get("warehouse_base_dir")+"/"+tool.get("catalog_name"));
        CatalogLoader catalogLoader =
                CatalogLoader.hadoop(tool.get("catalog_name"), new Configuration(), properties);
        icebergSink(src, tool, catalogLoader);
    }
    private static void icebergSink(DataStream input, ParameterTool tool, CatalogLoader loader) {
        Catalog catalog = loader.loadCatalog();
        TableIdentifier identifier =
                TableIdentifier.of(Namespace.of(tool.get("iceberg_db_name")), tool.get("iceberg_tb_name"));
        Table table;
        if (catalog.tableExists(identifier)) {
            table = catalog.loadTable(identifier);
        } else {
            table =
                    catalog.buildTable(identifier, SCHEMA)
                            .withPartitionSpec(PartitionSpec.unpartitioned())
                            .create();
        }
        TableOperations operations = ((BaseTable) table).operations();
        TableMetadata metadata = operations.current();
        operations.commit(metadata, metadata.upgradeToFormatVersion(2));
        TableLoader tableLoader = TableLoader.fromCatalog(loader, identifier);
        FlinkSink.forRowData(input)
                .table(table)
                .tableLoader(tableLoader)
                .equalityFieldColumns(Arrays.asList("id"))
                .writeParallelism(1)
                .build();
    }
    private static SourceFunction getPgCdc(ParameterTool tool) {
        TableSchema schema =
                TableSchema.builder()
                        .add(TableColumn.physical("id", DataTypes.INT()))
                        .add(TableColumn.physical("name", DataTypes.STRING()))
                        .add(TableColumn.physical("age", DataTypes.INT()))
                        .add(TableColumn.physical("sex", DataTypes.STRING()))
                        .build();
        RowType rowType = (RowType) schema.toRowDataType().getLogicalType();
        DebeziumDeserializationSchema deserialer =
                new RowDataDebeziumDeserializeSchema(
                        rowType,
                        createTypeInfo(schema.toRowDataType()),
                        (rowData, rowKind) -> {},
                        ZoneId.of("Asia/Shanghai"));
        Properties properties = new Properties();                
        properties.setProperty("decimal.handling.mode", "string");   //decimal等类型转化为string
        DebeziumSourceFunction sourceFunction =
                PostgreSQLSource.<RowData>builder()
                        .hostname(tool.get("db_host"))
                        .port(Integer.parseInt(tool.get("db_port")))
                        .database(tool.get("db_name"))
                        .schemaList(tool.get("schema_name"))
                        .tableList(tool.get("schema_name") + "." + tool.get("tb_name"))
                        .username(tool.get("db_user"))
                        .password(tool.get("db_user_pwd"))
                        .decodingPluginName("pgoutput")
                        .slotName(parameterTool.get("SLOTNAME4"))                        .deserializer(deserialer)
                        .deserializer(deserialer)
                        .debeziumProperties(properties)
                        .build();
        return sourceFunction;
    }
    private static TypeInformation<RowData> createTypeInfo(DataType producedDataType) {
        final DataType internalDataType =
                DataTypeUtils.transform(producedDataType, TypeTransformations.TO_INTERNAL_CLASS);
        return (TypeInformation<RowData>)
                TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(internalDataType);
    }
}

flink kafka sink iceberg datastream

import java.util.Map;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableMap;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.source.FlinkSource;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.types.Types;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.coomia.datalake.kafka.KafkaUtils;
public class FlinkWriteIcebergTest {
  public static void main(String[] args) throws Exception {
    System.setProperty("HADOOP_USER_NAME", "root");
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.getConfig().setAutoWatermarkInterval(5000L);
    env.setParallelism(1);
    // iceberg catalog identification.
    Configuration conf = new Configuration();
    Catalog catalog = new HadoopCatalog(conf);
    // iceberg table identification.
    TableIdentifier name =
        TableIdentifier.of("default", "iceberg-tb-" + System.currentTimeMillis());
    // iceberg table schema identification.
    Schema schema = new Schema(Types.NestedField.required(1, "uid", Types.StringType.get()),
        Types.NestedField.required(2, "eventTime", Types.LongType.get()),
        Types.NestedField.required(3, "eventid", Types.StringType.get()),
        Types.NestedField.optional(4, "uuid", Types.StringType.get()));
        Types.NestedField.required(5, "ts", Types.TimestampType.withoutZone());
    // iceberg table partition identification.
    // PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("uid", 5).build();
    PartitionSpec spec = PartitionSpec.unpartitioned();
    // identify using orc format as storage.
    Map<String, String> props =
        ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.ORC.name());
    Table table = null;
    // create an iceberg table if not exists, otherwise, load it.
    if (!catalog.tableExists(name))
      table = catalog.createTable(name, schema, spec, props);
    else
      table = catalog.loadTable(name);
    String topic = "arkevent";
    String servers = "kafka:9092";
    FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>(topic,
        new SimpleStringSchema(), KafkaUtils.consumeProps(servers, "flink-consumer"));
    consumer.setStartFromEarliest();
    SingleOutputStreamOperator<RowData> dataStream =
        env.addSource(consumer).map(new MapFunction<String, RowData>() {
          @Override
          public RowData map(String value) throws Exception {
            JSONObject dataJson = JSON.parseObject(value);
            GenericRowData row = new GenericRowData(5);
            row.setField(0, StringData.fromBytes(dataJson.getString("uid").getBytes()));
            row.setField(1, dataJson.getLong("eventTime"));
            row.setField(2, StringData.fromBytes(dataJson.getString("eventid").getBytes()));
            row.setField(3, StringData.fromBytes(dataJson.getString("uuid").getBytes()));
            row.setField(4, TimestampData.fromEpochMillis(dataJson.getLong("eventTime")));
            return row;
          }
        });
    // uid is used for job restart or something when using savepoint.
    dataStream.uid("flink-consumer");
    TableLoader tableLoader = TableLoader.fromHadoopTable(table.location());
    // sink data to iceberg table
    FlinkSink.forRowData(dataStream).table(table).tableLoader(tableLoader).writeParallelism(1)
        .overwrite(true)
        .build();
    //read and write to file.
    DataStream<RowData> batchData = FlinkSource.forRowData().env(env).tableLoader(tableLoader).build();
    batchData.print();
    batchData.writeAsCsv(tableLoader.loadTable().location().concat("/out/out.csv"), WriteMode.OVERWRITE, "\n", " ");
    // Execute the program.
    env.execute("Test Iceberg DataStream");
  }
}

spark kafka sink iceberg

package com.zjyg.iceberg.main
import com.zjyg.iceberg.util.TimeUtil
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.DataTypes
import org.apache.iceberg.spark.SparkCatalog
import org.apache.iceberg.spark.IcebergSpark
import java.sql.Timestamp
import java.util.concurrent.TimeUnit
import com.alibaba.fastjson.JSON
object Kafka2IcebergV3 {
  case class Tags(entrust_prop: String, entrust_status: Int, exchange_type: String, op_entrust_way: String, ywlx: String, ywzl: String)
  case class Logs(metric: String, endpoint: String, ts: Timestamp, step: Int, l_value: String, countertype: String, tags: Tags, l_type:String, kafka: String, nodata: String, dt: Int, hour: String)
  import org.apache.spark.sql.functions._
  def main(args: Array[String]): Unit = {
    if(args.length < 3) {
      System.err.println(
        s"""
           |Usage: Kafka2Iceberg <brokers> <topics> <Seconds>
           |  <brokers> is a list of one or more Kafka brokers
           |  <topics> is a list of one or more kafka topics to consume from
           |  <groupId> is a kafka consumer gorupId
        """.stripMargin)
      System.exit(1)
    }
    val brokers = args(0)
    val topics = args(1)
    val groupId = args(2)
    val checkpointDir = "hdfs:///user/bigdata/sparkStreamingCheckpoint/Kafka2Iceberg_opentsdb-datav3/spark"
    val checkpointDir_iceberg = "hdfs:///user/bigdata/sparkStreamingCheckpoint/Kafka2Iceberg_opentsdb-datav3/icebergtb"
    try {
      val spark = SparkSession.builder()
        .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
        .config("spark.sql.catalog.iceberg.type","hadoop")
        .config("spark.sql.catalog.iceberg",classOf[SparkCatalog].getName)
        .config("spark.sql.catalog.iceberg.warehouse","hdfs:///warehouse/tablespace/external/iceberg")
        .appName(this.getClass.getSimpleName)
        .getOrCreate()
      spark.sparkContext.setCheckpointDir(checkpointDir)
      // IcebergSpark.registerBucketUDF(spark, "iceberg_bucket16", DataTypes.LongType, 16)
      System.setProperty("java.security.auth.login.config", "./kafka_jaas.conf")
      System.setProperty("java.security.krb5.conf", "./krb5.conf")
      import spark.implicits._
      val lines = spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", brokers)
        .option("startingOffsets", "earliest")
        .option("kafka.security.protocol","SASL_PLAINTEXT")
        .option("kafka.sasl.mechanism","GSSAPI")
        .option("kafka.sasl.kerberos.service.name","kafka")
        .option("kafka.group.id",groupId)
        .option("subscribe",topics)
        .load()
        .withColumn("value", $"value".cast("string"))
        .filter($"value".isNotNull)
      println("------PrintSchema lines-------")
      lines.printSchema()
      val data = lines.map(
        row => row.getAs[String]("value").toString()
      ).map(s => getLogs(s)).toDF()
      println("------PrintSchema data-------")
      data.printSchema()
      val tableIdentifier: String = "hdfs:///warehouse/tablespace/external/iceberg/iceberg_test/opentsdb_logsv3"
      val query = data.writeStream.format("iceberg").outputMode("append").trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)).option("path", tableIdentifier).option("checkpointLocation", checkpointDir_iceberg).start()
      query.awaitTermination()
      spark.close()
    } catch {
      case e: Exception => {
        System.err.println("exit. Exception is:" + e)
        System.exit(1)
      }
    }
  }
  def getTags(tags:String):Tags ={
    var result = Tags("",0,"","","","")
    try {
      val tags_columns = tags.split(',')
      val entrust_prop = (tags_columns(0).split('='))(1).toString
      val entrust_status = (tags_columns(1).split('='))(1).toInt
      val exchange_type = (tags_columns(2).split('='))(1).toString
      val op_entrust_way = (tags_columns(3).split('='))(1).toString
      val ywlx = (tags_columns(4).split('='))(1).toString
      val ywzl = (tags_columns(5).split('='))(1).toString
      result = Tags(entrust_prop, entrust_status, exchange_type, op_entrust_way, ywlx, ywzl)
    } catch {
      case e: Exception => {
        System.err.println("Exception is:" + e)
        System.exit(1)
      }
    }
    return result
  }
  def getLogs(logs:String):Logs = {
    var v_tags = new Tags("",0,"","","","")
    var result = new Logs("","",new Timestamp(0L),0,"","",v_tags,"","","",0,"")
    try {
      val json=JSON.parseObject(logs)
      val metric = json.getString("metric")
      val endpoint = json.getString("endpoint")
      val ts_l = json.getInteger("timestamp") * 1000L
      val ts = new Timestamp(ts_l)
      val (dt:Int, hour:String) = TimeUtil.getTsYmdHour(ts_l)
      val step = json.getInteger("step")
      val l_value = json.getString("value")
      val countertype = json.getString("counterType")
      val j_tags = json.getString("tags")
      val tags = getTags(j_tags)
      val l_type = json.getString("type")
      val kafka = json.getString("kafka")
      val nodata = json.getString("nodata")
      result = new Logs(metric,endpoint,ts,step,l_value,countertype,tags,l_type,kafka,nodata,dt,hour)
    } catch {
      case e: Exception => {
        System.err.println("Exception is:" + e)
        System.exit(1)
      }
    }
    return result
  }
}

flink sink iceberg compation

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.actions.Actions;
import java.util.HashMap;
import java.util.Map;
public class FlinkCompaction {
    public static void main(String[] args) throws Exception {
        ParameterTool tool = ParameterTool.fromArgs(args);
        Map<String, String> properties = new HashMap<>();
        properties.put("type", "iceberg");
        properties.put("catalog-type", "hive");
        properties.put("property-version", "1");
        properties.put("warehouse", tool.get("warehouse"));
        properties.put("uri", tool.get("uri"));
        if (tool.has("oss.endpoint")) {
            properties.put("io-impl", "org.apache.iceberg.aliyun.oss.OSSFileIO");
            properties.put("oss.endpoint", tool.get("oss.endpoint"));
            properties.put("oss.access.key.id", tool.get("oss.access.key.id"));
            properties.put("oss.access.key.secret", tool.get("oss.access.key.secret"));
        }
        CatalogLoader loader =
                CatalogLoader.hive(tool.get("catalog"), new Configuration(), properties);
        Catalog catalog = loader.loadCatalog();
        TableIdentifier identifier =
                TableIdentifier.of(Namespace.of(tool.get("db")), tool.get("table"));
        Table table = catalog.loadTable(identifier);
        /**
        * 合并小文件,核心代码
        */
        Actions.forTable(table)
                .rewriteDataFiles()
                .maxParallelism(5)
                .targetSizeInBytes(128 * 1024 * 1024)
                .execute();
        Snapshot snapshot = table.currentSnapshot();
        if (snapshot != null) {
            table.expireSnapshots().expireOlderThan(snapshot.timestampMillis()).commit();
        }
    }
}

spark sink iceberg compaction

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.Actions;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
import java.util.HashMap;
import java.util.Map;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
/**
 * @author: zhushang
 * @create: 2021-04-02 14:30
 */
public class SparkCompaction {
    public static void main(String[] args) {
        TableIdentifier identifier = TableIdentifier.of(Namespace.of("db"), "table");
        Map<String, String> config = new HashMap<>();
        config.put("type", "iceberg");
        config.put("catalog-type", "hive");
        config.put("property-version", "1");
        config.put("warehouse", "warehouse");
        config.put("uri", "thrift://local:9083");
        config.put("io-impl", "org.apache.iceberg.aliyun.oss.OSSFileIO");
        config.put("oss.endpoint", "https://xxx.aliyuncs.com");
        config.put("oss.access.key.id", "key");
        config.put("oss.access.key.secret", "secret");
        sparkSession();
        HiveCatalog hiveCatalog = new HiveCatalog(new Configuration());
        hiveCatalog.initialize("iceberg_hive_catalog", config);
        Table table = hiveCatalog.loadTable(identifier);
        Actions.forTable(table).rewriteDataFiles().targetSizeInBytes(128 * 1024 * 1024).execute();
        Snapshot snapshot = table.currentSnapshot();
        if (snapshot != null) {
            table.expireSnapshots().expireOlderThan(snapshot.timestampMillis()).commit();
        }
    }
    private static void sparkSession() {
        SparkSession.builder()
                .master("local[*]")
                .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
                .config("spark.hadoop." + METASTOREURIS.varname, "localhost:9083")
                .config("spark.sql.warehouse.dir", "warehouse")
                .config("spark.executor.heartbeatInterval", "100000")
                .config("spark.network.timeoutInterval", "100000")
                .enableHiveSupport()
                .getOrCreate();
    }
}

spark Opentsdb sink iceberg

package com.zjyg.iceberg.spark
import java.sql.Timestamp
import java.util.concurrent.TimeUnit
import com.alibaba.fastjson.JSON
import com.zjyg.iceberg.util.TimeUtil
import org.apache.iceberg.spark.SparkCatalog
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
object OpentsdbLog2Iceberg {
  case class Tags(entrust_prop: String, entrust_status: Int, exchange_type: String, op_entrust_way: String, ywlx: String, ywzl: String)
  case class Logs(metric: String, endpoint: String, ts: Timestamp, step: Int, l_value: String, countertype: String, tags: Tags, l_type:String, kafka: String, nodata: String, dt: Int, hour: String,row_info: String)
  def main(args: Array[String]): Unit = {
    if(args.length < 9) {
      System.err.println(
        s"""
           |Usage: Kafka2IcebergV2 <brokers> <topics> <groupId> <checkpointBaseDir> ...
           |  <1. brokers> is a list of one or more Kafka brokers
           |  <2. topics> is a list of one or more kafka topics to consume from
           |  <3. groupId> is a kafka consumer gorupId
           |  <4. checkpointBaseDir> is a checkpoint dir for job
           |  <5. warehouseBaseDir> is a warehouse base dir for iceberg
           |  <6. catalogName> is a catalogName for iceberg
           |  <7. dbName> is a DatabaseName for iceberg
           |  <8. tbName> is a TableName for iceberg
           |  <9. kafkaServiceName> is a Kafka Service Name
        """.stripMargin)
      System.exit(1)
    }
    val brokers = args(0)
    val topics = args(1)
    val groupId = args(2)
    val checkpointBaseDir = args(3) // hdfs:///user/bigdata/sparkStreamingCheckpoint
    val warehouseBaseDir = args(4)  // hdfs:///warehouse/tablespace/external
    val catalogName = args(5)       // iceberg
    val dbName = args(6)            // iceberg_test
    val tbName = args(7)            // opentsdb_logs
    val kafkaServiceName = args(8)  // hdp-kafka
    val checkpointDir_spark = s"${checkpointBaseDir}/${catalogName}.${dbName}.${tbName}/spark"
    val checkpointDir_iceberg = s"${checkpointBaseDir}/${catalogName}.${dbName}.${tbName}/iceberg"
    try {
      val spark = SparkSession.builder()
        .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
        .config(s"spark.sql.catalog.${catalogName}.type","hadoop")
        .config(s"spark.sql.catalog.${catalogName}",classOf[SparkCatalog].getName)
        .config(s"spark.sql.catalog.${catalogName}.warehouse",s"${warehouseBaseDir}/${catalogName}")
        .getOrCreate()
      spark.sparkContext.setCheckpointDir(checkpointDir_spark)
      System.setProperty("java.security.auth.login.config", "./kafka_jaas.conf")
      System.setProperty("java.security.krb5.conf", "./krb5.conf")
      import spark.implicits._
      val lines = spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", brokers)
        .option("startingOffsets", "earliest")
        .option("failOnDataLoss","false")
        .option("kafka.security.protocol","SASL_PLAINTEXT")
        .option("kafka.sasl.mechanism","GSSAPI")
        .option("kafka.sasl.kerberos.service.name",s"${kafkaServiceName}")
        .option("kafka.group.id",groupId)
        .option("subscribe",topics)
        .load()
        .withColumn("value", $"value".cast("string"))
        .filter($"value".isNotNull)
      println("------PrintSchema lines-------")
      lines.printSchema()
      val data = lines.map(
        row => getLogs(row.getAs[Timestamp]("timestamp"),row.getAs[String]("value").toString())
      ).toDF().repartition($"dt", $"hour")
      println("------PrintSchema data-------")
      data.printSchema()
      val tableIdentifier: String = s"${warehouseBaseDir}/${catalogName}/${dbName}/${tbName}"
      val query = data.writeStream
        .format("iceberg")
        .partitionBy("dt","hour")
        .outputMode("append")
        .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
        .option("path", tableIdentifier)
        .option("fanout-enabled", "true")
        .option("checkpointLocation", checkpointDir_iceberg)
        .start()
      query.awaitTermination()
      spark.close()
    } catch {
      case e: Exception => {
        System.err.println("exit. Exception is:" + e)
        System.exit(1)
      }
    }
  }
  def getTags(tags:String):Tags ={
    var result = Tags("",0,"","","","")
    try {
      val tags_columns = tags.split(',')
      val entrust_prop = (tags_columns(0).split('='))(1).toString
      val entrust_status = (tags_columns(1).split('='))(1).toInt
      val exchange_type = (tags_columns(2).split('='))(1).toString
      val op_entrust_way = (tags_columns(3).split('='))(1).toString
      val ywlx = (tags_columns(4).split('='))(1).toString
      val ywzl = (tags_columns(5).split('='))(1).toString
      result = Tags(entrust_prop, entrust_status, exchange_type, op_entrust_way, ywlx, ywzl)
    } catch {
      case e: Exception => {
        System.err.println("Exception is:" + e)
        // System.exit(1)
      }
    }
    return result
  }
  def getLogs(kafkaTimestamp:Timestamp,logs:String):Logs = {
    var tags = new Tags("",0,"","","","")
    var result = new Logs("","",new Timestamp(0L),0,"","",tags,"","","",19700101,"00","")
    try {
      val json=JSON.parseObject(logs)
      val metric = json.getString("metric")
      val endpoint = json.getString("endpoint")
      val v_ts = json.getInteger("timestamp") * 1000L
      val ts = new Timestamp(v_ts)
      val (dt:Int, hour:String) = TimeUtil.getTsYmdHour(v_ts)
      val step = json.getInteger("step")
      val v_value = json.getString("value")
      val countertype = json.getString("counterType")
      val tags = getTags(json.getString("tags"))
      val v_type = json.getString("type")
      val kafka = json.getString("kafka")
      val nodata = json.getString("nodata")
      result = new Logs(metric,endpoint,ts,step,v_value,countertype,tags,v_type,kafka,nodata,dt,hour,"")
    } catch {
      case e: Exception => {
        System.err.println("Exception is:" + e)
        // System.exit(1)
        val (dt,hour) = TimeUtil.tsToDateHour(kafkaTimestamp)
        result = new Logs("","",new Timestamp(0L),0,"","",tags,"","","",-dt,hour,logs)
      }
    }
    return result
  }
}



相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
SQL 运维 网络安全
【实践】基于Hologres+Flink搭建GitHub实时数据查询
本文介绍了如何利用Flink和Hologres构建GitHub公开事件数据的实时数仓,并对接BI工具实现数据实时分析。流程包括创建VPC、Hologres、OSS、Flink实例,配置Hologres内部表,通过Flink实时写入数据至Hologres,查询实时数据,以及清理资源等步骤。
|
2月前
|
机器学习/深度学习 人工智能 jenkins
软件测试中的自动化与持续集成实践
在快速迭代的软件开发过程中,自动化测试和持续集成(CI)是确保代码质量和加速产品上市的关键。本文探讨了自动化测试的重要性、常见的自动化测试工具以及如何将自动化测试整合到持续集成流程中,以提高软件测试的效率和可靠性。通过案例分析,展示了自动化测试和持续集成在实际项目中的应用效果,并提供了实施建议。
|
13天前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
313 2
探索Flink动态CEP:杭州银行的实战案例
|
26天前
|
DataWorks 数据挖掘 大数据
方案实践测评 | DataWorks集成Hologres构建一站式高性能的OLAP数据分析
DataWorks在任务开发便捷性、任务运行速度、产品使用门槛等方面都表现出色。在数据处理场景方面仍有改进和扩展的空间,通过引入更多的智能技术、扩展数据源支持、优化任务调度和可视化功能以及提升团队协作效率,DataWorks将能够为企业提供更全面、更高效的数据处理解决方案。
|
27天前
|
流计算 开发者
【开发者评测】实时计算Flink场景实践和核心功能体验测评获奖名单公布!
【开发者评测】实时计算Flink场景实践和核心功能体验测评获奖名单公布!
|
2月前
|
运维 数据挖掘 网络安全
场景实践 | 基于Flink+Hologres搭建GitHub实时数据分析
基于Flink和Hologres构建的实时数仓方案在数据开发运维体验、成本与收益等方面均表现出色。同时,该产品还具有与其他产品联动组合的可能性,能够为企业提供更全面、更智能的数据处理和分析解决方案。
|
2月前
|
运维 Devops jenkins
DevOps实践:持续集成与持续部署在现代软件开发中的作用
【10月更文挑战第42天】在快节奏的软件开发世界里,DevOps已经成为一种提升效率、确保质量和加速交付的重要方法。本文将深入探讨DevOps的核心组成部分—持续集成(CI)和持续部署(CD)—并展示它们如何通过自动化流程优化开发周期。我们将从基础概念讲起,逐步过渡到实际操作,最终通过一个简单代码示例来演示这一过程。文章旨在为读者提供清晰的指导,帮助他们理解和实现CI/CD流程,从而在软件开发领域取得竞争优势。
|
2月前
|
Devops jenkins 测试技术
DevOps实践:自动化部署与持续集成的融合之旅
【10月更文挑战第41天】在软件开发的世界中,快速迭代和高效交付是企业竞争力的关键。本文将带你走进DevOps的核心实践——自动化部署与持续集成,揭示如何通过它们提升开发流程的效率与质量。我们将从DevOps的基本理念出发,逐步深入到具体的技术实现,最终展示一个实际的代码示例,让理论与实践相结合,为你的开发旅程提供清晰的指引。
61 4
|
2月前
|
存储 监控 Devops
DevOps实践:持续集成/持续部署(CI/CD)的实战指南
DevOps实践:持续集成/持续部署(CI/CD)的实战指南
|
2月前
|
运维 Devops jenkins
DevOps实践之持续集成与持续交付
【10月更文挑战第32天】在软件开发的快节奏世界中,DevOps已经成为提升效率和质量的关键策略。通过将开发(Development)和运维(Operations)紧密结合,DevOps促进了更快速的软件发布和更高的可靠性。本文将深入探讨DevOps的核心组成部分——持续集成(CI)和持续交付(CD),并展示如何通过实际代码示例实现它们,以帮助团队构建更加高效和稳定的软件发布流程。

热门文章

最新文章