网上都是1.9版本的,里面用到的ParquetRowInputFormat在下面的依赖中已经没有了! org.apache.flink flink-parquet 1.15.0
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
Flink 1.15 中,读取 HDFS 中的 Parquet 文件需要使用 ParquetInputFormat。以下是一个简单的 Java 样例:
import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.parquet.ParquetInputFormat; import org.apache.flink.formats.parquet.row.ParquetRowDataReadSupport;
public class ReadParquetFromHDFS { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    Path path = new Path("hdfs://namenode:8020/path/to/parquet/file");
    ParquetInputFormat<RowData> format = ParquetInputFormat.builder(path, new ParquetRowDataReadSupport())
            .build();
    DataSet<RowData> rows = env.createInput(format);
    
    // 处理数据的逻辑
    // ...
}
} 在 pom.xml 中添加以下依赖:
org.apache.flink flink-formats-flink-avro-confluent-schema-registry 1.15.0 org.apache.flink flink-formats-parquet 1.15.0 其中,flink-formats-parquet 依赖中包含了 ParquetRowDataReadSupport 类。
注意要将 path 替换为实际的 HDFS 文件路径。另外,在运行程序之前需要先启动 HDFS 和 Flink 集群。
Flink 1.15 版本中已经不再支持 ParquetRowInputFormat,而是推荐使用 ParquetAvroInputFormat 来读取 Parquet 格式的文件。下面我提供一个在 Flink 1.15 中使用 ParquetAvroInputFormat 读取 HDFS 中 Parquet 文件的简单样例:
java import org.apache.avro.generic.GenericRecord; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.formats.avro.AvroSchema; import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; import org.apache.flink.hadoopcompatibility.HadoopInputs; import org.apache.flink.hadoopcompatibility.HadoopReadFunction; import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class ReadParquetInFlink { public static void main(String[] args) throws Exception { // 执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);
    // HadoopInputFormat 配置
    org.apache.hadoop.mapreduce.Job job = org.apache.hadoop.mapreduce.Job.getInstance();
    org.apache.hadoop.conf.Configuration conf = job.getConfiguration();
    // Hadoop 文件路径
    String filePath = "hdfs://namenode:port/path/to/parquet";
    // 设置 HDFS 配置信息
    conf.set("fs.defaultFS", "hdfs://namenode:port");
    conf.set("dfs.nameservices", "nameservice1");
    conf.set("dfs.ha.namenodes.nameservice1", "namenode1,namenode2");
    conf.set("dfs.namenode.rpc-address.nameservice1.namenode1", "namenode1:8020");
    conf.set("dfs.namenode.rpc-address.nameservice1.namenode2", "namenode2:8020");
    
    // Parquet 格式配置信息
    String schemaRegistryUrl = "http://localhost:8081"; // Confluent Schema Registry URL
    AvroSchema avroSchema = new AvroSchema(MyRecord.class); // 自定义的 POJO 类
    ConfluentRegistryAvroDeserializationSchema<MyRecord> deserializationSchema =
        new ConfluentRegistryAvroDeserializationSchema<>(avroSchema, schemaRegistryUrl);
    HadoopInputFormat<?, ?> inputFormat = HadoopInputs.createHadoopInputFormat(
        new org.apache.hadoop.mapreduce.lib.input.FileInputFormat<GenericRecord, Void>() {},
        org.apache.avro.generic.GenericRecord.class,
        job
    );
    inputFormat.getConfiguration().set("parquet.avro.schema", avroSchema.toString());
    // 读取 HDFS 文件,转换为 POJO 类型
    env.createInput(inputFormat, TypeInformation.of(new org.apache.flink.api.common.typeinfo.TypeHint<GenericRecord>() {}))
        .map(new MapFunction<GenericRecord, MyRecord>() {
            @Override
            public MyRecord map(GenericRecord value) throws Exception {
                return new MyRecord(value.get("id").toString(), value.get("name").toString());
            }
        })
        .print();
}
public static class MyRecord {
    private String id;
    private String name;
    public MyRecord(String id, String name) {
        this.id = id;
        this.name = name;
    }
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
}
} 在上面的代码中,我使用了 HadoopInputFormat 从 HDFS 中读取 Parquet 文件,并使用 ConfluentRegistryAvroDeserializationSchema 将读取到的数据转换为自定义的 POJO 类型。需要注意的是,在使用 ConfluentRegistryAvroDeserializationSchema 时,需要指定 Confluent Schema Registry 的 URL 和 Avro Schema 对应的类。
另外,如果您需要使用 Flink 的流式处理功能来读取 Kafka 中的 Avro 格式数据,可以使用 FlinkKafkaConsumer 并结合 ConfluentRegistryAvroDeserializationSchema 来进行 Avro 数据的反序列化,示例如下:
java StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties kafkaProps = new Properties(); kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id"); kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName()); kafkaProps.setProperty("schema.registry.url", "http://localhost:8081");
String kafkaTopic = "my-kafka-topic"; AvroSchema avroSchema = new AvroSchema(MyRecord.class); ConfluentRegistryAvroDeserializationSchema deserializationSchema = new ConfluentRegistryAvroDeserializationSchema<>(avroSchema);
FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>( kafkaTopic, deserializationSchema, kafkaProps ); DataStream stream = env.addSource(kafkaConsumer);
stream.map(new MapFunction<MyRecord, String>() { @Override public String map(MyRecord value) throws Exception { return value.toString(); } }).print();
env.execute("Read from Kafka + Avro"); 在上面的代码中,我使用了 ConfluentRegistryAvroDeserializationSchema 将从 Kafka 中读取到的数据转换为自定义的 POJO 类型,并使用 FlinkKafkaConsumer 进行数据的消费。同样需要注意的是,这里需要指定 Confluent Schema Registry 的 URL 和 Avro Schema 对应的类。
在 Flink 1.15.0 版本中,确实不存在 flink-parquet 的依赖库了。不过,可以使用 flink-avro 库来实现 Parquet 的读取。
具体实现步骤如下:
flink-avro 库:<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-avro</artifactId>
  <version>1.15.0</version>
