使用GO语言通过Stream Load实现Doris数据导入

简介: 使用GO语言通过Stream Load实现Doris数据导入。

本文使用的GO是1.17.2


Doris 0.15.0 release版


Doris的数据导入有各种语言的版本,但是GO语言版本的基本见不到,简单学了一下,写了一个简单的Stream Load入库的示例,仅供参考


示例中使用的表结构:

CREATE TABLE IF NOT EXISTS user_info
(
    user_id LARGEINT NOT NULL COMMENT "用户id",
    username varchar(50) NOT NULL COMMENT "用户名",
    city VARCHAR(20) COMMENT "用户所在城市",
    age SMALLINT COMMENT "用户年龄",
    sex TINYINT COMMENT "用户性别",
    phone LARGEINT COMMENT "电话",
    address VARCHAR(500) COMMENT "地址",
    register_time datetime COMMENT "用户注册时间"
)
Unique KEY(user_id, username)
DISTRIBUTED BY HASH(user_id) BUCKETS 3
PROPERTIES (
"replication_num" = "3"
);

下面是GO的示例代码,其中支持从文件导入,从内存数据导入,同时提供了获取BE节点列表的方法,你在导入的时候可以从这里随机获取一个BE节点IP及端口,直连BE进行导入


package main
import (
   "container/list"
   "encoding/base64"
   "encoding/json"
   "fmt"
   "github.com/gofrs/uuid"
   "io/ioutil"
   "log"
   "net/http"
   "strconv"
   "strings"
)
type StreamLoad struct {
   url       string
   dbName    string
   tableName string
   data      string
   userName  string
   password  string
}
//实现Doris用户认证信息
func auth(load StreamLoad) string {
   s := load.userName + ":" + load.password
   b := []byte(s)
   sEnc := base64.StdEncoding.EncodeToString(b)
   fmt.Printf("enc=[%s]\n", sEnc)
   sDec, err := base64.StdEncoding.DecodeString(sEnc)
   if err != nil {
      fmt.Printf("base64 decode failure, error=[%v]\n", err)
   } else {
      fmt.Printf("dec=[%s]\n", sDec)
   }
   return sEnc
}
//使用Stream load将文件数据导入到Doris对应的数据表中
func batch_load_file(load StreamLoad, file string) {
   client := &http.Client{}
   //生成要访问的url
   url := "http://10.220.146.10:8030/api/test_2/user_info/_stream_load"
   //fmt.Formatter(.Format(url,load.dbName,l))
   fileContext, err := ioutil.ReadFile(file)
   if err != nil {
      log.Println("Failed to Read the File", file, err)
   }
   record := strings.NewReader(string(fileContext))
   //提交请求
   reqest, err := http.NewRequest(http.MethodPut, url, record)
   //增加header选项
   reqest.Header.Add("Authorization", "basic "+auth(load))
   reqest.Header.Add("EXPECT", "100-continue")
   var u1 = uuid.Must(uuid.NewV4())
   reqest.Header.Add("label", u1.String())
   reqest.Header.Add("column_separator", ",")
   if err != nil {
      panic(err)
   }
   //处理返回结果
   response, _ := client.Do(reqest)
   if response.StatusCode == 200 {
      body, _ := ioutil.ReadAll(response.Body)
      responseBody := ResponseBody{}
      jsonStr := string(body)
      err := json.Unmarshal([]byte(jsonStr), &responseBody)
      if err != nil {
         fmt.Println(err.Error())
      }
      if responseBody.Status == "Success" {
         //如果有被过滤的数据,打印错误的URL
         if responseBody.NumberFilteredRows > 0 {
            fmt.Printf("Error Data : %s ", responseBody.ErrorURL)
         } else {
            fmt.Printf("Success import data : %d", responseBody.NumberLoadedRows)
         }
      }
      fmt.Println(string(body))
   }
   defer response.Body.Close()
}
//内存流数据,通过Stream Load导入Doris表中
func batch_load_data(load StreamLoad, data string) {
   client := &http.Client{}
   //生成要访问的url
   url := "http://10.220.146.10:8030/api/test_2/user_info/_stream_load"
   //fmt.Formatter(.Format(url,load.dbName,l))
   record := strings.NewReader(data)
   //提交请求
   reqest, err := http.NewRequest(http.MethodPut, url, record)
   //增加header选项
   reqest.Header.Add("Authorization", "basic "+auth(load))
   reqest.Header.Add("EXPECT", "100-continue")
   var u1 = uuid.Must(uuid.NewV4())
   reqest.Header.Add("label", u1.String())
   reqest.Header.Add("column_separator", ",")
   if err != nil {
      panic(err)
   }
   //处理返回结果
   response, _ := client.Do(reqest)
   if response.StatusCode == 200 {
      body, _ := ioutil.ReadAll(response.Body)
      responseBody := ResponseBody{}
      jsonStr := string(body)
      err := json.Unmarshal([]byte(jsonStr), &responseBody)
      if err != nil {
         fmt.Println(err.Error())
      }
      if responseBody.Status == "Success" {
         //如果有被过滤的数据,打印错误的URL
         if responseBody.NumberFilteredRows > 0 {
            fmt.Printf("Error Data : %s ", responseBody.ErrorURL)
         } else {
            fmt.Printf("Success import data : %d", responseBody.NumberLoadedRows)
         }
      } else {
         fmt.Printf("Error Message : %s \n", responseBody.Message)
         fmt.Printf("Error Data : %s ", responseBody.ErrorURL)
      }
      //fmt.Println(jsonStr)
   }
   defer response.Body.Close()
}
//获取BE列表
func get_doris_be_list() *list.List {
   var load StreamLoad
   load.userName = "root"
   load.password = ""
   client := &http.Client{}
   //生成要访问的url
   url := "http://10.220.146.10:8030/api/backends?is_alive=true"
   //提交请求
   reqest, err := http.NewRequest("GET", url, nil)
   //增加header选项
   reqest.Header.Add("Authorization", "basic "+auth(load))
   if err != nil {
      panic(err)
   }
   //处理返回结果
   response, _ := client.Do(reqest)
   bes := list.New()
   if response.StatusCode == 200 {
      body, _ := ioutil.ReadAll(response.Body)
      backends := Backend{}
      jsonStr := string(body)
      err := json.Unmarshal([]byte(jsonStr), &backends)
      if err != nil {
         fmt.Println(err.Error())
      }
      for _, beinfo := range backends.Data.Backends {
         be := beinfo.IP + ":" + strconv.Itoa(beinfo.HTTPPort)
         bes.PushBack(be)
      }
   }
   defer response.Body.Close()
   return bes
}
//Stream load返回消息结构体
type ResponseBody struct {
   TxnID                  int    `json:"TxnId"`
   Label                  string `json:"Label"`
   Status                 string `json:"Status"`
   Message                string `json:"Message"`
   NumberTotalRows        int    `json:"NumberTotalRows"`
   NumberLoadedRows       int    `json:"NumberLoadedRows"`
   NumberFilteredRows     int    `json:"NumberFilteredRows"`
   NumberUnselectedRows   int    `json:"NumberUnselectedRows"`
   LoadBytes              int    `json:"LoadBytes"`
   LoadTimeMs             int    `json:"LoadTimeMs"`
   BeginTxnTimeMs         int    `json:"BeginTxnTimeMs"`
   StreamLoadPutTimeMs    int    `json:"StreamLoadPutTimeMs"`
   ReadDataTimeMs         int    `json:"ReadDataTimeMs"`
   WriteDataTimeMs        int    `json:"WriteDataTimeMs"`
   CommitAndPublishTimeMs int    `json:"CommitAndPublishTimeMs"`
   ErrorURL               string `json:"ErrorURL"`
}
//获取BE列表返回结构体
type Backend struct {
   Msg  string `json:"msg"`
   Code int    `json:"code"`
   Data struct {
      Backends []struct {
         IP       string `json:"ip"`
         HTTPPort int    `json:"http_port"`
         IsAlive  bool   `json:"is_alive"`
      } `json:"backends"`
   } `json:"data"`
   Count int `json:"count"`
}
func main() {
   var load StreamLoad
   load.userName = "root"
   load.password = ""
   //auth_info := auth(load)
   //fmt.Println(auth_info)
   //backends := get_doris_be_list()
   //for e := backends.Front(); e != nil; e = e.Next() {
   // fmt.Println(e.Value)
   //}
   data := "10001,张***,西安,30,1,133****760,陕西省**********,2021-03-12 12:34:12"
   batch_load_data(load, data)
   //batch_load_file(/load, "/Users/zhangfeng/Downloads/test.csv")
}




