flink 集成iceberg 实践

本文涉及的产品
对象存储 OSS,20GB 3个月
实时计算 Flink 版,5000CU*H 3个月
对象存储 OSS,恶意文件检测 1000次 1年
简介: flink 集成iceberg 实践

flink cdc postgresql sink iceberg with datastream

package com.zjyg.iceberg.flink;
import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.TypeTransformations;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Table;
import org.apache.iceberg.Schema;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.types.Types;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
public class Pg2Iceberg {
    private static final Schema SCHEMA =
            new Schema(
                    Types.NestedField.optional(1, "id", Types.IntegerType.get()),
                    Types.NestedField.optional(2, "name", Types.StringType.get()),
                    Types.NestedField.optional(3, "age", Types.IntegerType.get()),
                    Types.NestedField.optional(4, "sex", Types.StringType.get())
            );
    public static void main(String[] args) throws Exception {
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        checkpointConfig.setCheckpointStorage(parameterTool.get("checkpoint_base_dir")+"/"+parameterTool.get("catalog_name")+"."+parameterTool.get("iceberg_db_name")+"."+parameterTool.get("iceberg_tb_name"));
        checkpointConfig.setCheckpointInterval(60 * 1000L);
        checkpointConfig.setMinPauseBetweenCheckpoints(60 * 1000L);
        checkpointConfig.setTolerableCheckpointFailureNumber(10);
        checkpointConfig.setCheckpointTimeout(120 * 1000L);
        DataStreamSource<RowData> src = env.addSource(getPgCdc(parameterTool));
        icebergSink_hadoop(src, parameterTool);
        env.execute(parameterTool.get("app_name"));
    }
    private static void icebergSink_hadoop(DataStream<RowData> src, ParameterTool tool) {
        Map<String, String> properties = new HashMap<>();
        properties.put("type", "iceberg");
        properties.put("catalog-type", "hadoop");
        properties.put("property-version", "1");
        properties.put("warehouse", tool.get("warehouse_base_dir")+"/"+tool.get("catalog_name"));
        CatalogLoader catalogLoader =
                CatalogLoader.hadoop(tool.get("catalog_name"), new Configuration(), properties);
        icebergSink(src, tool, catalogLoader);
    }
    private static void icebergSink(DataStream input, ParameterTool tool, CatalogLoader loader) {
        Catalog catalog = loader.loadCatalog();
        TableIdentifier identifier =
                TableIdentifier.of(Namespace.of(tool.get("iceberg_db_name")), tool.get("iceberg_tb_name"));
        Table table;
        if (catalog.tableExists(identifier)) {
            table = catalog.loadTable(identifier);
        } else {
            table =
                    catalog.buildTable(identifier, SCHEMA)
                            .withPartitionSpec(PartitionSpec.unpartitioned())
                            .create();
        }
        TableOperations operations = ((BaseTable) table).operations();
        TableMetadata metadata = operations.current();
        operations.commit(metadata, metadata.upgradeToFormatVersion(2));
        TableLoader tableLoader = TableLoader.fromCatalog(loader, identifier);
        FlinkSink.forRowData(input)
                .table(table)
                .tableLoader(tableLoader)
                .equalityFieldColumns(Arrays.asList("id"))
                .writeParallelism(1)
                .build();
    }
    private static SourceFunction getPgCdc(ParameterTool tool) {
        TableSchema schema =
                TableSchema.builder()
                        .add(TableColumn.physical("id", DataTypes.INT()))
                        .add(TableColumn.physical("name", DataTypes.STRING()))
                        .add(TableColumn.physical("age", DataTypes.INT()))
                        .add(TableColumn.physical("sex", DataTypes.STRING()))
                        .build();
        RowType rowType = (RowType) schema.toRowDataType().getLogicalType();
        DebeziumDeserializationSchema deserialer =
                new RowDataDebeziumDeserializeSchema(
                        rowType,
                        createTypeInfo(schema.toRowDataType()),
                        (rowData, rowKind) -> {},
                        ZoneId.of("Asia/Shanghai"));
        Properties properties = new Properties();                
        properties.setProperty("decimal.handling.mode", "string");   //decimal等类型转化为string
        DebeziumSourceFunction sourceFunction =
                PostgreSQLSource.<RowData>builder()
                        .hostname(tool.get("db_host"))
                        .port(Integer.parseInt(tool.get("db_port")))
                        .database(tool.get("db_name"))
                        .schemaList(tool.get("schema_name"))
                        .tableList(tool.get("schema_name") + "." + tool.get("tb_name"))
                        .username(tool.get("db_user"))
                        .password(tool.get("db_user_pwd"))
                        .decodingPluginName("pgoutput")
                        .slotName(parameterTool.get("SLOTNAME4"))                        .deserializer(deserialer)
                        .deserializer(deserialer)
                        .debeziumProperties(properties)
                        .build();
        return sourceFunction;
    }
    private static TypeInformation<RowData> createTypeInfo(DataType producedDataType) {
        final DataType internalDataType =
                DataTypeUtils.transform(producedDataType, TypeTransformations.TO_INTERNAL_CLASS);
        return (TypeInformation<RowData>)
                TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(internalDataType);
    }
}

