flink 集成iceberg 实践

本文涉及的产品
对象存储 OSS,20GB 3个月
实时计算 Flink 版,5000CU*H 3个月
对象存储 OSS,内容安全 1000次 1年
简介: flink 集成iceberg 实践

flink cdc postgresql sink iceberg with datastream

package com.zjyg.iceberg.flink;
import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.TypeTransformations;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Table;
import org.apache.iceberg.Schema;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.types.Types;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
public class Pg2Iceberg {
    private static final Schema SCHEMA =
            new Schema(
                    Types.NestedField.optional(1, "id", Types.IntegerType.get()),
                    Types.NestedField.optional(2, "name", Types.StringType.get()),
                    Types.NestedField.optional(3, "age", Types.IntegerType.get()),
                    Types.NestedField.optional(4, "sex", Types.StringType.get())
            );
    public static void main(String[] args) throws Exception {
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        checkpointConfig.setCheckpointStorage(parameterTool.get("checkpoint_base_dir")+"/"+parameterTool.get("catalog_name")+"."+parameterTool.get("iceberg_db_name")+"."+parameterTool.get("iceberg_tb_name"));
        checkpointConfig.setCheckpointInterval(60 * 1000L);
        checkpointConfig.setMinPauseBetweenCheckpoints(60 * 1000L);
        checkpointConfig.setTolerableCheckpointFailureNumber(10);
        checkpointConfig.setCheckpointTimeout(120 * 1000L);
        DataStreamSource<RowData> src = env.addSource(getPgCdc(parameterTool));
        icebergSink_hadoop(src, parameterTool);
        env.execute(parameterTool.get("app_name"));
    }
    private static void icebergSink_hadoop(DataStream<RowData> src, ParameterTool tool) {
        Map<String, String> properties = new HashMap<>();
        properties.put("type", "iceberg");
        properties.put("catalog-type", "hadoop");
        properties.put("property-version", "1");
        properties.put("warehouse", tool.get("warehouse_base_dir")+"/"+tool.get("catalog_name"));
        CatalogLoader catalogLoader =
                CatalogLoader.hadoop(tool.get("catalog_name"), new Configuration(), properties);
        icebergSink(src, tool, catalogLoader);
    }
    private static void icebergSink(DataStream input, ParameterTool tool, CatalogLoader loader) {
        Catalog catalog = loader.loadCatalog();
        TableIdentifier identifier =
                TableIdentifier.of(Namespace.of(tool.get("iceberg_db_name")), tool.get("iceberg_tb_name"));
        Table table;
        if (catalog.tableExists(identifier)) {
            table = catalog.loadTable(identifier);
        } else {
            table =
                    catalog.buildTable(identifier, SCHEMA)
                            .withPartitionSpec(PartitionSpec.unpartitioned())
                            .create();
        }
        TableOperations operations = ((BaseTable) table).operations();
        TableMetadata metadata = operations.current();
        operations.commit(metadata, metadata.upgradeToFormatVersion(2));
        TableLoader tableLoader = TableLoader.fromCatalog(loader, identifier);
        FlinkSink.forRowData(input)
                .table(table)
                .tableLoader(tableLoader)
                .equalityFieldColumns(Arrays.asList("id"))
                .writeParallelism(1)
                .build();
    }
    private static SourceFunction getPgCdc(ParameterTool tool) {
        TableSchema schema =
                TableSchema.builder()
                        .add(TableColumn.physical("id", DataTypes.INT()))
                        .add(TableColumn.physical("name", DataTypes.STRING()))
                        .add(TableColumn.physical("age", DataTypes.INT()))
                        .add(TableColumn.physical("sex", DataTypes.STRING()))
                        .build();
        RowType rowType = (RowType) schema.toRowDataType().getLogicalType();
        DebeziumDeserializationSchema deserialer =
                new RowDataDebeziumDeserializeSchema(
                        rowType,
                        createTypeInfo(schema.toRowDataType()),
                        (rowData, rowKind) -> {},
                        ZoneId.of("Asia/Shanghai"));
        Properties properties = new Properties();                
        properties.setProperty("decimal.handling.mode", "string");   //decimal等类型转化为string
        DebeziumSourceFunction sourceFunction =
                PostgreSQLSource.<RowData>builder()
                        .hostname(tool.get("db_host"))
                        .port(Integer.parseInt(tool.get("db_port")))
                        .database(tool.get("db_name"))
                        .schemaList(tool.get("schema_name"))
                        .tableList(tool.get("schema_name") + "." + tool.get("tb_name"))
                        .username(tool.get("db_user"))
                        .password(tool.get("db_user_pwd"))
                        .decodingPluginName("pgoutput")
                        .slotName(parameterTool.get("SLOTNAME4"))                        .deserializer(deserialer)
                        .deserializer(deserialer)
                        .debeziumProperties(properties)
                        .build();
        return sourceFunction;
    }
    private static TypeInformation<RowData> createTypeInfo(DataType producedDataType) {
        final DataType internalDataType =
                DataTypeUtils.transform(producedDataType, TypeTransformations.TO_INTERNAL_CLASS);
        return (TypeInformation<RowData>)
                TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(internalDataType);
    }
}

