正文
ogstash 是一个实时数据收集引擎,可收集各类型数据并对其进行分析,过滤和归纳。按照自己条件分析过滤出符合数据导入到可视化界面。它可以实现多样化的数据源数据全量或增量传输,数据标准格式处理,数据格式化输出等的功能,常用于日志处理。工作流程分为三个阶段:
(1)input数据输入阶段,可接收oracle、mysql、postgresql、file等多种数据源;
(2)filter数据标准格式化阶段,可过滤、格式化数据,如格式化时间、字符串等;
(3)output数据输出阶段,可输出到elasticsearch、mongodb、kafka等接收终端。
传统方式
1、下载安装包
https://artifacts.elastic.co/downloads/logstash/logstash-7.15.2-linux-x86_64.tar.gz
2、解压缩,移动重命名
[root@localhost ~]# tar -zxvf logstash-7.15.2-linux-x86_64.tar.gz [root@localhost ~]# mv logstash-7.15.2 /usr/local/logstash
3、配置
注意一下所有的配置文件请设置成utf-8格式,不然启动可能会报错!!!
pipelines.yml
配置方式有三种 1、直接写input,output这样,使用config.string字段 - pipeline.id: test pipeline.workers: 1 pipeline.batch.size: 1 config.string: "input { generator {} } filter { sleep { time => 1 } } output { stdout { codec => dots } }" 2、使用配置文件的路径 使用path.config字段 - pipeline.id: another_test queue.type: persisted path.config: "/tmp/logstash/a.config" 3、使用通配符格式 path.config=/tmp/logstash/conf.d/*.conf - pipeline.id: another_test queue.type: persisted path.config: "/tmp/logstash/conf.d/*.conf" #多个路径分开写 - pipeline.id: kafka pipeline.workers: 2 #线程数默认与cpu核数一致 pipeline.batch.size: 1 #批量处理的条数默认125 path.config: "/usr/local/logstash/config/logstash-kafka.conf" - pipeline.id: es queue.type: persisted #队列持久化,防止丢失数据,默认不开启 path.config: "/usr/local/logstash/config/logstash-es.conf"
log-es.conf
# Sample Logstash configuration for creating a simple # Beats -> Logstash -> Elasticsearch pipeline. input{ file{ # 日志文件路径 path => "/usr/local/es/logs/my-es.log" type => "elasticsearch" start_position => "beginning" #从文件开始处读写 } } #过滤器,正则表达式 filter { #定义数据的格式 grok { match => { "message" => "%{DATA:timestamp}\|%{IP:serverIp}\|%{IP:clientIp}\|%{DATA:logSource}\|%{DATA:userId}\|%{DATA:reqUrl}\|%{DATA:reqUri}\|%{DATA:refer}\|%{DATA:device}\|%{DATA:textDuring}\|%{DATA:duringTime:int}\|\|"} } #定义时间戳的格式 date { match => [ "timestamp", "yyyy-MM-dd-HH:mm:ss" ] locale => "cn" } #定义客户端的IP是哪个字段(上面定义的数据格式) geoip { source => "clientIp" } } output{ elasticsearch{ hosts => ["192.168.139.160:9200","192.168.139.161:9200","192.168.139.162:9200"] # es地址 index => "es-message-%{+YYYY.MM.dd}" #如果es没有设置密码则不需要设置密码 user => "elastic" password => "cGKuMaWGZLBaSSDW7qKX" } stdout{ codec => rubydebug } }
logstash-kafka.conf
input { kafka { bootstrap_servers => "192.168.139.162:9092" topics => "my-topic-partition" } } filter { #Only matched data are send to output } output { elasticsearch{ hosts => ["192.168.139.160:9200","192.168.139.161:9200","192.168.139.162:9200"] # es地址 index => "kafka-log-%{+YYYY.MM.dd}" user => "elastic" password => "cGKuMaWGZLBaSSDW7qKX" } stdout{ codec => rubydebug } }
logstash.yml文件注释说明
# Settings file in YAML # # Settings can be specified either in hierarchical form, e.g.: # # pipeline: # batch: # size: 125 # delay: 5 # # Or as flat keys: # # pipeline.batch.size: 125 # pipeline.batch.delay: 5 # # ------------ Node identity ------------ # # Use a descriptive name for the node: #默认机器主机名称 # node.name: test # # If omitted the node name will default to the machine's host name # # ------------ Data path ------------------ # # Which directory should be used by logstash and its plugins # for any persistent needs. Defaults to LOGSTASH_HOME/data #logstash及其插件目录 # path.data: # # ------------ Pipeline Settings -------------- # # The ID of the pipeline. # # pipeline.id: main # # Set the number of workers that will, in parallel, execute the filters+outputs # stage of the pipeline. # # This defaults to the number of the host's CPU cores. #将并行执行管道的过滤器和输出阶段的工作线程数,默认是cpu核数 # pipeline.workers: 2 # # How many events to retrieve from inputs before sending to filters+workers #单个工作线程将从输入中收集的最大事件数 pipeline.batch.size: 125 # # How long to wait in milliseconds while polling for the next event # before dispatching an undersized batch to filters+outputs #轮询下一个事件时等待的时间(毫秒) pipeline.batch.delay: 50 # # Force Logstash to exit during shutdown even if there are still inflight # events in memory. By default, logstash will refuse to quit until all # received events have been pushed to the outputs. # # WARNING: enabling this can lead to data loss during shutdown #设置为 时true,强制 Logstash 在关闭期间退出,即使内存中仍有进行中的事件。 #默认情况下,Logstash 将拒绝退出,直到所有接收到的事件都已推送到输出 # pipeline.unsafe_shutdown: false # # Set the pipeline event ordering. Options are "auto" (the default), "true" or "false". # "auto" will automatically enable ordering if the 'pipeline.workers' setting # is also set to '1'. # "true" will enforce ordering on the pipeline and prevent logstash from starting # if there are multiple workers. # "false" will disable any extra processing necessary for preserving ordering. #排序 # pipeline.ordered: auto # # ------------ Pipeline Configuration Settings -------------- # # Where to fetch the pipeline configuration for the main pipeline # # path.config: # # Pipeline configuration string for the main pipeline # # config.string: # # At startup, test if the configuration is valid and exit (dry run) #检查配置是否正确,默认不检查 # config.test_and_exit: false # # Periodically check if the configuration has changed and reload the pipeline # This can also be triggered manually through the SIGHUP signal #会定期检查配置是否已更改,并在更改时重新加载配置。默认不检查 # config.reload.automatic: false # # How often to check if the pipeline configuration has changed (in seconds) # Note that the unit value (s) is required. Values without a qualifier (e.g. 60) # are treated as nanoseconds. # Setting the interval this way is not recommended and might change in later versions. #Logstash 检查配置文件的更改频率(以秒为单位) # config.reload.interval: 3s # # Show fully compiled configuration as debug log message # NOTE: --log.level must be 'debug' # # config.debug: false # # When enabled, process escaped characters such as \n and \" in strings in the # pipeline configuration files. #带引号的字符串是否转义 # config.support_escapes: false # # ------------ HTTP API Settings ------------- # Define settings related to the HTTP API here. # # The HTTP API is enabled by default. It can be disabled, but features that rely # on it will not work as intended. # http.enabled: true # # By default, the HTTP API is bound to only the host's local loopback interface, # ensuring that it is not accessible to the rest of the network. Because the API # includes neither authentication nor authorization and has not been hardened or # tested for use as a publicly-reachable API, binding to publicly accessible IPs # should be avoided where possible. # # http.host: 127.0.0.1 # # The HTTP API web server will listen on an available port from the given range. # Values can be specified as a single port (e.g., `9600`), or an inclusive range # of ports (e.g., `9600-9700`). #默认9600 # http.port: 9600-9700 # # ------------ Module Settings --------------- # Define modules here. Modules definitions must be defined as an array. # The simple way to see this is to prepend each `name` with a `-`, and keep # all associated variables under the `name` they are associated with, and # above the next, like this: # # modules: # - name: MODULE_NAME # var.PLUGINTYPE1.PLUGINNAME1.KEY1: VALUE # var.PLUGINTYPE1.PLUGINNAME1.KEY2: VALUE # var.PLUGINTYPE2.PLUGINNAME1.KEY1: VALUE # var.PLUGINTYPE3.PLUGINNAME3.KEY1: VALUE # # Module variable names must be in the format of # # var.PLUGIN_TYPE.PLUGIN_NAME.KEY # # modules: # # ------------ Cloud Settings --------------- # Define Elastic Cloud settings here. # Format of cloud.id is a base64 value e.g. dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRub3RhcmVhbCRpZGVudGlmaWVy # and it may have an label prefix e.g. staging:dXMtZ... # This will overwrite 'var.elasticsearch.hosts' and 'var.kibana.host' # cloud.id: <identifier> # # Format of cloud.auth is: <user>:<pass> # This is optional # If supplied this will overwrite 'var.elasticsearch.username' and 'var.elasticsearch.password' # If supplied this will overwrite 'var.kibana.username' and 'var.kibana.password' # cloud.auth: elastic:<password> # # ------------ Queuing Settings -------------- # # Internal queuing model, "memory" for legacy in-memory based queuing and # "persisted" for disk-based acked queueing. Defaults is memory # persisted基于磁盘的 ACKed 队列,会将未消费的消息持久化到磁盘 #memory基于内存,宕机之后,有可能丢失数据,默认是memory # queue.type: memory # # If using queue.type: persisted, the directory path where the data files will be stored. # Default is path.data/queue #启用持久队列时将存储数据文件的目录路径 # path.queue: # # If using queue.type: persisted, the page data files size. The queue data consists of # append-only data files separated into pages. Default is 64mb #启用持久队列时使用的页面数据文件的大小,默认64M # queue.page_capacity: 64mb # # If using queue.type: persisted, the maximum number of unread events in the queue. # Default is 0 (unlimited) #启用持久队列时队列中未读事件的最大数量,0表示没有限制 # queue.max_events: 0 # # If using queue.type: persisted, the total capacity of the queue in number of bytes. # If you would like more unacked events to be buffered in Logstash, you can increase the # capacity using this setting. Please make sure your disk drive has capacity greater than # the size specified here. If both max_bytes and max_events are specified, Logstash will pick # whichever criteria is reached first # Default is 1024mb or 1gb #队列的总容量 # queue.max_bytes: 1024mb # # If using queue.type: persisted, the maximum number of acked events before forcing a checkpoint # Default is 1024, 0 for unlimited #启用持久队列时强制检查点之前 ACKed 事件的最大数量 # queue.checkpoint.acks: 1024 # # If using queue.type: persisted, the maximum number of written events before forcing a checkpoint # Default is 1024, 0 for unlimited #启用持久队列时强制检查点之前的最大写入事件数 # queue.checkpoint.writes: 1024 # # If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page # Default is 1000, 0 for no periodic checkpoint. # # queue.checkpoint.interval: 1000 # # ------------ Dead-Letter Queue Settings -------------- # Flag to turn on dead-letter queue. #死信队列 # dead_letter_queue.enable: false # If using dead_letter_queue.enable: true, the maximum size of each dead letter queue. Entries # will be dropped if they would increase the size of the dead letter queue beyond this setting. # Default is 1024mb # dead_letter_queue.max_bytes: 1024mb # If using dead_letter_queue.enable: true, the interval in milliseconds where if no further events eligible for the DLQ # have been created, a dead letter queue file will be written. A low value here will mean that more, smaller, queue files # may be written, while a larger value will introduce more latency between items being "written" to the dead letter queue, and # being available to be read by the dead_letter_queue input when items are are written infrequently. # Default is 5000. # # dead_letter_queue.flush_interval: 5000 # If using dead_letter_queue.enable: true, the directory path where the data files will be stored. # Default is path.data/dead_letter_queue # # path.dead_letter_queue: # # ------------ Metrics Settings -------------- # # Bind address for the metrics REST endpoint # # http.host: "127.0.0.1" # # Bind port for the metrics REST endpoint, this option also accept a range # (9600-9700) and logstash will pick up the first available ports. # # http.port: 9600-9700 # # ------------ Debugging Settings -------------- # # Options for log.level: # * fatal # * error # * warn # * info (default) # * debug # * trace # # log.level: info # path.logs: # # ------------ Other Settings -------------- # # Where to find custom plugins # path.plugins: [] # # Flag to output log lines of each pipeline in its separate log file. Each log filename contains the pipeline.name # Default is false #用于启用不同日志文件中每个管道的日志分离 # pipeline.separate_logs: false # # ------------ X-Pack Settings (not applicable for OSS build)-------------- # # X-Pack Monitoring # https://www.elastic.co/guide/en/logstash/current/monitoring-logstash.html #xpack.monitoring.enabled: false #xpack.monitoring.elasticsearch.username: logstash_system #xpack.monitoring.elasticsearch.password: password #xpack.monitoring.elasticsearch.proxy: ["http://proxy:port"] #xpack.monitoring.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"] # an alternative to hosts + username/password settings is to use cloud_id/cloud_auth #xpack.monitoring.elasticsearch.cloud_id: monitoring_cluster_id:xxxxxxxxxx #xpack.monitoring.elasticsearch.cloud_auth: logstash_system:password # another authentication alternative is to use an Elasticsearch API key #xpack.monitoring.elasticsearch.api_key: "id:api_key" #xpack.monitoring.elasticsearch.ssl.certificate_authority: [ "/path/to/ca.crt" ] #xpack.monitoring.elasticsearch.ssl.truststore.path: path/to/file #xpack.monitoring.elasticsearch.ssl.truststore.password: password #xpack.monitoring.elasticsearch.ssl.keystore.path: /path/to/file #xpack.monitoring.elasticsearch.ssl.keystore.password: password #xpack.monitoring.elasticsearch.ssl.verification_mode: certificate #xpack.monitoring.elasticsearch.sniffing: false #xpack.monitoring.collection.interval: 10s #xpack.monitoring.collection.pipeline.details.enabled: true # # X-Pack Management # https://www.elastic.co/guide/en/logstash/current/logstash-centralized-pipeline-management.html #xpack.management.enabled: false #xpack.management.pipeline.id: ["main", "apache_logs"] #xpack.management.elasticsearch.username: logstash_admin_user #xpack.management.elasticsearch.password: password #xpack.management.elasticsearch.proxy: ["http://proxy:port"] #xpack.management.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"] # an alternative to hosts + username/password settings is to use cloud_id/cloud_auth #xpack.management.elasticsearch.cloud_id: management_cluster_id:xxxxxxxxxx #xpack.management.elasticsearch.cloud_auth: logstash_admin_user:password # another authentication alternative is to use an Elasticsearch API key #xpack.management.elasticsearch.api_key: "id:api_key" #xpack.management.elasticsearch.ssl.certificate_authority: [ "/path/to/ca.crt" ] #xpack.management.elasticsearch.ssl.truststore.path: /path/to/file #xpack.management.elasticsearch.ssl.truststore.password: password #xpack.management.elasticsearch.ssl.keystore.path: /path/to/file #xpack.management.elasticsearch.ssl.keystore.password: password #xpack.management.elasticsearch.ssl.verification_mode: certificate #xpack.management.elasticsearch.sniffing: false #xpack.management.logstash.poll_interval: 5s # X-Pack GeoIP plugin # https://www.elastic.co/guide/en/logstash/current/plugins-filters-geoip.html#plugins-filters-geoip-manage_update #xpack.geoip.download.endpoint: "https://geoip.elastic.co/v1/database"
4、启动
1. [root@localhost logstash]# ./bin/logstash 2. #后台启动 3. [root@localhost logstash]# nohup ./bin/logstash &
Docker方式
1、拉取镜像
docker pull docker.elastic.co/logstash/logstash:7.15.2
2、创建挂载
mkdir -p /data/logstash/{pipeline,config} logstash.yml
logstash.yml
#开启 http.host: 0.0.0.0
pipelines.yml
# List of pipelines to be loaded by Logstash # # This document must be a list of dictionaries/hashes, where the keys/values are pipeline settings. # Default values for omitted settings are read from the `logstash.yml` file. # When declaring multiple pipelines, each MUST have its own `pipeline.id`. # # Example of two pipelines: - pipeline.id: kafka pipeline.workers: 2 #线程数默认与cpu核数一致 pipeline.batch.size: 1 #批量处理的条数默认125 path.config: "/usr/share/logstash/pipeline"
降配置文件放到pipeline目录(logstash-kafka.conf)
3、创建容器
docker run -it --name logstash --net=host \ -v /data/logstash/pipeline/:/usr/share/logstash/pipeline/ \ -v /data/logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml \ -v /data/logstash/config/pipelines.yml:/usr/share/logstash/config/pipelines.yml \ docker.elastic.co/logstash/logstash:7.15.2
注意若是es与logstash不在同一台服务器上启动参数一定要加上--net=host,不然其他es节点连接不上!