flink kafka sink iceberg datastream

import java.util.Map;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableMap;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.source.FlinkSource;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.types.Types;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.coomia.datalake.kafka.KafkaUtils;
public class FlinkWriteIcebergTest {
  public static void main(String[] args) throws Exception {
    System.setProperty("HADOOP_USER_NAME", "root");
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.getConfig().setAutoWatermarkInterval(5000L);
    env.setParallelism(1);
    // iceberg catalog identification.
    Configuration conf = new Configuration();
    Catalog catalog = new HadoopCatalog(conf);
    // iceberg table identification.
    TableIdentifier name =
        TableIdentifier.of("default", "iceberg-tb-" + System.currentTimeMillis());
    // iceberg table schema identification.
    Schema schema = new Schema(Types.NestedField.required(1, "uid", Types.StringType.get()),
        Types.NestedField.required(2, "eventTime", Types.LongType.get()),
        Types.NestedField.required(3, "eventid", Types.StringType.get()),
        Types.NestedField.optional(4, "uuid", Types.StringType.get()));
        Types.NestedField.required(5, "ts", Types.TimestampType.withoutZone());
    // iceberg table partition identification.
    // PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("uid", 5).build();
    PartitionSpec spec = PartitionSpec.unpartitioned();
    // identify using orc format as storage.
    Map<String, String> props =
        ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.ORC.name());
    Table table = null;
    // create an iceberg table if not exists, otherwise, load it.
    if (!catalog.tableExists(name))
      table = catalog.createTable(name, schema, spec, props);
    else
      table = catalog.loadTable(name);
    String topic = "arkevent";
    String servers = "kafka:9092";
    FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>(topic,
        new SimpleStringSchema(), KafkaUtils.consumeProps(servers, "flink-consumer"));
    consumer.setStartFromEarliest();
    SingleOutputStreamOperator<RowData> dataStream =
        env.addSource(consumer).map(new MapFunction<String, RowData>() {
          @Override
          public RowData map(String value) throws Exception {
            JSONObject dataJson = JSON.parseObject(value);
            GenericRowData row = new GenericRowData(5);
            row.setField(0, StringData.fromBytes(dataJson.getString("uid").getBytes()));
            row.setField(1, dataJson.getLong("eventTime"));
            row.setField(2, StringData.fromBytes(dataJson.getString("eventid").getBytes()));
            row.setField(3, StringData.fromBytes(dataJson.getString("uuid").getBytes()));
            row.setField(4, TimestampData.fromEpochMillis(dataJson.getLong("eventTime")));
            return row;
          }
        });
    // uid is used for job restart or something when using savepoint.
    dataStream.uid("flink-consumer");
    TableLoader tableLoader = TableLoader.fromHadoopTable(table.location());
    // sink data to iceberg table
    FlinkSink.forRowData(dataStream).table(table).tableLoader(tableLoader).writeParallelism(1)
        .overwrite(true)
        .build();
    //read and write to file.
    DataStream<RowData> batchData = FlinkSource.forRowData().env(env).tableLoader(tableLoader).build();
    batchData.print();
    batchData.writeAsCsv(tableLoader.loadTable().location().concat("/out/out.csv"), WriteMode.OVERWRITE, "\n", " ");
    // Execute the program.
    env.execute("Test Iceberg DataStream");
  }
}

spark kafka sink iceberg

