基于Kafka的nginx日志收集分析与监控平台(3)

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 基于Kafka的nginx日志收集分析与监控平台(3)

基于Kafka的nginx日志收集分析与监控平台(2)+https://developer.aliyun.com/article/1557848

过半机制

在领导者选举的过程中,如果某台zkServer获得了超过半数的选票,则以zkServer就可以成为Lerder了。

过半机制的源码实现是通过下面操作:

public class QuorumMaj implements QuorumVerifier {
    private static final Logger LOG = LoggerFactory.getLogger(QuorumMaj.class);
    
    int half;   
    // n表示集群中zkServer的个数(准确的说是参与者的个数,参与者不包括观察者节点)
    public QuorumMaj(int n){
        this.half = n/2;
    }
    // 验证是否符合过半机制
    public boolean containsQuorum(Set<Long> set){
        // half是在构造方法里赋值的
        // set.size()表示某台zkServer获得的票数
        return (set.size() > half);
    }
}

核心代码部分为:

this.half = n/2;

return (set.size() > half);

就比如:如果现在集群中有5台zkServer,那么half=5/2=2,也就是说,领导者选举的过程中至少有三台zkServer投了同一个zkServer,才会符号过半机制,才会选举出一个Leader。

那为什么选举的过程中为什么一定要有一个过半机制验证呢?

因为这样不需要等待所有的zkServer都投了同一个zkServer就可以选举出来一个Leader了,这样比较快,所以叫快速领导者选举算法。

那么过半机制中为什么是大于,而不是大于等于?

这就跟脑裂问题有关系了,就比如回到上面出现脑裂问题的场景:

当机房中间的网络断掉之后,机房1台内的三台服务器会进行领导者选举,但是此时过半机制的条件是set.size()>3,也就是说至少要4台zkServer才能选举出来一个Leader,所以对于机房1来说它不能选出一个Leader,同样机房2也不能选出一个Leader,这种情况下整个集群当机房间的网络断掉之后,整个集群都将没有leader。

而如果过半机制的条件是set.size()>=3,那么机房1和机房2都会选出一个leader,这样就出现了脑裂。所以我们就知道了。为什么过半机制是大于,而不是大于等于。就是为了防止脑裂。

如果假设我们现在只有5台机器,也部署在两个机房:

此时过半机制的条件是set.size()>2,也就是至少要3台服务器才能选出一个leader,此时机房之间的网络断开了,对于机房1来说是没有影响的,Leader依然还是Leader,对于机房2来说是选不出来Leader的,此时整个集群中只有一个Leader。

所以,我们可以总结得出,有了过半机制,对于一个Zookeeper集群,那么没有Leader,要么只有一个leader,这样就避免了脑裂问题。

写的话是通过leader。读的话leader和follower都可以。

可以直接连接follower进行查询操作。

follower:查询、选举。选举是少数服从多数。

5.5.3 zookeeper与kakfa的联系

zookeeper是用来管理kafka的,用来保存kafka的元信息、topic、partition、副本信息,通过抢占的方式选举出kafka的controller,这个controller用来管理kakfa副本partition中的leader和follower的选举。

而zookeeper本身也是有leader和follower的,它的选举方式是采用一致性算法(zab),根据少数服从多数,票数过半的原则来选举leader。

所以在zk集群中,机器存活数必须过半,集群才能正常使用,所以我们通常也会将zk集群的节点数设为奇数个,这是为了方便选举。

跟filebeat给Kafka传递数据一样,Kafka连接任意一台zookeeper都可以操作,但是数据新增修改等事务操作必须在leader上运行。

客户端连接任意一台zk都可以操作:但是数据新增修改等事物操作必须在leader上运行。客户端如果连接到follower上进行事务操作,follower会返回给leader的IP,最终客户端还是在leader上操作,但是可以直接连接follower进行查询操作。

follower:查询、选举。选举是少数服从多数。

zookeeper中数据的同步,只要过半节点同步完成,就表示数据已经提交。(commit)

zookeeper不是强一致性的。它属于最终一致性。

zookeeper里面存放的数据很少,

每个节点存储的数据量默认不会超过1M。存储速度快。

zookeeper相当于一个文件树。

5.7 redis(扩展):数据库结构服务器

redis通常被称为数据库结构服务器,Redis不仅仅支持简单的key-value类型的数据,同时还提供list、set、zset、hash等数据结构的存储。

有丰富的数据类型—Redis支持二进制案例的Strings, Lists, Hashes, Sets 及 Ordered Sets 数据类型操作。

不同的类型执行不同的指令:list:lpush。

