Hbase java client 压测报错

使用apache 压测工具,2000并发时出现如下错误:
 
2018-10-12 16:09:35.527 WARN  org.apache.hadoop.hbase.client.ScannerCallable Line:367 - Ignore, probably already closed
org.apache.hadoop.hbase.ipc.RemoteWithExtrasException: Call queue is full on /10.x'x.xxxx.xx:16020, too many items queued ?
at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1234)
at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:223)
at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:328)
at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.scan(ClientProtos.java:32831)
at org.apache.hadoop.hbase.client.ScannerCallable.close(ScannerCallable.java:362)
at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:197)
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:145)
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:60)
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:210)
at org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:327)
at org.apache.hadoop.hbase.client.ClientScanner.close(ClientScanner.java:748)
at org.apache.hadoop.hbase.MetaTableAccessor.fullScan(MetaTableAccessor.java:610)
at org.apache.hadoop.hbase.MetaTableAccessor.tableExists(MetaTableAccessor.java:366)
at org.apache.hadoop.hbase.client.HBaseAdmin.tableExists(HBaseAdmin.java:403)
at org.apache.hadoop.hbase.client.HBaseAdmin.tableExists(HBaseAdmin.java:413)
at com.asiainfo.tag.utils.HBaseUtils.tableExists(HBaseUtils.java:65)
at com.asiainfo.tag.service.impl.CustGroupServiceImpl.checkTagInGroup(CustGroupServiceImpl.java:46)
at com.asiainfo.tag.service.impl.ServiceCustGroupProxy.checkTagInGroup(ServiceCustGroupProxy.java:30)
at com.asiainfo.service.CustGroupCheckImpl.checkExistsNo(CustGroupCheckImpl.java:30)
at sun.reflect.GeneratedMethodAccessor53.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.cxf.service.invoker.AbstractInvoker.performInvocation(AbstractInvoker.java:179)
at org.apache.cxf.jaxws.JAXWSMethodInvoker.performInvocation(JAXWSMethodInvoker.java:66)
at org.apache.cxf.service.invoker.AbstractInvoker.invoke(AbstractInvoker.java:96)
at org.apache.cxf.jaxws.AbstractJAXWSMethodInvoker.invoke(AbstractJAXWSMethodInvoker.java:232)
at org.apache.cxf.jaxws.JAXWSMethodInvoker.invoke(JAXWSMethodInvoker.java:85)
at org.apache.cxf.service.invoker.AbstractInvoker.invoke(AbstractInvoker.java:74)
at org.apache.cxf.interceptor.ServiceInvokerInterceptor$1.run(ServiceInvokerInterceptor.java:59)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.cxf.interceptor.ServiceInvokerInterceptor$2.run(ServiceInvokerInterceptor.java:126)
at org.apache.cxf.workqueue.SynchronousExecutor.execute(SynchronousExecutor.java:37)
at org.apache.cxf.interceptor.ServiceInvokerInterceptor.handleMessage(ServiceInvokerInterceptor.java:131)
at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:308)
at org.apache.cxf.transport.ChainInitiationObserver.onMessage(ChainInitiationObserver.java:121)
at org.apache.cxf.transport.http.AbstractHTTPDestination.invoke(AbstractHTTPDestination.java:267)
at org.apache.cxf.transport.servlet.ServletController.invokeDestination(ServletController.java:234)
at org.apache.cxf.transport.servlet.ServletController.invoke(ServletController.java:208)
at org.apache.cxf.transport.servlet.ServletController.invoke(ServletController.java:160)
at org.apache.cxf.transport.servlet.CXFNonSpringServlet.invoke(CXFNonSpringServlet.java:216)
at org.apache.cxf.transport.servlet.AbstractHTTPServlet.handleRequest(AbstractHTTPServlet.java:301)
at org.apache.cxf.transport.servlet.AbstractHTTPServlet.doPost(AbstractHTTPServlet.java:220)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:661)
at org.apache.cxf.transport.servlet.AbstractHTTPServlet.service(AbstractHTTPServlet.java:276)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:108)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:81)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:197)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:199)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:478)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:803)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1459)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:745)
===2018-10-12 16:09:35.528 WARN com.asiainfo.tag.service.impl.ServiceCustGroupProxy Line:32 - ================checkTagInGroup 自动集群切换
已邀请:

beyond

赞同来自:

报错不是已经很明显了嘛,消息对列打满了

beyond

赞同来自:

看看监控页面,另外压测看看GC 请求数 是否有分区,从这几个方面去排查

ProgramGeek

赞同来自:

根据异常日志推测是消费队列(默认长度为10倍的Handler数)打满导致,查看源码ServerRpcConnection类处理请求processRequest()方法:
 /**
* @param buf
* Has the request header and the request param and optionally
* encoded data buffer all in this one array.
* @throws IOException
* @throws InterruptedException
*/
protected void processRequest(ByteBuff buf) throws IOException,
InterruptedException {
...

if (!this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) {
this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize());
this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
"Call queue is full on " + this.rpcServer.server.getServerName() +
", too many items queued ?");
call.sendResponseIfReady();
}
}
得知Server端通过executeRpcCall()方法执行RPC远程调用CallRunner消费次数超过"hbase.ipc.server.max.callqueue.length"配置值就会引起"too many items queued"异常:
protected boolean executeRpcCall(final ThreadPoolExecutor executor, final AtomicInteger queueSize,
final CallRunner task) {
// Executors provide no offer, so make our own.
int queued = queueSize.getAndIncrement();
if (maxQueueLength > 0 && queued >= maxQueueLength) {
queueSize.decrementAndGet();
return false;
}

executor.execute(new FifoCallRunner(task){
@Override
public void run() {
task.setStatus(RpcServer.getStatus());
task.run();
queueSize.decrementAndGet();
}
});

return true;
}
public FifoRpcScheduler(Configuration conf, int handlerCount) {
this.handlerCount = handlerCount;
this.maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
}

public static final String IPC_SERVER_MAX_CALLQUEUE_LENGTH =
"hbase.ipc.server.max.callqueue.length";
RpcExecutor.png

要回复问题请先登录注册


中国HBase技术社区微信公众号:
hbasegroup

欢迎加入HBase生态+Spark社区钉钉大群