</dependency>
AvroParquetInputFormat 类来读取 Parquet 文件,例如:val filePath = "hdfs://localhost:9000/user/test.parquet"
val inputFormat = new AvroParquetInputFormat[GenericRecord](new Path(filePath), GenericRecord.getClassSchema)
val dataStream = env.createInput(inputFormat)
其中,GenericRecord 是 Avro 序列化和反序列化的通用数据类型。
这样,就可以在 Flink 1.15.0 中使用 AvroParquetInputFormat 来读取 Parquet 文件了。
以下是Flink 1.15读取HDFS中Parquet文件的Java示例代码:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.hdfs.HdfsFileStreamFactory;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import java.io.IOException;
import java.util.Properties;
public class ReadParquetFromHdfs {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置Hadoop环境变量
        System.setProperty("hadoop.conf.dir", "/path/to/hadoop/conf");
        // 设置hdfs连接参数
        Properties properties = new Properties();
        properties.setProperty("fs.defaultFS", "hdfs://localhost:9000");
        // 创建HdfsFileStreamFactory,用于从hdfs读取数据流
        HdfsFileStreamFactory factory = new HdfsFileStreamFactory(properties);
        // 从parquet文件中读取数据流
        DataStream<Tuple2<String, String>> stream = env
            .addSource(factory)
            .map(new MapFunction<Tuple2<String, String>, Tuple2<String, String>>() {
                @Override
                public Tuple2<String, String> map(Tuple2<String, String> value) throws Exception {
                    return value;
                }
            });
        // 将数据流转换为JSON格式并打印出来
        stream
            .map(new MapFunction<Tuple2<String, String>, String>() {
                @Override
                public String map(Tuple2<String, String> value) throws Exception {
                    return value.f1 + "\t" + value.f0;
                }
            })
            .print();
        env.execute("Read Parquet From HDFS");
    }
}
上述代码中,首先需要设置Hadoop环境变量和hdfs连接参数。然后使用HdfsFileStreamFactory创建数据流读取器,并将parquet文件添加到读取器中。接下来,使用map函数将读取的数据转换为JSON格式,并打印出来。最后,使用execute方法执行Flink程序。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。