基于Kafka的nginx日志收集分析与监控平台(3)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
云原生内存数据库 Tair,内存型 2GB
简介: 基于Kafka的nginx日志收集分析与监控平台(3)

基于Kafka的nginx日志收集分析与监控平台(2)+https://developer.aliyun.com/article/1557848

过半机制

在领导者选举的过程中,如果某台zkServer获得了超过半数的选票,则以zkServer就可以成为Lerder了。

过半机制的源码实现是通过下面操作:

public class QuorumMaj implements QuorumVerifier {
    private static final Logger LOG = LoggerFactory.getLogger(QuorumMaj.class);
    
    int half;   
    // n表示集群中zkServer的个数(准确的说是参与者的个数,参与者不包括观察者节点)
    public QuorumMaj(int n){
        this.half = n/2;
    }
    // 验证是否符合过半机制
    public boolean containsQuorum(Set<Long> set){
        // half是在构造方法里赋值的
        // set.size()表示某台zkServer获得的票数
        return (set.size() > half);
    }
}

核心代码部分为:

this.half = n/2;

return (set.size() > half);

就比如:如果现在集群中有5台zkServer,那么half=5/2=2,也就是说,领导者选举的过程中至少有三台zkServer投了同一个zkServer,才会符号过半机制,才会选举出一个Leader。

那为什么选举的过程中为什么一定要有一个过半机制验证呢?

因为这样不需要等待所有的zkServer都投了同一个zkServer就可以选举出来一个Leader了,这样比较快,所以叫快速领导者选举算法。

那么过半机制中为什么是大于,而不是大于等于?

这就跟脑裂问题有关系了,就比如回到上面出现脑裂问题的场景:

当机房中间的网络断掉之后,机房1台内的三台服务器会进行领导者选举,但是此时过半机制的条件是set.size()>3,也就是说至少要4台zkServer才能选举出来一个Leader,所以对于机房1来说它不能选出一个Leader,同样机房2也不能选出一个Leader,这种情况下整个集群当机房间的网络断掉之后,整个集群都将没有leader。

而如果过半机制的条件是set.size()>=3,那么机房1和机房2都会选出一个leader,这样就出现了脑裂。所以我们就知道了。为什么过半机制是大于,而不是大于等于。就是为了防止脑裂。

如果假设我们现在只有5台机器,也部署在两个机房:

此时过半机制的条件是set.size()>2,也就是至少要3台服务器才能选出一个leader,此时机房之间的网络断开了,对于机房1来说是没有影响的,Leader依然还是Leader,对于机房2来说是选不出来Leader的,此时整个集群中只有一个Leader。

所以,我们可以总结得出,有了过半机制,对于一个Zookeeper集群,那么没有Leader,要么只有一个leader,这样就避免了脑裂问题。

写的话是通过leader。读的话leader和follower都可以。

可以直接连接follower进行查询操作。

follower:查询、选举。选举是少数服从多数。

5.5.3 zookeeper与kakfa的联系

zookeeper是用来管理kafka的,用来保存kafka的元信息、topic、partition、副本信息,通过抢占的方式选举出kafka的controller,这个controller用来管理kakfa副本partition中的leader和follower的选举。

而zookeeper本身也是有leader和follower的,它的选举方式是采用一致性算法(zab),根据少数服从多数,票数过半的原则来选举leader。

所以在zk集群中,机器存活数必须过半,集群才能正常使用,所以我们通常也会将zk集群的节点数设为奇数个,这是为了方便选举。

跟filebeat给Kafka传递数据一样,Kafka连接任意一台zookeeper都可以操作,但是数据新增修改等事务操作必须在leader上运行。

客户端连接任意一台zk都可以操作:但是数据新增修改等事物操作必须在leader上运行。客户端如果连接到follower上进行事务操作,follower会返回给leader的IP,最终客户端还是在leader上操作,但是可以直接连接follower进行查询操作。

follower:查询、选举。选举是少数服从多数。

zookeeper中数据的同步,只要过半节点同步完成,就表示数据已经提交。(commit)

