二、测试方案:
1) 单机方案+多数据中心:
测试模块 测试所在环节 测试对象 依赖 测试方法 预期结果 备注
数据采集展现 Agent上报-正常 Cmdb-agent 无 制造正常模拟数据上报 上报到redis
Agent上报-异常 Cmdb-agent 无 制造异常情况,查看是否上报以及是否异常
Proxy-转发 Proxy Cmdb-agent 1.查看是否选择一致的redis
2. 查询是否正常
Redis写入 Redis Agent、proxy 1. 节点写入是否一致
2. 批量并发
命令下发 客户端异常
正常命令
危险命令
采集器升级
采集器卸载
KV 存储 扩容-自动发现
故障
2) 分布式集群方案:
三、More
1)异常告警
利用redis的订阅和发布来实现实时监控的一个DEMO(Python版本) - 坚固66 - 博客园 (cnblogs.com)
go实现的一个监控日志报警程序_嘿,逮到一只大码农-CSDN博客_go 监控程序
go实现进程监控并启动【附源码】_luoguo_51CTO博客
Python监控进程状态并实现告警_russellye_51CTO博客
用Python实现自动化监控远程服务器_代码帮-CSDN博客_python 监控服务器
Python自动化获取Linux服务状态与端口_博客网友陈浩的博客-CSDN博客
2)指标完善
3)限流熔断-(cmdb-agent)
4) 可配置
- 命令可配置
- 采集时长可配置
- 告警地址可配置
具体测试点
前置条件:
- 测试数据10万+(带redis-proxy节点信息)
- redis-cli客户端3个节点
- harbor镜像&对应网络权限
case 1-多节点上报
golang 模拟并发-数据库中随机选择数据进行上报
cpu监测
case 2-proxy转发
golang-编写脚本模拟proxy转发,校验数据是否写入redis正常
随机请求cmdb查询数据,检验是否正常
case3-正常命令下发
redis中写入是否正常
是否正确的agent节点
是否执行命令
case4-高危命令下发
redis中是否写入
是否命中正常的agent节点
是否拒绝命令
case5-正常命令下发,agent异常
是否进行自愈和自升级
是否有熔断节流
case6-模拟客户端升级(至少双节点)
升级后是否可以正常上报数据
agent版本是否有变更
case7-模拟客户端自卸载
卸载后是否数据清除(redis && cmdb)
case8-redis故障(单机)
是否进行了转移
是否有监控预警
测试数据生成脚本
from faker import Faker
import unittest
from faker.providers import BaseProvider
import faker_microservice
import pymysql
# 自定义项目名(项目名格式x-y)
class MyProvider(BaseProvider):
def app_name(self):
return f"{fake.pystr(6)}-{fake.pystr(5)}"
fake = Faker()
# then add new provider to faker instance
fake.add_provider(MyProvider)
fake.add_provider(faker_microservice.Provider)
class Create_data(object):
def make_sigle_data(self):
"""
cpu,app,pid,path,port,name(主机名),ip
:return:
"""
app_name = fake.microservice()
data_dict = {
"name": fake.pystr(),
"ip": fake.ipv4(),
"cpu": fake.random_int(min=1,max=32),
"app": app_name,
"pid": fake.random_int(max=1000),
"port": fake.random_int(min=1,max=65535),
"path": f"/web/apps/{app_name}",
}
return data_dict
def get_dataN(self, n):
data = [self.make_sigle_data() for i in range(n)]
return data
def convert_sql(self, table_name, data_dict=dict()):
field_columns = ",".join('`{}`'.format(k) for k in data_dict.keys())
val_columns = ",".join('%({})s'.format(k) for k in data_dict.keys())
coverted_sql = f"insert into {table_name}({field_columns}) values({val_columns})"
return coverted_sql
def write_batch(self, db_dict, n):
db = pymysql.connect(**db_dict)
cursor = db.cursor()
data_lst = self.get_dataN(n)
# 每次2000条写入:
if n<= 2000:
cursor.executemany(self.convert_sql(data_lst[0]), data_lst)
else:
tmp = n/2000
for i in range(0, tmp):
start = i*2000
end = (i+1)*2000 if (i+1)*2000<n else n
cursor.executemany(self.convert_sql(data_lst[0]), data_lst[start:end])
db.commit()
cursor.close()
db.close()
class Test_faker(unittest.TestCase):
def test_make(self):
self.assertTrue(Create_data().make_sigle_data())
if __name__ == '__main__':
# unittest.main
db_config = {"host": "127.0.0.1", "port": 3306, "user": "root", "password": "case123","database":"test"}
Create_data().write_batch(db_config, 10000)