flink kafka sink iceberg datastream

import java.util.Map;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableMap;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.source.FlinkSource;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.types.Types;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.coomia.datalake.kafka.KafkaUtils;
public class FlinkWriteIcebergTest {
  public static void main(String[] args) throws Exception {
    System.setProperty("HADOOP_USER_NAME", "root");
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.getConfig().setAutoWatermarkInterval(5000L);
    env.setParallelism(1);
    // iceberg catalog identification.
    Configuration conf = new Configuration();
    Catalog catalog = new HadoopCatalog(conf);
    // iceberg table identification.
    TableIdentifier name =
        TableIdentifier.of("default", "iceberg-tb-" + System.currentTimeMillis());
    // iceberg table schema identification.
    Schema schema = new Schema(Types.NestedField.required(1, "uid", Types.StringType.get()),
        Types.NestedField.required(2, "eventTime", Types.LongType.get()),
        Types.NestedField.required(3, "eventid", Types.StringType.get()),
        Types.NestedField.optional(4, "uuid", Types.StringType.get()));
        Types.NestedField.required(5, "ts", Types.TimestampType.withoutZone());
    // iceberg table partition identification.
    // PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("uid", 5).build();
    PartitionSpec spec = PartitionSpec.unpartitioned();
    // identify using orc format as storage.
    Map<String, String> props =
        ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.ORC.name());
    Table table = null;
    // create an iceberg table if not exists, otherwise, load it.
    if (!catalog.tableExists(name))
      table = catalog.createTable(name, schema, spec, props);
    else
      table = catalog.loadTable(name);
    String topic = "arkevent";
    String servers = "kafka:9092";
    FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>(topic,
        new SimpleStringSchema(), KafkaUtils.consumeProps(servers, "flink-consumer"));
    consumer.setStartFromEarliest();
    SingleOutputStreamOperator<RowData> dataStream =
        env.addSource(consumer).map(new MapFunction<String, RowData>() {
          @Override
          public RowData map(String value) throws Exception {
            JSONObject dataJson = JSON.parseObject(value);
            GenericRowData row = new GenericRowData(5);
            row.setField(0, StringData.fromBytes(dataJson.getString("uid").getBytes()));
            row.setField(1, dataJson.getLong("eventTime"));
            row.setField(2, StringData.fromBytes(dataJson.getString("eventid").getBytes()));
            row.setField(3, StringData.fromBytes(dataJson.getString("uuid").getBytes()));
            row.setField(4, TimestampData.fromEpochMillis(dataJson.getLong("eventTime")));
            return row;
          }
        });
    // uid is used for job restart or something when using savepoint.
    dataStream.uid("flink-consumer");
    TableLoader tableLoader = TableLoader.fromHadoopTable(table.location());
    // sink data to iceberg table
    FlinkSink.forRowData(dataStream).table(table).tableLoader(tableLoader).writeParallelism(1)
        .overwrite(true)
        .build();
    //read and write to file.
    DataStream<RowData> batchData = FlinkSource.forRowData().env(env).tableLoader(tableLoader).build();
    batchData.print();
    batchData.writeAsCsv(tableLoader.loadTable().location().concat("/out/out.csv"), WriteMode.OVERWRITE, "\n", " ");
    // Execute the program.
    env.execute("Test Iceberg DataStream");
  }
}