zookeeper不是强一致性的。它属于最终一致性。

zookeeper里面存放的数据很少,

每个节点存储的数据量默认不会超过1M。存储速度快。

zookeeper相当于一个文件树。

5.7 redis(扩展):数据库结构服务器

redis通常被称为数据库结构服务器,Redis不仅仅支持简单的key-value类型的数据,同时还提供list、set、zset、hash等数据结构的存储。

有丰富的数据类型—Redis支持二进制案例的Strings, Lists, Hashes, Sets 及 Ordered Sets 数据类型操作。

不同的类型执行不同的指令:list:lpush。

memcached vs redis:缓存

redis:常用于缓存里面。

redis是一种键值存储的数据库。

redis开启持久化的两种模式

AOF 全持久化模式 每一次操作都会同步到磁盘(同步的是每次操作日志到磁盘),安全,但是效率低。

RDB 半持久化模式 定时的将内存内容快照写入磁盘(将内容备份写入磁盘,定时写一次)

为什么用kafka做日志收集而不用redis?

1、kafka适用于做日志收集平台。

而redis大多数使用在做kv数据存储上。

2、redis也有一个queue的数据类型,用来做发布订阅系统的。redis只有单一的消费者,不能分成多个消费组。它里面的消息都只能消费一次。

kafka可以通过offset的设置来重复消费的。

3、redis数据是存储在内存里面的,kafka是存储在磁盘里面的。

redis的安装配置

1、先安装redis

yum  install epel-release -y
yum install redis -y

2、修改监听的ip:

vim /etc/redis.conf
bind 0.0.0.0

3、启动redis

[root@nginx-kafka01 ~]# systemctl start redis

默认端口:6379

查看端口:lsof -i:6379

或者:netstat -anplut|grep redis

redis数据库 0-15号库:

4、进入redis:

[root@nginx-kafka01 ~]# redis-cli
127.0.0.1:6379> select 0  选择0 1 2 3号库
OK
127.0.0.1:6379> select 1
OK
127.0.0.1:6379[1]> select 2
OK
127.0.0.1:6379[2]> select 3
OK
127.0.0.1:6379[3]> set a 123
OK
127.0.0.1:6379[3]> get a  设置key
"123"
 127.0.0.1:6379[3]> keys a*  获取以a开头的key
1) "a"
127.0.0.1:6379[3]> set abc 444 设置key
OK
127.0.0.1:6379[3]> set aaa 555
OK
127.0.0.1:6379[3]> set ddd 666
OK
127.0.0.1:6379[3]> keys a* 
1) "abc"
2) "a"
3) "aaa"

列表:使用lpush:

127.0.0.1:6379[3]> lpush sanchuang call
(integer) 1
127.0.0.1:6379[3]> lpush sanchuang 1
(integer) 2
127.0.0.1:6379[3]> lpush sanchuang nnn
(integer) 3
127.0.0.1:6379[3]> lpush sanchuang yyy
(integer) 4
127.0.0.1:6379[3]> lrange sanchuang 0 10
1) "yyy"
2) "nnn"
3) "1"
4) "call"
127.0.0.1:6379[3]> rpop  sanchuang   #移除并获取列表的第一个元素。
"call"
127.0.0.1:6379[3]> lrange sanchuang 0 10  #获取列表指定范围内的元素:获取列表0到10内的元素。
1) "yyy"
2) "nnn"
3) "1"

Python里面连接redis的模块库,python-pymysql- mysql python-pymysql -

5.8 python消费者程序

编写的python程序代码存放在gitee中:

python_consumer.py

5.9 mysql 数据库

数据存入mysql中的,需要先创建好mysql数据库:

1、需求分析

需要nginx日志的ip,时间,带宽字段

将ip字段解析成相应的省份、运营商

存入数据库的字段: id, 时间, 省份, 运营商, 带宽

带宽:通过nginx.log去获取。

通过淘宝接口:

使用pykafka模块。

#步骤

1、创建数据表

2、编写python脚本, 从kafka获取nginx日志

