使用GO语言通过Stream Load实现Doris数据导入

简介: 使用GO语言通过Stream Load实现Doris数据导入。

本文使用的GO是1.17.2


Doris 0.15.0 release版


Doris的数据导入有各种语言的版本,但是GO语言版本的基本见不到,简单学了一下,写了一个简单的Stream Load入库的示例,仅供参考


示例中使用的表结构:

CREATE TABLE IF NOT EXISTS user_info
(
    user_id LARGEINT NOT NULL COMMENT "用户id",
    username varchar(50) NOT NULL COMMENT "用户名",
    city VARCHAR(20) COMMENT "用户所在城市",
    age SMALLINT COMMENT "用户年龄",
    sex TINYINT COMMENT "用户性别",
    phone LARGEINT COMMENT "电话",
    address VARCHAR(500) COMMENT "地址",
    register_time datetime COMMENT "用户注册时间"
)
Unique KEY(user_id, username)
DISTRIBUTED BY HASH(user_id) BUCKETS 3
PROPERTIES (
"replication_num" = "3"
);

下面是GO的示例代码,其中支持从文件导入,从内存数据导入,同时提供了获取BE节点列表的方法,你在导入的时候可以从这里随机获取一个BE节点IP及端口,直连BE进行导入


package main
import (
   "container/list"
   "encoding/base64"
   "encoding/json"
   "fmt"
   "github.com/gofrs/uuid"
   "io/ioutil"
   "log"
   "net/http"
   "strconv"
   "strings"
)
type StreamLoad struct {
   url       string
   dbName    string
   tableName string
   data      string
   userName  string
   password  string
}
//实现Doris用户认证信息
func auth(load StreamLoad) string {
   s := load.userName + ":" + load.password
   b := []byte(s)
   sEnc := base64.StdEncoding.EncodeToString(b)
   fmt.Printf("enc=[%s]\n", sEnc)
   sDec, err := base64.StdEncoding.DecodeString(sEnc)
   if err != nil {
      fmt.Printf("base64 decode failure, error=[%v]\n", err)
   } else {
      fmt.Printf("dec=[%s]\n", sDec)
   }
   return sEnc
}
//使用Stream load将文件数据导入到Doris对应的数据表中
func batch_load_file(load StreamLoad, file string) {
   client := &http.Client{}
   //生成要访问的url
   url := "http://10.220.146.10:8030/api/test_2/user_info/_stream_load"
   //fmt.Formatter(.Format(url,load.dbName,l))
   fileContext, err := ioutil.ReadFile(file)
   if err != nil {
      log.Println("Failed to Read the File", file, err)
   }
   record := strings.NewReader(string(fileContext))
   //提交请求
   reqest, err := http.NewRequest(http.MethodPut, url, record)
   //增加header选项
   reqest.Header.Add("Authorization", "basic "+auth(load))
   reqest.Header.Add("EXPECT", "100-continue")
   var u1 = uuid.Must(uuid.NewV4())
   reqest.Header.Add("label", u1.String())
   reqest.Header.Add("column_separator", ",")
   if err != nil {
      panic(err)
   }
   //处理返回结果
   response, _ := client.Do(reqest)
   if response.StatusCode == 200 {
      body, _ := ioutil.ReadAll(response.Body)
      responseBody := ResponseBody{}
      jsonStr := string(body)
      err := json.Unmarshal([]byte(jsonStr), &responseBody)
      if err != nil {
         fmt.Println(err.Error())
      }
      if responseBody.Status == "Success" {
         //如果有被过滤的数据,打印错误的URL
         if responseBody.NumberFilteredRows > 0 {
            fmt.Printf("Error Data : %s ", responseBody.ErrorURL)
         } else {
            fmt.Printf("Success import data : %d", responseBody.NumberLoadedRows)
         }
      }
      fmt.Println(string(body))
   }
   defer response.Body.Close()
}
//内存流数据,通过Stream Load导入Doris表中
func batch_load_data(load StreamLoad, data string) {
   client := &http.Client{}
   //生成要访问的url
   url := "http://10.220.146.10:8030/api/test_2/user_info/_stream_load"
   //fmt.Formatter(.Format(url,load.dbName,l))
   record := strings.NewReader(data)
   //提交请求
   reqest, err := http.NewRequest(http.MethodPut, url, record)
   //增加header选项
   reqest.Header.Add("Authorization", "basic "+auth(load))
   reqest.Header.Add("EXPECT", "100-continue")
   var u1 = uuid.Must(uuid.NewV4())
   reqest.Header.Add("label", u1.String())
   reqest.Header.Add("column_separator", ",")
   if err != nil {
      panic(err)
   }
   //处理返回结果
   response, _ := client.Do(reqest)
   if response.StatusCode == 200 {
      body, _ := ioutil.ReadAll(response.Body)
      responseBody := ResponseBody{}
      jsonStr := string(body)
      err := json.Unmarshal([]byte(jsonStr), &responseBody)
      if err != nil {
         fmt.Println(err.Error())
      }
      if responseBody.Status == "Success" {
         //如果有被过滤的数据,打印错误的URL
         if responseBody.NumberFilteredRows > 0 {
            fmt.Printf("Error Data : %s ", responseBody.ErrorURL)
         } else {
            fmt.Printf("Success import data : %d", responseBody.NumberLoadedRows)
         }
      } else {
         fmt.Printf("Error Message : %s \n", responseBody.Message)
         fmt.Printf("Error Data : %s ", responseBody.ErrorURL)
      }
      //fmt.Println(jsonStr)
   }
   defer response.Body.Close()
}
//获取BE列表
func get_doris_be_list() *list.List {
   var load StreamLoad
   load.userName = "root"
   load.password = ""
   client := &http.Client{}
   //生成要访问的url
   url := "http://10.220.146.10:8030/api/backends?is_alive=true"
   //提交请求
   reqest, err := http.NewRequest("GET", url, nil)
   //增加header选项
   reqest.Header.Add("Authorization", "basic "+auth(load))
   if err != nil {
      panic(err)
   }
   //处理返回结果
   response, _ := client.Do(reqest)
   bes := list.New()
   if response.StatusCode == 200 {
      body, _ := ioutil.ReadAll(response.Body)
      backends := Backend{}
      jsonStr := string(body)
      err := json.Unmarshal([]byte(jsonStr), &backends)
      if err != nil {
         fmt.Println(err.Error())
      }
      for _, beinfo := range backends.Data.Backends {
         be := beinfo.IP + ":" + strconv.Itoa(beinfo.HTTPPort)
         bes.PushBack(be)
      }
   }
   defer response.Body.Close()
   return bes
}
//Stream load返回消息结构体
type ResponseBody struct {
   TxnID                  int    `json:"TxnId"`
   Label                  string `json:"Label"`
   Status                 string `json:"Status"`
   Message                string `json:"Message"`
   NumberTotalRows        int    `json:"NumberTotalRows"`
   NumberLoadedRows       int    `json:"NumberLoadedRows"`
   NumberFilteredRows     int    `json:"NumberFilteredRows"`
   NumberUnselectedRows   int    `json:"NumberUnselectedRows"`
   LoadBytes              int    `json:"LoadBytes"`
   LoadTimeMs             int    `json:"LoadTimeMs"`
   BeginTxnTimeMs         int    `json:"BeginTxnTimeMs"`
   StreamLoadPutTimeMs    int    `json:"StreamLoadPutTimeMs"`
   ReadDataTimeMs         int    `json:"ReadDataTimeMs"`
   WriteDataTimeMs        int    `json:"WriteDataTimeMs"`
   CommitAndPublishTimeMs int    `json:"CommitAndPublishTimeMs"`
   ErrorURL               string `json:"ErrorURL"`
}
//获取BE列表返回结构体
type Backend struct {
   Msg  string `json:"msg"`
   Code int    `json:"code"`
   Data struct {
      Backends []struct {
         IP       string `json:"ip"`
         HTTPPort int    `json:"http_port"`
         IsAlive  bool   `json:"is_alive"`
      } `json:"backends"`
   } `json:"data"`
   Count int `json:"count"`
}
func main() {
   var load StreamLoad
   load.userName = "root"
   load.password = ""
   //auth_info := auth(load)
   //fmt.Println(auth_info)
   //backends := get_doris_be_list()
   //for e := backends.Front(); e != nil; e = e.Next() {
   // fmt.Println(e.Value)
   //}
   data := "10001,张***,西安,30,1,133****760,陕西省**********,2021-03-12 12:34:12"
   batch_load_data(load, data)
   //batch_load_file(/load, "/Users/zhangfeng/Downloads/test.csv")
}




