go-hbase的Scan模型源码分析

简介:

git地址在这里:
https://github.com/Lazyshot/go-hbase

这是一个使用go操作hbase的行为。

分析scan行为

如何使用scan看下面这个例子,伪代码如下:

func scan(phone string, start time.Time, end time.Time) ([]Loc, error) {
     ...

     client := hbase.NewClient(zks, "/hbase")
     client.SetLogLevel("DEBUG")

     scan := client.Scan(table)

     scan.StartRow = []byte(phone + strconv.Itoa(int(end.Unix())))
     scan.StopRow = []byte(phone + strconv.Itoa(int(start.Unix())))

     var locs []Loc
     scan.Map(func(ret *hbase.ResultRow) {
          var loc Loc
          for _, v := range ret.Columns {
               switch v.ColumnName {
               case "lbs:phone":
                    loc.Phone = v.Value.String()
               case "lbs:lat":
                    loc.Lat = v.Value.String()
               ...
               }
          }
          locs = append(locs, loc)
     })
     return locs, nil
}

首先是NewClient, 返回的结构是hbase.Client, 这个结构代表的是与hbase服务端交互的客户端实体。

这里没有什么好看的,倒是有一点要注意,在NewClient的时候,里面的zkRootReginPath是写死的,就是说hbase在zk中的地址是固定的。当然这个也是默认的。

func NewClient(zkHosts []string, zkRoot string) *Client {
     cl := &Client{
          zkHosts:          zkHosts,
          zkRoot:           zkRoot,
          zkRootRegionPath: "/meta-region-server",

          servers:               make(map[string]*connection),
          cachedRegionLocations: make(map[string]map[string]*regionInfo),
          prefetched:            make(map[string]bool),
          maxRetries:            max_action_retries,
     }

     cl.initZk()

     return cl
}

下面是client.Scan

client.Scan

返回的是

func newScan(table []byte, client *Client) *Scan {
     return &Scan{
          client:       client,
          table:        table,
          nextStartRow: nil,

          families:   make([][]byte, 0),
          qualifiers: make([][][]byte, 0),

          numCached: 100,
          closed:    false,

          timeRange: nil,
     }
}

scan结构:

type Scan struct {
     client *Client

     id    uint64
     table []byte

     StartRow []byte
     StopRow  []byte

     families   [][]byte
     qualifiers [][][]byte

     nextStartRow []byte

     numCached int
     closed    bool

     //for filters
     timeRange *TimeRange

     location *regionInfo
     server   *connection
}

设置了开始位置,结束位置,就可以进行Map操作了。

func (s *Scan) Map(f func(*ResultRow)) {
     for {
          results := s.next()

          if results == nil {
               break
          }

          for _, v := range results {
               f(v)

               if s.closed {
                    return
               }
          }
     }
}

这个map的参数是一个函数f,没有返回值。框架的行为就是一个大循环,不断调用s.next(),注意,这里s.next返回回来的result可能是由多条,然后把这个多条数据每条进行一次实际的函数调用。结束循环有两个方法,一个是next中再也取不到数据(数据已经取完了)。还有一个是s.closed呗设置为true。

s.next()

func (s *Scan) next() []*ResultRow {
     startRow := s.nextStartRow
     if startRow == nil {
          startRow = s.StartRow
     }

     return s.getData(startRow)
}

这里其实是把startRow不断往前推进,但是每次从startRow获取多少数据呢?需要看getData

getData

最核心的流程如下:

func (s *Scan) getData(nextStart []byte) []*ResultRow {
     ...

     server, location := s.getServerAndLocation(s.table, nextStart)

     req := &proto.ScanRequest{
          Region: &proto.RegionSpecifier{
               Type:  proto.RegionSpecifier_REGION_NAME.Enum(),
               Value: []byte(location.name),
          },
          NumberOfRows: pb.Uint32(uint32(s.numCached)),
          Scan:         &proto.Scan{},
     }

    ...

     cl := newCall(req)
     server.call(cl)

    ...

     select {
     case msg := <-cl.responseCh:
          return s.processResponse(msg)
     }
}

这里看到有一个s.numCached, 我们猜测这个是用来指定一次call请求调用回多少条数据的。

看call函数

func newCall(request pb.Message) *call {
     var responseBuffer pb.Message
     var methodName string

     switch request.(type) {
     ...
     case *proto.ScanRequest:
          responseBuffer = &proto.ScanResponse{}
          methodName = "Scan"
     ...
     }

     return &call{
          methodName:     methodName,
          request:        request,
          responseBuffer: responseBuffer,
          responseCh:     make(chan pb.Message, 1),
     }
}
type call struct {
     id             uint32
     methodName     string
     request        pb.Message
     responseBuffer pb.Message
     responseCh     chan pb.Message
}