memcached vs redis:缓存

redis:常用于缓存里面。

redis是一种键值存储的数据库。

redis开启持久化的两种模式

AOF 全持久化模式 每一次操作都会同步到磁盘(同步的是每次操作日志到磁盘),安全,但是效率低。

RDB 半持久化模式 定时的将内存内容快照写入磁盘(将内容备份写入磁盘,定时写一次)

为什么用kafka做日志收集而不用redis?

1、kafka适用于做日志收集平台。

而redis大多数使用在做kv数据存储上。

2、redis也有一个queue的数据类型,用来做发布订阅系统的。redis只有单一的消费者,不能分成多个消费组。它里面的消息都只能消费一次。

kafka可以通过offset的设置来重复消费的。

3、redis数据是存储在内存里面的,kafka是存储在磁盘里面的。

redis的安装配置

1、先安装redis

yum  install epel-release -y
yum install redis -y

2、修改监听的ip:

vim /etc/redis.conf
bind 0.0.0.0

3、启动redis

[root@nginx-kafka01 ~]# systemctl start redis

默认端口:6379

查看端口:lsof -i:6379

或者:netstat -anplut|grep redis

redis数据库 0-15号库:

4、进入redis:

[root@nginx-kafka01 ~]# redis-cli
127.0.0.1:6379> select 0  选择0 1 2 3号库
OK
127.0.0.1:6379> select 1
OK
127.0.0.1:6379[1]> select 2
OK
127.0.0.1:6379[2]> select 3
OK
127.0.0.1:6379[3]> set a 123
OK
127.0.0.1:6379[3]> get a  设置key
"123"
 127.0.0.1:6379[3]> keys a*  获取以a开头的key
1) "a"
127.0.0.1:6379[3]> set abc 444 设置key
OK
127.0.0.1:6379[3]> set aaa 555
OK
127.0.0.1:6379[3]> set ddd 666
OK
127.0.0.1:6379[3]> keys a* 
1) "abc"
2) "a"
3) "aaa"

列表:使用lpush:

127.0.0.1:6379[3]> lpush sanchuang call
(integer) 1
127.0.0.1:6379[3]> lpush sanchuang 1
(integer) 2
127.0.0.1:6379[3]> lpush sanchuang nnn
(integer) 3
127.0.0.1:6379[3]> lpush sanchuang yyy
(integer) 4
127.0.0.1:6379[3]> lrange sanchuang 0 10
1) "yyy"
2) "nnn"
3) "1"
4) "call"
127.0.0.1:6379[3]> rpop  sanchuang   #移除并获取列表的第一个元素。
"call"
127.0.0.1:6379[3]> lrange sanchuang 0 10  #获取列表指定范围内的元素:获取列表0到10内的元素。
1) "yyy"
2) "nnn"
3) "1"

Python里面连接redis的模块库,python-pymysql- mysql python-pymysql -

5.8 python消费者程序

编写的python程序代码存放在gitee中:

python_consumer.py

5.9 mysql 数据库

数据存入mysql中的,需要先创建好mysql数据库:

1、需求分析

需要nginx日志的ip,时间,带宽字段

将ip字段解析成相应的省份、运营商

存入数据库的字段: id, 时间, 省份, 运营商, 带宽

带宽:通过nginx.log去获取。

通过淘宝接口:

使用pykafka模块。

#步骤

1、创建数据表

2、编写python脚本, 从kafka获取nginx日志

3、获取好的nginx日志,提取出ip,时间,带宽字段

4、提取出的ip字段通过淘宝的一个接口解析出省份和运营商。

url = “https://ip.taobao.com/outGetIpInfo?accessKey=alibaba-inc&ip=114.114.114.114

5、格式化时间字段 “2021-10-12 12:00:00”

6、存入数据库

#创建表
create table nginxlog (
id  int primary key auto_increment,
dt  datetime not null,
prov varchar(256) ,
isp  varchar(256),
bd  float
) CHARSET=utf8;
create table prov_index(
id  int primary key auto_increment,
prov_name  varchar(256)
) charset=utf8;
create table isp_index(
id int primary key auto_increment,
isp_name varchar(256)
) charset=utf8;
python程序:
import json
import requests
import time
import pymysql
taobao_url = "https://ip.taobao.com/outGetIpInfo?accessKey=alibaba-inc&ip="
db = pymysql.connect(host="192.168.2.152", user="sc", password="123456", port=3306, db="sanchuang", charset="utf8")
cursor = db.cursor()
# 查询ip地址的信息(省份和运营商isp),通过taobao网的接口
def resolv_ip(ip):
    response = requests.get(taobao_url + ip)
    if response.status_code == 200:
        tmp_dict = json.loads(response.text)
        prov = tmp_dict["data"]["region"]
        isp = tmp_dict["data"]["isp"]
        return prov, isp
    return None, None
