需求
开发语言:golang
目的:并发10000个mqtt连接,循环发送publish信息,当时间戳小于某个值的时候,中止循环,退出连接
publish内容是json格式的,未设置时,有默认值,可以通过golang代码修改json内容
登录信息存取在csv文件中,csv文件有多少列,就并发多少个设备连接
话不多说,直接上代码
main.go
package main import ( "encoding/csv" "encoding/json" "fmt" "os" "strconv" "time" mqtt "github.com/eclipse/paho.mqtt.golang" ) var total int // 读取csv文件里的第3列数据,存入一个string数组里 func readcsv(filename string) []string { var userNameList []string f, _ := os.Open(filename) defer f.Close() w := csv.NewReader(f) data, err := w.ReadAll() if err != nil { fmt.Println(err) } for i := range data { userNameList = append(userNameList, data[i][2]) } return userNameList } func mqttDevice(username string, end_number chan int) { // mqtt设备连接,设置IP地址 opts := mqtt.NewClientOptions().AddBroker("localhost:1883") // 设置连接的用户名密码 opts.SetUsername(username) // 使用连接信息进行连接 client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } time.Sleep(1 * time.Second) fmt.Println("connect success:" + username) // 读取json文件,json文件里的是默认参数 fileReader, _ := os.Open("test.json") var eiopJsonMap map[string]interface{} json.NewDecoder(fileReader).Decode(&eiopJsonMap) // 设置一个开始时间戳和结束时间戳 startTime := 1598167852000 endTime := 1598167852000 // 循环发送遥测,每次遥测间隔时间戳为15分钟 for ; startTime < endTime; startTime = startTime + 900000 { // 定义一个ep的初始值为1,每循环一次就+1 var ep int ep++ eiopJsonMap["ts"] = startTime eiopJsonMap["values"].(map[string]interface{})["ep"] = ep // 把修改过的json内容从map转换为json格式 eiopJsonText, _ := json.Marshal(eiopJsonMap) fmt.Println(string(eiopJsonText)) // 发送遥测,发完之后休眠1秒 result := client.Publish("topic", 0, true, eiopJsonText) result.Wait() time.Sleep(1 * time.Second) } // 发送完信息之后,退出连接 fmt.Println("disconnect:" + username) client.Disconnect(250) total++ end_number <- total } func main() { userNameList := readcsv("connect_info.csv") endNumber := make(chan int, len(userNameList)) // 变量所有的username,通过go关键字并发多个设备 for _, userName := range userNameList { go mqttDevice(userName, endNumber) } // 当所有的设备都发送完毕后,关闭程序 for i := range endNumber { fmt.Println("已经有" + strconv.Itoa(i) + "个设备发送完毕") if i == len(userNameList) { return } } }
test.json
{ "ts": 1603088274000, "values": { "ep": 12 } }
connect_info.csv
localhost,1883,XecUwSmMGiYJp2BspMK2 localhost,1883,GqVqoPP2wblDjS2P9pQ9