开发者社区> 问答> 正文

hbase批量查批量写报错org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException

hbase小能手 2018-11-07 15:59:18 2705

org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException

at org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.makeException(AsyncProcess.java:228)
at org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.access$1700(AsyncProcess.java:208)
at org.apache.hadoop.hbase.client.AsyncProcess.waitForAllPreviousOpsAndReset(AsyncProcess.java:1700)
at org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:208)
at org.apache.hadoop.hbase.client.BufferedMutatorImpl.flush(BufferedMutatorImpl.java:183)
at org.apache.hadoop.hbase.client.HTable.flushCommits(HTable.java:1449)
at org.apache.hadoop.hbase.client.HTable.put(HTable.java:1052)
at com.pdd.service.medoc.dao.hbase.dao.impl.BaseDAO.lambda$batchInsertV3$15(BaseDAO.java:353)
at com.pdd.service.medoc.dao.hbase.config.HbaseTemplateV2.execute(HbaseTemplateV2.java:27)
at com.pdd.service.medoc.dao.hbase.dao.impl.BaseDAO.batchInsertV3(BaseDAO.java:339)
at com.pdd.service.medoc.dao.hbase.dao.impl.GoodsExtensionDAOV3Impl.batchInsert(GoodsExtensionDAOV3Impl.java:83)
at com.pdd.service.medoc.dao.hbase.dao.impl.GoodsExtensionDAOV3Impl

$$ FastClassBySpringCGLIB $$

f025277.invoke()
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:720)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:136)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:655)
at com.pdd.service.medoc.dao.hbase.dao.impl.GoodsExtensionDAOV3Impl

$$ EnhancerBySpringCGLIB $$

499b33c1.batchInsert()
at com.pdd.service.medoc.build.sharding.ShardingGoodsHbaseLogic.processGoodsById(ShardingGoodsHbaseLogic.java:758)
at com.pdd.service.medoc.build.sharding.ShardingGoodsHbaseLogic.insertGoodsInfoIntoHbase(ShardingGoodsHbaseLogic.java:152)
at com.pdd.service.medoc.build.sync.SyncGoodsInfo.lambda$syncAll$0(SyncGoodsInfo.java:85)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
如上代码是批量读或者批量写时的报错,
public HbaseTemplateV2 hbaseTemplateV2() {

    HbaseTemplateV2 hbaseTemplateV2 = new HbaseTemplateV2();

    HbaseTemplateConfig config = new HbaseTemplateConfig();
    Configuration hBaseConfiguration = HBaseConfiguration.create();
    hBaseConfiguration.set("hbase.zookeeper.quorum", config.getZookeeperQuorumV2());
    hBaseConfiguration.set("hbase.zookeeper.property.clientPort", config.getZookeeperPortV2());
    hBaseConfiguration.set("zookeeper.znode.parent", config.getZookeeperParentV2());

    hBaseConfiguration.setInt("hbase.client.retries.number",3);
    hBaseConfiguration.setInt("ipc.socket.timeout",3000);//socket建立链接的超时时间
    hBaseConfiguration.setInt("hbase.rpc.timeout",3000);//单次rpc请求超时时间
    hBaseConfiguration.setInt("hbase.client.operation.timeout",18000);//单次数据操作总时间 不包括scan
    hBaseConfiguration.setInt("hbase.client.scanner.timeout.period",3000);//scan操作单次rpc请求超时时间
    hbaseTemplateV2.setConf(hBaseConfiguration);

    return hbaseTemplateV2;
}

如上代码,是hbase初始化时的参数配置,
protected List batchGetV2(String tableName, List rowNames, String familyName) {

    return hbaseTemplateV2.execute(conn -> {
        Table table = conn.getTable(TableName.valueOf(tableName));
        Transaction t1 = Cat.newTransaction("transaction_hbase_batch_get", tableName);
        try {
            List<Get> getList = Lists.newArrayList();
            rowNames.forEach(rowName -> {
                Get get = new Get(Bytes.toBytes(rowName));
                get.addFamily(Bytes.toBytes(familyName));
                getList.add(get);
            });
            Result results = table.get(getList);
            List<T> fetchResult = Lists.newArrayList();
            for (Result result : results) {
                T t = convertResult2DTO(result);
                if(t!=null) {
                    fetchResult.add(t);
                }
            }
            return fetchResult;
        } catch (Exception e) {
            Cat.logError("#medoc-hbase# failed to batch get tablename." + tableName + ".rowNames." + JsonUtils.toJson(rowNames), e);
            log.error("#medoc-hbase# failed to batch get tablename." + tableName + ".rowNames." + JsonUtils.toJson(rowNames), e);
            t1.setStatus(e);
            return null;
        } finally {
            if (Objects.nonNull(table)) {
                table.close();
            }
            t1.complete();
            t1.setStatus(Transaction.SUCCESS);
        }
    });
}

如上代码是我批量读的代码,
具体背景是,任务启动,程序从DB里批量读取一定量的数据,然后再从hbase里读取,进行比较筛选后,再写入hbase,有时候批量读会报错,有时候批量写会报错,都是上面那个类型的错误

hbase客户端版本
"org.apache.hbase:hbase-client:$hbaseVersion"
hbaseVersion = '1.1.2.2.4.2.0-258'

调用方法所在类
org.apache.hadoop.hbase.client.Table

分布式数据库 Hbase
分享到
取消 提交回答
全部回答(1)
  • 马铭芳
    2019-07-17 23:12:41

    后面降低了写数据的任务频率,同时降低数据写入量,运行平稳的时间稍微长了一点,但是看监控,只要GC一触发,就会继续报这个错误,我能理解成只要有GC 就会报这个错吗?如果是这样 怎么提高服务器在GC的同时负载能力不下降那么多呢

    0 0
+ 订阅

分享数据库前沿,解构实战干货,推动数据库技术变革

推荐文章
相似问题
推荐课程