filebeat+kafka+ELK分布式日志组件有哪些注意点?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
#filebeat - 打包说明: 镜像在打包时,要添加上filebeat可执行文件(可在官网下载),可以使用supervisor管理服务。 - filebeat配置文件可参考以下例子:
filebeat.prospectors:
- input_type: log
  paths: /var/log/nginx.log
  document_type: nginx_log
  fields:
    cluster_name: ${CLUSTER_NAME}
    host: ${HOST}
    log_topics: app1_nginx  # nginx日志
- input_type: log
  paths: /var/applog/*.log
  document_type: applog
  fields:
    cluster_name: ${CLUSTER_NAME}
    host: ${HOST}
    log_topics: app1_log  # 应用日志
output.kafka:
  hosts: ["kafka1:9092","kafka2:9092",...,"kafkaN:9092"] # 取决你的集群节点数
  topic: '%{[fields][log_topics]}'
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
 
#logstash - 机器数量: 可以找几台虚机(4c+8G)启动,尽量个数和kafka的节点数一致。 - input和output: kafka(filebeat日志流向的kafka)和es集群 - 配置注意 pipeline.batch.size: 2000 # 达到多少个events后向目标地址输送数据 pipeline.batch.delay: 10 # 等待多少秒向目标地址输送数据
两个配置不冲突,哪个满足了就触发向目标输送数据,我们的目标地就是es集群。
至于批量和延迟向目标输送数据应该好理解,避免频繁请求目标地址,导致目标地址高负载。
 
一般会用到的有grok(正则切割日志)、json(json解析)、mutate(组合命令remove_field(去除无用字段)等等)
很多这里不一一介绍了,推荐一个可以在线测试grok语法是否正确的工具:http://grokdebug.herokuapp.com/
 
input {
    kafka{
        bootstrap_servers => "kafka1:9092,kafka2:9092,...,kafkaN:9092" # 前边的kafka
        auto_offset_reset => "latest"
        group_id => "app1" 
        consumer_threads => 1
        decorate_events => true
        codec => "json"
        topics => ["app1_log"]
    }
}
filter {
    if [fields][log_topics] == "app1_log" {
        grok {
            match => {"message" => '(?<time_local>[^\|]*)\|(?<code_line>[^\|]*)\|(?<level>[^\|]*)\|(?<log_json>.*)'}
        }
        mutate {
            gsub => ["log_json", "[\|]", "_"]  # 替换|为_
        }
        json {
            source => "log_json"
            remove_field=>["log_json"]
        }
    }
    # 可以有多个if
    # remove not care field
    mutate
    {
        remove_field => ["field1", "field2"]
    }
}
output {
    if [fields][log_topics] == "app1_log" {
        elasticsearch {
             hosts => ["es1:9200", "es2:9200",..,"esN:9200"]
            index => "app1_log-%{+YYYY.MM.dd}"
        }
    }
    # 可以有多个if
}
 
注意点: 注意修改number_of_shards数量等于节点数,es的number_of_shards默认为5
 跳过一次坑,没有修改number_of_shards,虽然机器多,但是日志散落不均匀导致总有es的某几个
节点负载比较高,其他的却很清闲。