flink cdc postgresql sink iceberg with datastream
package com.zjyg.iceberg.flink; import com.ververica.cdc.connectors.postgres.PostgreSQLSource; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.debezium.DebeziumSourceFunction; import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableColumn; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.TypeTransformations; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.utils.DataTypeUtils; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.Table; import org.apache.iceberg.Schema; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.BaseTable; import org.apache.iceberg.TableOperations; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.sink.FlinkSink; import org.apache.iceberg.types.Types; import java.time.ZoneId; import java.util.Arrays; import java.util.HashMap; import java.util.Map; public class Pg2Iceberg { private static final Schema SCHEMA = new Schema( Types.NestedField.optional(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "name", Types.StringType.get()), Types.NestedField.optional(3, "age", Types.IntegerType.get()), Types.NestedField.optional(4, "sex", Types.StringType.get()) ); public static void main(String[] args) throws Exception { ParameterTool parameterTool = ParameterTool.fromArgs(args); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); checkpointConfig.setCheckpointStorage(parameterTool.get("checkpoint_base_dir")+"/"+parameterTool.get("catalog_name")+"."+parameterTool.get("iceberg_db_name")+"."+parameterTool.get("iceberg_tb_name")); checkpointConfig.setCheckpointInterval(60 * 1000L); checkpointConfig.setMinPauseBetweenCheckpoints(60 * 1000L); checkpointConfig.setTolerableCheckpointFailureNumber(10); checkpointConfig.setCheckpointTimeout(120 * 1000L); DataStreamSource<RowData> src = env.addSource(getPgCdc(parameterTool)); icebergSink_hadoop(src, parameterTool); env.execute(parameterTool.get("app_name")); } private static void icebergSink_hadoop(DataStream<RowData> src, ParameterTool tool) { Map<String, String> properties = new HashMap<>(); properties.put("type", "iceberg"); properties.put("catalog-type", "hadoop"); properties.put("property-version", "1"); properties.put("warehouse", tool.get("warehouse_base_dir")+"/"+tool.get("catalog_name")); CatalogLoader catalogLoader = CatalogLoader.hadoop(tool.get("catalog_name"), new Configuration(), properties); icebergSink(src, tool, catalogLoader); } private static void icebergSink(DataStream input, ParameterTool tool, CatalogLoader loader) { Catalog catalog = loader.loadCatalog(); TableIdentifier identifier = TableIdentifier.of(Namespace.of(tool.get("iceberg_db_name")), tool.get("iceberg_tb_name")); Table table; if (catalog.tableExists(identifier)) { table = catalog.loadTable(identifier); } else { table = catalog.buildTable(identifier, SCHEMA) .withPartitionSpec(PartitionSpec.unpartitioned()) .create(); } TableOperations operations = ((BaseTable) table).operations(); TableMetadata metadata = operations.current(); operations.commit(metadata, metadata.upgradeToFormatVersion(2)); TableLoader tableLoader = TableLoader.fromCatalog(loader, identifier); FlinkSink.forRowData(input) .table(table) .tableLoader(tableLoader) .equalityFieldColumns(Arrays.asList("id")) .writeParallelism(1) .build(); } private static SourceFunction getPgCdc(ParameterTool tool) { TableSchema schema = TableSchema.builder() .add(TableColumn.physical("id", DataTypes.INT())) .add(TableColumn.physical("name", DataTypes.STRING())) .add(TableColumn.physical("age", DataTypes.INT())) .add(TableColumn.physical("sex", DataTypes.STRING())) .build(); RowType rowType = (RowType) schema.toRowDataType().getLogicalType(); DebeziumDeserializationSchema deserialer = new RowDataDebeziumDeserializeSchema( rowType, createTypeInfo(schema.toRowDataType()), (rowData, rowKind) -> {}, ZoneId.of("Asia/Shanghai")); Properties properties = new Properties(); properties.setProperty("decimal.handling.mode", "string"); //decimal等类型转化为string DebeziumSourceFunction sourceFunction = PostgreSQLSource.<RowData>builder() .hostname(tool.get("db_host")) .port(Integer.parseInt(tool.get("db_port"))) .database(tool.get("db_name")) .schemaList(tool.get("schema_name")) .tableList(tool.get("schema_name") + "." + tool.get("tb_name")) .username(tool.get("db_user")) .password(tool.get("db_user_pwd")) .decodingPluginName("pgoutput") .slotName(parameterTool.get("SLOTNAME4")) .deserializer(deserialer) .deserializer(deserialer) .debeziumProperties(properties) .build(); return sourceFunction; } private static TypeInformation<RowData> createTypeInfo(DataType producedDataType) { final DataType internalDataType = DataTypeUtils.transform(producedDataType, TypeTransformations.TO_INTERNAL_CLASS); return (TypeInformation<RowData>) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(internalDataType); } }
flink kafka sink iceberg datastream
import java.util.Map; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableMap; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.sink.FlinkSink; import org.apache.iceberg.flink.source.FlinkSource; import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.types.Types; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.coomia.datalake.kafka.KafkaUtils; public class FlinkWriteIcebergTest { public static void main(String[] args) throws Exception { System.setProperty("HADOOP_USER_NAME", "root"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setAutoWatermarkInterval(5000L); env.setParallelism(1); // iceberg catalog identification. Configuration conf = new Configuration(); Catalog catalog = new HadoopCatalog(conf); // iceberg table identification. TableIdentifier name = TableIdentifier.of("default", "iceberg-tb-" + System.currentTimeMillis()); // iceberg table schema identification. Schema schema = new Schema(Types.NestedField.required(1, "uid", Types.StringType.get()), Types.NestedField.required(2, "eventTime", Types.LongType.get()), Types.NestedField.required(3, "eventid", Types.StringType.get()), Types.NestedField.optional(4, "uuid", Types.StringType.get())); Types.NestedField.required(5, "ts", Types.TimestampType.withoutZone()); // iceberg table partition identification. // PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("uid", 5).build(); PartitionSpec spec = PartitionSpec.unpartitioned(); // identify using orc format as storage. Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.ORC.name()); Table table = null; // create an iceberg table if not exists, otherwise, load it. if (!catalog.tableExists(name)) table = catalog.createTable(name, schema, spec, props); else table = catalog.loadTable(name); String topic = "arkevent"; String servers = "kafka:9092"; FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), KafkaUtils.consumeProps(servers, "flink-consumer")); consumer.setStartFromEarliest(); SingleOutputStreamOperator<RowData> dataStream = env.addSource(consumer).map(new MapFunction<String, RowData>() { @Override public RowData map(String value) throws Exception { JSONObject dataJson = JSON.parseObject(value); GenericRowData row = new GenericRowData(5); row.setField(0, StringData.fromBytes(dataJson.getString("uid").getBytes())); row.setField(1, dataJson.getLong("eventTime")); row.setField(2, StringData.fromBytes(dataJson.getString("eventid").getBytes())); row.setField(3, StringData.fromBytes(dataJson.getString("uuid").getBytes())); row.setField(4, TimestampData.fromEpochMillis(dataJson.getLong("eventTime"))); return row; } }); // uid is used for job restart or something when using savepoint. dataStream.uid("flink-consumer"); TableLoader tableLoader = TableLoader.fromHadoopTable(table.location()); // sink data to iceberg table FlinkSink.forRowData(dataStream).table(table).tableLoader(tableLoader).writeParallelism(1) .overwrite(true) .build(); //read and write to file. DataStream<RowData> batchData = FlinkSource.forRowData().env(env).tableLoader(tableLoader).build(); batchData.print(); batchData.writeAsCsv(tableLoader.loadTable().location().concat("/out/out.csv"), WriteMode.OVERWRITE, "\n", " "); // Execute the program. env.execute("Test Iceberg DataStream"); } }
spark kafka sink iceberg
package com.zjyg.iceberg.main import com.zjyg.iceberg.util.TimeUtil import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.DataTypes import org.apache.iceberg.spark.SparkCatalog import org.apache.iceberg.spark.IcebergSpark import java.sql.Timestamp import java.util.concurrent.TimeUnit import com.alibaba.fastjson.JSON object Kafka2IcebergV3 { case class Tags(entrust_prop: String, entrust_status: Int, exchange_type: String, op_entrust_way: String, ywlx: String, ywzl: String) case class Logs(metric: String, endpoint: String, ts: Timestamp, step: Int, l_value: String, countertype: String, tags: Tags, l_type:String, kafka: String, nodata: String, dt: Int, hour: String) import org.apache.spark.sql.functions._ def main(args: Array[String]): Unit = { if(args.length < 3) { System.err.println( s""" |Usage: Kafka2Iceberg <brokers> <topics> <Seconds> | <brokers> is a list of one or more Kafka brokers | <topics> is a list of one or more kafka topics to consume from | <groupId> is a kafka consumer gorupId """.stripMargin) System.exit(1) } val brokers = args(0) val topics = args(1) val groupId = args(2) val checkpointDir = "hdfs:///user/bigdata/sparkStreamingCheckpoint/Kafka2Iceberg_opentsdb-datav3/spark" val checkpointDir_iceberg = "hdfs:///user/bigdata/sparkStreamingCheckpoint/Kafka2Iceberg_opentsdb-datav3/icebergtb" try { val spark = SparkSession.builder() .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") .config("spark.sql.catalog.iceberg.type","hadoop") .config("spark.sql.catalog.iceberg",classOf[SparkCatalog].getName) .config("spark.sql.catalog.iceberg.warehouse","hdfs:///warehouse/tablespace/external/iceberg") .appName(this.getClass.getSimpleName) .getOrCreate() spark.sparkContext.setCheckpointDir(checkpointDir) // IcebergSpark.registerBucketUDF(spark, "iceberg_bucket16", DataTypes.LongType, 16) System.setProperty("java.security.auth.login.config", "./kafka_jaas.conf") System.setProperty("java.security.krb5.conf", "./krb5.conf") import spark.implicits._ val lines = spark.readStream.format("kafka") .option("kafka.bootstrap.servers", brokers) .option("startingOffsets", "earliest") .option("kafka.security.protocol","SASL_PLAINTEXT") .option("kafka.sasl.mechanism","GSSAPI") .option("kafka.sasl.kerberos.service.name","kafka") .option("kafka.group.id",groupId) .option("subscribe",topics) .load() .withColumn("value", $"value".cast("string")) .filter($"value".isNotNull) println("------PrintSchema lines-------") lines.printSchema() val data = lines.map( row => row.getAs[String]("value").toString() ).map(s => getLogs(s)).toDF() println("------PrintSchema data-------") data.printSchema() val tableIdentifier: String = "hdfs:///warehouse/tablespace/external/iceberg/iceberg_test/opentsdb_logsv3" val query = data.writeStream.format("iceberg").outputMode("append").trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)).option("path", tableIdentifier).option("checkpointLocation", checkpointDir_iceberg).start() query.awaitTermination() spark.close() } catch { case e: Exception => { System.err.println("exit. Exception is:" + e) System.exit(1) } } } def getTags(tags:String):Tags ={ var result = Tags("",0,"","","","") try { val tags_columns = tags.split(',') val entrust_prop = (tags_columns(0).split('='))(1).toString val entrust_status = (tags_columns(1).split('='))(1).toInt val exchange_type = (tags_columns(2).split('='))(1).toString val op_entrust_way = (tags_columns(3).split('='))(1).toString val ywlx = (tags_columns(4).split('='))(1).toString val ywzl = (tags_columns(5).split('='))(1).toString result = Tags(entrust_prop, entrust_status, exchange_type, op_entrust_way, ywlx, ywzl) } catch { case e: Exception => { System.err.println("Exception is:" + e) System.exit(1) } } return result } def getLogs(logs:String):Logs = { var v_tags = new Tags("",0,"","","","") var result = new Logs("","",new Timestamp(0L),0,"","",v_tags,"","","",0,"") try { val json=JSON.parseObject(logs) val metric = json.getString("metric") val endpoint = json.getString("endpoint") val ts_l = json.getInteger("timestamp") * 1000L val ts = new Timestamp(ts_l) val (dt:Int, hour:String) = TimeUtil.getTsYmdHour(ts_l) val step = json.getInteger("step") val l_value = json.getString("value") val countertype = json.getString("counterType") val j_tags = json.getString("tags") val tags = getTags(j_tags) val l_type = json.getString("type") val kafka = json.getString("kafka") val nodata = json.getString("nodata") result = new Logs(metric,endpoint,ts,step,l_value,countertype,tags,l_type,kafka,nodata,dt,hour) } catch { case e: Exception => { System.err.println("Exception is:" + e) System.exit(1) } } return result } }
flink sink iceberg compation
import org.apache.flink.api.java.utils.ParameterTool; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.actions.Actions; import java.util.HashMap; import java.util.Map; public class FlinkCompaction { public static void main(String[] args) throws Exception { ParameterTool tool = ParameterTool.fromArgs(args); Map<String, String> properties = new HashMap<>(); properties.put("type", "iceberg"); properties.put("catalog-type", "hive"); properties.put("property-version", "1"); properties.put("warehouse", tool.get("warehouse")); properties.put("uri", tool.get("uri")); if (tool.has("oss.endpoint")) { properties.put("io-impl", "org.apache.iceberg.aliyun.oss.OSSFileIO"); properties.put("oss.endpoint", tool.get("oss.endpoint")); properties.put("oss.access.key.id", tool.get("oss.access.key.id")); properties.put("oss.access.key.secret", tool.get("oss.access.key.secret")); } CatalogLoader loader = CatalogLoader.hive(tool.get("catalog"), new Configuration(), properties); Catalog catalog = loader.loadCatalog(); TableIdentifier identifier = TableIdentifier.of(Namespace.of(tool.get("db")), tool.get("table")); Table table = catalog.loadTable(identifier); /** * 合并小文件,核心代码 */ Actions.forTable(table) .rewriteDataFiles() .maxParallelism(5) .targetSizeInBytes(128 * 1024 * 1024) .execute(); Snapshot snapshot = table.currentSnapshot(); if (snapshot != null) { table.expireSnapshots().expireOlderThan(snapshot.timestampMillis()).commit(); } } }
spark sink iceberg compaction
import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.actions.Actions; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hive.HiveCatalog; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.internal.SQLConf; import java.util.HashMap; import java.util.Map; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; /** * @author: zhushang * @create: 2021-04-02 14:30 */ public class SparkCompaction { public static void main(String[] args) { TableIdentifier identifier = TableIdentifier.of(Namespace.of("db"), "table"); Map<String, String> config = new HashMap<>(); config.put("type", "iceberg"); config.put("catalog-type", "hive"); config.put("property-version", "1"); config.put("warehouse", "warehouse"); config.put("uri", "thrift://local:9083"); config.put("io-impl", "org.apache.iceberg.aliyun.oss.OSSFileIO"); config.put("oss.endpoint", "https://xxx.aliyuncs.com"); config.put("oss.access.key.id", "key"); config.put("oss.access.key.secret", "secret"); sparkSession(); HiveCatalog hiveCatalog = new HiveCatalog(new Configuration()); hiveCatalog.initialize("iceberg_hive_catalog", config); Table table = hiveCatalog.loadTable(identifier); Actions.forTable(table).rewriteDataFiles().targetSizeInBytes(128 * 1024 * 1024).execute(); Snapshot snapshot = table.currentSnapshot(); if (snapshot != null) { table.expireSnapshots().expireOlderThan(snapshot.timestampMillis()).commit(); } } private static void sparkSession() { SparkSession.builder() .master("local[*]") .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") .config("spark.hadoop." + METASTOREURIS.varname, "localhost:9083") .config("spark.sql.warehouse.dir", "warehouse") .config("spark.executor.heartbeatInterval", "100000") .config("spark.network.timeoutInterval", "100000") .enableHiveSupport() .getOrCreate(); } }
spark Opentsdb sink iceberg
package com.zjyg.iceberg.spark import java.sql.Timestamp import java.util.concurrent.TimeUnit import com.alibaba.fastjson.JSON import com.zjyg.iceberg.util.TimeUtil import org.apache.iceberg.spark.SparkCatalog import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.Trigger object OpentsdbLog2Iceberg { case class Tags(entrust_prop: String, entrust_status: Int, exchange_type: String, op_entrust_way: String, ywlx: String, ywzl: String) case class Logs(metric: String, endpoint: String, ts: Timestamp, step: Int, l_value: String, countertype: String, tags: Tags, l_type:String, kafka: String, nodata: String, dt: Int, hour: String,row_info: String) def main(args: Array[String]): Unit = { if(args.length < 9) { System.err.println( s""" |Usage: Kafka2IcebergV2 <brokers> <topics> <groupId> <checkpointBaseDir> ... | <1. brokers> is a list of one or more Kafka brokers | <2. topics> is a list of one or more kafka topics to consume from | <3. groupId> is a kafka consumer gorupId | <4. checkpointBaseDir> is a checkpoint dir for job | <5. warehouseBaseDir> is a warehouse base dir for iceberg | <6. catalogName> is a catalogName for iceberg | <7. dbName> is a DatabaseName for iceberg | <8. tbName> is a TableName for iceberg | <9. kafkaServiceName> is a Kafka Service Name """.stripMargin) System.exit(1) } val brokers = args(0) val topics = args(1) val groupId = args(2) val checkpointBaseDir = args(3) // hdfs:///user/bigdata/sparkStreamingCheckpoint val warehouseBaseDir = args(4) // hdfs:///warehouse/tablespace/external val catalogName = args(5) // iceberg val dbName = args(6) // iceberg_test val tbName = args(7) // opentsdb_logs val kafkaServiceName = args(8) // hdp-kafka val checkpointDir_spark = s"${checkpointBaseDir}/${catalogName}.${dbName}.${tbName}/spark" val checkpointDir_iceberg = s"${checkpointBaseDir}/${catalogName}.${dbName}.${tbName}/iceberg" try { val spark = SparkSession.builder() .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") .config(s"spark.sql.catalog.${catalogName}.type","hadoop") .config(s"spark.sql.catalog.${catalogName}",classOf[SparkCatalog].getName) .config(s"spark.sql.catalog.${catalogName}.warehouse",s"${warehouseBaseDir}/${catalogName}") .getOrCreate() spark.sparkContext.setCheckpointDir(checkpointDir_spark) System.setProperty("java.security.auth.login.config", "./kafka_jaas.conf") System.setProperty("java.security.krb5.conf", "./krb5.conf") import spark.implicits._ val lines = spark.readStream.format("kafka") .option("kafka.bootstrap.servers", brokers) .option("startingOffsets", "earliest") .option("failOnDataLoss","false") .option("kafka.security.protocol","SASL_PLAINTEXT") .option("kafka.sasl.mechanism","GSSAPI") .option("kafka.sasl.kerberos.service.name",s"${kafkaServiceName}") .option("kafka.group.id",groupId) .option("subscribe",topics) .load() .withColumn("value", $"value".cast("string")) .filter($"value".isNotNull) println("------PrintSchema lines-------") lines.printSchema() val data = lines.map( row => getLogs(row.getAs[Timestamp]("timestamp"),row.getAs[String]("value").toString()) ).toDF().repartition($"dt", $"hour") println("------PrintSchema data-------") data.printSchema() val tableIdentifier: String = s"${warehouseBaseDir}/${catalogName}/${dbName}/${tbName}" val query = data.writeStream .format("iceberg") .partitionBy("dt","hour") .outputMode("append") .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)) .option("path", tableIdentifier) .option("fanout-enabled", "true") .option("checkpointLocation", checkpointDir_iceberg) .start() query.awaitTermination() spark.close() } catch { case e: Exception => { System.err.println("exit. Exception is:" + e) System.exit(1) } } } def getTags(tags:String):Tags ={ var result = Tags("",0,"","","","") try { val tags_columns = tags.split(',') val entrust_prop = (tags_columns(0).split('='))(1).toString val entrust_status = (tags_columns(1).split('='))(1).toInt val exchange_type = (tags_columns(2).split('='))(1).toString val op_entrust_way = (tags_columns(3).split('='))(1).toString val ywlx = (tags_columns(4).split('='))(1).toString val ywzl = (tags_columns(5).split('='))(1).toString result = Tags(entrust_prop, entrust_status, exchange_type, op_entrust_way, ywlx, ywzl) } catch { case e: Exception => { System.err.println("Exception is:" + e) // System.exit(1) } } return result } def getLogs(kafkaTimestamp:Timestamp,logs:String):Logs = { var tags = new Tags("",0,"","","","") var result = new Logs("","",new Timestamp(0L),0,"","",tags,"","","",19700101,"00","") try { val json=JSON.parseObject(logs) val metric = json.getString("metric") val endpoint = json.getString("endpoint") val v_ts = json.getInteger("timestamp") * 1000L val ts = new Timestamp(v_ts) val (dt:Int, hour:String) = TimeUtil.getTsYmdHour(v_ts) val step = json.getInteger("step") val v_value = json.getString("value") val countertype = json.getString("counterType") val tags = getTags(json.getString("tags")) val v_type = json.getString("type") val kafka = json.getString("kafka") val nodata = json.getString("nodata") result = new Logs(metric,endpoint,ts,step,v_value,countertype,tags,v_type,kafka,nodata,dt,hour,"") } catch { case e: Exception => { System.err.println("Exception is:" + e) // System.exit(1) val (dt,hour) = TimeUtil.tsToDateHour(kafkaTimestamp) result = new Logs("","",new Timestamp(0L),0,"","",tags,"","","",-dt,hour,logs) } } return result } }