spark kafka sink iceberg

package com.zjyg.iceberg.main
import com.zjyg.iceberg.util.TimeUtil
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.DataTypes
import org.apache.iceberg.spark.SparkCatalog
import org.apache.iceberg.spark.IcebergSpark
import java.sql.Timestamp
import java.util.concurrent.TimeUnit
import com.alibaba.fastjson.JSON
object Kafka2IcebergV3 {
  case class Tags(entrust_prop: String, entrust_status: Int, exchange_type: String, op_entrust_way: String, ywlx: String, ywzl: String)
  case class Logs(metric: String, endpoint: String, ts: Timestamp, step: Int, l_value: String, countertype: String, tags: Tags, l_type:String, kafka: String, nodata: String, dt: Int, hour: String)
  import org.apache.spark.sql.functions._
  def main(args: Array[String]): Unit = {
    if(args.length < 3) {
      System.err.println(
        s"""
           |Usage: Kafka2Iceberg <brokers> <topics> <Seconds>
           |  <brokers> is a list of one or more Kafka brokers
           |  <topics> is a list of one or more kafka topics to consume from
           |  <groupId> is a kafka consumer gorupId
        """.stripMargin)
      System.exit(1)
    }
    val brokers = args(0)
    val topics = args(1)
    val groupId = args(2)
    val checkpointDir = "hdfs:///user/bigdata/sparkStreamingCheckpoint/Kafka2Iceberg_opentsdb-datav3/spark"
    val checkpointDir_iceberg = "hdfs:///user/bigdata/sparkStreamingCheckpoint/Kafka2Iceberg_opentsdb-datav3/icebergtb"
    try {
      val spark = SparkSession.builder()
        .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
        .config("spark.sql.catalog.iceberg.type","hadoop")
        .config("spark.sql.catalog.iceberg",classOf[SparkCatalog].getName)
        .config("spark.sql.catalog.iceberg.warehouse","hdfs:///warehouse/tablespace/external/iceberg")
        .appName(this.getClass.getSimpleName)
        .getOrCreate()
      spark.sparkContext.setCheckpointDir(checkpointDir)
      // IcebergSpark.registerBucketUDF(spark, "iceberg_bucket16", DataTypes.LongType, 16)
      System.setProperty("java.security.auth.login.config", "./kafka_jaas.conf")
      System.setProperty("java.security.krb5.conf", "./krb5.conf")
      import spark.implicits._
      val lines = spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", brokers)
        .option("startingOffsets", "earliest")
        .option("kafka.security.protocol","SASL_PLAINTEXT")
        .option("kafka.sasl.mechanism","GSSAPI")
        .option("kafka.sasl.kerberos.service.name","kafka")
        .option("kafka.group.id",groupId)
        .option("subscribe",topics)
        .load()
        .withColumn("value", $"value".cast("string"))
        .filter($"value".isNotNull)
      println("------PrintSchema lines-------")
      lines.printSchema()
      val data = lines.map(
        row => row.getAs[String]("value").toString()
      ).map(s => getLogs(s)).toDF()
      println("------PrintSchema data-------")
      data.printSchema()
      val tableIdentifier: String = "hdfs:///warehouse/tablespace/external/iceberg/iceberg_test/opentsdb_logsv3"
      val query = data.writeStream.format("iceberg").outputMode("append").trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)).option("path", tableIdentifier).option("checkpointLocation", checkpointDir_iceberg).start()
      query.awaitTermination()
      spark.close()
    } catch {
      case e: Exception => {
        System.err.println("exit. Exception is:" + e)
        System.exit(1)
      }
    }
  }
  def getTags(tags:String):Tags ={
    var result = Tags("",0,"","","","")
    try {
      val tags_columns = tags.split(',')
      val entrust_prop = (tags_columns(0).split('='))(1).toString
      val entrust_status = (tags_columns(1).split('='))(1).toInt
      val exchange_type = (tags_columns(2).split('='))(1).toString
      val op_entrust_way = (tags_columns(3).split('='))(1).toString
      val ywlx = (tags_columns(4).split('='))(1).toString
      val ywzl = (tags_columns(5).split('='))(1).toString
      result = Tags(entrust_prop, entrust_status, exchange_type, op_entrust_way, ywlx, ywzl)
    } catch {
      case e: Exception => {
        System.err.println("Exception is:" + e)
        System.exit(1)
      }
    }
    return result
  }
  def getLogs(logs:String):Logs = {
    var v_tags = new Tags("",0,"","","","")
    var result = new Logs("","",new Timestamp(0L),0,"","",v_tags,"","","",0,"")
    try {
      val json=JSON.parseObject(logs)
      val metric = json.getString("metric")
      val endpoint = json.getString("endpoint")
      val ts_l = json.getInteger("timestamp") * 1000L
      val ts = new Timestamp(ts_l)
      val (dt:Int, hour:String) = TimeUtil.getTsYmdHour(ts_l)
      val step = json.getInteger("step")
      val l_value = json.getString("value")
      val countertype = json.getString("counterType")
      val j_tags = json.getString("tags")
      val tags = getTags(j_tags)
      val l_type = json.getString("type")
      val kafka = json.getString("kafka")
      val nodata = json.getString("nodata")
      result = new Logs(metric,endpoint,ts,step,l_value,countertype,tags,l_type,kafka,nodata,dt,hour)
    } catch {
      case e: Exception => {
        System.err.println("Exception is:" + e)
        System.exit(1)
      }
    }
    return result
  }
}