package com.zjyg.iceberg.main
import com.zjyg.iceberg.util.TimeUtil
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.DataTypes
import org.apache.iceberg.spark.SparkCatalog
import org.apache.iceberg.spark.IcebergSpark
import java.sql.Timestamp
import java.util.concurrent.TimeUnit
import com.alibaba.fastjson.JSON
object Kafka2IcebergV3 {
  case class Tags(entrust_prop: String, entrust_status: Int, exchange_type: String, op_entrust_way: String, ywlx: String, ywzl: String)
  case class Logs(metric: String, endpoint: String, ts: Timestamp, step: Int, l_value: String, countertype: String, tags: Tags, l_type:String, kafka: String, nodata: String, dt: Int, hour: String)
  import org.apache.spark.sql.functions._
  def main(args: Array[String]): Unit = {
    if(args.length < 3) {
      System.err.println(
        s"""
           |Usage: Kafka2Iceberg <brokers> <topics> <Seconds>
           |  <brokers> is a list of one or more Kafka brokers
           |  <topics> is a list of one or more kafka topics to consume from
           |  <groupId> is a kafka consumer gorupId
        """.stripMargin)
      System.exit(1)
    }
    val brokers = args(0)
    val topics = args(1)
    val groupId = args(2)
    val checkpointDir = "hdfs:///user/bigdata/sparkStreamingCheckpoint/Kafka2Iceberg_opentsdb-datav3/spark"
    val checkpointDir_iceberg = "hdfs:///user/bigdata/sparkStreamingCheckpoint/Kafka2Iceberg_opentsdb-datav3/icebergtb"
    try {
      val spark = SparkSession.builder()
        .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
        .config("spark.sql.catalog.iceberg.type","hadoop")
        .config("spark.sql.catalog.iceberg",classOf[SparkCatalog].getName)
        .config("spark.sql.catalog.iceberg.warehouse","hdfs:///warehouse/tablespace/external/iceberg")
        .appName(this.getClass.getSimpleName)
        .getOrCreate()
      spark.sparkContext.setCheckpointDir(checkpointDir)
      // IcebergSpark.registerBucketUDF(spark, "iceberg_bucket16", DataTypes.LongType, 16)
      System.setProperty("java.security.auth.login.config", "./kafka_jaas.conf")
      System.setProperty("java.security.krb5.conf", "./krb5.conf")
      import spark.implicits._
      val lines = spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", brokers)
        .option("startingOffsets", "earliest")
        .option("kafka.security.protocol","SASL_PLAINTEXT")
        .option("kafka.sasl.mechanism","GSSAPI")
        .option("kafka.sasl.kerberos.service.name","kafka")
        .option("kafka.group.id",groupId)
        .option("subscribe",topics)
        .load()
        .withColumn("value", $"value".cast("string"))
        .filter($"value".isNotNull)
      println("------PrintSchema lines-------")
      lines.printSchema()
      val data = lines.map(
        row => row.getAs[String]("value").toString()
      ).map(s => getLogs(s)).toDF()
      println("------PrintSchema data-------")
      data.printSchema()
      val tableIdentifier: String = "hdfs:///warehouse/tablespace/external/iceberg/iceberg_test/opentsdb_logsv3"
      val query = data.writeStream.format("iceberg").outputMode("append").trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)).option("path", tableIdentifier).option("checkpointLocation", checkpointDir_iceberg).start()
      query.awaitTermination()
      spark.close()
    } catch {
      case e: Exception => {
        System.err.println("exit. Exception is:" + e)
        System.exit(1)
      }
    }
  }
  def getTags(tags:String):Tags ={
    var result = Tags("",0,"","","","")
    try {
      val tags_columns = tags.split(',')
      val entrust_prop = (tags_columns(0).split('='))(1).toString
      val entrust_status = (tags_columns(1).split('='))(1).toInt
      val exchange_type = (tags_columns(2).split('='))(1).toString
      val op_entrust_way = (tags_columns(3).split('='))(1).toString
      val ywlx = (tags_columns(4).split('='))(1).toString
      val ywzl = (tags_columns(5).split('='))(1).toString
      result = Tags(entrust_prop, entrust_status, exchange_type, op_entrust_way, ywlx, ywzl)
    } catch {
      case e: Exception => {
        System.err.println("Exception is:" + e)
        System.exit(1)
      }
    }
    return result
  }
  def getLogs(logs:String):Logs = {
    var v_tags = new Tags("",0,"","","","")
    var result = new Logs("","",new Timestamp(0L),0,"","",v_tags,"","","",0,"")
    try {
      val json=JSON.parseObject(logs)
      val metric = json.getString("metric")
      val endpoint = json.getString("endpoint")
      val ts_l = json.getInteger("timestamp") * 1000L
      val ts = new Timestamp(ts_l)
      val (dt:Int, hour:String) = TimeUtil.getTsYmdHour(ts_l)
      val step = json.getInteger("step")
      val l_value = json.getString("value")
      val countertype = json.getString("counterType")
      val j_tags = json.getString("tags")
      val tags = getTags(j_tags)
      val l_type = json.getString("type")
      val kafka = json.getString("kafka")
      val nodata = json.getString("nodata")
      result = new Logs(metric,endpoint,ts,step,l_value,countertype,tags,l_type,kafka,nodata,dt,hour)
    } catch {
      case e: Exception => {
        System.err.println("Exception is:" + e)
        System.exit(1)
      }
    }
    return result
  }
}

