Table Store实时数据通道服务Go SDK快速入门

简介: # Tunnel Service Go SDK ## 安装 * 下载源码包 ```bash go get github.com/aliyun/aliyun-tablestore-go-sdk/tunnel ``` * 安装依赖 * 可以在tunnel目录下使用dep安装依赖 * 安装[dep](https://github.

文档

安装

  • 下载源码包
go get github.com/aliyun/aliyun-tablestore-go-sdk/tunnel
  • 安装依赖

    1. 可以在tunnel目录下使用dep安装依赖
dep ensure -v
  1. 也可以直接使用go get安装依赖包:
go get -u go.uber.org/zap
go get github.com/cenkalti/backoff
go get github.com/golang/protobuf/proto
go get github.com/satori/go.uuid
go get github.com/stretchr/testify/assert
go get github.com/smartystreets/goconvey/convey
go get github.com/golang/mock/gomock
go get gopkg.in/natefinch/lumberjack.v2

快速开始

  • 初始化Tunnel client:
// endpoint是表格存储实例endpoint,如https://instance.cn-hangzhou.ots.aliyun.com
// instance为实例名称
// accessKeyId和accessKeySecret分别为访问表格存储服务的AccessKey的Id和Secret
tunnelClient := tunnel.NewTunnelClient(endpoint, instance,
   accessKeyId, accessKeySecret)
  • 创建新Tunnel:
req := &tunnel.CreateTunnelRequest{
   TableName:  "testTable",
   TunnelName: "testTunnel",
   Type:       tunnel.TunnelTypeBaseStream, //全量加增量类型Tunnel
}
resp, err := tunnelClient.CreateTunnel(req)
if err != nil {
   log.Fatal("create test tunnel failed", err)
}
log.Println("tunnel id is", resp.TunnelId)
  • 获取已有Tunnel信息:
req := &tunnel.DescribeTunnelRequest{
   TableName:  "testTable",
   TunnelName: "testTunnel",
}
resp, err := tunnelClient.DescribeTunnel(req)
if err != nil {
   log.Fatal("create test tunnel failed", err)
}
log.Println("tunnel id is", resp.Tunnel.TunnelId)
  • 注册callback,开始数据消费:
//用户定义消费callback函数
func exampleConsumeFunction(ctx *tunnel.ChannelContext, records []*tunnel.Record) error {
    fmt.Println("user-defined information", ctx.CustomValue)
    for _, rec := range records {
        fmt.Println("tunnel record detail:", rec.String())
    }
    fmt.Println("a round of records consumption finished")
    return nil
}
//配置callback到SimpleProcessFactory,配置消费端TunnelWorkerConfig
workConfig := &tunnel.TunnelWorkerConfig{
   ProcessorFactory: &tunnel.SimpleProcessFactory{
      CustomValue: "user custom interface{} value",
      ProcessFunc: exampleConsumeFunction,
   },
}
//使用TunnelDaemon持续消费指定tunnel
daemon := tunnel.NewTunnelDaemon(tunnelClient, tunnelId, workConfig)
log.Fatal(daemon.Run())
  • 删除Tunnel
req := &tunnel.DeleteTunnelRequest {
   TableName: "testTable",
   TunnelName: "testTunnel",
}
_, err := tunnelClient.DeleteTunnel(req)
if err != nil {
   log.Fatal("delete test tunnel failed", err)
}

配置项

  • tunnel client配置
    初始化tunnel client时可以通过NewTunnelClientWithConfig接口自定义客户端配置,使用不指定config初始化接口或者config为nil时会使用DefaultTunnelConfig:
var DefaultTunnelConfig = &TunnelConfig{
      //最大指数退避重试时间
      MaxRetryElapsedTime: 45 * time.Second,
      //HTTP请求超时时间
      RequestTimeout:      30 * time.Second,
      //http.DefaultTransport
      Transport:           http.DefaultTransport,
}
  • 数据消费worker配置
    TunnelWorkerConfig中包含了数据消费worker需要的配置,其中ProcessorFactory为必填项,其余字段不填将使用默认值,通常使用默认值即可:
type TunnelWorkerConfig struct {
   //worker同Tunnel服务的心跳超时时间,通常使用默认值即可
   HeartbeatTimeout  time.Duration
   //worker发送心跳的频率,通常使用默认值即可
   HeartbeatInterval time.Duration
   //tunnel下消费连接建立接口,通常使用默认值即可
   ChannelDialer     ChannelDialer

   //消费连接上具体处理器产生接口,通常使用callback函数初始化SimpleProcessFactory即可
   ProcessorFactory ChannelProcessorFactory

   //zap日志配置,默认值为DefaultLogConfig
   LogConfig      *zap.Config
   //zap日志轮转配置,默认值为DefaultSyncer
   LogWriteSyncer zapcore.WriteSyncer
}

其中的ProcessorFactory为用户注册消费callback函数以及其他信息的接口,建议使用SDK中自带SimpleProcessorFactory实现:

type SimpleProcessFactory struct {
   //用户自定义信息,会传递到ProcessFunc和ShutdownFunc中的ChannelContext参数中
   CustomValue interface{}

   //Worker记录checkpoint的间隔,CpInterval<=0时会使用DefaultCheckpointInterval
   CpInterval time.Duration

   //worker数据处理的同步调用callback,ProcessFunc返回error时worker会用本批数据退避重试ProcessFunc
   ProcessFunc  func(channelCtx *ChannelContext, records []*Record) error
   //worker退出时的同步调用callback
   ShutdownFunc func(channelCtx *ChannelContext)

   //日志配置,Logger为nil时会使用DefaultLogConfig初始化logger
   Logger *zap.Logger
}
  • 日志配置
    默认日志配置:
