日志采集/分析

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
注册配置 MSE Nacos/ZooKeeper,118元/月
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: 日志采集/分析

目录

EFK

这是一个日志收集系统,日志收集属于可观测性体系

可观测性体系

  • 监控
  • 基础设施的维度
  • USE方法
  • CPU:
  • 利用率
  • 负载
  • 内存:
  • 利用率
  • 饱和度
  • 错误率
  • 网卡:
  • 利用率
  • 饱和度
  • 错误率
  • 应用程序的维度
  • RED方法
  • 日志
  • 操作系统维度
  • 应用维度
  • 通过日志的错误进一步完善监控
  • 通过日志排查问题
  • 行为分析
  • 链路追踪

1. 日志系统

  1. ELK
  • ElasticSearch :日志存储系统
  • LogStash:日志采集器
  • Kibana:日志查询分析系统

ELK现在用的少,原因是

  1. jruby(java+ruby)
  2. 语法复杂:重量级日志采集
  3. 性能差
  1. EFK
  • ElasticSearch
  • Fluneted:日志采集器
  • Kibana
  1. PLG
  • Promtail :日志采集器
  • Loki:日志存储系统
  • Grafana:日志查询分析系统

我们这里部署的架构是

ilogtail ---> kafka ---> logstash ---> elasticsearch ---> kibana

使用ilogtail采集日志写入到kafka消息队列里,再由logstash从消息队列里读取日志写入到 es,最后再由kibana做展示

至于第三个环节为什么是logstash而不是ilogtail是因为,ilogtail要往es里面写日志会需要配置es的认证密码,但我们是没有给es配置用户名和密码的,所以采用logstash

2. 部署ElasticSearch

2.1 创建handless服务

[root@master EFK]# vim es-svc.yaml 
kind: Service
apiVersion: v1
metadata:
name: elasticsearch
namespace: logging
labels:
app: elasticsearch
spec:
selector:
app: elasticsearch
clusterIP: None
ports:
- port: 9200
name: rest
- port: 9300
name: inter-node

2.2 创建sts

[root@master EFK]# vim es-sts.yaml 
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: es
namespace: logging
spec:
serviceName: elasticsearch
replicas: 1
selector:
matchLabels:
app: elasticsearch
template:
metadata:
labels:
app: elasticsearch
spec:
initContainers:
- name: initc1
image: busybox
command: ["sysctl","-w","vm.max_map_count=262144"]
securityContext:
privileged: true
- name: initc2
image: busybox
command: ["sh","-c","ulimit -n 65536"]
securityContext:
privileged: true
- name: initc3
image: busybox
command: ["sh","-c","chmod 777 /data"]
volumeMounts:
- name: data
mountPath: /data
containers:
- name: elasticsearch
image: swr.cn-east-3.myhuaweicloud.com/hcie_openeuler/elasticsearch:7.17.1
resources:
limits:
cpu: 1000m
requests:
cpu: 100m
ports:
- containerPort: 9200
name: rest
protocol: TCP
- containerPort: 9300
name: inter-node
protocol: TCP
volumeMounts:
- name: data
mountPath: /usr/share/elasticsearch/data
env:
- name: cluster.name
value: k8s-logs
- name: node.name
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: cluster.initial_master_nodes
value: "es-0"
- name: discovery.zen.minimum_master_nodes
value: "2"
- name: discovery.seed_hosts
value: "elasticsearch"
- name: ES_JAVA_OPTS
value: "-Xms512m -Xmx512m"
- name: network.host
value: "0.0.0.0"
volumeClaimTemplates:
- metadata:
name: data
labels:
app: elasticsearch
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 10Gi

应用yaml文件

[root@master EFK]# kubectl create ns logging
[root@master EFK]# kubectl apply -f .
service/elasticsearch create
statefulset.apps/es create
[root@master EFK]# kubectl get pods -n logging 
NAME   READY   STATUS    RESTARTS   AGE
es-0   1/1     Running   0          46s

pod显示running就是部署好了

3. 部署kibana

我直接将所有需要的资源放在一个yaml文件里面

apiVersion: v1
kind: ConfigMap
metadata:
namespace: logging
name: kibana-config
labels:
app: kibana
data:
kibana.yml: |
    server.name: kibana
    server.host: "0.0.0.0"
    i18n.locale: zh-CN
    elasticsearch:
      hosts: ${ELASTICSEARCH_HOSTS}
 ---