目录
相关文章
|
7天前
|
Go
Go 语言循环语句
在不少实际问题中有许多具有规律性的重复操作,因此在程序中就需要重复执行某些语句。
17 1
|
6天前
|
Go 开发者
探索Go语言的并发之美
在Go语言的世界里,"并发"不仅仅是一个特性,它是一种哲学。本文将带你领略Go语言中goroutine和channel的魔力,揭示如何通过Go的并发机制来构建高效、可靠的系统。我们将通过一个简单的示例,展示如何利用Go的并发特性来解决实际问题,让你的程序像Go一样,轻盈而强大。
|
7天前
|
JSON Go API
使用Go语言和Gin框架构建RESTful API:GET与POST请求示例
使用Go语言和Gin框架构建RESTful API:GET与POST请求示例
|
7天前
|
Go
go语言创建字典
go语言创建字典
|
8天前
|
NoSQL Go API
go语言操作Redis
go语言操作Redis
|
7天前
|
Go
Go 语言接口
Go 语言提供了另外一种数据类型即接口,它把所有的具有共性的方法定义在一起,任何其他类型只要实现了这些方法就是实现了这个接口。 接口可以让我们将不同的类型绑定到一组公共的方法上,从而实现多态和灵活的设计。
|
8天前
|
存储 Go
go语言字符串变小写
go语言字符串变小写
|
Go 调度 负载均衡
如何用GO每秒处理100万条数据请求
最近看了一篇文章,用go处理每分钟达百万条的数据请求原文地址:http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/翻译地址:https://www.jianshu.com/p/21de03ac682c 这里作者为处理高峰期高并发的数据请求,用了3个版本的处理方式,下面是自己的一些理解: 第一种方式很简单,就是用go的协程处理请求,来一条请求开一个协程处理,由于每个请求是一个数据上传任务,有一定的耗时和资源消耗,当高峰期请求突然增多达到每分钟百万条的时候,不可避免的造成了携程爆炸,系统崩溃。
2023 0
|
8天前
|
安全 Go 数据处理
探索Go语言的并发之美:Goroutines与Channels
在Go语言的世界里,"并发"不仅仅是一个概念,它是一种生活的方式。本文将带你领略Go语言中Goroutines和Channels的魔力,它们是如何让并发编程变得既简单又高效。我们将通过一个简单的示例,展示如何使用这些工具来构建一个高性能的网络服务。
|
8天前
|
关系型数据库 Go 数据处理
高效数据迁移:使用Go语言优化ETL流程
在本文中,我们将探索Go语言在处理大规模数据迁移任务中的独特优势,以及如何通过Go语言的并发特性来优化数据提取、转换和加载(ETL)流程。不同于其他摘要,本文不仅展示了Go语言在ETL过程中的应用,还提供了实用的代码示例和性能对比分析。
下一篇
无影云桌面