filebeat+kafka+logstash+elasticsearch+kibana实现日志收集解决方案

简介: filebeat+kafka+logstash+elasticsearch+kibana实现日志收集解决方案

一,环境


服务器环境:

192.168.2.1:elasticsearch192.168.2.2:filebeat+nginx192.168.2.3:kafka192.168.2.4:logstash

二,服务的安装

elasticseatch+filebeat+kafka+logsstash(6.60)清华源下载: https://mirrors.tuna.tsinghua.edu.cn/elasticstack/6.x/yum/6.6.0/

zookeeper官网下载: https://zookeeper.apache.org/releases.html

kafka官网下载: https://kafka.apache.org/downloads

一,配置elasticsearch

1.验证java环境(存在无需安装)
java -version#验证java环境
安装JDK1.8:yum -y install java-1.8.0-openjdk.x86_64
2.安装elasticsearch
rpm -ivh /mnt/elk-6.6/elasticsearch-6.6.0.rpm
3.修改配置文件
vi /etc/elasticsearch/elasticsearch.yml
修改一下内容:
node.name: node-1                       #群集中本机节点名
network.host: 192.168.2.1,127.0.0.1 #监听的ip地址
http.port: 9200         
4.开启elasticsearch
systemctl start elasticsearch
5.查看启动情况
[root@localhost ~]# netstat -anpt | grep java
tcp6       0      0192.168.2.1:9200        :::*                    LISTEN      12564/java
tcp6       0      0127.0.0.1:9200          :::*                    LISTEN      12564/java
tcp6       0      0192.168.2.1:9300        :::*                    LISTEN      12564/java
tcp6       0      0127.0.0.1:9300          :::*                    LISTEN      12564/java
tcp6       0      0192.168.2.1:9200        192.168.2.4:34428       ESTABLISHED 12564/java
tcp6       0      0192.168.2.1:9200        192.168.2.4:34436       ESTABLISHED 12564/java

二,配置filebeat+nginx

1.安装nginx
yum -y install nginx
2.安装filebeat
rpm -ivh /mnt/elk-6.6/filebeat-6.6.0-x86_64.rpm
3.修改filebeat配置文件
[root@localhost ~]# vi /etc/filebeat/filebeat.yml
添加一下内容:
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/nginx/access.log
output.kafka:
  enabled: true
  hosts: ["192.168.2.3:9092"]   #kafka的IP地址和端口
  topic: test1                  #kafka的topic
4.开启服务
1. systemctl start nginx
2. systemctl start filebeat

三,配置kafka环境

1.安装java环境(存在无需安装)
yum -y install java-1.8.0-openjdk.x86_64
2.安装zookeeper
tar xf /mnt/zookeeper-3.4.9.tar.gz -C /usr/local/
mv /usr/local/zookeeper-3.4.9/ /usr/local/zookeeper
cd /usr/local/zookeeper/conf
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
mkdir data logs
echo 1 > data/myid
/usr/local/zookeeper/bin/zkServer.sh start
/usr/local/zookeeper/bin/zkServer.sh status
1.修改zookeeper配置文件
vi zoo.cfg
添加一下内容:
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/logs
server.1=192.168.2.3:3188:3288
保存退出
mkdir data logs
echo1 > data/myid
2.启动zookeeper文件
1. /usr/local/zookeeper/bin/zkServer.sh start
2. /usr/local/zookeeper/bin/zkServer.sh status
3.安装kafka
1. tar xf /mnt/kafka_2.11-2.2.1.tgz -C /usr/local/
2. mv /usr/local/kafka_2.11-2.2.1/ /usr/local/kafka
4.配置kafka
cd /usr/local/kafka/config/
cp server.properties server.properties.bak
vi server.properties
修改一下内容:
broker.id=1
listeners=PLAINTEXT://192.168.2.3:9092
zookeeper.connect=192.168.2.3:2181
5.启动kafka
1. cd /usr/local/kafka/
2. ./bin/kafka-server-start.sh  ./config/server.properties   &
6.kafka创建topic
./bin/kafka-topics.sh --create--zookeeper localhost:2181 --replication-factor1--partitions1--topic test1   #创建名为test1的topic
./bin/kafka-topics.sh --list--zookeeper localhost:2181     #查看当前有哪些topic
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --from-beginning    #查看test1中有那些信息

四,配置logstash

1.安装java环境(存在可省略)
yum -y install java-1.8.0-openjdk.x86_64
2.安装logstash
rpm -ivh /mnt/elk-6.6/logstash-6.6.0.rpm
3.编写配置文件
vi /etc/logstash/conf.d/kafka.conf
input {
  kafka {
    bootstrap_servers => ["192.168.2.3:9092"]
    group_id => "es-test"
    topics => ["test1"]                         #与filebeat使用的topic一致
    codec => json
 }
}
output {
  kafka{
       codec => json {
            charset => "UTF-8"
        }
    topic_id => "test1"
    bootstrap_servers => "192.168.2.3:9092"
  }
  elasticsearch {
    hosts => "http://192.168.2.1:9200"
    index => "kafka‐%{+YYYY.MM.dd}"
 }
}
4.启动服务
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/redis.conf

五,配置kibana

1.安装kiana
rpm -ihv /mnt/elk-6.6/kibana-6.6.0-x86_64.rpm
2.配置kiana
vim /etc/kibana/kibana.yml
修改:
server.port: 5601
server.host: "192.168.2.5"
server.name: "db01"
elasticsearch.hosts: ["http://192.168.2.1:9200"]   #es服务器的ip,便于接收日志数据
3.开启kiana服务
systemctl start kibana

三,收集日志

一,kibana收集日志

  1. 添加日志信息

  1. 选择日志格式

3.查看日志信息

目录
相关文章
|
存储 运维 监控
金融场景 PB 级大规模日志平台:中信银行信用卡中心从 Elasticsearch 到 Apache Doris 的先进实践
中信银行信用卡中心每日新增日志数据 140 亿条(80TB),全量归档日志量超 40PB,早期基于 Elasticsearch 构建的日志云平台,面临存储成本高、实时写入性能差、文本检索慢以及日志分析能力不足等问题。因此使用 Apache Doris 替换 Elasticsearch,实现资源投入降低 50%、查询速度提升 2~4 倍,同时显著提高了运维效率。
858 3
金融场景 PB 级大规模日志平台:中信银行信用卡中心从 Elasticsearch 到 Apache Doris 的先进实践
|
存储 弹性计算 运维
海量日志接入 Elasticsearch Serverless 应用降本70%以上
本文将探讨在日志场景下,使用阿里云Elasticsearch Serverless相较于基于ECS自建Elasticsearch集群的成本与性能优势,展示如何通过Serverless架构实现高达 70%以上的成本节约。
711 0
|
存储 SQL 监控
|
自然语言处理 监控 数据可视化
|
存储 监控 安全
|
运维 监控 安全
|
存储 数据采集 监控
开源日志分析Elasticsearch
【10月更文挑战第22天】
250 5
|
存储 JSON 监控
开源日志分析Logstash
【10月更文挑战第22天】
356 1
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
553 1

热门文章

最新文章