3、获取好的nginx日志,提取出ip,时间,带宽字段

4、提取出的ip字段通过淘宝的一个接口解析出省份和运营商。

url = “https://ip.taobao.com/outGetIpInfo?accessKey=alibaba-inc&ip=114.114.114.114

5、格式化时间字段 “2021-10-12 12:00:00”

6、存入数据库

#创建表
create table nginxlog (
id  int primary key auto_increment,
dt  datetime not null,
prov varchar(256) ,
isp  varchar(256),
bd  float
) CHARSET=utf8;
create table prov_index(
id  int primary key auto_increment,
prov_name  varchar(256)
) charset=utf8;
create table isp_index(
id int primary key auto_increment,
isp_name varchar(256)
) charset=utf8;
python程序:
import json
import requests
import time
import pymysql
taobao_url = "https://ip.taobao.com/outGetIpInfo?accessKey=alibaba-inc&ip="
db = pymysql.connect(host="192.168.2.152", user="sc", password="123456", port=3306, db="sanchuang", charset="utf8")
cursor = db.cursor()
# 查询ip地址的信息(省份和运营商isp),通过taobao网的接口
def resolv_ip(ip):
    response = requests.get(taobao_url + ip)
    if response.status_code == 200:
        tmp_dict = json.loads(response.text)
        prov = tmp_dict["data"]["region"]
        isp = tmp_dict["data"]["isp"]
        return prov, isp
    return None, None
# 将日志里读取的格式转换为我们指定的格式
def trans_time(dt):
    # 把字符串转成时间格式
    timeArray = time.strptime(dt, "%d/%b/%Y:%H:%M:%S")
    # timeStamp = int(time.mktime(timeArray))
    # 把时间格式转成字符串
    new_time = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
    return new_time
# 从kafka里获取数据,清洗为我们需要的ip,时间,带宽
from pykafka import KafkaClient
client = KafkaClient(hosts="192.168.2.152:9092,192.168.2.132:9092,192.168.2.137:9092")
topic = client.topics['nginxlog']
balanced_consumer = topic.get_balanced_consumer(
    consumer_group='testgroup',
    auto_commit_enable=True,
    zookeeper_connect='nginx-kafka01:2181,nginx-kafka02:2181,nginx-kafka03:2181'
)
# consumer = topic.get_simple_consumer()
for message in balanced_consumer:
    if message is not None:
        try:
            line = json.loads(message.value.decode("utf-8"))
            log = line["message"]
            tmp_lst = log.split()
            ip = tmp_lst[0]
            dt = tmp_lst[3].replace("[", "")
            bt = tmp_lst[9]
            dt = trans_time(dt)
            prov, isp = resolv_ip(ip)
            if prov and isp:
                print(ip, prov, isp, dt,bt)
                cursor.execute('insert into nginxlog(dt,prov,isp,bd) values("%s", "%s", "%s", "%s")' % (dt, prov, isp, bt))
                db.commit()
                print("保存成功")
        except ValueError as e:
            print("修改失败", e)
            db.rollback()
db.close()

6.0 celery:分布式任务处理模块

celery异步任务:要将监控的任务写入celery_tasks中的scheduled_task任务。指令下发

celery分布式任务处理模块:

任务处理:

1、获取监控项

2、获取数据库日志

3、对比数据库日志和监控项阈值,如果超出范围就发邮件告警。

1、安装celery:

Python里面连接redis的模块库,python-pymysql- mysql python-pymysql -

pip install  celery  
pip install  redis

2、生产者生产任务:celery_app:核心对象。执行的是定时任务。

#编辑celery 参照flask_log/celery_app

3、config.py:celery 的配置文件

from celery.schedules import crontab

4、配置消息中间件的地址 #启动worker的地址

BROKER_URL = "redis://192.168.77.152:6379/1"

5、启动celery时,导入任务,只要导入任务才能执行

CELERY_IMPORTS = {
      'celery_tasks'   #最开始时候是没有任务的,存放celery要执行的任务
}

