开发者社区 问答 正文

Kafka重复消费问题:InstanceAlreadyExistsException如何解决

我在运行以下代码时产生了:“javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=”的问题

 public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        env.enableCheckpointing(5000);

        tableEnv.executeSql(
                "CREATE CATALOG hadoop_iceberg WITH (\n" +
                        "  'type'='iceberg',\n" +
                        "  'catalog-type'='hadoop',\n" +
                        "  'warehouse'='hdfs://hadoop102/user/',\n" +
                        " 'default-database'='lakehouse'\n" +
                        ")"
        );

        tableEnv.executeSql(
                "create table kafka_ods_tbl(\n" +
                        "   iceberg_ods_tbl_name string,\n" +
                        "   kafka_dwd_topic string,\n" +
                        "   data string\n" +
                        ") with (\n" +
                        " 'connector' = 'kafka',\n" +
                        " 'topic' = 'KAFKA-ODS-TOPIC',\n" +
                        " 'properties.bootstrap.servers'='hadoop102:9092,hadoop103:9092,hadoop104:9092',\n" +
                        " 'scan.startup.mode'='earliest-offset',\n" +
                        " 'properties.group.id' = 'test',\n" +
                        " 'properties.enable.auto.commit' = 'false',\n" +
                        " 'format' = 'json'\n" +
                        ")"
        );

        Table odsTbl = tableEnv.sqlQuery("select iceberg_ods_tbl_name,data,kafka_dwd_topic from kafka_ods_tbl");

//        odsTbl.execute().print();

        DataStream<Row> odsDs = tableEnv.toDataStream(odsTbl);

        OutputTag<JSONObject> kafkaDataTag = new OutputTag<JSONObject>("kafka_data") {
        };

        SingleOutputStreamOperator<DwdInfoPoJo> dwdDs = odsDs.filter(new FilterFunction<Row>() {
                    @Override
                    public boolean filter(Row row) throws Exception {
                        return row.getField(0) != null && row.getField(1) != null && row.getField(2) != null;
                    }
                })
                .process(new ProcessFunction<Row, DwdInfoPoJo>() {

                    @Override
                    public void processElement(Row value, ProcessFunction<Row, DwdInfoPoJo>.Context ctx, Collector<DwdInfoPoJo> out) throws Exception {

                        String iceberg_ods_tbl_name = value.getField(0).toString();
                        String data = value.getField(1).toString();
                        String kafka_dwd_topic = value.getField(2).toString();

                        JSONObject jsonObj = JSON.parseObject(data);

                        // 时间处理
                        jsonObj.put("logTime", DateUtil.getDateYYYYMMDDHHMMSS(jsonObj.getString("logTime")));
                        jsonObj.put("login_tm", DateUtil.getDateYYYYMMDDHHMMSS(jsonObj.getString("login_tm")));
                        jsonObj.put("logout_tm", DateUtil.getDateYYYYMMDDHHMMSS(jsonObj.getString("logout_tm")));

                        // 解析嵌套json
                        String browse_product_code = jsonObj.getString("browseProductCode");
                        String browse_product_tpCode = jsonObj.getString("browseProductTpCode");
                        String user_ip = jsonObj.getString("userIp");
                        String obtain_points = jsonObj.getString("obtainPoints");
                        String user_id1 = jsonObj.getString("user_id");
                        String user_id2 = jsonObj.getString("userId");
                        String front_product_url = jsonObj.getString("frontProductUrl");
                        String log_time = jsonObj.getString("logTime");
                        String browse_product_url = jsonObj.getString("browseProductUrl");
                        String id = jsonObj.getString("id");
                        String ip = jsonObj.getString("ip");
                        String login_tm = jsonObj.getString("login_tm");
                        String logout_tm = jsonObj.getString("logout_tm");

                        jsonObj.put("kafka_dwd_topic", kafka_dwd_topic);
                        ctx.output(kafkaDataTag, jsonObj);

                        DwdInfoPoJo dwdInfoPoJo = new DwdInfoPoJo(iceberg_ods_tbl_name, kafka_dwd_topic, browse_product_code, browse_product_tpCode, user_ip, obtain_points, user_id1, user_id2, front_product_url, log_time, browse_product_url, id, ip, login_tm, logout_tm);

                        if (dwdInfoPoJo.kafka_dwd_topic.equals("KAFKA-DWD-BROWSE-LOG-TOPIC"))
                            System.out.println("dwdInfoPoJo=======" + dwdInfoPoJo);

                        out.collect(dwdInfoPoJo);
                    }
                });


        /**
         * 用户登录日志数据
         **/
        Schema schema = Schema.newBuilder()
                .column("iceberg_ods_tbl_name", DataTypes.STRING())
                .column("kafka_dwd_topic", DataTypes.STRING())
                .column("browse_product_code", DataTypes.STRING())
                .column("browse_product_tpcode", DataTypes.STRING())
                .column("user_ip", DataTypes.STRING())
                .column("obtain_points", DataTypes.STRING())
                .column("user_id1", DataTypes.STRING())
                .column("user_id2", DataTypes.STRING())
                .column("front_product_url", DataTypes.STRING())
                .column("log_time", DataTypes.STRING())
                .column("browse_product_url", DataTypes.STRING())
                .column("id", DataTypes.STRING())
                .column("ip", DataTypes.STRING())
                .column("login_tm", DataTypes.STRING())
                .column("logout_tm", DataTypes.STRING())
                .build();

        Table table = tableEnv.fromDataStream(dwdDs, schema);
        //插入 iceberg - dwd 层 用户登录信息数据 :DWD_USER_LOGIN
        sql = "insert into hadoop_iceberg.lakehouse.DWD_USER_LOGIN\n" +
                "select\n" +
                " id,user_id1,ip,login_tm,logout_tm\n" +
                " from " + table + " where iceberg_ods_tbl_name = 'ODS_USER_LOGIN'";
        tableEnv.executeSql(sql);


        // 业务二
        //插入 iceberg - dwd 层 会员浏览商品日志信息 :DWD_BROWSELOG