apiVersion: v1
kind: Service
metadata:
name: kibana
namespace: logging
labels:
app: kibana
spec:
ports:
- port: 5601
type: NodePort
selector:
app: kibana
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kibana
namespace: logging
labels:
app: kibana
spec:
selector:
matchLabels:
app: kibana
template:
metadata:
labels:
app: kibana
spec:
containers:
- name: kibana
image: swr.cn-east-3.myhuaweicloud.com/hcie_openeuler/kibana:7.17.1
imagePullPolicy: IfNotPresent
resources:
limits:
cpu: 1
requests:
cpu: 1
env:
- name: ELASTICSEARCH_URL
value: http://elasticsearch:9200    # 写handless的名字
- name: ELASTICSEARCH_HOSTS
value: http://elasticsearch:9200    # 写handless的名字
ports:
- containerPort: 5601
volumeMounts:
- name: config
mountPath: /usr/share/kibana/config/kibana.yml
readOnly: true
subPath: kibana.yml
volumes:
- name: config
configMap:
name: kibana-config

查看端口并访问

[root@master EFK]# kubectl get svc
NAME            TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)             AGE
elasticsearch   ClusterIP   None            <none>        9200/TCP,9300/TCP   17m
kibana          NodePort    10.104.94.122   <none>        5601:30980/TCP      4m30s

kibana的nodeport端口是30980,我们来访问

这样就算部署好了,接下来需要部署日志采集工具

4. 部署ilogtail(docker-compose)

因为Fluentd配置复杂,所以我这里采用ilogtail来采集日志

  • ilogtail配置简单
  • 阿里开源,界面中文

我们先使用docker-compose的方式部署,最后整个平台搭建起来之后我们再将ilogtail部署到k8s集群里

4.1 编写docker-compose

[root@master ilogtail]# vim docker-compose.yaml
version: "3"
services:
ilogtail:
container_name: ilogtail
image: sls-opensource-registry.cn-shanghai.cr.aliyuncs.com/ilogtail-community-edition/ilogtail:2.0.4
network_mode: host
volumes:
- /:/logtail_host:ro
- /var/run:/var/run
- ./checkpoing:/usr/local/ilogtail/checkpoint
- ./config:/usr/local/ilogtail/config/local
  • /:我们将宿主机的整个 / ,目录挂载到容器里面,方便容器读取日志
  • checkpoint:这个相当于一个指针,指向当前读取到哪一行日志了,如果ilogtail被重启了它可以根据这个checkpoint来回到上一次读取的地方
  • config:这个就是放采集的配置文件的

启动容器

[root@master ilogtail]# docker-compose up -d
[root@master ilogtail]# docker ps |grep ilogtail
eac545d4da87        sls-opensource-registry.cn-shanghai.cr.aliyuncs.com/ilogtail-community-edition/ilogtail:2.0.4   "/usr/local/ilogtail…"   10 seconds ago      Up 9 seconds                            ilogtail

这样容器就启动了

4.2 配置ilogtail采集

[root@master ilogtail]# cd config/
[root@master config]# vim sample-stdout.yaml
enable: true
inputs:
  - Type: input_file          # 文件输入类型
    FilePaths: 
      - /logtail_host/var/log/messages
flushers:
  - Type: flusher_stdout    # 标准输出流输出类型
    OnlyStdout: true
[root@master config]# docker restart ilogtail
  • /logtail_host/var/log/messages:这里是这个地址的原因是我们将宿主机的/,挂载到了容器内的logtail_host,所以我们宿主机产生的日志会在容器的/logtail_host/var/log/messages这个目录下
  • 配置文件写好之后我们还需要重启容器让他读取配置,所以有一个restart

4.3 查看容器采集的日志

