使用GO语言通过Stream Load实现Doris数据导入

简介: 使用GO语言通过Stream Load实现Doris数据导入。

本文使用的GO是1.17.2


Doris 0.15.0 release版


Doris的数据导入有各种语言的版本,但是GO语言版本的基本见不到,简单学了一下,写了一个简单的Stream Load入库的示例,仅供参考


示例中使用的表结构:

CREATE TABLE IF NOT EXISTS user_info
(
    user_id LARGEINT NOT NULL COMMENT "用户id",
    username varchar(50) NOT NULL COMMENT "用户名",
    city VARCHAR(20) COMMENT "用户所在城市",
    age SMALLINT COMMENT "用户年龄",
    sex TINYINT COMMENT "用户性别",
    phone LARGEINT COMMENT "电话",
    address VARCHAR(500) COMMENT "地址",
    register_time datetime COMMENT "用户注册时间"
)
Unique KEY(user_id, username)
DISTRIBUTED BY HASH(user_id) BUCKETS 3
PROPERTIES (
"replication_num" = "3"
);

下面是GO的示例代码,其中支持从文件导入,从内存数据导入,同时提供了获取BE节点列表的方法,你在导入的时候可以从这里随机获取一个BE节点IP及端口,直连BE进行导入


package main
import (
   "container/list"
   "encoding/base64"
   "encoding/json"
   "fmt"
   "github.com/gofrs/uuid"
   "io/ioutil"
   "log"
   "net/http"
   "strconv"
   "strings"
)
type StreamLoad struct {
   url       string
   dbName    string
   tableName string
   data      string
   userName  string
   password  string
}
//实现Doris用户认证信息
func auth(load StreamLoad) string {
   s := load.userName + ":" + load.password
   b := []byte(s)
   sEnc := base64.StdEncoding.EncodeToString(b)
   fmt.Printf("enc=[%s]\n", sEnc)
   sDec, err := base64.StdEncoding.DecodeString(sEnc)
   if err != nil {
      fmt.Printf("base64 decode failure, error=[%v]\n", err)
   } else {
      fmt.Printf("dec=[%s]\n", sDec)
   }
   return sEnc
}
//使用Stream load将文件数据导入到Doris对应的数据表中
func batch_load_file(load StreamLoad, file string) {
   client := &http.Client{}
   //生成要访问的url
   url := "http://10.220.146.10:8030/api/test_2/user_info/_stream_load"
   //fmt.Formatter(.Format(url,load.dbName,l))
   fileContext, err := ioutil.ReadFile(file)
   if err != nil {
      log.Println("Failed to Read the File", file, err)
   }
   record := strings.NewReader(string(fileContext))
   //提交请求
   reqest, err := http.NewRequest(http.MethodPut, url, record)
   //增加header选项
   reqest.Header.Add("Authorization", "basic "+auth(load))
   reqest.Header.Add("EXPECT", "100-continue")
   var u1 = uuid.Must(uuid.NewV4())
   reqest.Header.Add("label", u1.String())
   reqest.Header.Add("column_separator", ",")
   if err != nil {
      panic(err)
   }
   //处理返回结果
   response, _ := client.Do(reqest)
   if response.StatusCode == 200 {
      body, _ := ioutil.ReadAll(response.Body)
      responseBody := ResponseBody{}
      jsonStr := string(body)
      err := json.Unmarshal([]byte(jsonStr), &responseBody)
      if err != nil {
         fmt.Println(err.Error())
      }
      if responseBody.Status == "Success" {
         //如果有被过滤的数据,打印错误的URL
         if responseBody.NumberFilteredRows > 0 {
            fmt.Printf("Error Data : %s ", responseBody.ErrorURL)
         } else {
            fmt.Printf("Success import data : %d", responseBody.NumberLoadedRows)
         }
      }
      fmt.Println(string(body))
   }
   defer response.Body.Close()
}
//内存流数据,通过Stream Load导入Doris表中
func batch_load_data(load StreamLoad, data string) {
   client := &http.Client{}
   //生成要访问的url
   url := "http://10.220.146.10:8030/api/test_2/user_info/_stream_load"
   //fmt.Formatter(.Format(url,load.dbName,l))
   record := strings.NewReader(data)
   //提交请求
   reqest, err := http.NewRequest(http.MethodPut, url, record)
   //增加header选项
   reqest.Header.Add("Authorization", "basic "+auth(load))
   reqest.Header.Add("EXPECT", "100-continue")
   var u1 = uuid.Must(uuid.NewV4())
   reqest.Header.Add("label", u1.String())
   reqest.Header.Add("column_separator", ",")
   if err != nil {
      panic(err)
   }
   //处理返回结果
   response, _ := client.Do(reqest)
   if response.StatusCode == 200 {
      body, _ := ioutil.ReadAll(response.Body)
      responseBody := ResponseBody{}
      jsonStr := string(body)
      err := json.Unmarshal([]byte(jsonStr), &responseBody)
      if err != nil {
         fmt.Println(err.Error())
      }
      if responseBody.Status == "Success" {
         //如果有被过滤的数据,打印错误的URL
         if responseBody.NumberFilteredRows > 0 {
            fmt.Printf("Error Data : %s ", responseBody.ErrorURL)
         } else {
            fmt.Printf("Success import data : %d", responseBody.NumberLoadedRows)
         }
      } else {
         fmt.Printf("Error Message : %s \n", responseBody.Message)
         fmt.Printf("Error Data : %s ", responseBody.ErrorURL)
      }
      //fmt.Println(jsonStr)
   }
   defer response.Body.Close()
}
//获取BE列表
func get_doris_be_list() *list.List {
   var load StreamLoad
   load.userName = "root"
   load.password = ""
   client := &http.Client{}
   //生成要访问的url
   url := "http://10.220.146.10:8030/api/backends?is_alive=true"
   //提交请求
   reqest, err := http.NewRequest("GET", url, nil)
   //增加header选项
   reqest.Header.Add("Authorization", "basic "+auth(load))
   if err != nil {
      panic(err)
   }
   //处理返回结果
   response, _ := client.Do(reqest)
   bes := list.New()
   if response.StatusCode == 200 {
      body, _ := ioutil.ReadAll(response.Body)
      backends := Backend{}
      jsonStr := string(body)
      err := json.Unmarshal([]byte(jsonStr), &backends)
      if err != nil {
         fmt.Println(err.Error())
      }
      for _, beinfo := range backends.Data.Backends {
         be := beinfo.IP + ":" + strconv.Itoa(beinfo.HTTPPort)
         bes.PushBack(be)
      }
   }
   defer response.Body.Close()
   return bes
}
//Stream load返回消息结构体
type ResponseBody struct {
   TxnID                  int    `json:"TxnId"`
   Label                  string `json:"Label"`
   Status                 string `json:"Status"`
   Message                string `json:"Message"`
   NumberTotalRows        int    `json:"NumberTotalRows"`
   NumberLoadedRows       int    `json:"NumberLoadedRows"`
   NumberFilteredRows     int    `json:"NumberFilteredRows"`
   NumberUnselectedRows   int    `json:"NumberUnselectedRows"`
   LoadBytes              int    `json:"LoadBytes"`
   LoadTimeMs             int    `json:"LoadTimeMs"`
   BeginTxnTimeMs         int    `json:"BeginTxnTimeMs"`
   StreamLoadPutTimeMs    int    `json:"StreamLoadPutTimeMs"`
   ReadDataTimeMs         int    `json:"ReadDataTimeMs"`
   WriteDataTimeMs        int    `json:"WriteDataTimeMs"`
   CommitAndPublishTimeMs int    `json:"CommitAndPublishTimeMs"`
   ErrorURL               string `json:"ErrorURL"`
}
//获取BE列表返回结构体
type Backend struct {
   Msg  string `json:"msg"`
   Code int    `json:"code"`
   Data struct {
      Backends []struct {
         IP       string `json:"ip"`
         HTTPPort int    `json:"http_port"`
         IsAlive  bool   `json:"is_alive"`
      } `json:"backends"`
   } `json:"data"`
   Count int `json:"count"`
}
func main() {
   var load StreamLoad
   load.userName = "root"
   load.password = ""
   //auth_info := auth(load)
   //fmt.Println(auth_info)
   //backends := get_doris_be_list()
   //for e := backends.Front(); e != nil; e = e.Next() {
   // fmt.Println(e.Value)
   //}
   data := "10001,张***,西安,30,1,133****760,陕西省**********,2021-03-12 12:34:12"
   batch_load_data(load, data)
   //batch_load_file(/load, "/Users/zhangfeng/Downloads/test.csv")
}




