Kafka、Logstash、Nginx日志收集入门

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: Kafka、Logstash、Nginx日志收集入门

Kafka、Logstash、Nginx日志收集入门


Nginx作为网站的第一入口,其日志记录了除用户相关的信息之外,还记录了整个网站系统的性能,对其进行性能排查是优化网站性能的一大关键。

Logstash是一个接收,处理,转发日志的工具。支持系统日志,webserver日志,错误日志,应用日志,总之包括所有可以抛出来的日志类型。一般情景下,Logstash用来和ElasticSearch和Kibana搭配使用,简称ELK。

kafka是一个分布式的基于push-subscribe的消息系统,它具备快速、可扩展、可持久化的特点。它现在是Apache旗下的一个开源系统,作为hadoop生态系统的一部分,被各种商业公司广泛应用。它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/spark流式处理引擎。

下面是日志系统的搭建


一、Nginx日志


为了配合ELK的使用,把日志变成json的格式,方便ElasticSearch对其检索。


log_format main ''{"@timestamp":"$time_iso8601",''
      ''"host": "$server_addr",''
      ''"clientip": "$remote_addr",''
      ''"size": $body_bytes_sent,''
      ''"responsetime": $request_time,''
      ''"upstreamtime": "$upstream_response_time",''
      ''"upstreamhost": "$upstream_addr",''
      ''"http_host": "$host",''
      ''"url": "$uri",''
      ''"xff": "$http_x_forwarded_for",''
      ''"referer": "$http_referer",''
      ''"agent": "$http_user_agent",''
      ''"status": "$status"}'';
    access_log  logs/access.log  main;

然后执行nginx -t检验配置,nginx -s reload重启nginx即可。

注意:

1.这里的单引号用来标识不换行使用的,如果没有的话,Logstash会每一行都发送一次。

2.格式一定一定要规范。


二、Logstash


下载安装的具体请看Logstash官网,这里只讲讲如何配置,

输入

input {
    file {
        type => "nginx_access"
        path => "/usr/share/nginx/logs/access.log"
        codec => "json"
    }
}


过滤


filter,由于没有涉及到很复杂的手机,所以不填


输出

output {

stdout{

codec => rubydebug

}

kafka {

bootstrap_servers => “119.29.188.224:9092” #生产者

topic_id => “nginx-access-log” #设置写入kafka的topic


# compression_type => "snappy"    #消息压缩模式,默认是none,可选gzip、snappy。
    codec => json       #一定要加上这段,不然传输错误,${message}
}
elasticsearch {
    hosts => "119.29.188.224:9200"    #Elasticsearch 地址,多个地址以逗号分隔。
    index => "logstash-%!{(MISSING)type}-%!{(MISSING)+YYYY.MM.dd}"    #索引命名方式,不支持大写字母(Logstash除外)
    document_type => "%!{(MISSING)type}"    #文档类型
  }
}

具体字段:

stdout:控制台输出,方便tail -f查看,可不要

kafka:输出到kafka,bootstrap_servers指的是kafka的地址和端口,topic_id是每条发布到kafka集群的消息属于的类别,其中codec一定要设置为json,要不然生产者出错,导致消费者是看到${message}。

elasticsearch:输出到elasticsearch,hosts指的是elasticsearch的地址和端口,index指的命名方式

然后启动Logstash:

nohup bin/logstash -f config/nginxlog2es.conf –path.data=tmp &

tail -f 查看nohup


三、kafka

目前的云服务器都用了NAT转换公网,如果不开启外网,kafka会默认使用内网私有地址访问,所以要开启外网访问

只需要在config/server.properties里加入:


advertised.host.name=119.29.188.224

改变默认端口:


advertised.host.port=9200

启动步骤:

(1)ZooKeeper启动

bin/zookeeper-server-start.sh config/zookeeper.properties

(2)启动Kafka

nohup bin/kafka-server-start.sh config/server.properties &

(3)创建一个topic

bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test

查看topic数量

bin/kafka-topics.sh –list –zookeeper localhost:2181

(4)生产者发送消息

bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test

(5)消费者接收消息

bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning


删除


删除kafka存储的日志,在kafka的config/server.properties的log.dirs=/tmp/kafka-logs查看


