网上都是1.9版本的,里面用到的ParquetRowInputFormat在下面的依赖中已经没有了! org.apache.flink flink-parquet 1.15.0
Flink 1.15 中,读取 HDFS 中的 Parquet 文件需要使用 ParquetInputFormat。以下是一个简单的 Java 样例:
import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.parquet.ParquetInputFormat; import org.apache.flink.formats.parquet.row.ParquetRowDataReadSupport;
public class ReadParquetFromHDFS { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Path path = new Path("hdfs://namenode:8020/path/to/parquet/file");
ParquetInputFormat<RowData> format = ParquetInputFormat.builder(path, new ParquetRowDataReadSupport())
.build();
DataSet<RowData> rows = env.createInput(format);
// 处理数据的逻辑
// ...
}
} 在 pom.xml 中添加以下依赖:
org.apache.flink flink-formats-flink-avro-confluent-schema-registry 1.15.0 org.apache.flink flink-formats-parquet 1.15.0 其中,flink-formats-parquet 依赖中包含了 ParquetRowDataReadSupport 类。
注意要将 path 替换为实际的 HDFS 文件路径。另外,在运行程序之前需要先启动 HDFS 和 Flink 集群。
Flink 1.15 版本中已经不再支持 ParquetRowInputFormat,而是推荐使用 ParquetAvroInputFormat 来读取 Parquet 格式的文件。下面我提供一个在 Flink 1.15 中使用 ParquetAvroInputFormat 读取 HDFS 中 Parquet 文件的简单样例:
java import org.apache.avro.generic.GenericRecord; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.formats.avro.AvroSchema; import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; import org.apache.flink.hadoopcompatibility.HadoopInputs; import org.apache.flink.hadoopcompatibility.HadoopReadFunction; import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class ReadParquetInFlink { public static void main(String[] args) throws Exception { // 执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);
// HadoopInputFormat 配置
org.apache.hadoop.mapreduce.Job job = org.apache.hadoop.mapreduce.Job.getInstance();
org.apache.hadoop.conf.Configuration conf = job.getConfiguration();
// Hadoop 文件路径
String filePath = "hdfs://namenode:port/path/to/parquet";
// 设置 HDFS 配置信息
conf.set("fs.defaultFS", "hdfs://namenode:port");
conf.set("dfs.nameservices", "nameservice1");
conf.set("dfs.ha.namenodes.nameservice1", "namenode1,namenode2");
conf.set("dfs.namenode.rpc-address.nameservice1.namenode1", "namenode1:8020");
conf.set("dfs.namenode.rpc-address.nameservice1.namenode2", "namenode2:8020");
// Parquet 格式配置信息
String schemaRegistryUrl = "http://localhost:8081"; // Confluent Schema Registry URL
AvroSchema avroSchema = new AvroSchema(MyRecord.class); // 自定义的 POJO 类
ConfluentRegistryAvroDeserializationSchema<MyRecord> deserializationSchema =
new ConfluentRegistryAvroDeserializationSchema<>(avroSchema, schemaRegistryUrl);
HadoopInputFormat<?, ?> inputFormat = HadoopInputs.createHadoopInputFormat(
new org.apache.hadoop.mapreduce.lib.input.FileInputFormat<GenericRecord, Void>() {},
org.apache.avro.generic.GenericRecord.class,
job
);
inputFormat.getConfiguration().set("parquet.avro.schema", avroSchema.toString());
// 读取 HDFS 文件,转换为 POJO 类型
env.createInput(inputFormat, TypeInformation.of(new org.apache.flink.api.common.typeinfo.TypeHint<GenericRecord>() {}))
.map(new MapFunction<GenericRecord, MyRecord>() {
@Override
public MyRecord map(GenericRecord value) throws Exception {
return new MyRecord(value.get("id").toString(), value.get("name").toString());
}
})
.print();
}
public static class MyRecord {
private String id;
private String name;
public MyRecord(String id, String name) {
this.id = id;
this.name = name;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
} 在上面的代码中,我使用了 HadoopInputFormat 从 HDFS 中读取 Parquet 文件,并使用 ConfluentRegistryAvroDeserializationSchema 将读取到的数据转换为自定义的 POJO 类型。需要注意的是,在使用 ConfluentRegistryAvroDeserializationSchema 时,需要指定 Confluent Schema Registry 的 URL 和 Avro Schema 对应的类。
另外,如果您需要使用 Flink 的流式处理功能来读取 Kafka 中的 Avro 格式数据,可以使用 FlinkKafkaConsumer 并结合 ConfluentRegistryAvroDeserializationSchema 来进行 Avro 数据的反序列化,示例如下:
java StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties kafkaProps = new Properties(); kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id"); kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName()); kafkaProps.setProperty("schema.registry.url", "http://localhost:8081");
String kafkaTopic = "my-kafka-topic"; AvroSchema avroSchema = new AvroSchema(MyRecord.class); ConfluentRegistryAvroDeserializationSchema deserializationSchema = new ConfluentRegistryAvroDeserializationSchema<>(avroSchema);
FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>( kafkaTopic, deserializationSchema, kafkaProps ); DataStream stream = env.addSource(kafkaConsumer);
stream.map(new MapFunction<MyRecord, String>() { @Override public String map(MyRecord value) throws Exception { return value.toString(); } }).print();
env.execute("Read from Kafka + Avro"); 在上面的代码中,我使用了 ConfluentRegistryAvroDeserializationSchema 将从 Kafka 中读取到的数据转换为自定义的 POJO 类型,并使用 FlinkKafkaConsumer 进行数据的消费。同样需要注意的是,这里需要指定 Confluent Schema Registry 的 URL 和 Avro Schema 对应的类。
在 Flink 1.15.0 版本中,确实不存在 flink-parquet
的依赖库了。不过,可以使用 flink-avro
库来实现 Parquet 的读取。
具体实现步骤如下:
flink-avro
库:<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.15.0</version>
</dependency>
AvroParquetInputFormat
类来读取 Parquet 文件,例如:val filePath = "hdfs://localhost:9000/user/test.parquet"
val inputFormat = new AvroParquetInputFormat[GenericRecord](new Path(filePath), GenericRecord.getClassSchema)
val dataStream = env.createInput(inputFormat)
其中,GenericRecord
是 Avro 序列化和反序列化的通用数据类型。
这样,就可以在 Flink 1.15.0 中使用 AvroParquetInputFormat 来读取 Parquet 文件了。
以下是Flink 1.15读取HDFS中Parquet文件的Java示例代码:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.hdfs.HdfsFileStreamFactory;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import java.io.IOException;
import java.util.Properties;
public class ReadParquetFromHdfs {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置Hadoop环境变量
System.setProperty("hadoop.conf.dir", "/path/to/hadoop/conf");
// 设置hdfs连接参数
Properties properties = new Properties();
properties.setProperty("fs.defaultFS", "hdfs://localhost:9000");
// 创建HdfsFileStreamFactory,用于从hdfs读取数据流
HdfsFileStreamFactory factory = new HdfsFileStreamFactory(properties);
// 从parquet文件中读取数据流
DataStream<Tuple2<String, String>> stream = env
.addSource(factory)
.map(new MapFunction<Tuple2<String, String>, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(Tuple2<String, String> value) throws Exception {
return value;
}
});
// 将数据流转换为JSON格式并打印出来
stream
.map(new MapFunction<Tuple2<String, String>, String>() {
@Override
public String map(Tuple2<String, String> value) throws Exception {
return value.f1 + "\t" + value.f0;
}
})
.print();
env.execute("Read Parquet From HDFS");
}
}
上述代码中,首先需要设置Hadoop环境变量和hdfs连接参数。然后使用HdfsFileStreamFactory创建数据流读取器,并将parquet文件添加到读取器中。接下来,使用map函数将读取的数据转换为JSON格式,并打印出来。最后,使用execute方法执行Flink程序。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。