连接HBase的正确姿势

在云HBase值班的时候,经常会遇见有用户咨询诸如“HBase是否支持连接池?”这样的问题,也有用户因为应用中创建的Connection对象过多,触发Zookeeper的连接数限制,导致客户端连不上的。究其原因,都是因为对HBase客户端的原理不了解造成的。本文介绍HBase客户端的Connection对象与Socket连接的关系并且给出Connection的正确用法。
Connection是什么?
在云HBase用户中,常见的使用Connection的错误方法有:
  1. 自己实现一个Connection对象的资源池,每次使用都从资源池中取出一个Connection对象;
  2. 每个线程一个Connection对象。
  3. 每次访问HBase的时候临时创建一个Connection对象,使用完之后调用close关闭连接。

从这些做法来看,这些用户显然是把Connection对象当成了单机数据库里面的连接对象来用了。然而作为分布式数据库,HBase客户端需要和多个服务器中的不同服务角色建立连接,所以HBase客户端中的Connection对象并不是简单对应一个socket连接。HBase的API文档当中对Connection的定义是:
A cluster connection encapsulating lower level individual connections to actual servers and a connection to zookeeper.
我们知道,HBase客户端要连接三个不同的服务角色:
  1. Zookeeper:主要用于获得meta-region位置,集群Id、master等信息。
  2. HBase Master:主要用于执行HBaseAdmin接口的一些操作,例如建表等。
  3. HBase RegionServer:用于读、写数据。

下图简单示意了客户端与服务器交互的步骤:
客户端与服务器交互的步骤.png
HBase客户端的Connection包含了对以上三种Socket连接的封装。Connection对象和实际的Socket连接之间的对应关系如下图:
Connection对象与Socket连接之间的对应关系.png
HBase客户端代码真正对应Socket连接的是RpcConnection对象。HBase使用PoolMap这种数据结构来存储客户端到HBase服务器之间的连接。PoolMap封装ConcurrentHashMap的结构,key是ConnectionId(封装服务器地址和用户ticket),value是一个RpcConnection对象的资源池。当HBase需要连接一个服务器时,首先会根据ConnectionId找到对应的连接池,然后从连接池中取出一个连接对象。
HBase提供三种资源池的实现,分别是Reusable,RoundRobin和ThreadLocal。具体实现可以通过hbase.client.ipc.pool.type配置项指定,默认为Reusable。连接池的大小也可以通过hbase.client.ipc.pool.size配置项指定,默认为1。
连接HBase的正确姿势
从以上分析不难得出,在HBase中Connection类已经实现对连接的管理功能,所以不需要在Connection之上再做额外的管理。另外,Connection是线程安全的,然而Table和Admin则不是线程安全的,因此正确的做法是一个进程共用一个Connection对象,而在不同的线程中使用单独的Table和Admin对象。
//所有进程共用一个Connection对象
connection=ConnectionFactory.createConnection(config);
...
//每个线程使用单独的Table对象
Table table = connection.getTable(TableName.valueOf("test"));
try {
...
} finally {
table.close();
}
HBase客户端默认的是连接池大小是1,也就是每个RegionServer 1个连接。如果应用需要使用更大的连接池或指定其他的资源池类型,也可以通过修改配置实现: 
Connection源码解析
Connection创建RpcClient的核心入口:
/**
* constructor
* @param conf Configuration object
*/
ConnectionImplementation(Configuration conf,
ExecutorService pool, User user) throws IOException {
...
try {
...
this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
...
} catch (Throwable e) {
// avoid leaks: registry, rpcClient, ...
LOG.debug("connection construction failed", e);
close();
throw e;
}
}
RpcClient使用PoolMap数据结构存储客户端到HBase服务器之间的连接映射,PoolMap封装ConcurrentHashMap结构,其中key是ConnectionId[new ConnectionId(ticket, md.getService().getName(), addr)],value是RpcConnection对象的资源池。
protected final PoolMap<ConnectionId, T> connections;