四、Spring Boot与Kafka多模块的Spring Boot与Kafka


(1)在pom.xml中添加:


<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-releasetrain</artifactId>
            <version>Fowler-SR2</version>
            <scope>import</scope>
            <type>pom</type>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>1.5.9.RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

(2)在消费者模块中添加:


<parent>
        <artifactId>micro-service</artifactId>
        <groupId>micro-service</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>

配置文件:


# 本地运行端口
server.port=8082
# kafka地址和端口
spring.kafka.bootstrap-servers=119.29.188.224:9092
# 指定默认消费者group id
spring.kafka.consumer.group-id=myGroup
# 指定默认topic id
spring.kafka.template.default-topic=nginx-access-log
# 指定listener 容器中的线程数,用于提高并发量
spring.kafka.listener.concurrency=3
# 偏移量,最好使用latest,earily会从kafka运行起开始一直发送
spring.kafka.consumer.auto-offset-reset=latest
# 心跳检测
spring.kafka.consumer.heartbeat-interval=100

(5)接收消息


@Component
public class MsgConsumer {
    @KafkaListener(topics = {"nginx-access-log"})
    public void processMessage(String content) {
        System.out.println(content);
    }
}

(6)测试


错误记录


(1)与Spring的包冲突:


Error starting ApplicationContext. To display the auto-configuration report re-run your application with ''debug'' enabled.
2018-01-05 11:10:47.947 ERROR 251848 --- [           main] o.s.boot.SpringApplication               : Application startup failed
org.springframework.context.ApplicationContextException: Unable to start embedded container; nested exception is org.springframework.boot.context.embedded.EmbeddedServletContainerException: Unable to start embedded Tomcat
    at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.onRefresh(EmbeddedWebApplicationContext.java:137) ~[spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:537) ~[spring-context-4.3.11.RELEASE.jar:4.3.11.RELEASE]
    at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122) ~[spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693) [spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360) [spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) [spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118) [spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107) [spring-boot-1.5.9.RELEASE.jar:1.5.9.RELEASE]

解决办法:去掉父pom.xml文件里所有关于spring的包,只保留spring boot的即可

(2)消费者只接受到${message}消息

解决办法:

一定要在output的kafka中添加


codec => json


目录
相关文章
|
18天前
|
消息中间件 存储 Kafka
Kafka日志处理:深入了解偏移量查找与切分文件
**摘要:** 本文介绍了如何在Kafka中查找偏移量为23的消息,涉及ConcurrentSkipListMap的查询、索引文件的二分查找及日志分段的物理位置搜索。还探讨了Kafka日志分段的切分策略,包括大小、时间、索引大小和偏移量达到特定阈值时的切分条件。理解这些对于优化Kafka的性能和管理日志至关重要。
24 2
|
21天前
|
应用服务中间件 Linux 开发工具
Nginx14---目录结构分析,查看Ngnix访问日志命令的写法​
Nginx14---目录结构分析,查看Ngnix访问日志命令的写法​
|
24天前
|
消息中间件 NoSQL Kafka
基于Kafka的nginx日志收集分析与监控平台(3)
基于Kafka的nginx日志收集分析与监控平台(3)
|
24天前
|
消息中间件 监控 Kafka
基于Kafka的nginx日志收集分析与监控平台(2)
基于Kafka的nginx日志收集分析与监控平台(2)
|
24天前
|
消息中间件 负载均衡 应用服务中间件
基于Kafka的nginx日志收集分析与监控平台(1)
基于Kafka的nginx日志收集分析与监控平台(1)
|
1月前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之通过flink同步kafka数据进到doris,decimal数值类型的在kafka是正常显示数值,但是同步到doris表之后数据就变成了整数,该如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之 从Kafka读取数据,并与两个仅在任务启动时读取一次的维度表进行内连接(inner join)时,如果没有匹配到的数据会被直接丢弃还是会被存储在内存中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
955 0
|
8天前
|
消息中间件 存储 负载均衡
Kafka中producer生产的数据是如何存储在partition中的
Kafka中producer生产的数据是如何存储在partition中的
|
9天前
|
消息中间件 SQL 分布式计算
DataWorks产品使用合集之如何离线增量同步Kafka数据,并指定时间范围进行同步
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。