基于Kafka的nginx日志收集分析与监控平台(2)+https://developer.aliyun.com/article/1557848
过半机制
在领导者选举的过程中,如果某台zkServer获得了超过半数的选票,则以zkServer就可以成为Lerder了。
过半机制的源码实现是通过下面操作:
public class QuorumMaj implements QuorumVerifier { private static final Logger LOG = LoggerFactory.getLogger(QuorumMaj.class); int half; // n表示集群中zkServer的个数(准确的说是参与者的个数,参与者不包括观察者节点) public QuorumMaj(int n){ this.half = n/2; } // 验证是否符合过半机制 public boolean containsQuorum(Set<Long> set){ // half是在构造方法里赋值的 // set.size()表示某台zkServer获得的票数 return (set.size() > half); } }
核心代码部分为:
this.half = n/2;
return (set.size() > half);
就比如:如果现在集群中有5台zkServer,那么half=5/2=2,也就是说,领导者选举的过程中至少有三台zkServer投了同一个zkServer,才会符号过半机制,才会选举出一个Leader。
那为什么选举的过程中为什么一定要有一个过半机制验证呢?
因为这样不需要等待所有的zkServer都投了同一个zkServer就可以选举出来一个Leader了,这样比较快,所以叫快速领导者选举算法。
那么过半机制中为什么是大于,而不是大于等于?
这就跟脑裂问题有关系了,就比如回到上面出现脑裂问题的场景:
当机房中间的网络断掉之后,机房1台内的三台服务器会进行领导者选举,但是此时过半机制的条件是set.size()>3,也就是说至少要4台zkServer才能选举出来一个Leader,所以对于机房1来说它不能选出一个Leader,同样机房2也不能选出一个Leader,这种情况下整个集群当机房间的网络断掉之后,整个集群都将没有leader。
而如果过半机制的条件是set.size()>=3,那么机房1和机房2都会选出一个leader,这样就出现了脑裂。所以我们就知道了。为什么过半机制是大于,而不是大于等于。就是为了防止脑裂。
如果假设我们现在只有5台机器,也部署在两个机房:
此时过半机制的条件是set.size()>2,也就是至少要3台服务器才能选出一个leader,此时机房之间的网络断开了,对于机房1来说是没有影响的,Leader依然还是Leader,对于机房2来说是选不出来Leader的,此时整个集群中只有一个Leader。
所以,我们可以总结得出,有了过半机制,对于一个Zookeeper集群,那么没有Leader,要么只有一个leader,这样就避免了脑裂问题。
写的话是通过leader。读的话leader和follower都可以。
可以直接连接follower进行查询操作。
follower:查询、选举。选举是少数服从多数。
5.5.3 zookeeper与kakfa的联系
zookeeper是用来管理kafka的,用来保存kafka的元信息、topic、partition、副本信息,通过抢占的方式选举出kafka的controller,这个controller用来管理kakfa副本partition中的leader和follower的选举。
而zookeeper本身也是有leader和follower的,它的选举方式是采用一致性算法(zab),根据少数服从多数,票数过半的原则来选举leader。
所以在zk集群中,机器存活数必须过半,集群才能正常使用,所以我们通常也会将zk集群的节点数设为奇数个,这是为了方便选举。
跟filebeat给Kafka传递数据一样,Kafka连接任意一台zookeeper都可以操作,但是数据新增修改等事务操作必须在leader上运行。
客户端连接任意一台zk都可以操作:但是数据新增修改等事物操作必须在leader上运行。客户端如果连接到follower上进行事务操作,follower会返回给leader的IP,最终客户端还是在leader上操作,但是可以直接连接follower进行查询操作。
follower:查询、选举。选举是少数服从多数。
zookeeper中数据的同步,只要过半节点同步完成,就表示数据已经提交。(commit)
zookeeper不是强一致性的。它属于最终一致性。
zookeeper里面存放的数据很少,
每个节点存储的数据量默认不会超过1M。存储速度快。
zookeeper相当于一个文件树。
5.7 redis(扩展):数据库结构服务器
redis通常被称为数据库结构服务器,Redis不仅仅支持简单的key-value类型的数据,同时还提供list、set、zset、hash等数据结构的存储。
有丰富的数据类型—Redis支持二进制案例的Strings, Lists, Hashes, Sets 及 Ordered Sets 数据类型操作。
不同的类型执行不同的指令:list:lpush。
memcached vs redis:缓存
redis:常用于缓存里面。
redis是一种键值存储的数据库。
redis开启持久化的两种模式
AOF 全持久化模式 每一次操作都会同步到磁盘(同步的是每次操作日志到磁盘),安全,但是效率低。
RDB 半持久化模式 定时的将内存内容快照写入磁盘(将内容备份写入磁盘,定时写一次)
为什么用kafka做日志收集而不用redis?
1、kafka适用于做日志收集平台。
而redis大多数使用在做kv数据存储上。
2、redis也有一个queue的数据类型,用来做发布订阅系统的。redis只有单一的消费者,不能分成多个消费组。它里面的消息都只能消费一次。
kafka可以通过offset的设置来重复消费的。
3、redis数据是存储在内存里面的,kafka是存储在磁盘里面的。
redis的安装配置
1、先安装redis
yum install epel-release -y yum install redis -y
2、修改监听的ip:
vim /etc/redis.conf bind 0.0.0.0
3、启动redis
[root@nginx-kafka01 ~]# systemctl start redis
默认端口:6379
查看端口:lsof -i:6379
或者:netstat -anplut|grep redis
redis数据库 0-15号库:
4、进入redis:
[root@nginx-kafka01 ~]# redis-cli 127.0.0.1:6379> select 0 选择0 1 2 3号库 OK 127.0.0.1:6379> select 1 OK 127.0.0.1:6379[1]> select 2 OK 127.0.0.1:6379[2]> select 3 OK 127.0.0.1:6379[3]> set a 123 OK 127.0.0.1:6379[3]> get a 设置key "123" 127.0.0.1:6379[3]> keys a* 获取以a开头的key 1) "a" 127.0.0.1:6379[3]> set abc 444 设置key OK 127.0.0.1:6379[3]> set aaa 555 OK 127.0.0.1:6379[3]> set ddd 666 OK 127.0.0.1:6379[3]> keys a* 1) "abc" 2) "a" 3) "aaa"
列表:使用lpush:
127.0.0.1:6379[3]> lpush sanchuang call (integer) 1 127.0.0.1:6379[3]> lpush sanchuang 1 (integer) 2 127.0.0.1:6379[3]> lpush sanchuang nnn (integer) 3 127.0.0.1:6379[3]> lpush sanchuang yyy (integer) 4 127.0.0.1:6379[3]> lrange sanchuang 0 10 1) "yyy" 2) "nnn" 3) "1" 4) "call" 127.0.0.1:6379[3]> rpop sanchuang #移除并获取列表的第一个元素。 "call" 127.0.0.1:6379[3]> lrange sanchuang 0 10 #获取列表指定范围内的元素:获取列表0到10内的元素。 1) "yyy" 2) "nnn" 3) "1"
Python里面连接redis的模块库,python-pymysql- mysql python-pymysql -
5.8 python消费者程序
编写的python程序代码存放在gitee中:
5.9 mysql 数据库
数据存入mysql中的,需要先创建好mysql数据库:
1、需求分析
需要nginx日志的ip,时间,带宽字段
将ip字段解析成相应的省份、运营商
存入数据库的字段: id, 时间, 省份, 运营商, 带宽
带宽:通过nginx.log去获取。
通过淘宝接口:
使用pykafka模块。
#步骤
1、创建数据表
2、编写python脚本, 从kafka获取nginx日志
3、获取好的nginx日志,提取出ip,时间,带宽字段
4、提取出的ip字段通过淘宝的一个接口解析出省份和运营商。
url = “https://ip.taobao.com/outGetIpInfo?accessKey=alibaba-inc&ip=114.114.114.114”
5、格式化时间字段 “2021-10-12 12:00:00”
6、存入数据库
#创建表 create table nginxlog ( id int primary key auto_increment, dt datetime not null, prov varchar(256) , isp varchar(256), bd float ) CHARSET=utf8; create table prov_index( id int primary key auto_increment, prov_name varchar(256) ) charset=utf8; create table isp_index( id int primary key auto_increment, isp_name varchar(256) ) charset=utf8;
python程序: import json import requests import time import pymysql taobao_url = "https://ip.taobao.com/outGetIpInfo?accessKey=alibaba-inc&ip=" db = pymysql.connect(host="192.168.2.152", user="sc", password="123456", port=3306, db="sanchuang", charset="utf8") cursor = db.cursor() # 查询ip地址的信息(省份和运营商isp),通过taobao网的接口 def resolv_ip(ip): response = requests.get(taobao_url + ip) if response.status_code == 200: tmp_dict = json.loads(response.text) prov = tmp_dict["data"]["region"] isp = tmp_dict["data"]["isp"] return prov, isp return None, None # 将日志里读取的格式转换为我们指定的格式 def trans_time(dt): # 把字符串转成时间格式 timeArray = time.strptime(dt, "%d/%b/%Y:%H:%M:%S") # timeStamp = int(time.mktime(timeArray)) # 把时间格式转成字符串 new_time = time.strftime("%Y-%m-%d %H:%M:%S", timeArray) return new_time # 从kafka里获取数据,清洗为我们需要的ip,时间,带宽 from pykafka import KafkaClient client = KafkaClient(hosts="192.168.2.152:9092,192.168.2.132:9092,192.168.2.137:9092") topic = client.topics['nginxlog'] balanced_consumer = topic.get_balanced_consumer( consumer_group='testgroup', auto_commit_enable=True, zookeeper_connect='nginx-kafka01:2181,nginx-kafka02:2181,nginx-kafka03:2181' ) # consumer = topic.get_simple_consumer() for message in balanced_consumer: if message is not None: try: line = json.loads(message.value.decode("utf-8")) log = line["message"] tmp_lst = log.split() ip = tmp_lst[0] dt = tmp_lst[3].replace("[", "") bt = tmp_lst[9] dt = trans_time(dt) prov, isp = resolv_ip(ip) if prov and isp: print(ip, prov, isp, dt,bt) cursor.execute('insert into nginxlog(dt,prov,isp,bd) values("%s", "%s", "%s", "%s")' % (dt, prov, isp, bt)) db.commit() print("保存成功") except ValueError as e: print("修改失败", e) db.rollback() db.close()
6.0 celery:分布式任务处理模块
celery异步任务:要将监控的任务写入celery_tasks中的scheduled_task任务。指令下发
celery分布式任务处理模块:
任务处理:
1、获取监控项
2、获取数据库日志
3、对比数据库日志和监控项阈值,如果超出范围就发邮件告警。
1、安装celery:
Python里面连接redis的模块库,python-pymysql- mysql python-pymysql -
pip install celery pip install redis
2、生产者生产任务:celery_app:核心对象。执行的是定时任务。
#编辑celery 参照flask_log/celery_app
3、config.py:celery 的配置文件
from celery.schedules import crontab
4、配置消息中间件的地址 #启动worker的地址
BROKER_URL = "redis://192.168.77.152:6379/1"
5、启动celery时,导入任务,只要导入任务才能执行
CELERY_IMPORTS = { 'celery_tasks' #最开始时候是没有任务的,存放celery要执行的任务 }
6、时区
CELERY_TIMEZONE = "Asia/Shanghai"
7、设置了一个定时任务
CELERYBEAT_SCHEDULE = { 'log-every-minute': { 'task' : 'celery_tasks.scheduled_task', #是celery_tasks中的scheduled_task任务,千万不能打错 'schedule': crontab(minute='*/1') } }
8、celery_tasks.py:存放任务的文件
`from app import celery_app
@celery_app.task def scheduled_task(*args,**kwargs): print("this is scheduled_task")
加入装饰器就是一个任务,不加装饰器就是一个普通对象。
9、app.py:核心对象,生成celery的核心对象;存放celery核心对象的文件
from celery import Celery
实例化celery对象,传入一个名字即可
celery_app = Celery('celery_app')
从config中获取东西
celery_app.config_from_object('config')
启动worker:启动消费者,消费任务。
[root@nginx-kafka01 celery_task]# celery -A app.celery_app worker --loglevel=INFO -n node1 py` to avoid these warnings and to allow a smoother upgrade to Celery 6.0. [2022-07-25 16:45:51,567: INFO/MainProcess] Connected to redis://192.168.2.152:6379/1 [2022-07-25 16:45:51,574: INFO/MainProcess] mingle: searching for neighbors [2022-07-25 16:45:52,610: INFO/MainProcess] mingle: all alone [2022-07-25 16:45:52,645: INFO/MainProcess] celery@nodel ready.
生成的任务每分钟会打印出来。
先启动beat:
root@nginx-kafka01 celery_task]# celery -A app.celery_app beat --loglevel=INFO celery beat v5.1.2 (sun-harmonics) is starting. __ - ... __ - _ LocalTime -> 2022-07-25 17:08:53
worker和beat都要执行核心对象文件,将不同的work就有不同的消费者执行任务,消费不过来的时候可以再多加几个worker。
写监控接口:增删改查
10、celery程序,api接口授权。
访问接口实现对数据库的增删改查。
数据库:id ,prov ,lsp、bd、action
接口:/monitor ---- restful的接口规范
状态:100000 —成功。
到这里项目的详解就结束了,如有理解错误的地方,请大家进行批评指正。