接着上一篇的继续,
使用go实现一个适用于嵌入式上的存储模块。简单易用,使用简单方便。
由于在终端上,不需要执行复杂的sql查询,多表级联查询等。就是用来存储记录的,因此设计为存储到表里的都为二进制的字节流。
还有一个原因是终端上记录字段变动频繁,不适合动不动就更改数据库的表结构吧。如果想要方便记录的解析,可以结合protobuf把数据序列化为字节流存储进去。
以下为按照这个思路的实现的存储方案:
首先记录分为若干区,底层实现对应若干个表,表里存储记录。
然后在单独建一个表,作为目录表。目录表里只有若干条记录。注意这个表不是用来插入数据的。是用来记录记录表里的记录写到什么位置了,上传到什么位置了。
表的结构如下:
如图所示,tb_dir目录表里只有10条数据。不会增也不会减,只会更新。分别对应tb_rec01---tb_rec10这十个记录表。
记录了当前的记录流水号,当前记录写的位置,当前记录读的位置等信息。
再看下记录表里有哪些内容。
分别有id,recNo(记录流水),
recType(记录类型),
recTime(记录时间),
data(记录二进制数据内容,比byte字节流,长度不限),
ext(预留扩展),res(预留)
操作有哪些接口?
都在recapi.go文件中,
package sqllite // 配置项 const ( // MAXRECDIRS 最大记录目录数量 //(一个记录目录对应控制一个记录表,它记录了记录表中的数据存储和读取的位置) MAXRECDIRS = 10 // MAXRECAREAS 最大记录区数量 10个(即记录表的个数,必须跟记录目录数量保持一致) MAXRECAREAS = MAXRECDIRS // MAXRECCOUNTS 最大记录条数(即一个表中允许存储的最大记录数,例100000条 // 记录存满后且上传标记已清除后,则从头开始存储覆盖,存储一条,覆盖一条) MAXRECCOUNTS = 100000 ) //枚举,记录区定义(一个记录区对应一个表) const ( //RecArea01 记录区1 RecArea01 = iota + 1 //RecArea02 记录区2 RecArea02 //RecArea03 记录区3 RecArea03 //RecArea04 记录区4 RecArea04 //RecArea05 记录区5 RecArea05 //RecArea06 记录区6 RecArea06 //RecArea07 记录区7 RecArea07 //RecArea08 记录区8 RecArea08 //RecArea09 记录区9 RecArea09 //RecArea10 记录区10 RecArea10 ) // Recorder 操作记录的接口声明 type Recorder interface { // 初始化记录区(会清空所有数据!) InitRecAreas() error // 打开记录区(开机必须先打开一次) OpenRecAreas() (err error) // 保存记录 SaveRec(areaID int, buf []byte, recType int) (id int64, err error) // 删除记录 DeleteRec(areaID int, num int64) (err error) // 获取未上传记录数量 GetNoUploadNum(areaID int) int // 按数据库ID读取一条记录 ReadRecByID(areaID int, id int) (p *Records, err error) // 顺序读取未上传的记录 ReadRecNotServer(areaID int, sn int) (p *Records, err error) // 倒数读取记录(如sn=1代表最后一次写入的记录) ReadRecWriteNot(areaID int, sn int) (p *Records, err error) // 最后一条记录流水 GetLastRecNO(areaID int) int } // RecAPI 操作接口的类 type RecAPI struct { Recorder } // NewRecAPI 初始化操作接口 func NewRecAPI(debug bool) RecAPI { return RecAPI{Recorder: NewRecords(debug)} }
如何使用?代码存放在我的github,地址:https://github.com/yongzhena/go-sqllite
有个demo, main.go
package main import ( "log" rec "github.com/yongzhena/go-sqllite" ) func checkErr(err error) { if err != nil { panic(err) } } func main() { log.Println("test sqllite...") log.Println("InitRecAreas...") opt := rec.NewRecAPI(true) err := opt.InitRecAreas() if err != nil { log.Fatal(err.Error()) } log.Println("InitRecAreas ok!") err = opt.OpenRecAreas() if err != nil { log.Fatal(err.Error()) } log.Println("OpenRecAreas ok!") id, err := opt.SaveRec(1, []byte("123456789011"), 0) if err != nil { log.Println(err.Error()) } log.Printf("over,SaveRec ok!,area=%d,id=%d\n", 1, id) id, err = opt.SaveRec(1, []byte("1234567890221111"), 1) if err != nil { log.Println(err.Error()) } id, err = opt.SaveRec(2, []byte("123456789022"), 1) if err != nil { log.Println(err.Error()) } log.Printf("over,SaveRec ok!,area=%d,id=%d\n", 2, id) id, err = opt.SaveRec(2, []byte("123456789022"), 3) if err != nil { log.Println(err.Error()) } log.Printf("over,SaveRec ok!,area=%d,id=%d\n", 2, id) num := opt.GetNoUploadNum(1) log.Printf("area=%d,NoUploadNum=%d\n", 1, num) recp, err := opt.ReadRecNotServer(1, 1) if err != nil { log.Fatal(err.Error()) } log.Println(recp) err = opt.DeleteRec(1, 1) if err != nil { log.Fatal(err.Error()) } num = opt.GetNoUploadNum(1) log.Printf("area=%d,NoUploadNum=%d\n", 1, num) num = opt.GetNoUploadNum(2) log.Printf("area=%d,NoUploadNum=%d\n", 2, num) }
是不是很简单?完成了记录存储和记录获取。完全看不到任何sql的影子。
记录里有日期和流水和记录类型等简单信息供查询。
记录的内容为二进制byte流,想存什么就存什么,存多长也无所谓。至于解析记录嘛,建议结合protobuf来用。
把记录序列化后存储进去。
这几个接口,在嵌入式终端上足够用了。可以写入记录,按顺序读取未上传记录。删除记录(并非真正的删除记录,若直接删记录在终端上是不安全的。而是改了目录表里的readID。并且记录存储满后会从头覆盖。前提是该记录已上传完毕。是不是很安全?这在终端上操作是必须要考虑的。不能让表里记录一直存下去,得指定大小。存满了也不能删,得从头一条条覆盖。)
这种思路是否成熟?
我们原来的c代码,单片机的应用,就是这么做的。只不过底层操作的是Flash。
以下为内部实现:
recdir的实现:
package sqllite import ( "errors" "fmt" "log" db "sqllite/database" ) // RecDir ... type RecDir struct { ID int `json:"id"` RecNo int `json:"recno" ` WriteID int64 `json:"writeid" ` ReadID1 int64 `json:"readid1" ` ReadID2 int64 `json:"readid2" ` ReadID3 int64 `json:"readid3" ` Rp int `json:"rp" ` Res int `json:"res" ` Flag bool `json:"flag" ` } // InitRecDir ... func InitRecDir() (err error) { //创建表 sqlTable := ` DROP TABLE IF EXISTS tb_dir; CREATE TABLE IF NOT EXISTS tb_dir( id INTEGER PRIMARY KEY AUTOINCREMENT, recNo INTEGER NOT NULL, writeID INTEGER NOT NULL, readID1 INTEGER NOT NULL, readID2 INTEGER , readID3 INTEGER , rp INTEGER , res INTEGER ); ` if db.SQLDB == nil { err = errors.New("db.SQLDB is null") log.Fatal(err.Error()) return } log.Println("begin create dir table...") if IsDebug { log.Println("sql:" + sqlTable) } _, err = db.SQLDB.Exec(sqlTable) if err != nil { log.Fatal(err.Error()) } log.Println("create dir table ok!") //清空数据 // log.Println("begin truncate dir table...") // _, err = db.SQLDB.Exec(`UPDATE sqlite_sequence SET seq = 0 WHERE name = 'tb_dir' `) // if err != nil { // log.Fatal(err.Error()) // } // log.Println("truncate dir table ok!") log.Println("begin init dir table...") for i := 0; i < MAXRECDIRS; i++ { _, err = db.SQLDB.Exec("INSERT INTO tb_dir(recNo, writeID,readID1,readID2,readID3,rp,res) VALUES (?, ?,?,?,?,?,?)", 0, 0, 0, 0, 0, 0, 0) if err != nil { log.Fatal(err.Error()) } } log.Println("init dir table ok!") return err } // UpdateDirs 更新目录 func (rd *RecDir) UpdateDirs(areaID int) error { strSQL := fmt.Sprintf("UPDATE tb_dir SET recNo=%d, writeID=%d, readID1=%d, readID2=%d, readID3=%d, rp=%d,res=%d WHERE id=%d", rd.RecNo, rd.WriteID, rd.ReadID1, rd.ReadID2, rd.ReadID3, rd.Rp, rd.Res, areaID) if IsDebug { log.Println(strSQL) } res, err := db.SQLDB.Exec(strSQL) if err != nil { log.Fatal(err.Error()) } affect, err := res.RowsAffected() fmt.Println(affect) if IsDebug { log.Println(rd) } return err } // LoadDirs 加载(读取)目录 func (rd *RecDir) LoadDirs(areaID int) error { strSQL := fmt.Sprintf("SELECT * FROM tb_dir WHERE id=%d", areaID) if IsDebug { log.Println(strSQL) } rows, err := db.SQLDB.Query(strSQL) if err != nil { log.Fatal(err.Error()) return err } if rows.Next() { err = rows.Scan(&rd.ID, &rd.RecNo, &rd.WriteID, &rd.ReadID1, &rd.ReadID2, &rd.ReadID3, &rd.Rp, &rd.Res) if err != nil { log.Fatal(err.Error()) return err } } else { log.Fatal("no dir records") } rows.Close() if IsDebug { log.Println(rd) } return err }
records.go实现:
package sqllite import ( "errors" "fmt" "log" db "sqllite/database" "strings" "time" ) var ( //IsDebug 是否调试 IsDebug = true recDir [MAXRECAREAS]RecDir ) // Records ... type Records struct { ID int `json:"id"` RecNo int `json:"recno" ` RecType int `json:"rectype" ` RecTime string `json:"rectime" ` Data []byte `json:"data" ` Ext string `json:"ext" ` Res string `json:"res" ` } // InitRecAreas 初始化记录存储区 func (rec Records) InitRecAreas() error { //初始化目录表 err := InitRecDir() if err != nil { log.Fatal(err.Error()) return err } //创建记录表 sqlTable := ` DROP TABLE IF EXISTS TB_NAME; CREATE TABLE IF NOT EXISTS TB_NAME ( id INTEGER PRIMARY KEY AUTOINCREMENT, recNo INTEGER NOT NULL, recType INTEGER NOT NULL, recTime INTEGER NOT NULL, data BLOB , ext TEXT , res TEXT ); ` for i := 0; i < MAXRECAREAS; i++ { tbName := fmt.Sprintf("tb_rec%02d", i+1) log.Println("begin create rec table " + tbName) sqls := strings.Replace(sqlTable, "TB_NAME", tbName, -1) if IsDebug { log.Println("sql:" + sqls) } _, err = db.SQLDB.Exec(sqls) if err != nil { log.Fatal(err.Error()) return err } log.Println("create rec table " + tbName + " ok!") } return err } // OpenRecAreas 打开记录存储区,每次开机,需要先打开一下 func (rec Records) OpenRecAreas() (err error) { //加载RecDir for i := 0; i < MAXRECAREAS; i++ { log.Printf("LoadDirs %02d \n", i+1) err = recDir[i].LoadDirs(i + 1) if err != nil { log.Println(err.Error()) return } log.Printf("LoadDirs %02d ok!\n", i+1) } //log.Println(recDir) return err } // SaveRec 保存记录 func (rec *Records) SaveRec(areaID int, buf []byte, recType int) (id int64, err error) { log.Printf("SaveRec,area=%02d \n", areaID) if (areaID <= 0) || (areaID > MAXRECAREAS) { err = fmt.Errorf("area id %02d is not right,mast between 1 and %02d", areaID, MAXRECAREAS) log.Println(err.Error()) return } rec.RecNo = recDir[areaID-1].RecNo t := time.Now() rec.RecTime = t.Format("20060102150405") rec.Data = buf rec.RecType = recType //记录是否存储满,判断 if (recDir[areaID-1].WriteID + 1) > (int64)(MAXRECCOUNTS) { if recDir[areaID-1].ReadID1 == 0 { err = fmt.Errorf("rec area %02d is full", areaID) log.Println(err.Error()) return } if (recDir[areaID-1].WriteID + 1 - int64(MAXRECCOUNTS)) == recDir[areaID-1].ReadID1 { err = fmt.Errorf("rec area %02d is full", areaID) log.Println(err.Error()) return } //保存记录 strSQL := fmt.Sprintf(`UPDATE tb_rec%02x SET recNo=%d, recType=%d,recTime=%s,data=?,ext="%s",res="%s" WHERE id = 1`, areaID, rec.RecNo+1, rec.RecType, rec.RecTime, rec.Ext, rec.Res) if IsDebug { log.Println(strSQL) } _, err = db.SQLDB.Exec(strSQL, rec.Data) if err != nil { log.Fatal(err.Error()) return } recDir[areaID-1].RecNo++ recDir[areaID-1].WriteID = 1 recDir[areaID-1].Flag = true id = 1 err = recDir[areaID-1].UpdateDirs(areaID) if err != nil { log.Fatal(err.Error()) return } log.Printf("SaveRec,area=%02d ok!\n", areaID) return id, err } if recDir[areaID-1].Flag { //记录是否满判断 if (recDir[areaID-1].WriteID + 1) == recDir[areaID-1].ReadID1 { err = fmt.Errorf("rec area %02d is full", areaID) log.Println(err.Error()) return } id = recDir[areaID-1].WriteID + 1 strSQL := fmt.Sprintf(`UPDATE tb_rec%02x SET recNo=%d, recType=%d,recTime=%s,data=?,ext="%s",res="%s" WHERE id = %d`, areaID, rec.RecNo+1, rec.RecType, rec.RecTime, rec.Ext, rec.Res, id) if IsDebug { log.Println(strSQL) } _, err = db.SQLDB.Exec(strSQL, rec.Data) if err != nil { log.Fatal(err.Error()) return } recDir[areaID-1].RecNo++ recDir[areaID-1].WriteID = id err = recDir[areaID-1].UpdateDirs(areaID) if err != nil { log.Fatal(err.Error()) return 0, err } log.Printf("SaveRec,area=%02d ok!\n", areaID) return id, err } strSQL := fmt.Sprintf(`INSERT INTO tb_rec%02x(recNo, recType,recTime,data,ext,res) VALUES (%d,%d,%s,?,"%s","%s")`, areaID, rec.RecNo+1, rec.RecType, rec.RecTime, rec.Ext, rec.Res) if IsDebug { log.Println(strSQL) } rs, err := db.SQLDB.Exec(strSQL, rec.Data) if err != nil { log.Fatal(err.Error()) return 0, err } id, err = rs.LastInsertId() if err != nil { log.Fatal(err.Error()) return 0, err } recDir[areaID-1].RecNo++ recDir[areaID-1].WriteID = id err = recDir[areaID-1].UpdateDirs(areaID) if err != nil { log.Fatal(err.Error()) return 0, err } log.Printf("SaveRec,area=%02d ok!\n", areaID) return id, err } // DeleteRec 删除记录(并不是真正删除表里记录,而是清除该记录的上传标记) // areaID:记录区 num:删除的数量 func (rec Records) DeleteRec(areaID int, num int64) (err error) { if (areaID <= 0) || (areaID > MAXRECAREAS) { err = errors.New("area id is not right") log.Fatal(err.Error()) return } id := recDir[areaID-1].ReadID1 //如果写的位置等于读的位置,说明记录已上传完,没有要删除的了 if recDir[areaID-1].WriteID == recDir[areaID-1].ReadID1 { return } //如果要删除的数量大于了最大的记录数 if (id + num) > MAXRECCOUNTS { if (id + num - MAXRECCOUNTS) > recDir[areaID-1].WriteID { recDir[areaID-1].ReadID1 = recDir[areaID-1].WriteID err = recDir[areaID-1].UpdateDirs(areaID) if err != nil { log.Fatal(err.Error()) return err } return } //更新读指针(读的位置) recDir[areaID-1].ReadID1 = id + num - MAXRECCOUNTS err = recDir[areaID-1].UpdateDirs(areaID) if err != nil { log.Fatal(err.Error()) return err } return } //如果当前写的位置大于读的位置 if recDir[areaID-1].WriteID > recDir[areaID-1].ReadID1 { if id+num > recDir[areaID-1].WriteID { //更新读指针(读的位置) recDir[areaID-1].ReadID1 = recDir[areaID-1].WriteID err = recDir[areaID-1].UpdateDirs(areaID) if err != nil { log.Fatal(err.Error()) return err } return } } //更新读指针(读的位置) recDir[areaID-1].ReadID1 = id + num err = recDir[areaID-1].UpdateDirs(areaID) if err != nil { log.Fatal(err.Error()) return err } return } //GetNoUploadNum 获取未上传记录数量 func (rec Records) GetNoUploadNum(areaID int) int { num := 0 if recDir[areaID-1].WriteID == recDir[areaID-1].ReadID1 { num = 0 return num } if recDir[areaID-1].Flag == false { num = int(recDir[areaID-1].WriteID - recDir[areaID-1].ReadID1) } else { if recDir[areaID-1].WriteID > recDir[areaID-1].ReadID1 { num = int(recDir[areaID-1].WriteID - recDir[areaID-1].ReadID1) } else { num = int(MAXRECCOUNTS - recDir[areaID-1].ReadID1 + recDir[areaID-1].WriteID) } } return num } // ReadRecByID 按数据库ID读取记录 func (rec Records) ReadRecByID(areaID int, id int) (p *Records, err error) { var rec1 Records if (areaID <= 0) || (areaID > MAXRECAREAS) { err = errors.New("area id is not right") log.Fatal(err.Error()) return } strSQL := fmt.Sprintf("SELECT * FROM tb_rec%02d WHERE id=%d", areaID, id) if IsDebug { log.Println(strSQL) } rows, err := db.SQLDB.Query(strSQL) if err != nil { log.Fatal(err.Error()) return nil, err } if rows.Next() { err = rows.Scan(&rec1.ID, &rec1.RecNo, &rec1.RecType, &rec1.RecTime, &rec1.Data, &rec1.Ext, &rec1.Res) if err != nil { log.Fatal(err.Error()) return nil, err } } else { log.Println("no records") return nil, err } rows.Close() return &rec1, nil } //ReadRecNotServer 读取未上传的记录数据,顺序读取第SN条未上传的记录 //sn取值 1-到-->未上传记录数目 func (rec Records) ReadRecNotServer(areaID int, sn int) (p *Records, err error) { if (areaID <= 0) || (areaID > MAXRECAREAS) { err = errors.New("area id is not right") log.Fatal(err.Error()) return } id := recDir[areaID-1].ReadID1 if (int(id) + sn) > MAXRECCOUNTS { if int(id)+sn-MAXRECCOUNTS > int(recDir[areaID-1].WriteID) { return nil, errors.New("no records") } p, err = rec.ReadRecByID(areaID, int(id)+sn-MAXRECCOUNTS) } else { if recDir[areaID-1].ReadID1 < recDir[areaID-1].WriteID { if (int(id) + sn) > int(recDir[areaID-1].WriteID) { return nil, errors.New("no records") } p, err = rec.ReadRecByID(areaID, int(recDir[areaID-1].ReadID1)+sn) } } return p, err } // ReadRecWriteNot 倒数读取第SN条写入的记录 //读取一条记录 倒数读取第SN条写入的记录 func (rec Records) ReadRecWriteNot(areaID int, sn int) (p *Records, err error) { id := int(recDir[areaID-1].WriteID) if (id - sn) < 0 { if recDir[areaID-1].Flag { p, err = rec.ReadRecByID(areaID, MAXRECCOUNTS-(sn-id-1)) } else { return nil, errors.New("no records") } } else { p, err = rec.ReadRecByID(areaID, (id - sn + 1)) } return } // GetLastRecNO 获取最后一条记录流水号 func (rec Records) GetLastRecNO(areaID int) int { if (areaID <= 0) || (areaID > MAXRECAREAS) { log.Println("area id is not right") return 0 } id := recDir[areaID-1].RecNo return id } // NewRecords ... func NewRecords(debug bool) *Records { IsDebug = debug records := new(Records) return records }