6、时区

CELERY_TIMEZONE = "Asia/Shanghai"

7、设置了一个定时任务

CELERYBEAT_SCHEDULE = {
   'log-every-minute': {
     'task' : 'celery_tasks.scheduled_task',    #是celery_tasks中的scheduled_task任务,千万不能打错
     'schedule': crontab(minute='*/1')   
  }
}

8、celery_tasks.py:存放任务的文件

`from app import celery_app
@celery_app.task
 def scheduled_task(*args,**kwargs):  
    print("this is scheduled_task")

加入装饰器就是一个任务,不加装饰器就是一个普通对象。

9、app.py:核心对象,生成celery的核心对象;存放celery核心对象的文件

from celery import Celery

实例化celery对象,传入一个名字即可

celery_app = Celery('celery_app')

从config中获取东西

celery_app.config_from_object('config')

启动worker:启动消费者,消费任务。

[root@nginx-kafka01 celery_task]# celery -A app.celery_app worker --loglevel=INFO -n node1
py` to avoid these warnings and to allow a smoother upgrade to Celery 6.0.
[2022-07-25 16:45:51,567: INFO/MainProcess] Connected to redis://192.168.2.152:6379/1
[2022-07-25 16:45:51,574: INFO/MainProcess] mingle: searching for neighbors
[2022-07-25 16:45:52,610: INFO/MainProcess] mingle: all alone
[2022-07-25 16:45:52,645: INFO/MainProcess] celery@nodel ready.

生成的任务每分钟会打印出来。

先启动beat:

root@nginx-kafka01 celery_task]# celery -A app.celery_app beat --loglevel=INFO
celery beat v5.1.2 (sun-harmonics) is starting.
__    -    ... __   -        _
LocalTime -> 2022-07-25 17:08:53

worker和beat都要执行核心对象文件,将不同的work就有不同的消费者执行任务,消费不过来的时候可以再多加几个worker。

写监控接口:增删改查

10、celery程序,api接口授权。

访问接口实现对数据库的增删改查。

数据库:id ,prov ,lsp、bd、action

接口:/monitor ---- restful的接口规范

状态:100000 —成功。

到这里项目的详解就结束了,如有理解错误的地方,请大家进行批评指正。



相关文章
|
2天前
|
消息中间件 监控 Kafka
基于Kafka的nginx日志收集分析与监控平台(2)
基于Kafka的nginx日志收集分析与监控平台(2)
|
19天前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之通过flink同步kafka数据进到doris,decimal数值类型的在kafka是正常显示数值,但是同步到doris表之后数据就变成了整数,该如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
19天前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之 从Kafka读取数据,并与两个仅在任务启动时读取一次的维度表进行内连接(inner join)时,如果没有匹配到的数据会被直接丢弃还是会被存储在内存中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
18天前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
778 0
|
2月前
|
消息中间件 关系型数据库 Kafka
实时计算 Flink版产品使用合集之想要加快消费 Kafka 数据的速度,该怎么配置参数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
9天前
|
Java
使用kafka-clients操作数据(java)
使用kafka-clients操作数据(java)
14 6
|
5天前
|
消息中间件 Java Kafka
kafka 磁盘扩容与数据均衡操作代码
Kafka 的磁盘扩容和数据均衡是与保证Kafka集群可用性和性能相关的两个重要方面。在 Kafka 中,分区数据的存储和平衡对集群的运行至关重要。以下是有关Kafka磁盘扩容和数据均衡的一些建议
12 1
|
19天前
|
消息中间件 SQL Kafka
实时计算 Flink版产品使用问题之如何实现OSS数据到Kafka的实时同步
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
消息中间件 数据采集 分布式计算
【数据采集与预处理】数据接入工具Kafka
【数据采集与预处理】数据接入工具Kafka
46 1
【数据采集与预处理】数据接入工具Kafka
|
25天前
|
消息中间件 DataWorks 安全
DataWorks产品使用合集之如何处理Kafka数据
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
DataWorks产品使用合集之如何处理Kafka数据