云HBase Thrift使用最佳实践

简介:   HBase原生只提供了JAVA API客户端,针对诸如python、php、c++等非java语言一般都是通过Thrift代理的方式访问HBase服务,本文从thrift架构、hbase thrift api使用以及如何监控thrift等几个方面详细介绍云HBase Thrift使用最佳实践;  Thrift是一种接口描述语言和二进制通讯协议,它被用来定义和创建跨语言的服务。

  HBase原生只提供了JAVA API客户端,针对诸如python、php、c++等非java语言一般都是通过Thrift代理的方式访问HBase服务,本文从thrift架构、hbase thrift api使用以及如何监控thrift等几个方面详细介绍云HBase Thrift使用最佳实践;
  Thrift是一种接口描述语言和二进制通讯协议,它被用来定义和创建跨语言的服务。通常被当作RPC框架来使用,最初是Facebook为了解决大规模的跨语言服务调用而设计开发出来的,Thrift架构图如下:
             1
  从上图可以看出Thrift采用了分层架构,概念非常清晰,便于大家理解使用,从下至上依次为传输层、协议层和处理层;

  • 传输层:对底层IO进行了简单的抽象,解决数据在网络中的传输问题,通过字节流的方式发送和接受Message,支持TSocket、TFileTransport、TZlibTransport、TBufferedTranspor、TFramedTransport等多种传输层协议。
  • 协议层:将底层的字节流转换成数据流,对字节流数据进行序列化和反序列化,协议层决定了到底被传输的数据是什么,支持包括TBinaryProtocol、TCompactProtocal、TCompactProtocal在内的多种协议;
  • 处理层:包含用户的完整逻辑,决定数据应该被如何进行处理;
  • TServer:Thrift同时还提供了多种服务端模型,TSimpleServer(单线程服务模型,常用于测试)、TThreadPoolServer(多线程服务模型,使用标准的阻塞式IO)、TNonblockingServer(多线程服务模型,使用非阻塞式IO,需同TFramedTransport配合使用)、THttpServer(支持HTTP协议)。

  了解完Thrift架构,我们通过一个简单的例子来了解Thrift是如何工作的。Thrift定义了一种接口描述语言IDL,IDL文件可以被Thrift代码生成期处理生成目标语言的代码,我们定义了一个简单的IDL文件tutorial.thrift文件,内容如下:

#cat tutorial.thrift
server Calculator{
  void ping(),
  i32 add(i:i32 num1, 2:i32 num2)
}

  文件中我们定义了一个名为calculator的service,包含两个接口ping和add,围绕这个文件我们分别实现客户端和服务端代码,在编写代码之前需要使用如下命令生成目标语言代码:

thrift -gen py(替换py为其他语言可以生成对应语言的) tutorial.thrift

  以python为例实现服务端代码:

import sys, glob
sys.path.append('gen-py')
sys.path.insert(0, glob.glob('../../lib/py/build/lib.*')[0])

from tutorial import Calculator
from tutorial.ttypes import *

from shared.ttypes import SharedStruct

from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from thrift.server import TServer

class CalculatorHandler:
  def __init__(self):
    self.log = {}

  def ping(self):
    print 'ping()'

  def add(self, n1, n2):
    print 'add(%d,%d)' % (n1, n2)
    return n1+n2

handler = CalculatorHandler()
processor = Calculator.Processor(handler)
transport = TSocket.TServerSocket(port=9090)
tfactory = TTransport.TBufferedTransportFactory()
pfactory = TBinaryProtocol.TBinaryProtocolFactory()

server = TServer.TSimpleServer(processor, transport, tfactory, pfactory)

# You could do one of these for a multithreaded server
#server = TServer.TThreadedServer(processor, transport, tfactory, pfactory)
#server = TServer.TThreadPoolServer(processor, transport, tfactory, pfactory)

print 'Starting the server...'
server.serve()
print 'done.'

  整个服务端需要我们实现的就是处理层,决定如果处理客户群传输过来的数据,对应CalculatorHandler实现了接口中定义的ping和add方法,分别输出"ping()"和实现整数加法,将handler作为参数生成Processor,传输层使用TServerSocket监听9090端口,传输层使用TBinaryProtocol同时使用Buffer进行读写(用TBufferedTransport进行装饰),可以选用不同的Server模型,测试使用TSimpleServer,将传输层、协议层、处理层以及Server组合到一起就完成全部服务端开发工作;在看下客户端实现:

import sys, glob
sys.path.append('gen-py')
sys.path.insert(0, glob.glob('../../lib/py/build/lib.*')[0])

from tutorial import Calculator
from tutorial.ttypes import *

from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

