开发者社区 问答 正文

Flink使用批处理,DataSet获取HBase的数据,自定义的数据集该怎么写?

那个,最近在写这个Flink SQL对接HBase,但是在对接的时候,FlinkSQL需要用到我们自己实现的InputFormat接口的一个HBase的InputFormat。然后我自己写了一个继承RichInputFormat的HBaseInputFormat类,是按照JDBCInputFormat来写的。但是着实是不知道后面怎么处理了,有没有大神帮我看一下?

public class HBaseInputFormat extends RichInputFormat<ResultScanner , InputSplit>  {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(HBaseInputFormat.class);

    private org.apache.hadoop.conf.Configuration conf = null;
    private Connection connection = null;
    private Admin admin = null;
    private String tableName ;
    private String quorum ;
    private String clientPort ;
    private ResultScanner scanner ;

    @Override
    public void configure(Configuration parameters) {
        //do nothing here
    }

    @Override
    public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
        return cachedStatistics;
    }

    @Override
    public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
        if (minNumSplits < 1 ){
            throw new IllegalArgumentIOException("Number of input splits must be at least 1.");
        }
        minNumSplits = (this instanceof NonParallelInput ) ? 1 : minNumSplits ;
        GenericInputSplit[] splits = new GenericInputSplit[minNumSplits];
        for (int i = 0 ; i < splits.length ; i ++ ){
            splits[i] = new GenericInputSplit(i , minNumSplits) ;
        }
        return splits;
    }

    @Override
    public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
        return new DefaultInputSplitAssigner(inputSplits);
    }
    // 打开与HBase的连接
    @Override
    public void openInputFormat()  {
        System.out.println("openInputFormat...");
        conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum" , quorum);
        conf.set("hbase.zookeeper.property.clientPort" , clientPort);
        try {
            connection = ConnectionFactory.createConnection(conf);
        } catch (IOException e) {
           throw new IllegalArgumentException("connection failed " + e.getMessage() , e) ;
        }
        try {
            admin = connection.getAdmin() ;
        } catch (IOException e) {
            throw new IllegalArgumentException("admin failed " + e.getMessage() , e) ;
        }
    }

    @Override
    public void closeInputFormat()  {
        if (connection != null ){
            try {
                connection.close();
            } catch (IOException e) {
                throw new IllegalArgumentException("connection closed  failed " + e.getMessage() , e) ;
            }
        }
        if (admin != null ){
            try {
                admin.close();
            } catch (IOException e) {
                throw new IllegalArgumentException("admin closed  failed " + e.getMessage() , e) ;
            }
        }

    }

    @Override
    public void open(InputSplit split) throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Scan scan = new Scan();
        scanner = table.getScanner(scan);
    }

    @Override
    public boolean reachedEnd() throws IOException {
        if (scanner.next() != null) {return true;}
        return false ;
    }

    @Override
    public ResultScanner nextRecord(ResultScanner reuse) throws IOException {
        return reuse;
    }

    @Override
    public void close() throws IOException {
        if (scanner == null ){
            return ;
        }
        try {
            scanner.close();
        }catch (Exception e){
            LOG.info("Inputformat ResultScanner couldn't be closed - " + e.getMessage());
        }
    }
    @VisibleForTesting
    Admin getAdmin(){ return admin ;}
    @VisibleForTesting
    Connection getConnection(){return connection ;}
    public static HBaseInputFormatBuilder buildHBaseInputFormat() {
        return new HBaseInputFormatBuilder();
    }

    public static class HBaseInputFormatBuilder {
        private final HBaseInputFormat format ;

        public HBaseInputFormatBuilder(){
            this.format = new HBaseInputFormat();
        }

        public HBaseInputFormatBuilder setQuorum(String quorum) {
            format.quorum = quorum ;
            return this;
        }
        public HBaseInputFormatBuilder setClientPort(String clientPort){
            format.clientPort = clientPort ;
            return this ;
        }
        public HBaseInputFormatBuilder setTableName(String tableName){
            format.tableName = tableName ;
            return this ;
        }
        public HBaseInputFormatBuilder open(){
            format.openInputFormat();
            return this ;
        }

        public HBaseInputFormat finish(){
            if (format.quorum == null ){
                LOG.info("quorum was not supplied separately.");
            }
            if (format.clientPort == null ){
                LOG.info("clientPort was not supplied separately.");
            }
            if (format.tableName == null ){
                LOG.info("tableName was not supplied separately.");
            }
            if (format.connection == null ){
                throw new IllegalArgumentException("No connection supplied");
            }
            if (format.admin == null ){
                throw new IllegalArgumentException("No admin supplied");
            }
            return format ;
        }

    }
}