flink sink iceberg compation

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.actions.Actions;
import java.util.HashMap;
import java.util.Map;
public class FlinkCompaction {
    public static void main(String[] args) throws Exception {
        ParameterTool tool = ParameterTool.fromArgs(args);
        Map<String, String> properties = new HashMap<>();
        properties.put("type", "iceberg");
        properties.put("catalog-type", "hive");
        properties.put("property-version", "1");
        properties.put("warehouse", tool.get("warehouse"));
        properties.put("uri", tool.get("uri"));
        if (tool.has("oss.endpoint")) {
            properties.put("io-impl", "org.apache.iceberg.aliyun.oss.OSSFileIO");
            properties.put("oss.endpoint", tool.get("oss.endpoint"));
            properties.put("oss.access.key.id", tool.get("oss.access.key.id"));
            properties.put("oss.access.key.secret", tool.get("oss.access.key.secret"));
        }
        CatalogLoader loader =
                CatalogLoader.hive(tool.get("catalog"), new Configuration(), properties);
        Catalog catalog = loader.loadCatalog();
        TableIdentifier identifier =
                TableIdentifier.of(Namespace.of(tool.get("db")), tool.get("table"));
        Table table = catalog.loadTable(identifier);
        /**
        * 合并小文件,核心代码
        */
        Actions.forTable(table)
                .rewriteDataFiles()
                .maxParallelism(5)
                .targetSizeInBytes(128 * 1024 * 1024)
                .execute();
        Snapshot snapshot = table.currentSnapshot();
        if (snapshot != null) {
            table.expireSnapshots().expireOlderThan(snapshot.timestampMillis()).commit();
        }
    }
}

spark sink iceberg compaction

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.Actions;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
import java.util.HashMap;
import java.util.Map;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
/**
 * @author: zhushang
 * @create: 2021-04-02 14:30
 */
public class SparkCompaction {
    public static void main(String[] args) {
        TableIdentifier identifier = TableIdentifier.of(Namespace.of("db"), "table");
        Map<String, String> config = new HashMap<>();
        config.put("type", "iceberg");
        config.put("catalog-type", "hive");
        config.put("property-version", "1");
        config.put("warehouse", "warehouse");
        config.put("uri", "thrift://local:9083");
        config.put("io-impl", "org.apache.iceberg.aliyun.oss.OSSFileIO");
        config.put("oss.endpoint", "https://xxx.aliyuncs.com");
        config.put("oss.access.key.id", "key");
        config.put("oss.access.key.secret", "secret");
        sparkSession();
        HiveCatalog hiveCatalog = new HiveCatalog(new Configuration());
        hiveCatalog.initialize("iceberg_hive_catalog", config);
        Table table = hiveCatalog.loadTable(identifier);
        Actions.forTable(table).rewriteDataFiles().targetSizeInBytes(128 * 1024 * 1024).execute();
        Snapshot snapshot = table.currentSnapshot();
        if (snapshot != null) {
            table.expireSnapshots().expireOlderThan(snapshot.timestampMillis()).commit();
        }
    }
    private static void sparkSession() {
        SparkSession.builder()
                .master("local[*]")
                .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
                .config("spark.hadoop." + METASTOREURIS.varname, "localhost:9083")
                .config("spark.sql.warehouse.dir", "warehouse")
                .config("spark.executor.heartbeatInterval", "100000")
                .config("spark.network.timeoutInterval", "100000")
                .enableHiveSupport()
                .getOrCreate();
    }
}

