开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink1.15读取hdfs目录下所有parquet文件并生成hfile

我需要使用flink1.15去读取hdfs目录下的parquet文件,并且将其转为hfile,请问有合适的样例不,非常感谢

希望可以提供可运行样例,而非gpt样例

展开
收起
1969124368617827 2023-05-13 19:21:44 444 0
3 条回答
写回答
取消 提交回答
  • 下是一个可运行的 Flink 1.15 样例代码,用于读取 HDFS 目录下的 Parquet 文件,然后将数据转换为 HFile 文件并写入到 HBase 表中。(需要注意的是,HBase 表必须存在并且已经预分区,否则会抛出异常。) import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat; import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.types.Row; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.SimpleGroup; import org.apache.parquet.hadoop.ParquetInputFormat; import org.apache.parquet.hadoop.api.InitContext; import org.apache.parquet.hadoop.api.ReadSupport; import org.apache.parquet.hadoop.util.ContextUtil; import org.apache.parquet.schema.MessageType;

    import javax.security.auth.Subject; import java.io.IOException; import java.math.BigDecimal;

    2023-05-14 08:11:07
    赞同 展开评论 打赏
  • 十分耕耘,一定会有一分收获!

    楼主你好,通过apacheflink读取hdfs中parquet文件的方法就是直接在pom.xml中添加以下依赖项:

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-hadoop-compatibility_2.11</artifactId>
      <version>1.6.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-avro</artifactId>
      <version>1.6.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.parquet</groupId>
      <artifactId>parquet-avro</artifactId>
      <version>1.10.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-core</artifactId>
      <version>3.1.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>3.1.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-core</artifactId>
      <version>1.2.1</version>
    </dependency>
    
    2023-05-14 08:05:15
    赞同 展开评论 打赏
  • 公众号:网络技术联盟站,InfoQ签约作者,阿里云社区签约作者,华为云 云享专家,BOSS直聘 创作王者,腾讯课堂创作领航员,博客+论坛:https://www.wljslmz.cn,工程师导航:https://www.wljslmz.com

    以下是一个基于 Flink 1.15 的读取 HDFS 中 Parquet 文件并转换为 HFile 的样例代码:

    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringEncoder;
    import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.core.fs.FileSystem;
    import org.apache.flink.core.fs.Path;
    import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;
    import org.apache.flink.formats.parquet.avro.ParquetReaderFactory;
    import org.apache.flink.formats.parquet.avro.ParquetWriterFactory;
    import org.apache.flink.hadoopcompatibility.HadoopInputs;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
    import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
    import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
    import org.apache.hadoop.hbase.util.Bytes;
    
    import java.io.IOException;
    import java.util.Properties;
    
    public class HdfsParquetToHFile {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 读取 HDFS 中的 Parquet 文件
            String hdfsPath = "hdfs://localhost:9000/path/to/parquet/file";
            env.createInput(HadoopInputs.readHadoopFile(new ParquetReaderFactory<>(), new Path(hdfsPath), Void.class, false));
    
            // 转换数据并写入 HFile
            String hbaseTableName = "mytable";
            String hbaseColumnFamily = "cf";
            String hbaseColumnQualifier = "c1";
            String hbaseOutputPath = "hdfs://localhost:9000/path/to/hfile/output";
            Configuration hbaseConfig = HBaseConfiguration.create();
            hbaseConfig.set("hbase.zookeeper.quorum", "localhost:2181");
            hbaseConfig.set("zookeeper.znode.parent", "/hbase");
            hbaseConfig.set("hbase.mapreduce.hfileoutputformat.table.name", hbaseTableName);
            HadoopOutputFormat<ImmutableBytesWritable, Put> hfileOutputFormat = new HadoopOutputFormat<>(new HFileOutputFormat2(), new JobConf(hbaseConfig));
            hfileOutputFormat.getJobConf().setOutputKeyClass(ImmutableBytesWritable.class);
            hfileOutputFormat.getJobConf().setOutputValueClass(Put.class);
            hfileOutputFormat.getJobConf().set(HFileOutputFormat2.COMPRESS, "NONE");
            hfileOutputFormat.getJobConf().set(HFileOutputFormat2.HFILE_BLOCK_CACHE_SIZE_KEY, "0.0");
            env.map(new MapFunction<GenericRecord, Tuple2<ImmutableBytesWritable, Put>>() {
                @Override
                public Tuple2<ImmutableBytesWritable, Put> map(GenericRecord record) throws Exception {
                    byte[] rowKey = Bytes.toBytes(record.get("id").toString());
                    byte[] value = Bytes.toBytes(record.get("value").toString());
                    Put put = new Put(rowKey);
                    put.addColumn(Bytes.toBytes(hbaseColumnFamily), Bytes.toBytes(hbaseColumnQualifier), value);
                    return Tuple2.of(new ImmutableBytesWritable(rowKey), put);
                }
            })
            .output(hfileOutputFormat).setParallelism(1);
    
            // 将 HFile 写入 HDFS 中
            StreamingFileSink<ImmutableBytesWritable> sink = StreamingFileSink
                .forBulkFormat(new Path(hbaseOutputPath), HadoopOutputFormat.class)
                .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd"))
                .withRollingPolicy(OnCheckpointRollingPolicy.build())
                .withOutputFileConfig(new StreamingFileSink.OutputFileConfig("hfile", ".hfile"))
                .build();
            env.addSink(sink);
    
            env.execute("HdfsParquetToHFile");
        }
    }
    

    在上述代码中,我们首先使用 HadoopInputs.readHadoopFile() 读取了 HDFS 中的 Parquet 文件,然后使用 map() 转换数据,并将其写入 HFile 格式的文件中。最后,使用 StreamingFileSink 将 HFile 文件写入 HDFS 中。

    2023-05-14 08:04:11
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 热门讨论

    热门文章

    相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载