[root@master config]# docker logs ilogtail
2024-06-30 11:16:25 {"content":"Jun 30 19:16:22 master dockerd[1467]: time=\"2024-06-30T19:16:22.251108165+08:00\" level=info msg=\"handled exit event processID=9a8df40981b3609897794e50aeb2bde805eab8a75334266d7b5c2899f61d486e containerID=61770e8f88e3c6a63e88f2a09d2683c6ccce1e13f6d4a5b6f79cc4d49094bab4 pid=125402\" module=libcontainerd namespace=moby","__time__":"1719746182"}
2024-06-30 11:16:25 {"content":"Jun 30 19:16:23 master kubelet[1468]: E0630 19:16:23.594557    1468 kubelet_volumes.go:245] \"There were many similar errors. Turn up verbosity to see them.\" err=\"orphaned pod \\\"9d5ae64f-1341-4c15-b70f-1c8f71efc20e\\\" found, but error not a directory occurred when trying to remove the volumes dir\" numErrs=2","__time__":"1719746184"}

可以看到,宿主机的日志已经被成功采集了,宿主机的日志会被封装到content里,如果没有看到输出的日志的话,需要进入到容器内部查看一个叫做ilogtail.LOG的文件,而不能使用docker logs ilogtail

4.4 采集容器标准输出日志(可选)

[root@master config]# cp sample-stdout.yaml docker-stdout.yaml
# 为了避免同时输出到标准输出而导致的日志杂乱,我们临时将sample-stdout关掉
[root@master config]# cat sample-stdout.yaml 
enable: false                 # 将这里改为false
inputs:
  - Type: input_file          # 文件输入类型
    FilePaths: 
      - /logtail_host/var/log/messages
flushers:
  - Type: flusher_stdout    # 标准输出流输出类型
    OnlyStdout: true
[root@master config]# cat docker-stdout.yaml 
enable: true
inputs:
  - Type: service_docker_stdout        
    Stdout: true                 # 采集标准输出
    Stderr: false                # 不采集错误输出
flushers:
  - Type: flusher_stdout    
    OnlyStdout: true
[root@master config]# docker restart ilogtail 
ilogtail

4.5 查看采集的容器日志

2024-06-30 11:24:13 {"content":"2024-06-30 11:24:10 {\"content\":\"2024-06-30 11:24:07 {\\\"content\\\":\\\"2024-06-30 11:24:04.965 [INFO][66] felix/summary.go 100: Summarising 12 dataplane reconciliation loops over 1m3.4s: avg=3ms longest=12ms ()\\\",\\\"_time_\\\":\\\"2024-06-30T11:24:04.965893702Z\\\",\\\"_source_\\\":\\\"stdout\\\",\\\"_container_ip_\\\":\\\"192.168.200.200\\\",\\\"_image_name_\\\":\\\"calico/node:v3.23.5\\\",\\\"_container_name_\\\":\\\"calico-node\\\",\\\"_pod_name_\\\":\\\"calico-node-hgqzr\\\",\\\"_namespace_\\\":\\\"kube-system\\\",\\\"_pod_uid_\\\":\\\"4d0d950c-346a-4f81-817c-c19526700542\\\",\\\"__time__\\\":\\\"1719746645\\\"}\",\"_time_\":\"2024-06-30T11:24:07.968118197Z\",\"_source_\":\"stdout\",\"_container_ip_\":\"192.168.200.200\",\"_image_name_\":\"sls-opensource-registry.cn-shanghai.cr.aliyuncs.com/ilogtail-community-edition/ilogtail:2.0.4\",\"_container_name_\":\"ilogtail\",\"__time__\":\"1719746647\"}","_time_":"2024-06-30T11:24:10.971474647Z","_source_":"stdout","_container_ip_":"192.168.200.200","_image_name_":"sls-opensource-registry.cn-shanghai.cr.aliyuncs.com/ilogtail-community-edition/ilogtail:2.0.4","_container_name_":"ilogtail","__time__":"1719746650"}

能够正常看见日志就说明日志采集没有问题,接下来我们部署kafka,用来接收ilogtail的日志,注意将日志采集关掉,不然你的虚拟机磁盘很快就会满

5. 部署kafka

kafka作为消息队列,会有消费者和生产者,生产者在这里就是ilogtail,也就是将日志写入到kafka,消费者就是logstash,从kafka里面读取日志写入到es

5.1 kafka介绍

Apache kafka是分布式的,基于发布/订阅的容错消息系统,主要特性如下

  • 高吞吐,低延迟:可以做到每秒百万级的吞吐量,并且延迟低(其他的消息队列基本也都可以)
  • 持久性,可靠性:消息会被持久化到本地磁盘,支持数据备份防止数据丢失,并且可以配置消息有效期,以便消费者可以多次消费
  • kafka官方不支持docker部署,我们可以使用第三方的镜像

