使用GO语言通过Stream Load实现Doris数据导入

简介: 使用GO语言通过Stream Load实现Doris数据导入。

本文使用的GO是1.17.2


Doris 0.15.0 release版


Doris的数据导入有各种语言的版本,但是GO语言版本的基本见不到,简单学了一下,写了一个简单的Stream Load入库的示例,仅供参考


示例中使用的表结构:

CREATE TABLE IF NOT EXISTS user_info
(
    user_id LARGEINT NOT NULL COMMENT "用户id",
    username varchar(50) NOT NULL COMMENT "用户名",
    city VARCHAR(20) COMMENT "用户所在城市",
    age SMALLINT COMMENT "用户年龄",
    sex TINYINT COMMENT "用户性别",
    phone LARGEINT COMMENT "电话",
    address VARCHAR(500) COMMENT "地址",
    register_time datetime COMMENT "用户注册时间"
)
Unique KEY(user_id, username)
DISTRIBUTED BY HASH(user_id) BUCKETS 3
PROPERTIES (
"replication_num" = "3"
);

下面是GO的示例代码,其中支持从文件导入,从内存数据导入,同时提供了获取BE节点列表的方法,你在导入的时候可以从这里随机获取一个BE节点IP及端口,直连BE进行导入


package main
import (
   "container/list"
   "encoding/base64"
   "encoding/json"
   "fmt"
   "github.com/gofrs/uuid"
   "io/ioutil"
   "log"
   "net/http"
   "strconv"
   "strings"
)
type StreamLoad struct {
   url       string
   dbName    string
   tableName string
   data      string
   userName  string
   password  string
}
//实现Doris用户认证信息
func auth(load StreamLoad) string {
   s := load.userName + ":" + load.password
   b := []byte(s)
   sEnc := base64.StdEncoding.EncodeToString(b)
   fmt.Printf("enc=[%s]\n", sEnc)
   sDec, err := base64.StdEncoding.DecodeString(sEnc)
   if err != nil {
      fmt.Printf("base64 decode failure, error=[%v]\n", err)
   } else {
      fmt.Printf("dec=[%s]\n", sDec)
   }
   return sEnc
}
//使用Stream load将文件数据导入到Doris对应的数据表中
func batch_load_file(load StreamLoad, file string) {
   client := &http.Client{}
   //生成要访问的url
   url := "http://10.220.146.10:8030/api/test_2/user_info/_stream_load"
   //fmt.Formatter(.Format(url,load.dbName,l))
   fileContext, err := ioutil.ReadFile(file)
   if err != nil {
      log.Println("Failed to Read the File", file, err)
   }
   record := strings.NewReader(string(fileContext))
   //提交请求
   reqest, err := http.NewRequest(http.MethodPut, url, record)
   //增加header选项
   reqest.Header.Add("Authorization", "basic "+auth(load))
   reqest.Header.Add("EXPECT", "100-continue")
   var u1 = uuid.Must(uuid.NewV4())
   reqest.Header.Add("label", u1.String())
   reqest.Header.Add("column_separator", ",")
   if err != nil {
      panic(err)
   }
   //处理返回结果
   response, _ := client.Do(reqest)
   if response.StatusCode == 200 {
      body, _ := ioutil.ReadAll(response.Body)
      responseBody := ResponseBody{}
      jsonStr := string(body)
      err := json.Unmarshal([]byte(jsonStr), &responseBody)
      if err != nil {
         fmt.Println(err.Error())
      }
      if responseBody.Status == "Success" {
         //如果有被过滤的数据,打印错误的URL
         if responseBody.NumberFilteredRows > 0 {
            fmt.Printf("Error Data : %s ", responseBody.ErrorURL)
         } else {
            fmt.Printf("Success import data : %d", responseBody.NumberLoadedRows)
         }
      }
      fmt.Println(string(body))
   }
   defer response.Body.Close()
}
//内存流数据,通过Stream Load导入Doris表中
func batch_load_data(load StreamLoad, data string) {
   client := &http.Client{}
   //生成要访问的url
   url := "http://10.220.146.10:8030/api/test_2/user_info/_stream_load"
   //fmt.Formatter(.Format(url,load.dbName,l))
   record := strings.NewReader(data)
   //提交请求
   reqest, err := http.NewRequest(http.MethodPut, url, record)
   //增加header选项
   reqest.Header.Add("Authorization", "basic "+auth(load))
   reqest.Header.Add("EXPECT", "100-continue")
   var u1 = uuid.Must(uuid.NewV4())
   reqest.Header.Add("label", u1.String())
   reqest.Header.Add("column_separator", ",")
   if err != nil {
      panic(err)
   }
   //处理返回结果
   response, _ := client.Do(reqest)
   if response.StatusCode == 200 {
      body, _ := ioutil.ReadAll(response.Body)
      responseBody := ResponseBody{}
      jsonStr := string(body)
      err := json.Unmarshal([]byte(jsonStr), &responseBody)
      if err != nil {
         fmt.Println(err.Error())
      }
      if responseBody.Status == "Success" {
         //如果有被过滤的数据,打印错误的URL
         if responseBody.NumberFilteredRows > 0 {
            fmt.Printf("Error Data : %s ", responseBody.ErrorURL)
         } else {
            fmt.Printf("Success import data : %d", responseBody.NumberLoadedRows)
         }
      } else {
         fmt.Printf("Error Message : %s \n", responseBody.Message)
         fmt.Printf("Error Data : %s ", responseBody.ErrorURL)
      }
      //fmt.Println(jsonStr)
   }
   defer response.Body.Close()
}
//获取BE列表
func get_doris_be_list() *list.List {
   var load StreamLoad
   load.userName = "root"
   load.password = ""
   client := &http.Client{}
   //生成要访问的url
   url := "http://10.220.146.10:8030/api/backends?is_alive=true"
   //提交请求
   reqest, err := http.NewRequest("GET", url, nil)
   //增加header选项
   reqest.Header.Add("Authorization", "basic "+auth(load))
   if err != nil {
      panic(err)
   }
   //处理返回结果
   response, _ := client.Do(reqest)
   bes := list.New()
   if response.StatusCode == 200 {
      body, _ := ioutil.ReadAll(response.Body)
      backends := Backend{}
      jsonStr := string(body)
      err := json.Unmarshal([]byte(jsonStr), &backends)
      if err != nil {
         fmt.Println(err.Error())
      }
      for _, beinfo := range backends.Data.Backends {
         be := beinfo.IP + ":" + strconv.Itoa(beinfo.HTTPPort)
         bes.PushBack(be)
      }
   }
   defer response.Body.Close()
   return bes
}
//Stream load返回消息结构体
type ResponseBody struct {
   TxnID                  int    `json:"TxnId"`
   Label                  string `json:"Label"`
   Status                 string `json:"Status"`
   Message                string `json:"Message"`
   NumberTotalRows        int    `json:"NumberTotalRows"`
   NumberLoadedRows       int    `json:"NumberLoadedRows"`
   NumberFilteredRows     int    `json:"NumberFilteredRows"`
   NumberUnselectedRows   int    `json:"NumberUnselectedRows"`
   LoadBytes              int    `json:"LoadBytes"`
   LoadTimeMs             int    `json:"LoadTimeMs"`
   BeginTxnTimeMs         int    `json:"BeginTxnTimeMs"`
   StreamLoadPutTimeMs    int    `json:"StreamLoadPutTimeMs"`
   ReadDataTimeMs         int    `json:"ReadDataTimeMs"`
   WriteDataTimeMs        int    `json:"WriteDataTimeMs"`
   CommitAndPublishTimeMs int    `json:"CommitAndPublishTimeMs"`
   ErrorURL               string `json:"ErrorURL"`
}
//获取BE列表返回结构体
type Backend struct {
   Msg  string `json:"msg"`
   Code int    `json:"code"`
   Data struct {
      Backends []struct {
         IP       string `json:"ip"`
         HTTPPort int    `json:"http_port"`
         IsAlive  bool   `json:"is_alive"`
      } `json:"backends"`
   } `json:"data"`
   Count int `json:"count"`
}
func main() {
   var load StreamLoad
   load.userName = "root"
   load.password = ""
   //auth_info := auth(load)
   //fmt.Println(auth_info)
   //backends := get_doris_be_list()
   //for e := backends.Front(); e != nil; e = e.Next() {
   // fmt.Println(e.Value)
   //}
   data := "10001,张***,西安,30,1,133****760,陕西省**********,2021-03-12 12:34:12"
   batch_load_data(load, data)
   //batch_load_file(/load, "/Users/zhangfeng/Downloads/test.csv")
}




