我在运行以下代码时产生了:“javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=”的问题
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(5000);
tableEnv.executeSql(
"CREATE CATALOG hadoop_iceberg WITH (\n" +
" 'type'='iceberg',\n" +
" 'catalog-type'='hadoop',\n" +
" 'warehouse'='hdfs://hadoop102/user/',\n" +
" 'default-database'='lakehouse'\n" +
")"
);
tableEnv.executeSql(
"create table kafka_ods_tbl(\n" +
" iceberg_ods_tbl_name string,\n" +
" kafka_dwd_topic string,\n" +
" data string\n" +
") with (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'KAFKA-ODS-TOPIC',\n" +
" 'properties.bootstrap.servers'='hadoop102:9092,hadoop103:9092,hadoop104:9092',\n" +
" 'scan.startup.mode'='earliest-offset',\n" +
" 'properties.group.id' = 'test',\n" +
" 'properties.enable.auto.commit' = 'false',\n" +
" 'format' = 'json'\n" +
")"
);
Table odsTbl = tableEnv.sqlQuery("select iceberg_ods_tbl_name,data,kafka_dwd_topic from kafka_ods_tbl");
// odsTbl.execute().print();
DataStream<Row> odsDs = tableEnv.toDataStream(odsTbl);
OutputTag<JSONObject> kafkaDataTag = new OutputTag<JSONObject>("kafka_data") {
};
SingleOutputStreamOperator<DwdInfoPoJo> dwdDs = odsDs.filter(new FilterFunction<Row>() {
@Override
public boolean filter(Row row) throws Exception {
return row.getField(0) != null && row.getField(1) != null && row.getField(2) != null;
}
})
.process(new ProcessFunction<Row, DwdInfoPoJo>() {
@Override
public void processElement(Row value, ProcessFunction<Row, DwdInfoPoJo>.Context ctx, Collector<DwdInfoPoJo> out) throws Exception {
String iceberg_ods_tbl_name = value.getField(0).toString();
String data = value.getField(1).toString();
String kafka_dwd_topic = value.getField(2).toString();
JSONObject jsonObj = JSON.parseObject(data);
// 时间处理
jsonObj.put("logTime", DateUtil.getDateYYYYMMDDHHMMSS(jsonObj.getString("logTime")));
jsonObj.put("login_tm", DateUtil.getDateYYYYMMDDHHMMSS(jsonObj.getString("login_tm")));
jsonObj.put("logout_tm", DateUtil.getDateYYYYMMDDHHMMSS(jsonObj.getString("logout_tm")));
// 解析嵌套json
String browse_product_code = jsonObj.getString("browseProductCode");
String browse_product_tpCode = jsonObj.getString("browseProductTpCode");
String user_ip = jsonObj.getString("userIp");
String obtain_points = jsonObj.getString("obtainPoints");
String user_id1 = jsonObj.getString("user_id");
String user_id2 = jsonObj.getString("userId");
String front_product_url = jsonObj.getString("frontProductUrl");
String log_time = jsonObj.getString("logTime");
String browse_product_url = jsonObj.getString("browseProductUrl");
String id = jsonObj.getString("id");
String ip = jsonObj.getString("ip");
String login_tm = jsonObj.getString("login_tm");
String logout_tm = jsonObj.getString("logout_tm");
jsonObj.put("kafka_dwd_topic", kafka_dwd_topic);
ctx.output(kafkaDataTag, jsonObj);
DwdInfoPoJo dwdInfoPoJo = new DwdInfoPoJo(iceberg_ods_tbl_name, kafka_dwd_topic, browse_product_code, browse_product_tpCode, user_ip, obtain_points, user_id1, user_id2, front_product_url, log_time, browse_product_url, id, ip, login_tm, logout_tm);
if (dwdInfoPoJo.kafka_dwd_topic.equals("KAFKA-DWD-BROWSE-LOG-TOPIC"))
System.out.println("dwdInfoPoJo=======" + dwdInfoPoJo);
out.collect(dwdInfoPoJo);
}
});
/**
* 用户登录日志数据
**/
Schema schema = Schema.newBuilder()
.column("iceberg_ods_tbl_name", DataTypes.STRING())
.column("kafka_dwd_topic", DataTypes.STRING())
.column("browse_product_code", DataTypes.STRING())
.column("browse_product_tpcode", DataTypes.STRING())
.column("user_ip", DataTypes.STRING())
.column("obtain_points", DataTypes.STRING())
.column("user_id1", DataTypes.STRING())
.column("user_id2", DataTypes.STRING())
.column("front_product_url", DataTypes.STRING())
.column("log_time", DataTypes.STRING())
.column("browse_product_url", DataTypes.STRING())
.column("id", DataTypes.STRING())
.column("ip", DataTypes.STRING())
.column("login_tm", DataTypes.STRING())
.column("logout_tm", DataTypes.STRING())
.build();
Table table = tableEnv.fromDataStream(dwdDs, schema);
//插入 iceberg - dwd 层 用户登录信息数据 :DWD_USER_LOGIN
sql = "insert into hadoop_iceberg.lakehouse.DWD_USER_LOGIN\n" +
"select\n" +
" id,user_id1,ip,login_tm,logout_tm\n" +
" from " + table + " where iceberg_ods_tbl_name = 'ODS_USER_LOGIN'";
tableEnv.executeSql(sql);
// 业务二
//插入 iceberg - dwd 层 会员浏览商品日志信息 :DWD_BROWSELOG
// sql = "insert into hadoop_iceberg.lakehouse.DWD_BROWSELOG\n" +
// "select\n" +
// " log_time,\n" +
// " user_id2,\n" +
// " user_ip,\n" +
// " front_product_url,\n" +
// " browse_product_url,\n" +
// " browse_product_tpcode,\n" +
// " browse_product_code,\n" +
// " obtain_points\n" +
// " from " + table + " where iceberg_ods_tbl_name = 'ODS_BROWSELOG'";
// tableEnv.executeSql(sql);
// 将以上数据写入到Kafka 各自DWD 层topic中,这里不再使用SQL方式,而是直接使用DataStream代码方式 Sink 到各自的DWD层代码中
// dwdDs.getSideOutput(kafkaDataTag).print("kafkaDataTag=====>");
// dwdDs.getSideOutput(kafkaDataTag).sinkTo(
// KafkaSink.<JSONObject>builder()
// .setBootstrapServers(kafkaBrokers)
// .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
// .setProperty("transaction.timeout.ms", 1000 * 60 * 15 + "")
// .setRecordSerializer(
// KafkaRecordSerializationSchema.<JSONObject>builder()
// .setTopicSelector(jsonObject -> jsonObject.getString("kafka_dwd_topic"))
// .setValueSerializationSchema(new JsonSerializationSchema<JSONObject>())
// .build()
// ).build()
// );
env.execute("ODSToDWD Job");
}
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。