5.2 部署kafka(docker-compose)

version: '3'
services:
zookeeper:
image: quay.io/3330878296/zookeeper:3.8
network_mode: host
container_name: zookeeper-test
volumes:
- zookeeper_vol:/data
- zookeeper_vol:/datalog
- zookeeper_vol:/logs
kafka:
image: quay.io/3330878296/kafka:2.13-2.8.1
network_mode: host
container_name: kafka
environment:
KAFKA_ADVERTISED_HOST_NAME: "192.168.200.200"
KAFKA_ZOOKEEPER_CONNECT: "192.168.200.200:2181"
KAFKA_LOG_DIRS: "/kafka/logs"
volumes:
- kafka_vol:/kafka
depends_on:
- zookeeper
volumes:
zookeeper_vol: {}
kafka_vol: {}
  • KAFKA_LOG_DIRS: "/kafka/logs":这个地方需要注意,在kafka的名词里面,他把数据叫做日志,这个地方看似是定义的日志目录,其实是kafka的数据目录

5.3 部署kafdrop(kafka的web界面)

[root@master kafka]# docker run -d --rm -p 9000:9000 \
    -e KAFKA_BROKERCONNECT=192.168.200.200:9092 \
    -e SERVER_SERVLET_CONTEXTPATH="/" \
    quay.io/3330878296/kafdrop

部署好之后就可以使用web界面查看了,部署web界面的原因是我们将日志写入到kafka之后可以直接使用web界面查看也没有写入进去,比kafka命令行更加的直观

在浏览器输入ip:9000

5.4 ilogtail将日志写入到kafka

[root@master config]# cd /root/ilogtail/config
[root@master config]# cp sample-stdout.yaml kafka.yaml
[root@master config]# vim kafka.yaml
enable: true
inputs:
  - Type: input_file         
    FilePaths:
      - /logtail_host/var/log/messages
flushers:
  - Type: flusher_kafka_v2  
    Brokers:
      - 192.168.200.200:9092
    Topic: KafkaTopic
[root@master config]# docker restart ilogtail
ilogtail

这个时候我们再回到web界面就会出现一个topic

点进去可以查看有哪些日志被写入进去了

能看见日志就没问题了,接下来部署logstash

6. 部署logstash

logstash会从kafka读取消息然后写入到es里面去

6.1 部署logstash(docker-compose)

[root@master ~]# mkdir logstash
[root@master ~]# cd logstash
[root@master logstash]# vim docker-compose.yaml
version: '3'
services:
  logstash:
    image: quay.io/3330878296/logstash:8.10.1
    container_name: logstash
    network_mode: host
    environment:
      LS_JAVA_OPTS: "-Xmx1g -Xms1g"
    volumes:
      - /etc/localtime:/etc/localtime:ro
      - /apps/logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml
      - /apps/logstash/pipeline:/usr/share/logstash/pipeline
      - /var/log:/var/log
  • config里面放的是logstash本身的配置文件
  • pipeline里面放的是采集/输出日志的规则

docker-compose写好之后先不要着急启动,因为我们给他挂载的配置文件还没有启动

现在编写配置文件

[root@master logstash]# mkdir /apps/logstash/{config,pipeline}
[root@master logstash]# cd /apps/logstash/config/
[root@master config]# vim logstash.yml 
pipeline.workers: 2
pipeline.batch.size: 10
pipeline.batch.delay: 5
config.reload.automatic: true
config.reload.interval: 60s

写好这个文件之后我们启动这个logstash容器

[root@master logstash]# /root/logstash
[root@master logstash]# docker-compose up -d
[root@master logstash]# docker ps |grep logstash
60dfde4df40d        quay.io/3330878296/logstash:8.10.1                                                              "/usr/local/bin/dock…"   2 minutes ago       Up 2 minutes                                 logstash

启动之后就没问题了

6.2 输出日志到es

Logstash官方文档地址

我们要使用logstash输出日志到es的话就需要到pipeline里面去写一些规则

