分布式日志采集产生背景
在传统项目中,如果在生产环境中,有多台不同的服务器集群,如果生产环境需要通过日志定位项目的Bug的话,需要在每台节点上使用传统的命令方式查询,这样效率非常低下。
因此我们需要集中化的管理日志,ELK则应运而生。
为什么ELK需要结合Kafka
如果只整合elk 不结合kafka这样的话 每个服务器节点上都会安装Logstash做读写日志IO操作,可能性能不是很好,而且比较冗余。
ELK+Kafka环境构建
基于docker-compose构建ELK+Kafka环境
注:所有的ip地址不能是localhost或者127.0.0.1
version: '3' services: elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch:7.15.0 container_name: elasticsearch environment: - discovery.type=single-node ports: - "9200:9200" - "9300:9300" kibana: image: docker.elastic.co/kibana/kibana:7.15.0 container_name: kibana depends_on: - elasticsearch ports: - "5601:5601" logstash: image: docker.elastic.co/logstash/logstash:7.15.0 container_name: logstash # command: --config /etc/logstash/conf.d/*.conf volumes: - ./logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf ports: - "5044:5044" zookeeper: image: wurstmeister/zookeeper container_name: zookeeper ports: - "2181:2181" kafka: image: wurstmeister/kafka container_name: kafka depends_on: - zookeeper ports: - "9092:9092" environment: - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.137.1:9092 - KAFKA_LISTENERS=PLAINTEXT://:9092 kafka-manager: image: sheepkiller/kafka-manager ## 镜像:开源的web管理kafka集群的界面 environment: ZK_HOSTS: 192.168.137.1:2181 ## 修改:宿主机IP ports: - "9001:9000"
验证elk+kafka 环境
访问:zk http://127.0.0.1:2181 访问:es http://127.0.0.1:9200/ 访问:kibana http://127.0.0.1:5601/app/kibana#/dev_tools/console?_g=()
SpringBoot项目整合ELK+Kafka
添加maven依赖
<!-- kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!--logstash 整合logback--> <dependency> <groupId>net.logstash.logback</groupId> <artifactId>logstash-logback-encoder</artifactId> <version>7.4</version> <exclusions> <exclusion> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> </exclusion> </exclusions> </dependency> <!--logback 整合 kafka--> <dependency> <groupId>com.github.danielwegener</groupId> <artifactId>logback-kafka-appender</artifactId> <version>0.2.0-RC2</version> <scope>runtime</scope> </dependency>
配置文件配置
spring: kafka: listener: #设置是否批量消费,默认 single(单条),batch(批量) type: single # 集群地址 不能是localhost/127.0.0.1 bootstrap-servers: 192.168.137.1:9092 # 生产者配置 producer: # 重试次数 retries: 3 # 应答级别 # acks=0 把消息发送到kafka就认为发送成功 # acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功 # acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功 acks: all # 批量处理的最大大小 单位 byte batch-size: 4096 # 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka buffer-memory: 33554432 # 客户端ID client-id: logstash # Key 序列化类 key-serializer: org.apache.kafka.common.serialization.StringSerializer # Value 序列化类 value-serializer: org.apache.kafka.common.serialization.StringSerializer # 消息压缩:none、lz4、gzip、snappy,默认为 none。 compression-type: gzip properties: linger: # 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka ms: 1000 max: block: # KafkaProducer.send() 和 partitionsFor() 方法的最长阻塞时间 单位 ms ms: 6000 # 消费者配置 consumer: # 默认消费者组 group-id: logstash # 自动提交 offset 默认 true enable-auto-commit: false # 自动提交的频率 单位 ms auto-commit-interval: 1000 # 批量消费最大数量 max-poll-records: 100 # Key 反序列化类 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # Value 反序列化类 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 当kafka中没有初始offset或offset超出范围时将自动重置offset # earliest:重置为分区中最小的offset # latest:重置为分区中最新的offset(消费分区中新产生的数据) # none:只要有一个分区不存在已提交的offset,就抛出异常 auto-offset-reset: latest properties: session: timeout: # session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作 ms: 120000 request: timeout: # 请求超时 ms: 120000 template: default-topic: tiger-log # 指定logback配置文件,因为查找优先级问题,最好手动配置上,避免其他依赖导致未使用到自定义的logback文件 logging: config: classpath:logback-spring.xml
配置logback-spring.xml
<?xml version="1.0" encoding="UTF-8"?> <configuration> <include resource="org/springframework/boot/logging/logback/defaults.xml"/> <include resource="org/springframework/boot/logging/logback/console-appender.xml"/> <!-- 这里把整个测试环境的 模块复制过来了,里面包含 输入到 kafka 的配置--> <springProfile name="dev"> <!-- 日志输出格式 --> <property name="console.log.pattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) --- [%green(%thread)] %cyan(%-40.40(%logger{40})) : %msg%n"/> <property name="log.pattern" value="%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"/> <property name="log_name_prefix" value="tiger-ucenter"/> <property name="log.path" value="logs/tiger-ucenter"/> <appender name="console" class="ch.qos.logback.core.ConsoleAppender"> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <pattern>${console.log.pattern}</pattern> </encoder> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>INFO</level> </filter> </appender> <appender name="file_info" class="ch.qos.logback.core.rolling.RollingFileAppender"> <File>${log.path}/${log_name_prefix}-info.log</File> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <pattern>${log.pattern}</pattern> </encoder> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>INFO</level> </filter> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <FileNamePattern>${log.path}/backup/${log_name_prefix}-info.%d{yyyy-MM-dd}.log </FileNamePattern> </rollingPolicy> </appender> <appender name="file_error" class="ch.qos.logback.core.rolling.RollingFileAppender"> <File>${log.path}/${log_name_prefix}-error.log</File> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <pattern>${log.pattern}</pattern> </encoder> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>ERROR</level> </filter> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <FileNamePattern>${log.path}/backup/${log_name_prefix}-error.%d{yyyy-MM-dd}.log </FileNamePattern> </rollingPolicy> </appender> <!-- kafka的appender配置--> <appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender"> <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder"> <providers class="net.logstash.logback.composite.loggingevent.LoggingEventJsonProviders"> <pattern> <pattern> {"app":"${APP}", "profile":"${PROFILES_ACTIVE}", "thread": "%thread", "logger": "%logger{5}", "message":"%msg", "app_name":"${APP_NAME}", "env_name":"${ENV_NAME}", "hostname":"${HOSTNAME}", "captain_seq":"${CAPTAIN_SEQ}", "captain_gen":"${CAPTAIN_GEN}", "build_name":"${BUILD_NAME}", "build_git_version":"${BUILD_GIT_VERSION}", "build_git_hash":"${BUILD_GIT_HASH}", "build_timestamp":"${BUILD_TIMESTAMP}", "date":"%d{yyyy-MM-dd HH:mm:ss.SSS}", "level":"%level", "stack_trace":"%exception" } </pattern> </pattern> </providers> </encoder> <topic>tiger-log</topic> <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy"/> <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/> <producerConfig>bootstrap.servers=192.168.137.1:9092</producerConfig> <producerConfig>retries=1</producerConfig> <producerConfig>batch-size=16384</producerConfig> <producerConfig>buffer-memory=33554432</producerConfig> <producerConfig>properties.max.request.size==2097152</producerConfig> <appender-ref ref="console"/> </appender> <root level="INFO"> <appender-ref ref="console"/> <appender-ref ref="file_error"/> <appender-ref ref="file_info"/> <appender-ref ref="kafkaAppender"/> </root> </springProfile> </configuration>
打开kibana查看收集的日志