flink sink iceberg compation

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.actions.Actions;
import java.util.HashMap;
import java.util.Map;
public class FlinkCompaction {
    public static void main(String[] args) throws Exception {
        ParameterTool tool = ParameterTool.fromArgs(args);
        Map<String, String> properties = new HashMap<>();
        properties.put("type", "iceberg");
        properties.put("catalog-type", "hive");
        properties.put("property-version", "1");
        properties.put("warehouse", tool.get("warehouse"));
        properties.put("uri", tool.get("uri"));
        if (tool.has("oss.endpoint")) {
            properties.put("io-impl", "org.apache.iceberg.aliyun.oss.OSSFileIO");
            properties.put("oss.endpoint", tool.get("oss.endpoint"));
            properties.put("oss.access.key.id", tool.get("oss.access.key.id"));
            properties.put("oss.access.key.secret", tool.get("oss.access.key.secret"));
        }
        CatalogLoader loader =
                CatalogLoader.hive(tool.get("catalog"), new Configuration(), properties);
        Catalog catalog = loader.loadCatalog();
        TableIdentifier identifier =
                TableIdentifier.of(Namespace.of(tool.get("db")), tool.get("table"));
        Table table = catalog.loadTable(identifier);
        /**
        * 合并小文件,核心代码
        */
        Actions.forTable(table)
                .rewriteDataFiles()
                .maxParallelism(5)
                .targetSizeInBytes(128 * 1024 * 1024)
                .execute();
        Snapshot snapshot = table.currentSnapshot();
        if (snapshot != null) {
            table.expireSnapshots().expireOlderThan(snapshot.timestampMillis()).commit();
        }
    }
}

spark sink iceberg compaction

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.Actions;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
import java.util.HashMap;
import java.util.Map;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
/**
 * @author: zhushang
 * @create: 2021-04-02 14:30
 */
