Hbase java client 压测报错-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文

Hbase java client 压测报错

hbase小能手 2018-11-06 17:39:41 2539

使用apache 压测工具,2000并发时出现如下错误:
2018-10-12 16:09:35.527 WARN org.apache.hadoop.hbase.client.ScannerCallable Line:367 - Ignore, probably already closed
org.apache.hadoop.hbase.ipc.RemoteWithExtrasException: Call queue is full on /10.x'x.xxxx.xx:16020, too many items queued ?

    at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1234)
    at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:223)
    at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:328)
    at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.scan(ClientProtos.java:32831)
    at org.apache.hadoop.hbase.client.ScannerCallable.close(ScannerCallable.java:362)
    at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:197)
    at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:145)
    at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:60)
    at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:210)
    at org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:327)
    at org.apache.hadoop.hbase.client.ClientScanner.close(ClientScanner.java:748)
    at org.apache.hadoop.hbase.MetaTableAccessor.fullScan(MetaTableAccessor.java:610)
    at org.apache.hadoop.hbase.MetaTableAccessor.tableExists(MetaTableAccessor.java:366)
    at org.apache.hadoop.hbase.client.HBaseAdmin.tableExists(HBaseAdmin.java:403)
    at org.apache.hadoop.hbase.client.HBaseAdmin.tableExists(HBaseAdmin.java:413)
    at com.asiainfo.tag.utils.HBaseUtils.tableExists(HBaseUtils.java:65)
    at com.asiainfo.tag.service.impl.CustGroupServiceImpl.checkTagInGroup(CustGroupServiceImpl.java:46)
    at com.asiainfo.tag.service.impl.ServiceCustGroupProxy.checkTagInGroup(ServiceCustGroupProxy.java:30)
    at com.asiainfo.service.CustGroupCheckImpl.checkExistsNo(CustGroupCheckImpl.java:30)
    at sun.reflect.GeneratedMethodAccessor53.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.cxf.service.invoker.AbstractInvoker.performInvocation(AbstractInvoker.java:179)
    at org.apache.cxf.jaxws.JAXWSMethodInvoker.performInvocation(JAXWSMethodInvoker.java:66)
    at org.apache.cxf.service.invoker.AbstractInvoker.invoke(AbstractInvoker.java:96)
    at org.apache.cxf.jaxws.AbstractJAXWSMethodInvoker.invoke(AbstractJAXWSMethodInvoker.java:232)
    at org.apache.cxf.jaxws.JAXWSMethodInvoker.invoke(JAXWSMethodInvoker.java:85)
    at org.apache.cxf.service.invoker.AbstractInvoker.invoke(AbstractInvoker.java:74)
    at org.apache.cxf.interceptor.ServiceInvokerInterceptor$1.run(ServiceInvokerInterceptor.java:59)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.cxf.interceptor.ServiceInvokerInterceptor$2.run(ServiceInvokerInterceptor.java:126)
    at org.apache.cxf.workqueue.SynchronousExecutor.execute(SynchronousExecutor.java:37)
    at org.apache.cxf.interceptor.ServiceInvokerInterceptor.handleMessage(ServiceInvokerInterceptor.java:131)
    at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:308)
    at org.apache.cxf.transport.ChainInitiationObserver.onMessage(ChainInitiationObserver.java:121)
    at org.apache.cxf.transport.http.AbstractHTTPDestination.invoke(AbstractHTTPDestination.java:267)
    at org.apache.cxf.transport.servlet.ServletController.invokeDestination(ServletController.java:234)
    at org.apache.cxf.transport.servlet.ServletController.invoke(ServletController.java:208)
    at org.apache.cxf.transport.servlet.ServletController.invoke(ServletController.java:160)
    at org.apache.cxf.transport.servlet.CXFNonSpringServlet.invoke(CXFNonSpringServlet.java:216)
    at org.apache.cxf.transport.servlet.AbstractHTTPServlet.handleRequest(AbstractHTTPServlet.java:301)
    at org.apache.cxf.transport.servlet.AbstractHTTPServlet.doPost(AbstractHTTPServlet.java:220)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:661)
    at org.apache.cxf.transport.servlet.AbstractHTTPServlet.service(AbstractHTTPServlet.java:276)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:108)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:81)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:197)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:199)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:478)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342)
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:803)
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1459)
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
    at java.lang.Thread.run(Thread.java:745)

===2018-10-12 16:09:35.528 WARN com.asiainfo.tag.service.impl.ServiceCustGroupProxy Line:32 - ================checkTagInGroup 自动集群切换

Java 测试技术 分布式数据库 Apache Hbase
分享到
取消 提交回答
全部回答(2)
  • hiekay
    2019-07-17 23:12:31

    队列 满了, 你可以设置 大一些

    0 0
  • 我是管理员
    2019-07-17 23:12:31

    根据异常日志推测是消费队列(默认长度为10倍的Handler数)打满导致,查看源码ServerRpcConnection类处理请求processRequest()方法:
    /**

    • @param buf
    • Has the request header and the request param and optionally
    • encoded data buffer all in this one array.
    • @throws IOException
    • @throws InterruptedException
      */

    protected void processRequest(ByteBuff buf) throws IOException,

      InterruptedException {
    ...
    
    if (!this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) {
      this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize());
      this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
      call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
        "Call queue is full on " + this.rpcServer.server.getServerName() +
            ", too many items queued ?");
      call.sendResponseIfReady();
    }

    }
    得知Server端通过executeRpcCall()方法执行RPC远程调用CallRunner消费次数超过"hbase.ipc.server.max.callqueue.length"配置值就会引起"too many items queued"异常:
    protected boolean executeRpcCall(final ThreadPoolExecutor executor, final AtomicInteger queueSize,

      final CallRunner task) {
    // Executors provide no offer, so make our own.
    int queued = queueSize.getAndIncrement();
    if (maxQueueLength > 0 && queued >= maxQueueLength) {
      queueSize.decrementAndGet();
      return false;
    }
    
    executor.execute(new FifoCallRunner(task){
      @Override
      public void run() {
        task.setStatus(RpcServer.getStatus());
        task.run();
        queueSize.decrementAndGet();
      }
    });
    
    return true;

    }
    public FifoRpcScheduler(Configuration conf, int handlerCount) {

    this.handlerCount = handlerCount;
    this.maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
        handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);

    }

    public static final String IPC_SERVER_MAX_CALLQUEUE_LENGTH =

      "hbase.ipc.server.max.callqueue.length";

    9

    0 0
添加回答
数据库
使用钉钉扫一扫加入圈子
+ 订阅

分享数据库前沿,解构实战干货,推动数据库技术变革

推荐文章
相似问题
推荐课程