Golang使用协程进行mqtt的publish信息性能测试

本文涉及的产品
性能测试 PTS,5000VUM额度
简介: 开发语言:golang目的:并发10000个mqtt连接,循环发送publish信息,当时间戳小于某个值的时候,中止循环,退出连接publish内容是json格式的,未设置时,有默认值,可以通过golang代码修改json内容登录信息存取在csv文件中,csv文件有多少列,就并发多少个设备连接

需求

开发语言:golang

目的:并发10000个mqtt连接,循环发送publish信息,当时间戳小于某个值的时候,中止循环,退出连接

publish内容是json格式的,未设置时,有默认值,可以通过golang代码修改json内容

登录信息存取在csv文件中,csv文件有多少列,就并发多少个设备连接

话不多说,直接上代码

main.go

package main
import (
  "encoding/csv"
  "encoding/json"
  "fmt"
  "os"
  "strconv"
  "time"
  mqtt "github.com/eclipse/paho.mqtt.golang"
)
var total int
// 读取csv文件里的第3列数据,存入一个string数组里
func readcsv(filename string) []string {
  var userNameList []string
  f, _ := os.Open(filename)
  defer f.Close()
  w := csv.NewReader(f)
  data, err := w.ReadAll()
  if err != nil {
    fmt.Println(err)
  }
  for i := range data {
    userNameList = append(userNameList, data[i][2])
  }
  return userNameList
}
func mqttDevice(username string, end_number chan int) {
  // mqtt设备连接,设置IP地址
  opts := mqtt.NewClientOptions().AddBroker("localhost:1883")
  // 设置连接的用户名密码
  opts.SetUsername(username)
  // 使用连接信息进行连接
  client := mqtt.NewClient(opts)
  if token := client.Connect(); token.Wait() && token.Error() != nil {
    panic(token.Error())
  }
  time.Sleep(1 * time.Second)
  fmt.Println("connect success:" + username)
  // 读取json文件,json文件里的是默认参数
  fileReader, _ := os.Open("test.json")
  var eiopJsonMap map[string]interface{}
  json.NewDecoder(fileReader).Decode(&eiopJsonMap)
  // 设置一个开始时间戳和结束时间戳
  startTime := 1598167852000
  endTime := 1598167852000
  // 循环发送遥测,每次遥测间隔时间戳为15分钟
  for ; startTime < endTime; startTime = startTime + 900000 {
    // 定义一个ep的初始值为1,每循环一次就+1
    var ep int
    ep++
    eiopJsonMap["ts"] = startTime
    eiopJsonMap["values"].(map[string]interface{})["ep"] = ep
    // 把修改过的json内容从map转换为json格式
    eiopJsonText, _ := json.Marshal(eiopJsonMap)
    fmt.Println(string(eiopJsonText))
    // 发送遥测,发完之后休眠1秒
    result := client.Publish("topic", 0, true, eiopJsonText)
    result.Wait()
    time.Sleep(1 * time.Second)
  }
  // 发送完信息之后,退出连接
  fmt.Println("disconnect:" + username)
  client.Disconnect(250)
  total++
  end_number <- total
}
func main() {
  userNameList := readcsv("connect_info.csv")
  endNumber := make(chan int, len(userNameList))
  // 变量所有的username,通过go关键字并发多个设备
  for _, userName := range userNameList {
    go mqttDevice(userName, endNumber)
  }
  // 当所有的设备都发送完毕后,关闭程序
  for i := range endNumber {
    fmt.Println("已经有" + strconv.Itoa(i) + "个设备发送完毕")
    if i == len(userNameList) {
      return
    }
  }
}

test.json

{
  "ts": 1603088274000,
  "values": {
    "ep": 12
  }
}

connect_info.csv

localhost,1883,XecUwSmMGiYJp2BspMK2
localhost,1883,GqVqoPP2wblDjS2P9pQ9
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
安全 Go
Golang语言goroutine协程并发安全及锁机制
这篇文章是关于Go语言中多协程操作同一数据问题、互斥锁Mutex和读写互斥锁RWMutex的详细介绍及使用案例,涵盖了如何使用这些同步原语来解决并发访问共享资源时的数据安全问题。
86 4
|
3月前
|
Go 调度 开发者
[go 面试] 深入理解进程、线程和协程的概念及区别
[go 面试] 深入理解进程、线程和协程的概念及区别
|
12天前
|
存储 安全 测试技术
GoLang协程Goroutiney原理与GMP模型详解
本文详细介绍了Go语言中的Goroutine及其背后的GMP模型。Goroutine是Go语言中的一种轻量级线程,由Go运行时管理,支持高效的并发编程。文章讲解了Goroutine的创建、调度、上下文切换和栈管理等核心机制,并通过示例代码展示了如何使用Goroutine。GMP模型(Goroutine、Processor、Machine)是Go运行时调度Goroutine的基础,通过合理的调度策略,实现了高并发和高性能的程序执行。
74 29
|
10天前
|
Go 计算机视觉
在Golang高并发环境中如何进行协程同步?
在此示例中,使用互斥锁来保护对共享计数器变量 c 的访问,确保并发的 HTTP 请求不会产生数据竞争。
29 3
|
10天前
|
负载均衡 算法 Go
GoLang协程Goroutiney原理与GMP模型详解
【11月更文挑战第4天】Goroutine 是 Go 语言中的轻量级线程,由 Go 运行时管理,创建和销毁开销小,适合高并发场景。其调度采用非抢占式和协作式多任务处理结合的方式。GMP 模型包括 G(Goroutine)、M(系统线程)和 P(逻辑处理器),通过工作窃取算法实现负载均衡,确保高效利用系统资源。
|
4月前
|
Java Go 调度
GO 协程
GO 协程
36 0
|
26天前
|
安全 Go 调度
探索Go语言的并发模式:协程与通道的协同作用
Go语言以其并发能力闻名于世,而协程(goroutine)和通道(channel)是实现并发的两大利器。本文将深入了解Go语言中协程的轻量级特性,探讨如何利用通道进行协程间的安全通信,并通过实际案例演示如何将这两者结合起来,构建高效且可靠的并发系统。
|
1月前
|
存储 数据采集 分布式计算
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
44 1
|
2月前
|
Go 调度
Golang语言goroutine协程篇
这篇文章是关于Go语言goroutine协程的详细教程,涵盖了并发编程的常见术语、goroutine的创建和调度、使用sync.WaitGroup控制协程退出以及如何通过GOMAXPROCS设置程序并发时占用的CPU逻辑核心数。
50 4
Golang语言goroutine协程篇
|
6月前
|
NoSQL 关系型数据库 MySQL
涉及rocketMQ,jemeter等性能测试服务器的安装记录
涉及rocketMQ,jemeter等性能测试服务器的安装记录
71 1