public class SparkCompaction {
    public static void main(String[] args) {
        TableIdentifier identifier = TableIdentifier.of(Namespace.of("db"), "table");
        Map<String, String> config = new HashMap<>();
        config.put("type", "iceberg");
        config.put("catalog-type", "hive");
        config.put("property-version", "1");
        config.put("warehouse", "warehouse");
        config.put("uri", "thrift://local:9083");
        config.put("io-impl", "org.apache.iceberg.aliyun.oss.OSSFileIO");
        config.put("oss.endpoint", "https://xxx.aliyuncs.com");
        config.put("oss.access.key.id", "key");
        config.put("oss.access.key.secret", "secret");
        sparkSession();
        HiveCatalog hiveCatalog = new HiveCatalog(new Configuration());
        hiveCatalog.initialize("iceberg_hive_catalog", config);
        Table table = hiveCatalog.loadTable(identifier);
        Actions.forTable(table).rewriteDataFiles().targetSizeInBytes(128 * 1024 * 1024).execute();
        Snapshot snapshot = table.currentSnapshot();
        if (snapshot != null) {
            table.expireSnapshots().expireOlderThan(snapshot.timestampMillis()).commit();
        }
    }
    private static void sparkSession() {
        SparkSession.builder()
                .master("local[*]")
                .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
                .config("spark.hadoop." + METASTOREURIS.varname, "localhost:9083")
                .config("spark.sql.warehouse.dir", "warehouse")
                .config("spark.executor.heartbeatInterval", "100000")
                .config("spark.network.timeoutInterval", "100000")
                .enableHiveSupport()
                .getOrCreate();
    }
}

spark Opentsdb sink iceberg