[root@master EFK]# cd /apps/logstash/pipeline/
[root@master pipeline]# vim logstash.conf
input {
  kafka {
# 指定kafka地址
    bootstrap_servers => "192.168.200.200:9092"
# 从哪些topic获取数据,要写已经存在topic
    topics => ["KafkaTopic"]
# 从哪个地方开始读取,earliest是从头开始读取
    auto_offset_reset => "earliest"
    codec => "json"
# 当一个logstash中有多个input插件时,建议每个插件定义一个id
# id => "kubernetes"
# group_id => "kubernetes"
  }
}
filter {
  json {
source => "event.original"
  }
  mutate {
    remove_field => ["event.original","event"]
  }
}
output {
  elasticsearch {
    hosts => ["http://192.168.200.200:9200"]
    index => "kubernetes-logs-%{+YYYY.MM.dd}"
  }
}
  • hosts => ["http://192.168.200.200:9200"]:这个地方的9200,因为我们的logstash是用docker部署的,但是es是部署在k8s集群内部的,所以这个地方9200端口是通不了的,所以我们需要给k8s的es创建一个nodeport类型的svc,来让docker可以访问到
[root@master EFK]# kubectl expose pod es-0 --type NodePort --port 9200 --target-port 9200
service/es-0 exposed
[root@master EFK]# kubectl get svc
NAME            TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)             AGE
elasticsearch   ClusterIP   None            <none>        9200/TCP,9300/TCP   3h38m
es-0            NodePort    10.97.238.173   <none>        9200:32615/TCP      2s
kibana          NodePort    10.106.1.52     <none>        5601:30396/TCP      3h38m

这里他将9200映射到了本地的32615端口,所以我们将logstash的地址改到32615

output {
  elasticsearch {
    hosts => ["http://192.168.200.200:32615"]
    index => "kubernetes-logs-%{+YYYY.MM.dd}"
  }
}

然后重启logstash

[root@master pipeline]# docker restart logstash

6.3 到kibana查看

6.3.1 查看索引

  1. 点击stack management
  2. 点击索引管理,会看到有索引存在就是正常

  3. 点击索引模式,创建索引

  1. 进入discover

7. 将ilogtail部署到k8s

我们刚刚是将ilogtail使用的单机docker部署,无法采集到其他节点上的容器,主机日志,所以我们可以将ilogtail采用DaemonSet的方式部署到k8s集群内的每个节点上,这样每个节点的日志都可以被采集到

如果已经使用docker部署,就先将docker部署的ilogtail停掉

[root@master ilogtail]# ls
checkpoing  config  docker-compose.yaml
[root@master ilogtail]# docker-compose down

7.1 下载配置示例

ilogtail文档

7.1.1 创建命名空间

[root@master EFK]# kubectl create ns ilogtail
namespace/ilogtail created

7.1.2 创建ConfigMap

apiVersion: v1
kind: ConfigMap
metadata:
name: ilogtail-user-cm
namespace: ilogtail
data:
nginx_stdout.yaml: |
    enable: true
    inputs:
      - Type: service_docker_stdout
        Stderr: false
        Stdout: true                # only collect stdout
        IncludeK8sLabel:
          app: nginx                # choose containers with this label
          #processors:
          #  - Type: processor_regex       # structure log
          #    SourceKey: content
          #    Regex: '([\d\.:]+) - (\S+) \[(\S+) \S+\] \"(\S+) (\S+) ([^\\"]+)\" (\d+) (\d+) \"([^\\"]*)\" \"([^\\"]*)\" \"([^\\"]*)\"'
          #    Keys:
          #      - remote_addr
          #      - remote_user
          #      - time_local
          #      - method
          #      - url
          #      - protocol
          #      - status
          #      - body_bytes_sent
          #      - http_referer
          #      - http_user_agent
          #      - http_x_forwarded_for
    flushers:
      - Type: flusher_kafka_v2
        Brokers:
          - 192.168.200.200:9092
        Topic: nginx
        OnlyStdout: true

这个configmap是从官方下载了,我进行了一些修改

[root@master EFK]# kubectl apply -f ilogtail-user-configmap.yaml 
configmap/ilogtail-user-cm created

7.1.2 创建DaemonSet