//        sql = "insert into hadoop_iceberg.lakehouse.DWD_BROWSELOG\n" +
//                "select\n" +
//                " log_time,\n" +
//                " user_id2,\n" +
//                " user_ip,\n" +
//                " front_product_url,\n" +
//                " browse_product_url,\n" +
//                " browse_product_tpcode,\n" +
//                " browse_product_code,\n" +
//                " obtain_points\n" +
//                " from " + table + " where iceberg_ods_tbl_name = 'ODS_BROWSELOG'";
//        tableEnv.executeSql(sql);


        // 将以上数据写入到Kafka 各自DWD 层topic中,这里不再使用SQL方式,而是直接使用DataStream代码方式 Sink 到各自的DWD层代码中
//        dwdDs.getSideOutput(kafkaDataTag).print("kafkaDataTag=====>");
//        dwdDs.getSideOutput(kafkaDataTag).sinkTo(
//                KafkaSink.<JSONObject>builder()
//                        .setBootstrapServers(kafkaBrokers)
//                        .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
//                        .setProperty("transaction.timeout.ms", 1000 * 60 * 15 + "")
//                        .setRecordSerializer(
//                                KafkaRecordSerializationSchema.<JSONObject>builder()
//                                        .setTopicSelector(jsonObject -> jsonObject.getString("kafka_dwd_topic"))
//                                        .setValueSerializationSchema(new JsonSerializationSchema<JSONObject>())
//                                        .build()
//                        ).build()
//        );
        env.execute("ODSToDWD Job");
    }
}

展开
收起
特困生lyq 2023-04-17 08:08:57 844 分享 版权
0 条回答
写回答
取消 提交回答