Golang使用协程进行mqtt的publish信息性能测试

简介: 开发语言:golang目的:并发10000个mqtt连接,循环发送publish信息,当时间戳小于某个值的时候,中止循环,退出连接publish内容是json格式的,未设置时,有默认值,可以通过golang代码修改json内容登录信息存取在csv文件中,csv文件有多少列,就并发多少个设备连接

需求

开发语言:golang

目的:并发10000个mqtt连接,循环发送publish信息,当时间戳小于某个值的时候,中止循环,退出连接

publish内容是json格式的,未设置时,有默认值,可以通过golang代码修改json内容

登录信息存取在csv文件中,csv文件有多少列,就并发多少个设备连接

话不多说,直接上代码

main.go

package main
import (
  "encoding/csv"
  "encoding/json"
  "fmt"
  "os"
  "strconv"
  "time"
  mqtt "github.com/eclipse/paho.mqtt.golang"
)
var total int
// 读取csv文件里的第3列数据,存入一个string数组里
func readcsv(filename string) []string {
  var userNameList []string
  f, _ := os.Open(filename)
  defer f.Close()
  w := csv.NewReader(f)
  data, err := w.ReadAll()
  if err != nil {
    fmt.Println(err)
  }
  for i := range data {
    userNameList = append(userNameList, data[i][2])
  }
  return userNameList
}
func mqttDevice(username string, end_number chan int) {
  // mqtt设备连接,设置IP地址
  opts := mqtt.NewClientOptions().AddBroker("localhost:1883")
  // 设置连接的用户名密码
  opts.SetUsername(username)
  // 使用连接信息进行连接
  client := mqtt.NewClient(opts)
  if token := client.Connect(); token.Wait() && token.Error() != nil {
    panic(token.Error())
  }
  time.Sleep(1 * time.Second)
  fmt.Println("connect success:" + username)
  // 读取json文件,json文件里的是默认参数
  fileReader, _ := os.Open("test.json")
  var eiopJsonMap map[string]interface{}
  json.NewDecoder(fileReader).Decode(&eiopJsonMap)
  // 设置一个开始时间戳和结束时间戳
  startTime := 1598167852000
  endTime := 1598167852000
  // 循环发送遥测,每次遥测间隔时间戳为15分钟
  for ; startTime < endTime; startTime = startTime + 900000 {
    // 定义一个ep的初始值为1,每循环一次就+1
    var ep int
    ep++
    eiopJsonMap["ts"] = startTime
    eiopJsonMap["values"].(map[string]interface{})["ep"] = ep
    // 把修改过的json内容从map转换为json格式
    eiopJsonText, _ := json.Marshal(eiopJsonMap)
    fmt.Println(string(eiopJsonText))
    // 发送遥测,发完之后休眠1秒
    result := client.Publish("topic", 0, true, eiopJsonText)
    result.Wait()
    time.Sleep(1 * time.Second)
  }
  // 发送完信息之后,退出连接
  fmt.Println("disconnect:" + username)
  client.Disconnect(250)
  total++
  end_number <- total
}
func main() {
  userNameList := readcsv("connect_info.csv")
  endNumber := make(chan int, len(userNameList))
  // 变量所有的username,通过go关键字并发多个设备
  for _, userName := range userNameList {
    go mqttDevice(userName, endNumber)
  }
  // 当所有的设备都发送完毕后,关闭程序
  for i := range endNumber {
    fmt.Println("已经有" + strconv.Itoa(i) + "个设备发送完毕")
    if i == len(userNameList) {
      return
    }
  }
}

test.json

{
  "ts": 1603088274000,
  "values": {
    "ep": 12
  }
}

connect_info.csv

localhost,1883,XecUwSmMGiYJp2BspMK2
localhost,1883,GqVqoPP2wblDjS2P9pQ9
相关实践学习
5分钟轻松打造应对流量洪峰的稳定商城交易系统
本实验通过SAE极速部署一个微服务电商商城,同时结合RocketMQ异步解耦、削峰填谷的能力,带大家体验面对流量洪峰仍旧稳定可靠的商城交易系统!
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
打赏
0
0
0
0
706
分享
相关文章
golang开源的可嵌入应用程序高性能的MQTT服务
golang开源的可嵌入应用程序高性能的MQTT服务
552 3
使用Go语言(Golang)可以实现MQTT协议的点对点(P2P)消息发送。MQTT协议本身支持多种消息收发模式
使用Go语言(Golang)可以实现MQTT协议的点对点(P2P)消息发送。MQTT协议本身支持多种消息收发模式【1月更文挑战第21天】【1月更文挑战第104篇】
769 1
【专栏】编写网络设备割接方案的七个步骤,包括明确割接目标、收集信息、制定计划、设计流程、风险评估、准备测试环境和编写文档。
【4月更文挑战第28天】本文介绍了编写网络设备割接方案的七个步骤,包括明确割接目标、收集信息、制定计划、设计流程、风险评估、准备测试环境和编写文档。通过实际案例分析,展示了如何成功完成割接,确保业务连续性和稳定性。遵循这些步骤,可提高割接成功率,为公司的网络性能和安全提供保障。
1174 0
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
157 1
Python 渗透测试:黑客内外网信息收集.(帮助 得到信息攻击计算机内外网.)
Python 渗透测试:黑客内外网信息收集.(帮助 得到信息攻击计算机内外网.)
127 0
消息队列 MQ产品使用合集之在测试环境中拥有大量的topic会有什么影响
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
203 1

推荐镜像

更多
登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问