目录
相关文章
|
9天前
|
存储 Go 索引
go语言使用for循环遍历
go语言使用for循环遍历
24 7
|
12天前
|
存储 Go
go语言 遍历映射(map)
go语言 遍历映射(map)
25 2
|
13天前
|
Go 调度 开发者
Go语言中的并发编程:深入理解goroutines和channels####
本文旨在探讨Go语言中并发编程的核心概念——goroutines和channels。通过分析它们的工作原理、使用场景以及最佳实践,帮助开发者更好地理解和运用这两种强大的工具来构建高效、可扩展的应用程序。文章还将涵盖一些常见的陷阱和解决方案,以确保在实际应用中能够避免潜在的问题。 ####
|
13天前
|
测试技术 Go 索引
go语言使用 range 关键字遍历
go语言使用 range 关键字遍历
17 3
|
13天前
|
测试技术 Go 索引
go语言通过 for 循环遍历
go语言通过 for 循环遍历
23 3
|
15天前
|
安全 Go 数据处理
Go语言中的并发编程:掌握goroutine和channel的艺术####
本文深入探讨了Go语言在并发编程领域的核心概念——goroutine与channel。不同于传统的单线程执行模式,Go通过轻量级的goroutine实现了高效的并发处理,而channel作为goroutines之间通信的桥梁,确保了数据传递的安全性与高效性。文章首先简述了goroutine的基本特性及其创建方法,随后详细解析了channel的类型、操作以及它们如何协同工作以构建健壮的并发应用。此外,还介绍了select语句在多路复用中的应用,以及如何利用WaitGroup等待一组goroutine完成。最后,通过一个实际案例展示了如何在Go中设计并实现一个简单的并发程序,旨在帮助读者理解并掌
|
14天前
|
Go 索引
go语言按字符(Rune)遍历
go语言按字符(Rune)遍历
24 3
|
25天前
|
存储 JSON 监控
Viper,一个Go语言配置管理神器!
Viper 是一个功能强大的 Go 语言配置管理库,支持从多种来源读取配置,包括文件、环境变量、远程配置中心等。本文详细介绍了 Viper 的核心特性和使用方法,包括从本地 YAML 文件和 Consul 远程配置中心读取配置的示例。Viper 的多来源配置、动态配置和轻松集成特性使其成为管理复杂应用配置的理想选择。
38 2
|
23天前
|
Go 索引
go语言中的循环语句
【11月更文挑战第4天】
26 2
|
23天前
|
Go C++
go语言中的条件语句
【11月更文挑战第4天】
33 2