目录
相关文章
|
2天前
|
测试技术 Go
go语言中测试工具
【10月更文挑战第22天】
9 4
|
2天前
|
SQL 关系型数据库 MySQL
go语言中数据库操作
【10月更文挑战第22天】
11 4
|
2天前
|
缓存 前端开发 中间件
go语言中Web框架
【10月更文挑战第22天】
12 4
|
5天前
|
Go
go语言的复数常量
【10月更文挑战第21天】
18 6
|
5天前
|
Go
go语言的浮点型常量
【10月更文挑战第21天】
13 4
|
5天前
|
Serverless Go
Go语言中的并发编程:从入门到精通
本文将深入探讨Go语言中并发编程的核心概念和实践,包括goroutine、channel以及sync包等。通过实例演示如何利用这些工具实现高效的并发处理,同时避免常见的陷阱和错误。
|
2天前
|
安全 测试技术 Go
Go语言中的并发编程模型解析####
在当今的软件开发领域,高效的并发处理能力是提升系统性能的关键。本文深入探讨了Go语言独特的并发编程模型——goroutines和channels,通过实例解析其工作原理、优势及最佳实践,旨在为开发者提供实用的Go语言并发编程指南。 ####
|
7天前
|
Go 数据安全/隐私保护 开发者
Go语言开发
【10月更文挑战第26天】Go语言开发
21 3
|
9天前
|
Java 程序员 Go
Go语言的开发
【10月更文挑战第25天】Go语言的开发
19 3
|
3月前
|
JSON 中间件 Go
go语言后端开发学习(四) —— 在go项目中使用Zap日志库
本文详细介绍了如何在Go项目中集成并配置Zap日志库。首先通过`go get -u go.uber.org/zap`命令安装Zap,接着展示了`Logger`与`Sugared Logger`两种日志记录器的基本用法。随后深入探讨了Zap的高级配置,包括如何将日志输出至文件、调整时间格式、记录调用者信息以及日志分割等。最后,文章演示了如何在gin框架中集成Zap,通过自定义中间件实现了日志记录和异常恢复功能。通过这些步骤,读者可以掌握Zap在实际项目中的应用与定制方法
125 1
go语言后端开发学习(四) —— 在go项目中使用Zap日志库