try:

  # Make socket
  transport = TSocket.TSocket('localhost', 9090)

  # Buffering is critical. Raw sockets are very slow
  transport = TTransport.TBufferedTransport(transport)

  # Wrap in a protocol
  protocol = TBinaryProtocol.TBinaryProtocol(transport)

  # Create a client to use the protocol encoder
  client = Calculator.Client(protocol)

  # Connect!
  transport.open()

  client.ping()
  print 'ping()'
  
  sum = client.add(1,1)
  print '1+1=%d' % (sum)

  # Close!
  transport.close()

except Thrift.TException, tx:
  print '%s' % (tx.message)

  客户端的实现也很简单传输层使用TSocket访问本地9090端口同时使用buffer进行读写(TBufferedTransport进行装饰),协议层使用TBinaryProtocol,初始化Calculator.Client(根据IDL自动生成代码),我就可以使用client进行方法的调用;
  上面这个例子完整的说明了如何通过Thrift实现自己的业务逻辑,通过Thrift访问HBase同上面本质上是完全一样的,只不过HBase提供了更多的接口而已,HBase提供了相应的IDL文件Hbase.thrift,通过该文件可以生成对应语言的客户端代码,还是已python为例实现基本的CURD操作,代码如下:

from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

# hbase 客户端代码是由 thrift -gen py hbase.thrift 生成,拷贝到工程目录下
from hbase.THBaseService import Client
from hbase.ttypes import TScan, TColumn, TGet, TPut, TColumnValue, TableDescriptor, ColumnDescriptor, TDelete

class HbaseClient(object):
    def __init__(self, host='hbasetest02.et2sqa.tbsite.net', port=9090):
        self.transport = TTransport.TBufferedTransport(TSocket.TSocket(host, port))
        protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
        self.client = Client(protocol)

    def list_tables(self):
        self.transport.open()
        tables = self.client.listTables()
        self.transport.close()
        return tables

    def scan_data(self, table, startRow, stopRow, limit, columns=None):
        self.transport.open()
        if columns:
            columns = [TColumn(*i.split(':')) for i in columns]
        scan = TScan(startRow=startRow, stopRow=stopRow, columns=columns)
        rows = self.client.scan(table, scan, limit)
        self.transport.close()
        return rows

    def get(self, table, row=None, columns=None):
        self.transport.open()
        if columns:
            columns = [TColumn(*i.split(':')) for i in columns]
        get = TGet(row=row, columns=columns)
        res = self.client.get(table, get)
        self.transport.close()
        return res

    def put(self, table, row, columnValues):
        self.transport.open()
        put = TPut(row, columnValues)
        self.client.put(table, put)
        self.transport.close()

    def table_exists(self, table):
        self.transport.open()
        res = self.client.tableExists(table)
        self.transport.close()
        return res

    def delete_single(self, table, delete):
        self.transport.open()
        self.client.deleteSingle(table, delete)
        self.transport.close()

    def create_table(self, tableDescriptor, splitKeys):
        self.transport.open()
        self.client.createTable(tableDescriptor, splitKeys)
        self.transport.close()

    def enable_table(self, table):
        self.transport.open()
        self.client.enableTable(table)
        self.transport.close()

    def disable_table(self, table):
        self.transport.open()
        self.client.disableTable(table)
        self.transport.close()