目录
相关文章
|
6天前
|
监控 算法 Go
Golang深入浅出之-Go语言中的服务熔断、降级与限流策略
【5月更文挑战第4天】本文探讨了分布式系统中保障稳定性的重要策略:服务熔断、降级和限流。服务熔断通过快速失败和暂停故障服务调用来保护系统;服务降级在压力大时提供有限功能以保持整体可用性;限流控制访问频率,防止过载。文中列举了常见问题、解决方案,并提供了Go语言实现示例。合理应用这些策略能增强系统韧性和可用性。
30 0
|
22小时前
|
存储 缓存 Java
来聊聊go语言的hashMap
本文介绍了Go语言中的`map`与Java的不同设计思想。作者`sharkChili`是一名Java和Go开发者,同时也是CSDN博客专家及JavaGuide项目的维护者。文章探讨了Go语言`map`的数据结构,包括`count`、`buckets指针`和`bmap`,解释了键值对的存储方式,如何利用内存对齐优化空间使用,并展示了`map`的初始化、插入键值对以及查找数据的源码过程。此外,作者还分享了如何通过汇编查看`map`操作,并鼓励读者深入研究Go的哈希冲突解决和源码。最后,作者提供了一个交流群,供读者讨论相关话题。
9 0
|
1天前
|
Java Go
Go语言学习11-数据初始化
【5月更文挑战第3天】本篇带大家通过内建函数 new 和 make 了解Go语言的数据初始化过程
15 1
Go语言学习11-数据初始化
|
2天前
|
自然语言处理 安全 Java
速通Go语言编译过程
Go语言编译过程详解:从词法分析(生成token)到句法分析(构建语法树),再到语义分析(类型检查、推断、匹配及函数内联)、生成中间码(SSA)和汇编码。最后,通过链接生成可执行文件。作者sharkchili,CSDN Java博客专家,分享技术细节,邀请读者加入交流群。
20 2
|
2天前
|
Java Linux Go
一文带你速通Go语言基础语法
本文是关于Go语言的入门介绍,作者因其简洁高效的特性对Go语言情有独钟。文章首先概述了Go语言的优势,包括快速上手、并发编程简单、设计简洁且功能强大,以及丰富的标准库。接着,文章通过示例展示了如何编写和运行Go代码,包括声明包、导入包和输出语句。此外,还介绍了Go的语法基础,如变量类型(数字、字符串、布尔和复数)、变量赋值、类型转换和默认值。文章还涉及条件分支(if和switch)和循环结构(for)。最后,简要提到了Go函数的定义和多返回值特性,以及一些常见的Go命令。作者计划在后续文章中进一步探讨Go语言的其他方面。
9 0
|
3天前
|
JavaScript 前端开发 Go
Go语言的入门学习
【4月更文挑战第7天】Go语言,通常称为Golang,是由Google设计并开发的一种编程语言,它于2009年公开发布。Go的设计团队主要包括Robert Griesemer、Rob Pike和Ken Thompson,这三位都是计算机科学和软件工程领域的杰出人物。
10 1
|
3天前
|
Go
|
4天前
|
分布式计算 Java Go
Golang深入浅出之-Go语言中的分布式计算框架Apache Beam
【5月更文挑战第6天】Apache Beam是一个统一的编程模型,适用于批处理和流处理,主要支持Java和Python,但也提供实验性的Go SDK。Go SDK的基本概念包括`PTransform`、`PCollection`和`Pipeline`。在使用中,需注意类型转换、窗口和触发器配置、资源管理和错误处理。尽管Go SDK文档有限,生态系统尚不成熟,且性能可能不高,但它仍为分布式计算提供了可移植的解决方案。通过理解和掌握Beam模型,开发者能编写高效的数据处理程序。
133 1
|
4天前
|
算法 关系型数据库 MySQL
Go语言中的分布式ID生成器设计与实现
【5月更文挑战第6天】本文探讨了Go语言在分布式系统中生成全局唯一ID的策略,包括Twitter的Snowflake算法、UUID和MySQL自增ID。Snowflake算法通过时间戳、节点ID和序列号生成ID,Go实现中需处理时间回拨问题。UUID保证全局唯一,但长度较长。MySQL自增ID依赖数据库,可能造成性能瓶颈。选择策略时需考虑业务需求和并发、时间同步等挑战,以确保系统稳定可靠。
111 0
|
4天前
|
缓存 NoSQL Go
Go语言中的分布式锁实现与选型
【5月更文挑战第6天】本文探讨了Go语言中分布式锁的实现,包括Redis、ZooKeeper和Etcd三种方式,强调了选型时的性能、可靠性和复杂度考量。通过代码示例展示了Redis分布式锁的使用,并提出了避免死锁、公平性等问题的策略。结论指出,开发者应根据业务需求选择合适实现并理解底层原理,以确保系统稳定和高效。
131 0