//DefaultLogConfig是TunnelWorkerConfig和SimpleProcessFactory使用的默认日志配置
var DefaultLogConfig = zap.Config{
   Level:       zap.NewAtomicLevelAt(zap.InfoLevel),
   Development: false,
   Sampling: &zap.SamplingConfig{
      Initial:    100,
      Thereafter: 100,
   },
   Encoding: "json",
   EncoderConfig: zapcore.EncoderConfig{
      TimeKey:        "ts",
      LevelKey:       "level",
      NameKey:        "logger",
      CallerKey:      "caller",
      MessageKey:     "msg",
      StacktraceKey:  "stacktrace",
      LineEnding:     zapcore.DefaultLineEnding,
      EncodeLevel:    zapcore.LowercaseLevelEncoder,
      EncodeTime:     zapcore.ISO8601TimeEncoder,
      EncodeDuration: zapcore.SecondsDurationEncoder,
      EncodeCaller:   zapcore.ShortCallerEncoder,
   },
}

日志轮转配置:

//DefaultSyncer是TunnelWorkerConfig和SimpleProcessFactory使用的默认日志轮转配置
var DefaultSyncer = zapcore.AddSync(&lumberjack.Logger{
   //日志文件路径
   Filename:   "tunnelClient.log",
   //最大日志文件大小
   MaxSize:    512, //MB
   //压缩轮转的日志文件数
   MaxBackups: 5,
   //轮转日志文件保留的最大天数
   MaxAge:     30, //days
   //是否压缩轮转日志文件
   Compress:   true,
})
相关实践学习
阿里云表格存储使用教程
表格存储(Table Store)是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务,根据99.99%的高可用以及11个9的数据可靠性的标准设计。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问。 产品详情:https://www.aliyun.com/product/ots
目录
相关文章
|
7月前
|
JSON 中间件 Go
Go 网络编程:HTTP服务与客户端开发
Go 语言的 `net/http` 包功能强大,可快速构建高并发 HTTP 服务。本文从创建简单 HTTP 服务入手,逐步讲解请求与响应对象、URL 参数处理、自定义路由、JSON 接口、静态文件服务、中间件编写及 HTTPS 配置等内容。通过示例代码展示如何使用 `http.HandleFunc`、`http.ServeMux`、`http.Client` 等工具实现常见功能,帮助开发者掌握构建高效 Web 应用的核心技能。
410 61
|
7月前
|
开发框架 安全 前端开发
Go Web开发框架实践:模板渲染与静态资源服务
Gin 是一个功能强大的 Go Web 框架,不仅适用于构建 API 服务,还支持 HTML 模板渲染和静态资源托管。它可以帮助开发者快速搭建中小型网站,并提供灵活的模板语法、自定义函数、静态文件映射等功能,同时兼容 Go 的 html/template 引擎,具备高效且安全的页面渲染能力。
|
7月前
|
开发框架 JSON 中间件
Go语言Web开发框架实践:使用 Gin 快速构建 Web 服务
Gin 是一个高效、轻量级的 Go 语言 Web 框架,支持中间件机制,非常适合开发 RESTful API。本文从安装到进阶技巧全面解析 Gin 的使用:快速入门示例(Hello Gin)、定义 RESTful 用户服务(增删改查接口实现),以及推荐实践如参数校验、中间件和路由分组等。通过对比标准库 `net/http`,Gin 提供更简洁灵活的开发体验。此外,还推荐了 GORM、Viper、Zap 等配合使用的工具库,助力高效开发。
|
10月前
|
存储 JSON Go
PHP 日志系统的最佳搭档:一个 Go 写的远程日志收集服务
为了不再 SSH 上去翻日志,我写了个 Go 小脚本,用来接收远程日志。PHP 负责记录日志,Go 负责存储和展示,按天存储、支持 API 访问、可远程管理,终于能第一时间知道项目炸了。
229 10
|
Go UED
Go Web服务中如何优雅平滑重启?
在生产环境中,服务升级时如何确保不中断当前请求并应用新代码是一个挑战。本文介绍了如何使用 Go 语言的 `endless` 包实现服务的优雅重启,确保在不停止服务的情况下完成无缝升级。通过示例代码和测试步骤,详细展示了 `endless` 包的工作原理和实际应用。
338 3
|
JSON Go UED
Go Web服务中如何优雅关机?
在构建 Web 服务时,优雅关机是一个关键的技术点,它确保服务关闭时所有正在处理的请求都能顺利完成。本文通过一个简单的 Go 语言示例,展示了如何使用 Gin 框架实现优雅关机。通过捕获系统信号和使用 `http.Server` 的 `Shutdown` 方法,我们可以在服务关闭前等待所有请求处理完毕,从而提升用户体验,避免数据丢失或不一致。
243 1
|
Kubernetes API 开发工具
【Azure Developer】通过SDK(for python)获取Azure服务生命周期信息
需要通过Python SDK获取Azure服务的一些通知信息,如:K8S版本需要更新到指定的版本,Azure服务的维护通知,服务处于不健康状态时的通知,及相关的操作建议等内容。
211 18
|
Go API 开发者
深入探讨:使用Go语言构建高性能RESTful API服务
在本文中,我们将探索Go语言在构建高效、可靠的RESTful API服务中的独特优势。通过实际案例分析,我们将展示Go如何通过其并发模型、简洁的语法和内置的http包,成为现代后端服务开发的有力工具。
|
算法 数据挖掘 数据库
表格存储低成本向量检索服务助力 AI 检索
本文阐述了阿里云表格存储(Tablestore)如何通过其向量检索服务应对大规模数据检索的需求,尤其是在成本、规模和召回率这三个关键挑战方面。
476 13
|
运维 监控 程序员
Go 服务自动收集线上问题现场
Go 服务自动收集线上问题现场