我需要使用flink1.15去读取hdfs目录下的parquet文件,并且将其转为hfile,请问有合适的样例不,非常感谢
希望可以提供可运行样例,而非gpt样例
下是一个可运行的 Flink 1.15 样例代码,用于读取 HDFS 目录下的 Parquet 文件,然后将数据转换为 HFile 文件并写入到 HBase 表中。(需要注意的是,HBase 表必须存在并且已经预分区,否则会抛出异常。) import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat; import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.types.Row; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.SimpleGroup; import org.apache.parquet.hadoop.ParquetInputFormat; import org.apache.parquet.hadoop.api.InitContext; import org.apache.parquet.hadoop.api.ReadSupport; import org.apache.parquet.hadoop.util.ContextUtil; import org.apache.parquet.schema.MessageType;
import javax.security.auth.Subject; import java.io.IOException; import java.math.BigDecimal;
楼主你好,通过apacheflink读取hdfs中parquet文件的方法就是直接在pom.xml中添加以下依赖项:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
以下是一个基于 Flink 1.15 的读取 HDFS 中 Parquet 文件并转换为 HFile 的样例代码:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;
import org.apache.flink.formats.parquet.avro.ParquetReaderFactory;
import org.apache.flink.formats.parquet.avro.ParquetWriterFactory;
import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.Properties;
public class HdfsParquetToHFile {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取 HDFS 中的 Parquet 文件
String hdfsPath = "hdfs://localhost:9000/path/to/parquet/file";
env.createInput(HadoopInputs.readHadoopFile(new ParquetReaderFactory<>(), new Path(hdfsPath), Void.class, false));
// 转换数据并写入 HFile
String hbaseTableName = "mytable";
String hbaseColumnFamily = "cf";
String hbaseColumnQualifier = "c1";
String hbaseOutputPath = "hdfs://localhost:9000/path/to/hfile/output";
Configuration hbaseConfig = HBaseConfiguration.create();
hbaseConfig.set("hbase.zookeeper.quorum", "localhost:2181");
hbaseConfig.set("zookeeper.znode.parent", "/hbase");
hbaseConfig.set("hbase.mapreduce.hfileoutputformat.table.name", hbaseTableName);
HadoopOutputFormat<ImmutableBytesWritable, Put> hfileOutputFormat = new HadoopOutputFormat<>(new HFileOutputFormat2(), new JobConf(hbaseConfig));
hfileOutputFormat.getJobConf().setOutputKeyClass(ImmutableBytesWritable.class);
hfileOutputFormat.getJobConf().setOutputValueClass(Put.class);
hfileOutputFormat.getJobConf().set(HFileOutputFormat2.COMPRESS, "NONE");
hfileOutputFormat.getJobConf().set(HFileOutputFormat2.HFILE_BLOCK_CACHE_SIZE_KEY, "0.0");
env.map(new MapFunction<GenericRecord, Tuple2<ImmutableBytesWritable, Put>>() {
@Override
public Tuple2<ImmutableBytesWritable, Put> map(GenericRecord record) throws Exception {
byte[] rowKey = Bytes.toBytes(record.get("id").toString());
byte[] value = Bytes.toBytes(record.get("value").toString());
Put put = new Put(rowKey);
put.addColumn(Bytes.toBytes(hbaseColumnFamily), Bytes.toBytes(hbaseColumnQualifier), value);
return Tuple2.of(new ImmutableBytesWritable(rowKey), put);
}
})
.output(hfileOutputFormat).setParallelism(1);
// 将 HFile 写入 HDFS 中
StreamingFileSink<ImmutableBytesWritable> sink = StreamingFileSink
.forBulkFormat(new Path(hbaseOutputPath), HadoopOutputFormat.class)
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd"))
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withOutputFileConfig(new StreamingFileSink.OutputFileConfig("hfile", ".hfile"))
.build();
env.addSink(sink);
env.execute("HdfsParquetToHFile");
}
}
在上述代码中,我们首先使用 HadoopInputs.readHadoopFile()
读取了 HDFS 中的 Parquet 文件,然后使用 map()
转换数据,并将其写入 HFile 格式的文件中。最后,使用 StreamingFileSink
将 HFile 文件写入 HDFS 中。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。