spark Opentsdb sink iceberg

package com.zjyg.iceberg.spark
import java.sql.Timestamp
import java.util.concurrent.TimeUnit
import com.alibaba.fastjson.JSON
import com.zjyg.iceberg.util.TimeUtil
import org.apache.iceberg.spark.SparkCatalog
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
object OpentsdbLog2Iceberg {
  case class Tags(entrust_prop: String, entrust_status: Int, exchange_type: String, op_entrust_way: String, ywlx: String, ywzl: String)
  case class Logs(metric: String, endpoint: String, ts: Timestamp, step: Int, l_value: String, countertype: String, tags: Tags, l_type:String, kafka: String, nodata: String, dt: Int, hour: String,row_info: String)
  def main(args: Array[String]): Unit = {
    if(args.length < 9) {
      System.err.println(
        s"""
           |Usage: Kafka2IcebergV2 <brokers> <topics> <groupId> <checkpointBaseDir> ...
           |  <1. brokers> is a list of one or more Kafka brokers
           |  <2. topics> is a list of one or more kafka topics to consume from
           |  <3. groupId> is a kafka consumer gorupId
           |  <4. checkpointBaseDir> is a checkpoint dir for job
           |  <5. warehouseBaseDir> is a warehouse base dir for iceberg
           |  <6. catalogName> is a catalogName for iceberg
           |  <7. dbName> is a DatabaseName for iceberg
           |  <8. tbName> is a TableName for iceberg
           |  <9. kafkaServiceName> is a Kafka Service Name
        """.stripMargin)
      System.exit(1)
    }
    val brokers = args(0)
    val topics = args(1)
    val groupId = args(2)
    val checkpointBaseDir = args(3) // hdfs:///user/bigdata/sparkStreamingCheckpoint
    val warehouseBaseDir = args(4)  // hdfs:///warehouse/tablespace/external
    val catalogName = args(5)       // iceberg
    val dbName = args(6)            // iceberg_test
    val tbName = args(7)            // opentsdb_logs
    val kafkaServiceName = args(8)  // hdp-kafka
    val checkpointDir_spark = s"${checkpointBaseDir}/${catalogName}.${dbName}.${tbName}/spark"
    val checkpointDir_iceberg = s"${checkpointBaseDir}/${catalogName}.${dbName}.${tbName}/iceberg"
    try {
      val spark = SparkSession.builder()
        .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
        .config(s"spark.sql.catalog.${catalogName}.type","hadoop")
        .config(s"spark.sql.catalog.${catalogName}",classOf[SparkCatalog].getName)
        .config(s"spark.sql.catalog.${catalogName}.warehouse",s"${warehouseBaseDir}/${catalogName}")
        .getOrCreate()
      spark.sparkContext.setCheckpointDir(checkpointDir_spark)
      System.setProperty("java.security.auth.login.config", "./kafka_jaas.conf")
      System.setProperty("java.security.krb5.conf", "./krb5.conf")
      import spark.implicits._
      val lines = spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", brokers)
        .option("startingOffsets", "earliest")
        .option("failOnDataLoss","false")
        .option("kafka.security.protocol","SASL_PLAINTEXT")
        .option("kafka.sasl.mechanism","GSSAPI")
        .option("kafka.sasl.kerberos.service.name",s"${kafkaServiceName}")
        .option("kafka.group.id",groupId)
        .option("subscribe",topics)
        .load()
        .withColumn("value", $"value".cast("string"))
        .filter($"value".isNotNull)
      println("------PrintSchema lines-------")
      lines.printSchema()
      val data = lines.map(
        row => getLogs(row.getAs[Timestamp]("timestamp"),row.getAs[String]("value").toString())
      ).toDF().repartition($"dt", $"hour")
      println("------PrintSchema data-------")
      data.printSchema()
      val tableIdentifier: String = s"${warehouseBaseDir}/${catalogName}/${dbName}/${tbName}"
      val query = data.writeStream
        .format("iceberg")
        .partitionBy("dt","hour")
        .outputMode("append")
        .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
        .option("path", tableIdentifier)
        .option("fanout-enabled", "true")
        .option("checkpointLocation", checkpointDir_iceberg)
        .start()
      query.awaitTermination()
      spark.close()
    } catch {
      case e: Exception => {
        System.err.println("exit. Exception is:" + e)
        System.exit(1)
      }
    }
  }
  def getTags(tags:String):Tags ={
    var result = Tags("",0,"","","","")
    try {
      val tags_columns = tags.split(',')
      val entrust_prop = (tags_columns(0).split('='))(1).toString
      val entrust_status = (tags_columns(1).split('='))(1).toInt
      val exchange_type = (tags_columns(2).split('='))(1).toString
      val op_entrust_way = (tags_columns(3).split('='))(1).toString
      val ywlx = (tags_columns(4).split('='))(1).toString
      val ywzl = (tags_columns(5).split('='))(1).toString
      result = Tags(entrust_prop, entrust_status, exchange_type, op_entrust_way, ywlx, ywzl)
    } catch {
      case e: Exception => {
        System.err.println("Exception is:" + e)
        // System.exit(1)
      }
    }
    return result
  }
  def getLogs(kafkaTimestamp:Timestamp,logs:String):Logs = {
    var tags = new Tags("",0,"","","","")
    var result = new Logs("","",new Timestamp(0L),0,"","",tags,"","","",19700101,"00","")
    try {
      val json=JSON.parseObject(logs)
      val metric = json.getString("metric")
      val endpoint = json.getString("endpoint")
      val v_ts = json.getInteger("timestamp") * 1000L
      val ts = new Timestamp(v_ts)
      val (dt:Int, hour:String) = TimeUtil.getTsYmdHour(v_ts)
      val step = json.getInteger("step")
      val v_value = json.getString("value")
      val countertype = json.getString("counterType")
      val tags = getTags(json.getString("tags"))
      val v_type = json.getString("type")
      val kafka = json.getString("kafka")
      val nodata = json.getString("nodata")
      result = new Logs(metric,endpoint,ts,step,v_value,countertype,tags,v_type,kafka,nodata,dt,hour,"")
    } catch {
      case e: Exception => {
        System.err.println("Exception is:" + e)
        // System.exit(1)
        val (dt,hour) = TimeUtil.tsToDateHour(kafkaTimestamp)
        result = new Logs("","",new Timestamp(0L),0,"","",tags,"","","",-dt,hour,logs)
      }
    }
    return result
  }
}