# 将日志里读取的格式转换为我们指定的格式
def trans_time(dt):
    # 把字符串转成时间格式
    timeArray = time.strptime(dt, "%d/%b/%Y:%H:%M:%S")
    # timeStamp = int(time.mktime(timeArray))
    # 把时间格式转成字符串
    new_time = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
    return new_time
# 从kafka里获取数据,清洗为我们需要的ip,时间,带宽
from pykafka import KafkaClient
client = KafkaClient(hosts="192.168.2.152:9092,192.168.2.132:9092,192.168.2.137:9092")
topic = client.topics['nginxlog']
balanced_consumer = topic.get_balanced_consumer(
    consumer_group='testgroup',
    auto_commit_enable=True,
    zookeeper_connect='nginx-kafka01:2181,nginx-kafka02:2181,nginx-kafka03:2181'
)
# consumer = topic.get_simple_consumer()
for message in balanced_consumer:
    if message is not None:
        try:
            line = json.loads(message.value.decode("utf-8"))
            log = line["message"]
            tmp_lst = log.split()
            ip = tmp_lst[0]
            dt = tmp_lst[3].replace("[", "")
            bt = tmp_lst[9]
            dt = trans_time(dt)
            prov, isp = resolv_ip(ip)
            if prov and isp:
                print(ip, prov, isp, dt,bt)
                cursor.execute('insert into nginxlog(dt,prov,isp,bd) values("%s", "%s", "%s", "%s")' % (dt, prov, isp, bt))
                db.commit()
                print("保存成功")
        except ValueError as e:
            print("修改失败", e)
            db.rollback()
db.close()

6.0 celery:分布式任务处理模块

celery异步任务:要将监控的任务写入celery_tasks中的scheduled_task任务。指令下发

celery分布式任务处理模块:

任务处理:

1、获取监控项

2、获取数据库日志

3、对比数据库日志和监控项阈值,如果超出范围就发邮件告警。

1、安装celery:

Python里面连接redis的模块库,python-pymysql- mysql python-pymysql -

pip install  celery  
pip install  redis

2、生产者生产任务:celery_app:核心对象。执行的是定时任务。

#编辑celery 参照flask_log/celery_app

3、config.py:celery 的配置文件

from celery.schedules import crontab

4、配置消息中间件的地址 #启动worker的地址

BROKER_URL = "redis://192.168.77.152:6379/1"

5、启动celery时,导入任务,只要导入任务才能执行

CELERY_IMPORTS = {
      'celery_tasks'   #最开始时候是没有任务的,存放celery要执行的任务
}

6、时区

CELERY_TIMEZONE = "Asia/Shanghai"

7、设置了一个定时任务

CELERYBEAT_SCHEDULE = {
   'log-every-minute': {
     'task' : 'celery_tasks.scheduled_task',    #是celery_tasks中的scheduled_task任务,千万不能打错
     'schedule': crontab(minute='*/1')   
  }
}

8、celery_tasks.py:存放任务的文件

`from app import celery_app
@celery_app.task
 def scheduled_task(*args,**kwargs):  
    print("this is scheduled_task")

加入装饰器就是一个任务,不加装饰器就是一个普通对象。

9、app.py:核心对象,生成celery的核心对象;存放celery核心对象的文件

from celery import Celery

实例化celery对象,传入一个名字即可

celery_app = Celery('celery_app')

从config中获取东西

celery_app.config_from_object('config')

启动worker:启动消费者,消费任务。

[root@nginx-kafka01 celery_task]# celery -A app.celery_app worker --loglevel=INFO -n node1
py` to avoid these warnings and to allow a smoother upgrade to Celery 6.0.
[2022-07-25 16:45:51,567: INFO/MainProcess] Connected to redis://192.168.2.152:6379/1
[2022-07-25 16:45:51,574: INFO/MainProcess] mingle: searching for neighbors
[2022-07-25 16:45:52,610: INFO/MainProcess] mingle: all alone
[2022-07-25 16:45:52,645: INFO/MainProcess] celery@nodel ready.

生成的任务每分钟会打印出来。

先启动beat:

root@nginx-kafka01 celery_task]# celery -A app.celery_app beat --loglevel=INFO
celery beat v5.1.2 (sun-harmonics) is starting.
__    -    ... __   -        _
LocalTime -> 2022-07-25 17:08:53