if __name__ == '__main__':
    client = HbaseClient()
    print client.list_tables()

    print client.scan_data('bear_test', startRow='row1', stopRow='row5', limit=10, columns=['f1:name'])
    print client.get('bear_test', 'row1', ['f1:age'])

    columnValues = [TColumnValue('f1', 'name', 'bear3')]
    client.put('bear_test', 'row4', columnValues)

    print client.table_exists('bear_test')

    delete = TDelete('row4')
    client.delete_single('bear_test', delete)
    print client.scan_data('bear_test', startRow='row1', stopRow='row5', limit=10, columns=['f1:name'])

    tableDescriptor = TableDescriptor('bear_test1', families=[ColumnDescriptor(name='f1', maxVersions=1)])
    client.create_table(tableDescriptor, None)
    print client.table_exists('bear_test1')

  对于上述例子介绍下几个关键的点,HBase Thrift Server默认使用TThreadPoolServer服务模型(在并发数可控的情况下使用该服务模型性能最好),相应的客户端传输层使用TTransport.TBufferedTransport(TSocket.TSocket(host, port))以及协议层TBinaryProtocol同服务端进行通信;同时另外一个需要注意的点就是Client本身并不是线程安全的,每一个线程要使用单独的Client;
  阿里云HBase本身提供了高可用版本的ThriftServer,通过负载均衡将流量分散到不同的Thrift Server上,最大程度上保证服务的高可用和高性能,使用户可以不必关系资源、部署以及运维,将精力都放在业务逻辑的实现上,云HBase的连接地址以及使用可以参考(https://help.aliyun.com/document_detail/87068.html);学会了如何使用客户端,同时云HBase Thrift Server免运维,那么唯一需要关心的就是监控了,每一个云HBase实例多部署了ganglia监控,可以在云HBase实例-》数据库连接-》UI访问中找到:
2
  点击Ganglia进入到主页,在主页最下面可以找到hbase_cluster。
3
  点击hbase_cluster进入到hbase监控详情:
  详情页的最上面为时间设置,可以选择hour、2hr、4hr等不同的时间纬度,也可以在from、to中设置自定义时间
4
  详情页的下面为指标选择:
5
  thrift相关的指标都是以thrift-one和thrift-two开头的,分别对应thrift1和thrift2,thrift默认使用TThreadPoolServer服务模型,使用thrift主要关注3种监控指标:

  • thrift-one.Thrift.numActiveWorkers:表示当前活跃的工作线程,默认的最大线程数为1000,通过该指标可以判断当前thrift server的负载情况
  • thrift-one.Thrift.callQueueLen:表示请求的队列大小,默认队列长度为1000,该指标同thrift-one.Thrift.numActiveWorkers一同反应thrift server的负载情况
  • thrift-one.Thrift.方法名_开头的各种指标:表示对应方法的thrift server耗时,以batchGet方法为例thrift-one.Thrift.batchGet_max、thrift-one.Thrift.batchGet_mean以及thrift-one.Thrift.batchGet_min分别表示batchGet的最大、平均以及最小耗时。
相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
  相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情: https://cn.aliyun.com/product/hbase   ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
SQL 分布式计算 Java
Hbase入门(五)——客户端(Java,Shell,Thrift,Rest,MR,WebUI)
Hbase的客户端有原生java客户端,Hbase Shell,Thrift,Rest,Mapreduce,WebUI等等。 下面是这几种客户端的常见用法。
1632 0
Hbase入门(五)——客户端(Java,Shell,Thrift,Rest,MR,WebUI)
|
SQL 搜索推荐 Java
「从零单排HBase 12」HBase二级索引Phoenix使用与最佳实践
「从零单排HBase 12」HBase二级索引Phoenix使用与最佳实践
552 0
「从零单排HBase 12」HBase二级索引Phoenix使用与最佳实践
|
存储 缓存 算法
生产环境使用HBase,你必须知道的最佳实践
生产环境使用HBase,你必须知道的最佳实践
285 0
|
存储 缓存 算法
生产环境使用HBase,你必须知道的最佳实践
前面,我们已经打下了很多关于HBase的理论基础,今天,我们主要聊聊在实际开发使用HBase中,需要关注的一些最佳实践经验。 1.Schema设计七大原则 1)每个region的大小应该控制在10G到50G之间; 2)一个表最好保持在 50到100个 region的规模; 3)每个cell最大不应该超过10MB,如果超过,应该有些考虑业务拆分,如果实在无法拆分,那就只能使用mob; 4)跟传统的关系型数据库不同,一个HBase的表中列族最多不超过3个,列族中的列可以动态添加的,不要设计过多列族; 5)列族名必须尽量短,因为我们知道在存储的时候,每个keyvalue都会包含列族名;
217 0
|
监控 分布式数据库 API
HBase TB级数据规模不停机迁移最佳实践
有关HBase集群如何做不停服的数据迁移一直都是云HBase被问的比较多的一个问题,目前有许多开源的工具或者HBase本身集成的方案在性能、稳定性、使用体验上都不是很好,因此阿里云提供了BDS迁移服务,可以帮助云上客户实现TB级数据规模不停机迁移
6017 0
|
存储 NoSQL 分布式数据库
案例篇-HBase 在滴滴出行的应用场景和最佳实践
李扬 滴滴出行 资深软件开发工程师
2628 0
|
存储 缓存 Java
HBase最佳实践-读性能优化策略
任何系统都会有各种各样的问题,有些是系统本身设计问题,有些却是使用姿势问题。HBase也一样,在真实生产线上大家或多或少都会遇到很多问题,有些是HBase还需要完善的,有些是我们确实对它了解太少。总结起来,大家遇到的主要问题无非是Full GC异常导致宕机问题、RIT问题、写吞吐量太低以及读延迟较大。
2149 0
|
机器学习/深度学习 SQL 分布式计算
云HBase Phoenix索引构建最佳实践
介绍三种的不同的索引构建方法及其适用场景
2668 0
|
存储 缓存 Java
技术篇-HBase 最佳实践-读性能优化策略
任何系统都会有各种各样的问题,有些是系统本身设计问题,有些却是使用姿势问题。HBase也一样,在真实生产线上大家或多或少都会遇到很多问题,有些是 HBase 还需要完善的,有些是我们确实对它了解太少。总结起来,大家遇到的主要问题无非是 Full GC 异常导致宕机问题、RIT 问题、写吞吐量太低以及读延迟较大。
2482 0