展开
收起
IT喵看视界 2019-09-16 15:45:04 2533 分享 版权
阿里云 AI 助理回答

在Flink SQL中自定义InputFormat以对接HBase,你已经正确地开始通过继承RichInputFormat并实现了部分方法。接下来,你需要完成几个关键步骤来确保数据可以从HBase读取到Flink中。以下是一些关键点和代码示例,帮助你继续完成这个任务:

  1. 初始化HBase连接: 在open()方法中初始化HBase的配置、连接和Admin对象。

  2. 创建InputSplit: 根据HBase表的分区逻辑(如Region)来生成InputSplit。这一步可能需要查询HBase的元数据信息来决定如何分割数据。

  3. 实现createInputSplits(): 生成多个InputSplit,每个Split代表一部分数据的读取任务。

  4. 实现openInputSplit(): 根据传入的InputSplit初始化Scanner或其他必要的读取器。

  5. 实现reachEnd()nextRecord(): 这两个方法分别用于判断是否读取完毕以及从HBase获取下一个记录。

  6. 序列化与反序列化: 将HBase中的数据转换为Flink可以处理的数据类型。

下面是一个简化的示例代码框架,供你参考:

public class HBaseInputFormat extends RichInputFormat<Row, InputSplit> {
    // ...你的成员变量...

    @Override
    public void openInputFormat() throws IOException {
        conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", quorum);
        conf.set("hbase.zookeeper.property.clientPort", clientPort);
        connection = ConnectionFactory.createConnection(conf);
        admin = connection.getAdmin();
    }

    @Override
    public void closeInputFormat() throws IOException {
        if (admin != null) {
            admin.close();
        }
        if (connection != null) {
            connection.close();
        }
    }

    @Override
    public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
        // 实现根据表大小或逻辑划分InputSplit
        List<InputSplit> splits = new ArrayList<>();
        // 假设简单分片,实际应根据HBase Region等进行
        for (int i = 0; i < minNumSplits; i++) {
            splits.add(new GenericInputSplit(i, minNumSplits));
        }
        return splits.toArray(new InputSplit[0]);
    }

    @Override
    public void open(InputSplit split) throws IOException {
        // 根据split初始化scanner,例如基于rowkey范围
        TableName table = TableName.valueOf(tableName);
        Table htable = connection.getTable(table);
        // 假设使用Scan操作,实际需根据需求调整
        Scan scan = new Scan();
        // 设置scan参数...
        scanner = htable.getScanner(scan);
    }

    @Override
    public boolean reachedEnd() throws IOException {
        return !scanner.iterator().hasNext();
    }

    @Override
    public Row nextRecord(Row reuse) throws IOException {
        if (!reachedEnd()) {
            Result result = scanner.next();
            // 将Result转换为Flink Row,这里需要根据你的数据结构来实现
            // 示例:假设结果只包含一个列族和一列
            byte[] value = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("qualifier"));
            String stringValue = new String(value, StandardCharsets.UTF_8);
            reuse.setField(stringValue, 0); // 假设只有一个字段
            return reuse;
        } else {
            return null;
        }
    }
}

请根据你的具体需求调整上述代码,比如Scan的设置、Row的构造等。记得处理好异常情况,并在不需要时关闭资源。希望这能帮助你完成Flink SQL与HBase的集成工作!

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答