可以看出,这个call是一个有responseBuffer的实际调用者。

下面看server.Call

至于这里的server, 我们不看代码流程了,只需要知道最后他返回的是connection这么个结构

type connection struct {
     connstr string

     id   int
     name string

     socket net.Conn
     in     *inputStream

     calls  map[int]*call
     callId *atomicCounter

     isMaster bool
}

创建是使用函数newConnection调用

func newConnection(connstr string, isMaster bool) (*connection, error) {
     id := connectionIds.IncrAndGet()

     log.Debug("Connecting to server[id=%d] [%s]", id, connstr)

     socket, err := net.Dial("tcp", connstr)

     if err != nil {
          return nil, err
     }

     c := &connection{
          connstr: connstr,

          id:   id,
          name: fmt.Sprintf("connection(%s) id: %d", connstr, id),

          socket: socket,
          in:     newInputStream(socket),

          calls:  make(map[int]*call),
          callId: newAtomicCounter(),

          isMaster: isMaster,
     }

     err = c.init()
     if err != nil {
          return nil, err
     }

     log.Debug("Initiated connection [id=%d] [%s]", id, connstr)

     return c, nil
}

好,那么实际上就是调用connection.call(request *call)

func (c *connection) call(request *call) error {
     id := c.callId.IncrAndGet()
     rh := &proto.RequestHeader{
          CallId:       pb.Uint32(uint32(id)),
          MethodName:   pb.String(request.methodName),
          RequestParam: pb.Bool(true),
     }

     request.setid(uint32(id))

     bfrh := newOutputBuffer()
     err := bfrh.WritePBMessage(rh)
     ...

     bfr := newOutputBuffer()
     err = bfr.WritePBMessage(request.request)
    ...

     buf := newOutputBuffer()
     buf.writeDelimitedBuffers(bfrh, bfr)

     c.calls[id] = request
     n, err := c.socket.Write(buf.Bytes())

    ...
}

逻辑就是先把requestHeader压入,再压入request.request

call只是完成了请求转换成byte传输到hbase服务端,在什么地方进行消息回收呢?

回到NewConnection的方法,里面有个connection.init()

func (c *connection) init() error {

     err := c.writeHead()
     if err != nil {
          return err
     }

     err = c.writeConnectionHeader()
     if err != nil {
          return err
     }

     go c.processMessages()

     return nil
}

这里go c.processMessage()

func (c *connection) processMessages() {
     for {
          msgs := c.in.processData()
          if msgs == nil || len(msgs) == 0 || len(msgs[0]) == 0 {
               continue
          }

          var rh proto.ResponseHeader
          err := pb.Unmarshal(msgs[0], &rh)
          if err != nil {
               panic(err)
          }

          callId := rh.GetCallId()
          call, ok := c.calls[int(callId)]

          delete(c.calls, int(callId))

          exception := rh.GetException()
          if exception != nil {
               call.complete(fmt.Errorf("Exception returned: %s\n%s", exception.GetExceptionClassName(), exception.GetStackTrace()), nil)
          } else if len(msgs) == 2 {
               call.complete(nil, msgs[1])
          }
     }
}

这里将它简化下:

func (c *connection) processMessages() {
     for {
          msgs := c.in.processData()

        call.complete(nil, msgs[1])
     }
}

c.in.processData

是在input_stream.go中

func (in *inputStream) processData() [][]byte {

    nBytesExpecting, err := in.readInt32()
    
    ...

     if nBytesExpecting > 0 {
          buf, err := in.readN(nBytesExpecting)

          if err != nil && err == io.EOF {
               panic("Unexpected closed socket")
          }

          payloads := in.processMessage(buf)

          if len(payloads) > 0 {
               return payloads
          }
     }

     return nil
}

先读取出一个int值,这个int值判断后面还有多少个bytes,再将后面的bytes读取进入到buf中,进行input_stream的processMessage处理。

我们这里还看到并没有执行我们map中定义的匿名方法。只是把消息解析出来了而已。

call.complete

func (c *call) complete(err error, response []byte) {
     ...

     err2 := pb.Unmarshal(response, c.responseBuffer)
     ...

     c.responseCh <- c.responseBuffer
}

这个函数有用的也就这两句话把responseBuffer里面的内容通过管道传递给responseCh

这里就看到getData的时候,被堵塞的地方

     select {
     case msg := <-cl.responseCh:
          return s.processResponse(msg)
     }

