Apache Doris 是一个基于 MPP 架构的高性能、实时的分析型数据库,以极速易用的特点被人们所熟知,仅需亚秒级响应时间即可返回海量数据下的查询结果,不仅可以支持高并发的点查询场景,也能支持高吞吐的复杂分析场景。基于此,Apache Doris 能够较好的满足报表分析、即席查询、统一数仓构建、数据湖联邦查询加速等使用场景,用户可以在此之上构建用户行为分析、AB 实验平台、日志检索分析、用户画像分析、订单分析等应用。
使用 Doris 的用户都知道 Doris 是完全兼容 MySQL 协议的,我们可以使用任意 MySQL 客户端或者 Connector 去连接 Doris,用 SQL 操作 Doris,这样你可以使用任意语言来操作 Doris。今天我们演示使用 Go 语言来访问 Doris ,完成查询和 插入操作。
Go 与 MySQL 的结合还是比较容易的,像是连接,增、删、改这些操作都比较简单。
Go 语言的安装配置还是很简单的,这里我们就不做介绍了,直接开始
安装驱动
安装 Go 连接 MySQL的驱动
go get github.com/go-sql-driver/mysql
在我们程序里导入依赖库
import ( "database/sql" "fmt" _ "github.com/go-sql-driver/mysql" )
连接Doris数据库
var ( // 定义一个全局对象db db *sql.DB //连接Doris的用户名 userName string = "root" //连接Doris的密码 password string = "" //连接Doris的地址 ipAddress string = "127.0.0.1" //连接Doris的端口号,默认是9030 port int = 9030 //连接Doris的具体数据库名称 dbName string = "test" ) func initDB() (err error) { dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", userName, password, ipAddress, port, dbName) //Open打开一个driverName指定的数据库,dataSourceName指定数据源 //不会校验用户名和密码是否正确,只会对dsn的格式进行检测 db, err = sql.Open("mysql", dsn) //dsn格式不正确的时候会报错 if err != nil { return err } //尝试与数据库连接,校验dsn是否正确 err = db.Ping() if err != nil { fmt.Println("校验失败,err", err) return err } // 设置最大连接数 db.SetMaxOpenConns(50) // 设置最大的空闲连接数 // db.SetMaxIdleConns(20) fmt.Println("连接数据库成功!") return nil }
验证连接
func main() { err := initDB() if err != nil { fmt.Println("初始化数据库失败,err", err) return } }
查询数据表
这里我们简单做一个查询表里的所有数据
我的表结构如下:
CREATE TABLE `t_cn_search` ( `md5` varchar(100) NULL, `book_line` text NULL, INDEX idx_line (`book_line`) USING INVERTED PROPERTIES("parser" = "chinese", "support_phrase" = "true") COMMENT '' ) ENGINE=OLAP DUPLICATE KEY(`md5`) COMMENT 'OLAP' DISTRIBUTED BY HASH(`md5`) BUCKETS 2 PROPERTIES ( "replication_allocation" = "tag.location.default: 1", "is_being_synced" = "false", "storage_format" = "V2", "light_schema_change" = "true", "disable_auto_compaction" = "false", "enable_single_replica_compaction" = "false" );
我这个表是一个日志检索用的表,使用了 Doris 的倒排索引,我们匹配任意关键字粒子
查询程序:
// 查询数据 func QueryRow() { rows, _ := db.Query("select * from t_cn_search where book_line MATCH_ANY '粒子'") //获取所有数据 var md5 int var book_line string for rows.Next() { //循环显示所有的数据 rows.Scan(&md5, &book_line) fmt.Println(md5, "--", book_line) } }
我们执行程序可以看到查询结果:
插入数据
我们现在来演示怎么插入数据,这个演示的是我们通过 Doris 提供的 TVF(Table Value Function)将 HDFS 上文件数据直接导入到 Doris 的表里。
我的 hdfs 上的文件格式是 Parquet,我们可以通过 TVF 来看一下这个表的数据结构
mysql> desc function hdfs( -> "uri" = "hdfs://localhost:9000/tmp/test.parquet", -> "fs.defaultFS" = "hdfs://localhost:9000", -> "hadoop.username" = "doris", -> "format" = "parquet"); +----------------+------+------+-------+---------+-------+ | Field | Type | Null | Key | Default | Extra | +----------------+------+------+-------+---------+-------+ | date | TEXT | Yes | false | NULL | NONE | | user_src | TEXT | Yes | false | NULL | NONE | | order_src | TEXT | Yes | false | NULL | NONE | | order_location | TEXT | Yes | false | NULL | NONE | | new_order | INT | Yes | false | NULL | NONE | | payed_order | INT | Yes | false | NULL | NONE | | pending_order | INT | Yes | false | NULL | NONE | | cancel_order | INT | Yes | false | NULL | NONE | | reject_order | INT | Yes | false | NULL | NONE | | good_order | INT | Yes | false | NULL | NONE | | report_order | INT | Yes | false | NULL | NONE | +----------------+------+------+-------+---------+-------+ 11 rows in set (0.16 sec)
Doris 的表结构如下:
CREATE TABLE `order_analysis` ( `date` varchar(57) NULL, `user_src` varchar(27) NULL, `order_src` varchar(33) NULL, `order_location` varchar(6) NULL, `new_order` int(11) NULL, `payed_order` int(11) NULL, `pending_order` int(11) NULL, `cancel_order` int(11) NULL, `reject_order` int(11) NULL, `good_order` int(11) NULL, `report_order` int(11) NULL ) ENGINE=OLAP DUPLICATE KEY(`date`) COMMENT 'OLAP' DISTRIBUTED BY HASH(`date`) BUCKETS 2 PROPERTIES ( "replication_allocation" = "tag.location.default: 1", "is_being_synced" = "false", "storage_format" = "V2", "light_schema_change" = "true", "disable_auto_compaction" = "false", "enable_single_replica_compaction" = "false" );
将 hdfs 文件数据导入到 Doris 表里,这里我们使用的是 insert into tbl select
这个操作
func insert() { result, err := db.Exec("insert into order_analysis select * from hdfs(" + "\"uri\" = \"hdfs://localhost:9000/tmp/test.parquet\"," + "\"fs.defaultFS\" = \"hdfs://localhost:9000\"," + "\"hadoop.username\" = \"doris\"," + "\"format\" = \"parquet\")") if err != nil { fmt.Println("预处理失败:", err) return } if err != nil { fmt.Println("执行预处理失败:", err) return } else { rows, _ := result.RowsAffected() fmt.Println("执行成功,影响行数", rows, "行") } }
执行完成之后我们可以看到返回的结果
连接数据库成功! 执行成功,影响行数 5061 行
总结
是不是很简单,你可以使用任意语言通过 MySQL 协议来操作 Doris ,后面我们会在讲解通过 Go 语言使用 Doris 提供的 Stream load(http协议)来完成数据导入的操作。