安装go-elasticsearch,gorm
go get -u github.com/jinzhu/gorm go get github.com/elastic/go-elasticsearch/v7
我的是v7版本,所以elasticsearch使用v7,如果是v8则改成v8
go-elasticsearch和gorm操作
func EsClient() *elasticsearch.Client { cfg := elasticsearch.Config{ Addresses: \[\]string{ "http://127.0.0.1:9200", }, } es, err := elasticsearch.NewClient(cfg) if err != nil { log.Fatalf("Error creating the client: %s", err) } // Get cluster info // var r map\[string\]interface{} res, err := es.Info() if err != nil { log.Fatalf("Error getting response: %s", err) } // Check response status if res.IsError() { log.Fatalf("Error: %s", res.String()) } // Deserialize the response into a map. if err := json.NewDecoder(res.Body).Decode(&r); err != nil { log.Fatalf("Error parsing the response body: %s", err) } // Print client and server version numbers. log.Printf("Client: %s", elasticsearch.Version) log.Printf("Server: %s", r\["version"\].(map\[string\]interface{})\["number"\]) log.Println(strings.Repeat("~", 37)) return es } func Db() *gorm.DB { db, err := gorm.Open("mysql", "robot:bK8D6pAx82iTSWrK@(admin.easyswoole.cn:3306)/robot?charset=utf8mb4&parseTime=True&loc=Local") if err != nil { panic(err) } return db }
elasticsearch导入
在7.0之后的版本,一个index只允许一个type,所以不需要额外定义type
func AddEsData(es *elasticsearch.Client, info LogInfo) { id := info.Id // Build the request body. data, err := json.Marshal(info) if err != nil { log.Fatalf("Error marshaling document: %s", err) } // Set up the request object. req := esapi.IndexRequest{ Index: "test", DocumentID: strconv.Itoa(id), Body: bytes.NewReader(data), Refresh: "true", } // Perform the request with the client. res, err := req.Do(context.Background(), es) if err != nil { log.Fatalf("Error getting response: %s", err) } defer res.Body.Close() if res.IsError() { fmt.Println(res) log.Printf("\[%s\] Error indexing document ID=%d", res.Status(), info.Id) } else { // Deserialize the response into a map. var r map\[string\]interface{} if err := json.NewDecoder(res.Body).Decode(&r); err != nil { log.Printf("Error parsing the response body: %s", err) } else { // Print the response status and indexed document version. //log.Printf("\[%s\] %s; version=%d", res.Status(), r\["result"\], int(r\["_version"\].(float64))) } } }
总的代码:
package main import ( "bytes" "context" "encoding/json" "fmt" "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esapi" "github.com/jinzhu/gorm" _ "github.com/jinzhu/gorm/dialects/mysql" "log" "strconv" "strings" "sync" ) type LogInfo struct { Id int \`gorm:"primary_key"\` Type_name string \`gorm:""\` Type string \`gorm:""\` Sub_type string \`gorm:""\` Sub\_type\_name string \`gorm:""\` Time string \`gorm:""\` Login_qq string \`gorm:""\` Send_qq string \`gorm:""\` Group string \`gorm:""\` Content string \`gorm:""\` Font_id string \`gorm:""\` File string \`gorm:""\` Being\_operate\_qq string \`gorm:""\` Add_time string \`gorm:""\` } func EsClient() *elasticsearch.Client { cfg := elasticsearch.Config{ Addresses: \[\]string{ "http://127.0.0.1:9200", }, } es, err := elasticsearch.NewClient(cfg) if err != nil { log.Fatalf("Error creating the client: %s", err) } // Get cluster info // var r map\[string\]interface{} res, err := es.Info() if err != nil { log.Fatalf("Error getting response: %s", err) } // Check response status if res.IsError() { log.Fatalf("Error: %s", res.String()) } // Deserialize the response into a map. if err := json.NewDecoder(res.Body).Decode(&r); err != nil { log.Fatalf("Error parsing the response body: %s", err) } // Print client and server version numbers. log.Printf("Client: %s", elasticsearch.Version) log.Printf("Server: %s", r\["version"\].(map\[string\]interface{})\["number"\]) log.Println(strings.Repeat("~", 37)) return es } func Db() *gorm.DB { db, err := gorm.Open("mysql", "robot:bK8D6pAx82iTSWrK@(admin.easyswoole.cn:3306)/robot?charset=utf8mb4&parseTime=True&loc=Local") if err != nil { panic(err) } return db } func main() { es := EsClient() res, err := es.Info() if err != nil { log.Fatalf("Error getting response: %s", err) } defer res.Body.Close() db := Db() defer db.Close() db.AutoMigrate(&LogInfo{}) var list = &\[\]LogInfo{} lastId := 30308 start: db.Table("log").Limit(100).Order("id ASC").Where("id > ?", lastId).Find(list) var wg sync.WaitGroup for _, logInfo := range *list { wg.Add(1) go func(info LogInfo) { defer wg.Done() AddEsData(es, info) }(logInfo) lastId = logInfo.Id } wg.Wait() goto start //fmt.Println(list) //log.Println(res) } func AddEsData(es *elasticsearch.Client, info LogInfo) { id := info.Id // Build the request body. data, err := json.Marshal(info) if err != nil { log.Fatalf("Error marshaling document: %s", err) } // Set up the request object. req := esapi.IndexRequest{ Index: "test", DocumentID: strconv.Itoa(id), Body: bytes.NewReader(data), Refresh: "true", } // Perform the request with the client. res, err := req.Do(context.Background(), es) if err != nil { log.Fatalf("Error getting response: %s", err) } defer res.Body.Close() if res.IsError() { fmt.Println(res) log.Printf("\[%s\] Error indexing document ID=%d", res.Status(), info.Id) } else { // Deserialize the response into a map. var r map\[string\]interface{} if err := json.NewDecoder(res.Body).Decode(&r); err != nil { log.Printf("Error parsing the response body: %s", err) } else { // Print the response status and indexed document version. //log.Printf("\[%s\] %s; version=%d", res.Status(), r\["result"\], int(r\["_version"\].(float64))) } } }