/**
* Construct an IPC client for the cluster <code>clusterId</code>
* @param conf configuration
* @param clusterId the cluster id
* @param localAddr client socket bind address.
* @param metrics the connection metrics
*/
public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
MetricsConnection metrics) {
...
this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf));
...
}
当HBase需要连接一个服务器时,首先会根据ConnectionId找到对应的连接池,然后从连接池中取出一个连接对象,获取连接的核心实现:
/**
* Get a connection from the pool, or create a new one and add it to the pool. Connections to a
* given host/port are reused.
*/
private T getConnection(ConnectionId remoteId) throws IOException {
if (failedServers.isFailedServer(remoteId.getAddress())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Not trying to connect to " + remoteId.address
+ " this server is in the failed servers list");
}
throw new FailedServerException(
"This server is in the failed servers list: " + remoteId.address);
}
T conn;
synchronized (connections) {
if (!running) {
throw new StoppedRpcClientException();
}
conn = connections.get(remoteId);
if (conn == null) {
conn = createConnection(remoteId);
connections.put(remoteId, conn);
}
conn.setLastTouched(EnvironmentEdgeManager.currentTime());
}
return conn;
}
连接池根据ConnectionId获取不到连接则创建RpcConnection的具体实现:
protected NettyRpcConnection createConnection(ConnectionId remoteId) throws IOException {
return new NettyRpcConnection(this, remoteId);
}

NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException {
super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId,
rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor);
this.rpcClient = rpcClient;
byte connectionHeaderPreamble = getConnectionHeaderPreamble();
this.connectionHeaderPreamble =
Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble);
ConnectionHeader header = getConnectionHeader();
this.connectionHeaderWithLength = Unpooled.directBuffer(4 + header.getSerializedSize());
this.connectionHeaderWithLength.writeInt(header.getSerializedSize());
header.writeTo(new ByteBufOutputStream(this.connectionHeaderWithLength));
}

protected RpcConnection(Configuration conf, HashedWheelTimer timeoutTimer, ConnectionId remoteId,
String clusterId, boolean isSecurityEnabled, Codec codec, CompressionCodec compressor)
throws IOException {
if (remoteId.getAddress().isUnresolved()) {
throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName());
}
this.timeoutTimer = timeoutTimer;
this.codec = codec;
this.compressor = compressor;
this.conf = conf;

UserGroupInformation ticket = remoteId.getTicket().getUGI();
SecurityInfo securityInfo = SecurityInfo.getInfo(remoteId.getServiceName());
this.useSasl = isSecurityEnabled;
Token<? extends TokenIdentifier> token = null;
String serverPrincipal = null;
if (useSasl && securityInfo != null) {
AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
if (tokenKind != null) {
TokenSelector<? extends TokenIdentifier> tokenSelector = AbstractRpcClient.TOKEN_HANDLERS
.get(tokenKind);
if (tokenSelector != null) {
token = tokenSelector.selectToken(new Text(clusterId), ticket.getTokens());
} else if (LOG.isDebugEnabled()) {
LOG.debug("No token selector found for type " + tokenKind);
}
}
String serverKey = securityInfo.getServerPrincipal();
if (serverKey == null) {
throw new IOException("Can't obtain server Kerberos config key from SecurityInfo");
}
serverPrincipal = SecurityUtil.getServerPrincipal(conf.get(serverKey),
remoteId.address.getAddress().getCanonicalHostName().toLowerCase());
if (LOG.isDebugEnabled()) {
LOG.debug("RPC Server Kerberos principal name for service=" + remoteId.getServiceName()
+ " is " + serverPrincipal);
}
}
this.token = token;
this.serverPrincipal = serverPrincipal;
if (!useSasl) {
authMethod = AuthMethod.SIMPLE;
} else if (token != null) {
authMethod = AuthMethod.DIGEST;
} else {
authMethod = AuthMethod.KERBEROS;
}

// Log if debug AND non-default auth, else if trace enabled.
// No point logging obvious.
if ((LOG.isDebugEnabled() && !authMethod.equals(AuthMethod.SIMPLE)) ||
LOG.isTraceEnabled()) {
// Only log if not default auth.
LOG.debug("Use " + authMethod + " authentication for service " + remoteId.serviceName
+ ", sasl=" + useSasl);
}
reloginMaxBackoff = conf.getInt("hbase.security.relogin.maxbackoff", 5000);
this.remoteId = remoteId;
}

1 个评论

此篇文章按照毕杰山提供的建议补充说明:
1.源码解析基于HBase 2.1版本分支的;
2. 一个Connection是关于一个集群连接资源的抽象,主要是Socket资源与线程资源,这部分资源在一个Connection级别是共享的,这使得创建Admin/Table的操作是一个非常轻量级的操作。
3. ConnectionId直接关联Socket资源。
4. 关于Admin实例不建议常驻,按需创建,用完即刻关闭。在老版本中,这一点建议是在HBaseAdmin的类文件注释中显式说明了,2.0版本没有。但这里依然是建议这样做的,因为Admin操作不是一个很高频的操作。

要回复文章请先登录注册


中国HBase技术社区微信公众号:
hbasegroup

欢迎加入HBase生态+Spark社区钉钉大群