开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

有偿求助,Flink1.15读取hdfs中的parquet文件的Java样例

网上都是1.9版本的,里面用到的ParquetRowInputFormat在下面的依赖中已经没有了! org.apache.flink flink-parquet 1.15.0

展开
收起
1969124368617827 2023-05-12 15:11:36 296 0
4 条回答
写回答
取消 提交回答
  • 热爱开发

    Flink 1.15 中,读取 HDFS 中的 Parquet 文件需要使用 ParquetInputFormat。以下是一个简单的 Java 样例:

    import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.parquet.ParquetInputFormat; import org.apache.flink.formats.parquet.row.ParquetRowDataReadSupport;

    public class ReadParquetFromHDFS { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        Path path = new Path("hdfs://namenode:8020/path/to/parquet/file");
    
        ParquetInputFormat<RowData> format = ParquetInputFormat.builder(path, new ParquetRowDataReadSupport())
                .build();
    
        DataSet<RowData> rows = env.createInput(format);
        
        // 处理数据的逻辑
        // ...
    }
    

    } 在 pom.xml 中添加以下依赖:

    org.apache.flink flink-formats-flink-avro-confluent-schema-registry 1.15.0 org.apache.flink flink-formats-parquet 1.15.0 其中,flink-formats-parquet 依赖中包含了 ParquetRowDataReadSupport 类。

    注意要将 path 替换为实际的 HDFS 文件路径。另外,在运行程序之前需要先启动 HDFS 和 Flink 集群。

    2023-05-13 09:05:46
    赞同 展开评论 打赏
  • Flink 1.15 版本中已经不再支持 ParquetRowInputFormat,而是推荐使用 ParquetAvroInputFormat 来读取 Parquet 格式的文件。下面我提供一个在 Flink 1.15 中使用 ParquetAvroInputFormat 读取 HDFS 中 Parquet 文件的简单样例:

    java import org.apache.avro.generic.GenericRecord; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.formats.avro.AvroSchema; import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; import org.apache.flink.hadoopcompatibility.HadoopInputs; import org.apache.flink.hadoopcompatibility.HadoopReadFunction; import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

    import java.util.Properties;

    public class ReadParquetInFlink { public static void main(String[] args) throws Exception { // 执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);

        // HadoopInputFormat 配置
        org.apache.hadoop.mapreduce.Job job = org.apache.hadoop.mapreduce.Job.getInstance();
        org.apache.hadoop.conf.Configuration conf = job.getConfiguration();
    
        // Hadoop 文件路径
        String filePath = "hdfs://namenode:port/path/to/parquet";
    
        // 设置 HDFS 配置信息
        conf.set("fs.defaultFS", "hdfs://namenode:port");
        conf.set("dfs.nameservices", "nameservice1");
        conf.set("dfs.ha.namenodes.nameservice1", "namenode1,namenode2");
        conf.set("dfs.namenode.rpc-address.nameservice1.namenode1", "namenode1:8020");
        conf.set("dfs.namenode.rpc-address.nameservice1.namenode2", "namenode2:8020");
        
        // Parquet 格式配置信息
        String schemaRegistryUrl = "http://localhost:8081"; // Confluent Schema Registry URL
        AvroSchema avroSchema = new AvroSchema(MyRecord.class); // 自定义的 POJO 类
        ConfluentRegistryAvroDeserializationSchema<MyRecord> deserializationSchema =
            new ConfluentRegistryAvroDeserializationSchema<>(avroSchema, schemaRegistryUrl);
        HadoopInputFormat<?, ?> inputFormat = HadoopInputs.createHadoopInputFormat(
            new org.apache.hadoop.mapreduce.lib.input.FileInputFormat<GenericRecord, Void>() {},
            org.apache.avro.generic.GenericRecord.class,
            job
        );
        inputFormat.getConfiguration().set("parquet.avro.schema", avroSchema.toString());
    
        // 读取 HDFS 文件,转换为 POJO 类型
        env.createInput(inputFormat, TypeInformation.of(new org.apache.flink.api.common.typeinfo.TypeHint<GenericRecord>() {}))
            .map(new MapFunction<GenericRecord, MyRecord>() {
                @Override
                public MyRecord map(GenericRecord value) throws Exception {
                    return new MyRecord(value.get("id").toString(), value.get("name").toString());
                }
            })
            .print();
    }
    
    public static class MyRecord {
        private String id;
        private String name;
    
        public MyRecord(String id, String name) {
            this.id = id;
            this.name = name;
        }
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    }
    

    } 在上面的代码中,我使用了 HadoopInputFormat 从 HDFS 中读取 Parquet 文件,并使用 ConfluentRegistryAvroDeserializationSchema 将读取到的数据转换为自定义的 POJO 类型。需要注意的是,在使用 ConfluentRegistryAvroDeserializationSchema 时,需要指定 Confluent Schema Registry 的 URL 和 Avro Schema 对应的类。

    另外,如果您需要使用 Flink 的流式处理功能来读取 Kafka 中的 Avro 格式数据,可以使用 FlinkKafkaConsumer 并结合 ConfluentRegistryAvroDeserializationSchema 来进行 Avro 数据的反序列化,示例如下:

    java StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties kafkaProps = new Properties(); kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id"); kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName()); kafkaProps.setProperty("schema.registry.url", "http://localhost:8081");

    String kafkaTopic = "my-kafka-topic"; AvroSchema avroSchema = new AvroSchema(MyRecord.class); ConfluentRegistryAvroDeserializationSchema deserializationSchema = new ConfluentRegistryAvroDeserializationSchema<>(avroSchema);

    FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>( kafkaTopic, deserializationSchema, kafkaProps ); DataStream stream = env.addSource(kafkaConsumer);

    stream.map(new MapFunction<MyRecord, String>() { @Override public String map(MyRecord value) throws Exception { return value.toString(); } }).print();

    env.execute("Read from Kafka + Avro"); 在上面的代码中,我使用了 ConfluentRegistryAvroDeserializationSchema 将从 Kafka 中读取到的数据转换为自定义的 POJO 类型,并使用 FlinkKafkaConsumer 进行数据的消费。同样需要注意的是,这里需要指定 Confluent Schema Registry 的 URL 和 Avro Schema 对应的类。

    2023-05-13 08:40:03
    赞同 展开评论 打赏
  • 公众号:网络技术联盟站,InfoQ签约作者,阿里云社区签约作者,华为云 云享专家,BOSS直聘 创作王者,腾讯课堂创作领航员,博客+论坛:https://www.wljslmz.cn,工程师导航:https://www.wljslmz.com

    在 Flink 1.15.0 版本中,确实不存在 flink-parquet 的依赖库了。不过,可以使用 flink-avro 库来实现 Parquet 的读取。

    具体实现步骤如下:

    1. 在 Maven 依赖中添加 flink-avro 库:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-avro</artifactId>
      <version>1.15.0</version>
    </dependency>
    
    1. 使用 AvroParquetInputFormat 类来读取 Parquet 文件,例如:
    val filePath = "hdfs://localhost:9000/user/test.parquet"
    val inputFormat = new AvroParquetInputFormat[GenericRecord](new Path(filePath), GenericRecord.getClassSchema)
    val dataStream = env.createInput(inputFormat)
    

    其中,GenericRecord 是 Avro 序列化和反序列化的通用数据类型。

    这样,就可以在 Flink 1.15.0 中使用 AvroParquetInputFormat 来读取 Parquet 文件了。

    2023-05-12 23:18:13
    赞同 展开评论 打赏
  • CSDN全栈领域优质创作者,万粉博主;InfoQ签约博主;华为云享专家;华为Iot专家;亚马逊人工智能自动驾驶(大众组)吉尼斯世界纪录获得者

    以下是Flink 1.15读取HDFS中Parquet文件的Java示例代码:

    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.hdfs.HdfsFileStreamFactory;
    import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
    
    import java.io.IOException;
    import java.util.Properties;
    
    public class ReadParquetFromHdfs {
    
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 设置Hadoop环境变量
            System.setProperty("hadoop.conf.dir", "/path/to/hadoop/conf");
            // 设置hdfs连接参数
            Properties properties = new Properties();
            properties.setProperty("fs.defaultFS", "hdfs://localhost:9000");
            // 创建HdfsFileStreamFactory,用于从hdfs读取数据流
            HdfsFileStreamFactory factory = new HdfsFileStreamFactory(properties);
            // 从parquet文件中读取数据流
            DataStream<Tuple2<String, String>> stream = env
                .addSource(factory)
                .map(new MapFunction<Tuple2<String, String>, Tuple2<String, String>>() {
                    @Override
                    public Tuple2<String, String> map(Tuple2<String, String> value) throws Exception {
                        return value;
                    }
                });
            // 将数据流转换为JSON格式并打印出来
            stream
                .map(new MapFunction<Tuple2<String, String>, String>() {
                    @Override
                    public String map(Tuple2<String, String> value) throws Exception {
                        return value.f1 + "\t" + value.f0;
                    }
                })
                .print();
            env.execute("Read Parquet From HDFS");
        }
    }
    

    上述代码中,首先需要设置Hadoop环境变量和hdfs连接参数。然后使用HdfsFileStreamFactory创建数据流读取器,并将parquet文件添加到读取器中。接下来,使用map函数将读取的数据转换为JSON格式,并打印出来。最后,使用execute方法执行Flink程序。

    2023-05-12 16:10:03
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    海量数据分布式存储——Apache HDFS之最新进展 立即下载
    从《阿里巴巴Java开发手册》编写推广谈技术成长 立即下载
    如何通过 Serverless 提高 Java 微服务治理效 立即下载