worker和beat都要执行核心对象文件,将不同的work就有不同的消费者执行任务,消费不过来的时候可以再多加几个worker。

写监控接口:增删改查

10、celery程序,api接口授权。

访问接口实现对数据库的增删改查。

数据库:id ,prov ,lsp、bd、action

接口:/monitor ---- restful的接口规范

状态:100000 —成功。

到这里项目的详解就结束了,如有理解错误的地方,请大家进行批评指正。



相关文章
|
1月前
|
监控 应用服务中间件 定位技术
要统计Nginx的客户端IP,可以通过分析Nginx的访问日志文件来实现
要统计Nginx的客户端IP,可以通过分析Nginx的访问日志文件来实现
114 3
|
3月前
|
自然语言处理 应用服务中间件 程序员
Nginx UI:全新的 Nginx 在线管理平台
Nginx UI:全新的 Nginx 在线管理平台
138 1
|
3月前
|
存储 监控 固态存储
如何监控和优化 WAL 日志文件的存储空间使用?
如何监控和优化 WAL 日志文件的存储空间使用?
|
3月前
|
监控 网络协议 CDN
阿里云国际监控查询流量、用量查询流量与日志统计流量有差异?
阿里云国际监控查询流量、用量查询流量与日志统计流量有差异?
|
5月前
|
应用服务中间件 nginx
nginx error日志 client intended to send too large body: 1434541 bytes 如何处理?
【8月更文挑战第27天】nginx error日志 client intended to send too large body: 1434541 bytes 如何处理?
442 6
|
5月前
|
应用服务中间件 Linux nginx
在Linux中,如何统计ip访问情况?分析 nginx 访问日志?如何找出访问页面数量在前十位的ip?
在Linux中,如何统计ip访问情况?分析 nginx 访问日志?如何找出访问页面数量在前十位的ip?
|
4月前
|
运维 Kubernetes 监控
Loki+Promtail+Grafana监控K8s日志
综上,Loki+Promtail+Grafana 监控组合对于在 K8s 环境中优化日志管理至关重要,它不仅提供了强大且易于扩展的日志收集与汇总工具,还有可视化这些日志的能力。通过有效地使用这套工具,可以显著地提高对应用的运维监控能力和故障诊断效率。
460 0
|
5月前
|
SQL 数据库 Java
Hibernate 日志记录竟藏着这些秘密?快来一探究竟,解锁调试与监控最佳实践
【8月更文挑战第31天】在软件开发中,日志记录对调试和监控至关重要。使用持久化框架 Hibernate 时,合理配置日志可帮助理解其内部机制并优化性能。首先,需选择合适的日志框架,如 Log4j 或 Logback,并配置日志级别;理解 Hibernate 的多级日志,如 DEBUG 和 ERROR,以适应不同开发阶段需求;利用 Hibernate 统计功能监测数据库交互情况;记录自定义日志以跟踪业务逻辑;定期审查和清理日志避免占用过多磁盘空间。综上,有效日志记录能显著提升 Hibernate 应用的性能和稳定性。
59 0
|
5月前
|
开发者 前端开发 编解码
Vaadin解锁移动适配新境界:一招制胜,让你的应用征服所有屏幕!
【8月更文挑战第31天】在移动互联网时代,跨平台应用开发备受青睐。作为一款基于Java的Web应用框架,Vaadin凭借其组件化设计和强大的服务器端渲染能力,助力开发者轻松构建多设备适应的Web应用。本文探讨Vaadin与移动设备的适配策略,包括响应式布局、CSS媒体查询、TouchKit插件及服务器端优化,帮助开发者打造美观且实用的移动端体验。通过这些工具和策略的应用,可有效应对屏幕尺寸、分辨率及操作系统的多样性挑战,满足广大移动用户的使用需求。
76 0
|
5月前
|
存储 运维 监控
Entity Framework Core 实现审计日志记录超棒!多种方法助你跟踪数据变化、监控操作,超实用!
【8月更文挑战第31天】在软件开发中,审计日志记录对于跟踪数据变化、监控用户操作及故障排查至关重要。Entity Framework Core (EF Core) 作为强大的对象关系映射框架,提供了多种实现审计日志记录的方法。例如,可以使用 EF Core 的拦截器在数据库操作前后执行自定义逻辑,记录操作类型、时间和执行用户等信息。此外,也可通过在实体类中添加审计属性(如 `CreatedBy`、`CreatedDate` 等),并在保存实体时更新这些属性来记录审计信息。这两种方法都能有效帮助我们追踪数据变更并满足合规性和安全性需求。
142 0