package com.zjyg.iceberg.spark
import java.sql.Timestamp
import java.util.concurrent.TimeUnit
import com.alibaba.fastjson.JSON
import com.zjyg.iceberg.util.TimeUtil
import org.apache.iceberg.spark.SparkCatalog
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
object OpentsdbLog2Iceberg {
  case class Tags(entrust_prop: String, entrust_status: Int, exchange_type: String, op_entrust_way: String, ywlx: String, ywzl: String)
  case class Logs(metric: String, endpoint: String, ts: Timestamp, step: Int, l_value: String, countertype: String, tags: Tags, l_type:String, kafka: String, nodata: String, dt: Int, hour: String,row_info: String)
  def main(args: Array[String]): Unit = {
    if(args.length < 9) {
      System.err.println(
        s"""
           |Usage: Kafka2IcebergV2 <brokers> <topics> <groupId> <checkpointBaseDir> ...
           |  <1. brokers> is a list of one or more Kafka brokers
           |  <2. topics> is a list of one or more kafka topics to consume from
           |  <3. groupId> is a kafka consumer gorupId
           |  <4. checkpointBaseDir> is a checkpoint dir for job
           |  <5. warehouseBaseDir> is a warehouse base dir for iceberg
           |  <6. catalogName> is a catalogName for iceberg
           |  <7. dbName> is a DatabaseName for iceberg
           |  <8. tbName> is a TableName for iceberg
           |  <9. kafkaServiceName> is a Kafka Service Name
        """.stripMargin)
      System.exit(1)
    }
    val brokers = args(0)
    val topics = args(1)
    val groupId = args(2)
    val checkpointBaseDir = args(3) // hdfs:///user/bigdata/sparkStreamingCheckpoint
    val warehouseBaseDir = args(4)  // hdfs:///warehouse/tablespace/external
    val catalogName = args(5)       // iceberg
    val dbName = args(6)            // iceberg_test
    val tbName = args(7)            // opentsdb_logs
    val kafkaServiceName = args(8)  // hdp-kafka
    val checkpointDir_spark = s"${checkpointBaseDir}/${catalogName}.${dbName}.${tbName}/spark"
    val checkpointDir_iceberg = s"${checkpointBaseDir}/${catalogName}.${dbName}.${tbName}/iceberg"
    try {
      val spark = SparkSession.builder()
        .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
        .config(s"spark.sql.catalog.${catalogName}.type","hadoop")
        .config(s"spark.sql.catalog.${catalogName}",classOf[SparkCatalog].getName)
        .config(s"spark.sql.catalog.${catalogName}.warehouse",s"${warehouseBaseDir}/${catalogName}")
        .getOrCreate()
      spark.sparkContext.setCheckpointDir(checkpointDir_spark)
      System.setProperty("java.security.auth.login.config", "./kafka_jaas.conf")
      System.setProperty("java.security.krb5.conf", "./krb5.conf")
      import spark.implicits._
      val lines = spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", brokers)
        .option("startingOffsets", "earliest")
        .option("failOnDataLoss","false")
        .option("kafka.security.protocol","SASL_PLAINTEXT")
        .option("kafka.sasl.mechanism","GSSAPI")
        .option("kafka.sasl.kerberos.service.name",s"${kafkaServiceName}")
        .option("kafka.group.id",groupId)
        .option("subscribe",topics)
        .load()
        .withColumn("value", $"value".cast("string"))
        .filter($"value".isNotNull)
      println("------PrintSchema lines-------")
      lines.printSchema()
      val data = lines.map(
        row => getLogs(row.getAs[Timestamp]("timestamp"),row.getAs[String]("value").toString())
      ).toDF().repartition($"dt", $"hour")
      println("------PrintSchema data-------")
      data.printSchema()
      val tableIdentifier: String = s"${warehouseBaseDir}/${catalogName}/${dbName}/${tbName}"
      val query = data.writeStream
        .format("iceberg")
        .partitionBy("dt","hour")
        .outputMode("append")
        .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
        .option("path", tableIdentifier)
        .option("fanout-enabled", "true")
        .option("checkpointLocation", checkpointDir_iceberg)
        .start()
      query.awaitTermination()
      spark.close()
    } catch {
      case e: Exception => {
        System.err.println("exit. Exception is:" + e)
        System.exit(1)
      }
    }
  }
  def getTags(tags:String):Tags ={
    var result = Tags("",0,"","","","")
    try {
      val tags_columns = tags.split(',')
      val entrust_prop = (tags_columns(0).split('='))(1).toString
      val entrust_status = (tags_columns(1).split('='))(1).toInt
      val exchange_type = (tags_columns(2).split('='))(1).toString
      val op_entrust_way = (tags_columns(3).split('='))(1).toString
      val ywlx = (tags_columns(4).split('='))(1).toString
      val ywzl = (tags_columns(5).split('='))(1).toString
      result = Tags(entrust_prop, entrust_status, exchange_type, op_entrust_way, ywlx, ywzl)
    } catch {
      case e: Exception => {
        System.err.println("Exception is:" + e)
        // System.exit(1)
      }
    }
    return result
  }
  def getLogs(kafkaTimestamp:Timestamp,logs:String):Logs = {
    var tags = new Tags("",0,"","","","")
    var result = new Logs("","",new Timestamp(0L),0,"","",tags,"","","",19700101,"00","")
    try {
      val json=JSON.parseObject(logs)
      val metric = json.getString("metric")
      val endpoint = json.getString("endpoint")
      val v_ts = json.getInteger("timestamp") * 1000L
      val ts = new Timestamp(v_ts)
      val (dt:Int, hour:String) = TimeUtil.getTsYmdHour(v_ts)
      val step = json.getInteger("step")
      val v_value = json.getString("value")
      val countertype = json.getString("counterType")
      val tags = getTags(json.getString("tags"))
      val v_type = json.getString("type")
      val kafka = json.getString("kafka")
      val nodata = json.getString("nodata")
      result = new Logs(metric,endpoint,ts,step,v_value,countertype,tags,v_type,kafka,nodata,dt,hour,"")
    } catch {
      case e: Exception => {
        System.err.println("Exception is:" + e)
        // System.exit(1)
        val (dt,hour) = TimeUtil.tsToDateHour(kafkaTimestamp)
        result = new Logs("","",new Timestamp(0L),0,"","",tags,"","","",-dt,hour,logs)
      }
    }
    return result
  }
}