相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3天前
|
监控 Devops 测试技术
DevOps实践:持续集成与持续部署(CI/CD)在现代软件开发中的作用
【6月更文挑战第24天】本文深入探讨了持续集成(Continuous Integration,简称CI)和持续部署(Continuous Deployment,简称CD)在现代软件开发生命周期中的核心作用。通过阐述CI/CD的概念、优势以及实施策略,文章旨在为读者提供一套完整的理论框架和实践指南,以促进软件开发流程的高效性和产品质量的提升。
|
10天前
|
运维 监控 Devops
DevOps实践:持续集成与持续部署(CI/CD)的精髓
【6月更文挑战第17天】本文将深入探讨DevOps文化中的核心实践——持续集成(CI)和持续部署(CD)。我们将从理论出发,逐步过渡到实际操作,介绍如何搭建一个高效的CI/CD流程。文章将涵盖工具选择、流程设计、自动化测试以及监控和反馈机制的建立。通过具体案例分析,揭示成功实施CI/CD的关键因素,并探讨如何在组织内推广这一实践以促进开发和运维之间的协作。
|
2天前
|
搜索推荐 Java 数据库
Java中的ElasticSearch集成与实践
Java中的ElasticSearch集成与实践
|
29天前
|
人工智能 自然语言处理 安全
构建未来:AI驱动的自适应网络安全防御系统提升软件测试效率:自动化与持续集成的实践之路
【5月更文挑战第30天】 在数字化时代,网络安全已成为维护信息完整性、保障用户隐私和企业持续运营的关键。传统的安全防御手段,如防火墙和入侵检测系统,面对日益复杂的网络攻击已显得力不从心。本文提出了一种基于人工智能(AI)技术的自适应网络安全防御系统,该系统能够实时分析网络流量,自动识别潜在威胁,并动态调整防御策略以应对未知攻击。通过深度学习算法和自然语言处理技术的结合,系统不仅能够提高检测速度和准确性,还能自主学习和适应新型攻击模式,从而显著提升网络安全防御的效率和智能化水平。 【5月更文挑战第30天】 在快速迭代的软件开发周期中,传统的手动测试方法已不再适应现代高效交付的要求。本文探讨了如
|
29天前
|
运维 Kubernetes 持续交付
构建高效自动化运维体系:基于容器技术的持续集成与持续部署实践
【5月更文挑战第30天】随着云计算和微服务架构的兴起,传统的运维模式已难以满足快速迭代和高可用性的需求。本文探讨了如何利用容器技术构建一个高效、可靠的自动化运维体系,重点分析了Docker和Kubernetes在这一过程中的关键作用,并提出了一套基于这些技术的持续集成(CI)与持续部署(CD)解决方案。通过实际案例和操作步骤的详细阐述,文章为读者提供了一种实现自动化运维的有效途径,同时对未来运维技术的发展趋势进行了展望。
|
1月前
|
SQL 数据采集 监控
14个Flink SQL性能优化实践分享
本文档详细列举了Apache Flink SQL的性能调优策略。主要关注点包括:增加数据源读取并行度、优化状态管理(如使用RocksDB状态后端并设置清理策略)、调整窗口操作以减少延迟、避免类型转换和不合理的JOIN操作、使用广播JOIN、注意SQL查询复杂度、控制并发度和资源调度、自定义源码实现、执行计划分析、异常检测与恢复、监控报警、数据预处理与清洗、利用高级特性(如容器化部署和UDF)以及数据压缩与序列化。此外,文档还强调了任务并行化、网络传输优化、系统配置调优、数据倾斜处理和任务调度策略。通过这些方法,可以有效解决性能问题,提升Flink SQL的运行效率。
|
1月前
|
监控 测试技术 持续交付
构建高效持续集成系统的策略与实践
【5月更文挑战第28天】 在快速迭代的软件开发过程中,持续集成(CI)系统是确保代码质量和加速交付的关键。本文将探讨构建一个高效、可靠的CI系统的关键策略,并通过实际案例分析如何实现这些策略。我们将讨论自动化测试、容器化部署、监控和日志记录等主题,以及它们如何共同作用以提升开发流程的效率和稳定性。通过实施这些策略,团队可以显著减少集成问题,并缩短从开发到部署的时间。
32 2
|
1月前
|
运维 Kubernetes jenkins
构建高效自动化运维系统:基于容器技术的持续集成与持续部署实践
【5月更文挑战第28天】 在现代软件工程实践中,持续集成(CI)和持续部署(CD)已成为提升开发效率、确保产品质量的关键环节。本文旨在探讨如何利用容器技术构建一套高效、可靠的自动化运维系统,以支持敏捷开发流程和微服务架构。通过对Docker容器及Kubernetes集群管理工具的深入分析,我们提出了一种结合Jenkins实现自动化测试、构建与部署的完整解决方案,并讨论了其在现实业务中的应用效果和面临的挑战。
|
1月前
|
运维 监控 安全
构建高效自动化运维体系:基于容器技术的持续集成与持续部署实践
【5月更文挑战第28天】在现代IT基础设施管理中,自动化运维已成为提升效率、确保稳定性的关键手段。本文将探讨如何利用容器技术实现软件的持续集成(CI)与持续部署(CD),从而构建一套高效的自动化运维体系。通过分析容器化的优势和挑战,结合DevOps文化,我们提出一个实用的框架,以帮助企业快速响应市场变化,缩短产品上市时间,同时保障服务的高可用性。
|
1月前
|
机器学习/深度学习 敏捷开发 测试技术
深入探索软件测试中的持续集成与持续部署(CI/CD)实践利用机器学习提升网络安全防御效能
【5月更文挑战第27天】 在现代软件开发的快节奏环境中,持续集成(Continuous Integration, CI)和持续部署(Continuous Deployment, CD)已成为确保产品质量和加快交付速度的关键策略。本文将深入探讨CI/CD在软件测试中的应用,分析其对提高自动化测试效率、缩短反馈周期以及优化发布流程的重要性。通过实际案例研究,我们揭示了成功实施CI/CD的最佳实践,并讨论了面临的挑战及其解决方案。

热门文章

最新文章