apiVersion: apps/v1
kind: DaemonSet
metadata:
name: ilogtail-ds
namespace: ilogtail
labels:
k8s-app: logtail-ds
spec:
selector:
matchLabels:
k8s-app: logtail-ds
template:
metadata:
labels:
k8s-app: logtail-ds
spec:
tolerations:
- operator: Exists                    # deploy on all nodes
containers:
- name: logtail
env:
- name: ALIYUN_LOG_ENV_TAGS       # add log tags from env
value: _node_name_|_node_ip_
- name: _node_name_
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: spec.nodeName
- name: _node_ip_
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.hostIP
- name: cpu_usage_limit           # iLogtail's self monitor cpu limit
value: "1"
- name: mem_usage_limit           # iLogtail's self monitor mem limit
value: "512"
- name: default_access_key_id     # accesskey id if you want to flush to SLS
valueFrom:
secretKeyRef:
name: ilogtail-secret
key: access_key_id
optional: true
- name: default_access_key        # accesskey secret if you want to flush to SLS
valueFrom:
secretKeyRef:
name: ilogtail-secret
key: access_key
optional: true
image: >-
            sls-opensource-registry.cn-shanghai.cr.aliyuncs.com/ilogtail-community-edition/ilogtail:2.0.4
           imagePullPolicy: IfNotPresent
resources:
limits:
cpu: 1000m
memory: 1Gi
requests:
cpu: 400m
memory: 384Mi
volumeMounts:
- mountPath: /var/run                       # for container runtime socket
name: run
- mountPath: /logtail_host                  # for log access on the node
mountPropagation: HostToContainer
name: root
readOnly: true
- mountPath: /usr/local/ilogtail/checkpoint # for checkpoint between container restart
name: checkpoint
- mountPath: /usr/local/ilogtail/config/local # mount config dir
name: user-config
readOnly: true
dnsPolicy: ClusterFirstWithHostNet
hostNetwork: true
volumes:
- hostPath:
path: /var/run
type: Directory
name: run
- hostPath:
path: /
type: Directory
name: root
- hostPath:
path: /etc/ilogtail-ilogtail-ds/checkpoint
type: DirectoryOrCreate
name: checkpoint
- configMap:
defaultMode: 420
name: ilogtail-user-cm
name: user-config

这个里面基本不需要改什么

[root@master EFK]# kubectl apply -f ilogtail-ds.yaml 
daemonset.apps/ilogtail-ds created
[root@master EFK]# kubectl get pods -n ilogtail 
NAME                READY   STATUS    RESTARTS   AGE
ilogtail-ds-bd6ll   1/1     Running   0          83s

因为我的k8s就是一个单节点,所以只有一个pod被启动

7.2 采集数据

在configmap里面我们定义的是采集带有app: nginx这个标签的pod,所以我们现在来创建这个一个nginx的pod

[root@master tmp]# kubectl create deployment nginx --image quay.io/3330878296/nginx
[root@master tmp]# kubectl get pods -o wide
NAME                      READY   STATUS    RESTARTS      AGE   IP               NODE     NOMINATED NODE   READINESS GATES
es-0                      1/1     Running   1 (28m ago)   15h   10.251.205.147   master   <none>           <none>
kibana-5bbb9b5d5f-jvghq   1/1     Running   1 (28m ago)   15h   10.251.205.165   master   <none>           <none>
nginx-cb7755f57-7q7xn     1/1     Running   0             13s   10.251.205.152   master   <none>           <none>

这个nginx的ip是10.251.205.152,我们来访问一下

[root@master tmp]# curl -I  10.251.205.152
HTTP/1.1 200 OK
Server: nginx/1.27.0
Date: Mon, 01 Jul 2024 01:48:09 GMT
Content-Type: text/html
Content-Length: 615
Last-Modified: Tue, 28 May 2024 13:22:30 GMT
Connection: keep-alive
ETag: "6655da96-267"
Accept-Ranges: bytes

7.3 回到kafdrop查看topic

我们访问nginx之后按照正常情况下来说的话,ilogtail会采集日志写入到kafka并创建一个nginx的topic,我们现在来看看也没有

有这个nginx就没有问题了,说明日志可以被正常采集到并写入,要将nginx写入到es的话可以在/apps/logstash/pipeline/这个目录下面写一个新的配置文件