那么这里就有把获取到的responseCh的消息进行processResponse处理。

func (s *Scan) processResponse(response pb.Message) []*ResultRow {
     ...
     results := res.GetResults()
     n := len(results)

    ...
     s.closeScan(s.server, s.location, s.id)

    ...

     tbr := make([]*ResultRow, n)
     for i, v := range results {
          tbr[i] = newResultRow(v)
     }

     return tbr
}

这个函数并没有什么特别的行为,只是进行ResultRow的组装。

好吧,这个包有个地方可以优化,这个go-hbase的scan的时候,numCached默认是100,这个对于hbase来说太小了,完全可以调整大点,到2000~10000之间,你会发现scan的性能提升杠杠的。


本文转自轩脉刃博客园博客,原文链接:http://www.cnblogs.com/yjf512/p/6076690.html,如需转载请自行联系原作者

相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
7月前
|
Go 调度 开发者
CSP模型与Goroutine调度的协同作用:构建高效并发的Go语言世界
【2月更文挑战第17天】在Go语言的并发编程中,CSP模型与Goroutine调度机制相互协同,共同构建了高效并发的运行环境。CSP模型通过通道(channel)实现了进程间的通信与同步,而Goroutine调度机制则确保了并发任务的合理调度与执行。本文将深入探讨CSP模型与Goroutine调度的协同作用,分析它们如何共同促进Go语言并发性能的提升。
|
7月前
|
Go 开发者
Go语言并发模型概览:CSP模型解析
【2月更文挑战第17天】Go语言以其强大的并发处理能力在编程领域崭露头角。其中,CSP(Communicating Sequential Processes)模型作为Go语言并发模型的核心之一,在并发编程中发挥着至关重要的作用。本文将深入解析CSP模型的基本原理及其在Go语言中的应用,帮助读者更好地理解Go语言的并发编程特性。
|
21天前
|
Go 开发工具
百炼-千问模型通过openai接口构建assistant 等 go语言
由于阿里百炼平台通义千问大模型没有完善的go语言兼容openapi示例,并且官方答复assistant是不兼容openapi sdk的。 实际使用中发现是能够支持的,所以自己写了一个demo test示例,给大家做一个参考。
|
1月前
|
Go 调度 开发者
Go语言的并发编程模型
【10月更文挑战第26天】Go语言的并发编程模型
18 1
|
1月前
|
安全 测试技术 Go
Go语言中的并发编程模型解析####
在当今的软件开发领域,高效的并发处理能力是提升系统性能的关键。本文深入探讨了Go语言独特的并发编程模型——goroutines和channels,通过实例解析其工作原理、优势及最佳实践,旨在为开发者提供实用的Go语言并发编程指南。 ####
|
2月前
|
负载均衡 安全 物联网
探索Go语言的并发编程模型及其在现代应用中的优势
【10月更文挑战第10天】探索Go语言的并发编程模型及其在现代应用中的优势
|
5月前
|
缓存 编译器 Go
开发与运维线程问题之Go语言的goroutine基于线程模型实现如何解决
开发与运维线程问题之Go语言的goroutine基于线程模型实现如何解决
60 3
|
6月前
|
Go 开发者
探索Go语言的并发编程模型
通过实例详细介绍了Go语言中的并发编程模型,包括goroutine、channel的基本使用和最佳实践。深入剖析如何利用Go的并发特性提高程序性能和效率,适用于初学者和有一定经验的开发者。
|
7月前
|
安全 Go 开发者
Golang深入浅出之-Go语言中的CSP模型:深入理解并发哲学
【5月更文挑战第2天】Go语言的并发编程基于CSP模型,强调通过通信共享内存。核心概念是goroutines(轻量级线程)和channels(用于goroutines间安全数据传输)。常见问题包括数据竞争、死锁和goroutine管理。避免策略包括使用同步原语、复用channel和控制并发。示例展示了如何使用channel和`sync.WaitGroup`避免死锁。理解并发原则和正确应用CSP模型是编写高效安全并发程序的关键。
181 7
|
7月前
|
安全 Go 开发者
Golang深入浅出之-Go语言中的CSP模型:深入理解并发哲学
【5月更文挑战第1天】Go语言基于CSP理论,借助goroutines和channels实现独特的并发模型。Goroutine是轻量级线程,通过`go`关键字启动,而channels提供安全的通信机制。文章讨论了数据竞争、死锁和goroutine泄漏等问题及其避免方法,并提供了一个生产者消费者模型的代码示例。理解CSP和妥善处理并发问题对于编写高效、可靠的Go程序至关重要。
170 2