相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
29天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
85 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
16天前
|
机器学习/深度学习 人工智能 jenkins
软件测试中的自动化与持续集成实践
在快速迭代的软件开发过程中,自动化测试和持续集成(CI)是确保代码质量和加速产品上市的关键。本文探讨了自动化测试的重要性、常见的自动化测试工具以及如何将自动化测试整合到持续集成流程中,以提高软件测试的效率和可靠性。通过案例分析,展示了自动化测试和持续集成在实际项目中的应用效果,并提供了实施建议。
|
28天前
|
jenkins Devops Java
DevOps实践:Jenkins在持续集成与持续部署中的价值
【10月更文挑战第27天】在快速发展的软件开发领域,DevOps实践日益重要。Jenkins作为一款流行的开源自动化服务器,在持续集成(CI)和持续部署(CD)中扮演关键角色。本文通过案例分析,探讨Jenkins在Java项目中的应用,展示其自动化构建、测试和部署的能力,提高开发效率和软件质量。
47 2
|
14天前
|
Devops jenkins 测试技术
DevOps实践:自动化部署与持续集成的融合之旅
【10月更文挑战第41天】在软件开发的世界中,快速迭代和高效交付是企业竞争力的关键。本文将带你走进DevOps的核心实践——自动化部署与持续集成,揭示如何通过它们提升开发流程的效率与质量。我们将从DevOps的基本理念出发,逐步深入到具体的技术实现,最终展示一个实际的代码示例,让理论与实践相结合,为你的开发旅程提供清晰的指引。
27 4
|
20天前
|
存储 监控 Devops
DevOps实践:持续集成/持续部署(CI/CD)的实战指南
DevOps实践:持续集成/持续部署(CI/CD)的实战指南
|
29天前
|
jenkins Devops 测试技术
DevOps实践:Jenkins在持续集成与持续部署中的价值
【10月更文挑战第26天】随着DevOps理念的普及,Jenkins作为一款开源自动化服务器,在持续集成(CI)与持续部署(CD)中发挥重要作用。本文通过某中型互联网企业的实际案例,展示了Jenkins如何通过自动化构建、持续集成和持续部署,显著提升开发效率、代码质量和软件交付速度,帮助企业解决传统手工操作带来的低效和错误问题。
54 4
|
2月前
|
消息中间件 监控 数据可视化
实时计算Flink场景实践和核心功能体验
本文详细评测了阿里云实时计算Flink版,从产品引导、文档帮助、功能满足度等方面进行了全面分析。产品界面设计友好,文档丰富实用,数据开发和运维体验优秀,具备出色的实时性和动态扩展性。同时,提出了针对业务场景的改进建议,包括功能定制化增强、高级分析功能拓展及可视化功能提升。文章还探讨了产品与阿里云内部产品及第三方工具的联动潜力,展示了其在多云架构和跨平台应用中的广阔前景。
60 9
|
1月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
54 1
|
23天前
|
运维 Devops jenkins
DevOps实践之持续集成与持续交付
【10月更文挑战第32天】在软件开发的快节奏世界中,DevOps已经成为提升效率和质量的关键策略。通过将开发(Development)和运维(Operations)紧密结合,DevOps促进了更快速的软件发布和更高的可靠性。本文将深入探讨DevOps的核心组成部分——持续集成(CI)和持续交付(CD),并展示如何通过实际代码示例实现它们,以帮助团队构建更加高效和稳定的软件发布流程。
|
26天前
|
数据采集 运维 搜索推荐
实时计算Flink场景实践
在数字化时代,实时数据处理愈发重要。本文分享了作者使用阿里云实时计算Flink版和流式数据湖仓Paimon的体验,展示了其在电商场景中的应用,包括数据抽取、清洗、关联和聚合,突出了系统的高效、稳定和低延迟特点。
49 0