[root@master pipeline]# ls
logstash.conf  nginx.conf
[root@master pipeline]# cat nginx.conf 
input {
  kafka {
# 指定kafka地址
    bootstrap_servers => "192.168.200.200:9092"
# 从哪些topic获取数据,要写已经存在topic
    topics => ["nginx"]
# 从哪个地方开始读取,earliest是从头开始读取
    auto_offset_reset => "earliest"
    codec => "json"
# 当一个logstash中有多个input插件时,建议每个插件定义一个id
# id => "kubernetes"
# group_id => "kubernetes"
  }
}
filter {
  json {
source => "event.original"
  }
  mutate {
    remove_field => ["event.original","event"]
  }
}
output {
  elasticsearch {
    hosts => ["http://192.168.200.200:32615"]
    index => "kubernetes-nginx-logs-%{+YYYY.MM.dd}"
  }
}
[root@master pipeline]# docker restart logstash
logstash

由于当时写的时候没有发现索引的名字不是以年月日为标识的,是后来看的时候才发现这个问题的,文中的错误已改正,图片没有换,这里再补一张正确的图

本文来自博客园,作者:FuShudi,转载请注明原文链接:https://www.cnblogs.com/fsdstudy/p/18277099

分类: Euler , Euler / HCIE / 日志采集

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
3天前
|
存储 缓存 关系型数据库
MySQL事务日志-Redo Log工作原理分析
事务的隔离性和原子性分别通过锁和事务日志实现,而持久性则依赖于事务日志中的`Redo Log`。在MySQL中,`Redo Log`确保已提交事务的数据能持久保存,即使系统崩溃也能通过重做日志恢复数据。其工作原理是记录数据在内存中的更改,待事务提交时写入磁盘。此外,`Redo Log`采用简单的物理日志格式和高效的顺序IO,确保快速提交。通过不同的落盘策略,可在性能和安全性之间做出权衡。
1538 5
|
3天前
|
存储 消息中间件 大数据
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
15 4
|
5天前
|
SQL 分布式计算 Hadoop
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
17 2
|
5天前
|
存储 数据采集 分布式计算
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
14 1
|
20天前
|
Kubernetes API Docker
跟着iLogtail学习容器运行时与K8s下日志采集方案
iLogtail 作为开源可观测数据采集器,对 Kubernetes 环境下日志采集有着非常好的支持,本文跟随 iLogtail 的脚步,了解容器运行时与 K8s 下日志数据采集原理。
|
27天前
|
缓存 监控 算法
分析慢日志文件来优化 PHP 脚本的性能
分析慢日志文件来优化 PHP 脚本的性能
|
20天前
08-06-06>pe_xscan 精简log分析代码 速度提升一倍
08-06-06>pe_xscan 精简log分析代码 速度提升一倍
|
2月前
|
存储 分布式计算 大数据
【Flume的大数据之旅】探索Flume如何成为大数据分析的得力助手,从日志收集到实时处理一网打尽!
【8月更文挑战第24天】Apache Flume是一款高效可靠的数据收集系统,专为Hadoop环境设计。它能在数据产生端与分析/存储端间搭建桥梁,适用于日志收集、数据集成、实时处理及数据备份等多种场景。通过监控不同来源的日志文件并将数据标准化后传输至Hadoop等平台,Flume支持了性能监控、数据分析等多种需求。此外,它还能与Apache Storm或Flink等实时处理框架集成,实现数据的即时分析。下面展示了一个简单的Flume配置示例,说明如何将日志数据导入HDFS进行存储。总之,Flume凭借其灵活性和强大的集成能力,在大数据处理流程中占据了重要地位。
58 3
|
2月前
|
应用服务中间件 Linux nginx
在Linux中,如何统计ip访问情况?分析 nginx 访问日志?如何找出访问页面数量在前十位的ip?
在Linux中,如何统计ip访问情况?分析 nginx 访问日志?如何找出访问页面数量在前十位的ip?
|
2月前
|
存储 消息中间件 监控
Java日志详解:日志级别,优先级、配置文件、常见日志管理系统ELK、日志收集分析
Java日志详解:日志级别,优先级、配置文件、常见日志管理系统、日志收集分析。日志级别从小到大的关系(优先级从低到高): ALL < TRACE < DEBUG < INFO < WARN < ERROR < FATAL < OFF 低级别的会输出高级别的信息,高级别的不会输出低级别的信息