LogStash的安装(传统方式&Docker)与使用

简介: LogStash的安装(传统方式&Docker)与使用

正文


ogstash 是一个实时数据收集引擎,可收集各类型数据并对其进行分析,过滤和归纳。按照自己条件分析过滤出符合数据导入到可视化界面。它可以实现多样化的数据源数据全量或增量传输,数据标准格式处理,数据格式化输出等的功能,常用于日志处理。工作流程分为三个阶段:


 (1)input数据输入阶段,可接收oracle、mysql、postgresql、file等多种数据源;

 (2)filter数据标准格式化阶段,可过滤、格式化数据,如格式化时间、字符串等;

 (3)output数据输出阶段,可输出到elasticsearch、mongodb、kafka等接收终端。


传统方式


1、下载安装包


https://artifacts.elastic.co/downloads/logstash/logstash-7.15.2-linux-x86_64.tar.gz


2、解压缩,移动重命名


[root@localhost ~]# tar -zxvf logstash-7.15.2-linux-x86_64.tar.gz 
[root@localhost ~]# mv logstash-7.15.2 /usr/local/logstash


3、配置


注意一下所有的配置文件请设置成utf-8格式,不然启动可能会报错!!!


pipelines.yml  


配置方式有三种
1、直接写input,output这样,使用config.string字段
 - pipeline.id: test
   pipeline.workers: 1
   pipeline.batch.size: 1
   config.string: "input { generator {} } filter { sleep { time => 1 } } output { stdout { codec => dots } }"
2、使用配置文件的路径 使用path.config字段
 - pipeline.id: another_test
   queue.type: persisted
   path.config: "/tmp/logstash/a.config"
3、使用通配符格式 path.config=/tmp/logstash/conf.d/*.conf
 - pipeline.id: another_test
   queue.type: persisted
   path.config: "/tmp/logstash/conf.d/*.conf"
#多个路径分开写
  - pipeline.id: kafka
    pipeline.workers: 2 #线程数默认与cpu核数一致
    pipeline.batch.size: 1 #批量处理的条数默认125
    path.config: "/usr/local/logstash/config/logstash-kafka.conf"
  - pipeline.id: es
    queue.type: persisted #队列持久化,防止丢失数据,默认不开启
    path.config: "/usr/local/logstash/config/logstash-es.conf"


log-es.conf


# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input{
  file{
    # 日志文件路径
    path => "/usr/local/es/logs/my-es.log"
    type => "elasticsearch"
    start_position => "beginning" #从文件开始处读写
  }
}
 #过滤器,正则表达式
filter {
  #定义数据的格式
  grok {
    match => { "message" => "%{DATA:timestamp}\|%{IP:serverIp}\|%{IP:clientIp}\|%{DATA:logSource}\|%{DATA:userId}\|%{DATA:reqUrl}\|%{DATA:reqUri}\|%{DATA:refer}\|%{DATA:device}\|%{DATA:textDuring}\|%{DATA:duringTime:int}\|\|"}
  }
 #定义时间戳的格式
  date {
    match => [ "timestamp", "yyyy-MM-dd-HH:mm:ss" ]
    locale => "cn"
  }
  #定义客户端的IP是哪个字段(上面定义的数据格式)
  geoip {
    source => "clientIp"
  }
}
output{
  elasticsearch{
    hosts => ["192.168.139.160:9200","192.168.139.161:9200","192.168.139.162:9200"] # es地址
    index => "es-message-%{+YYYY.MM.dd}" 
    #如果es没有设置密码则不需要设置密码
    user => "elastic"
    password => "cGKuMaWGZLBaSSDW7qKX"
  }
  stdout{
    codec => rubydebug
  }
}


logstash-kafka.conf


input {
  kafka {
    bootstrap_servers => "192.168.139.162:9092"
    topics => "my-topic-partition"
 }
}
filter {
  #Only matched data are send to output
}
output  {
  elasticsearch{
    hosts => ["192.168.139.160:9200","192.168.139.161:9200","192.168.139.162:9200"] # es地址
    index => "kafka-log-%{+YYYY.MM.dd}"
    user => "elastic"
    password => "cGKuMaWGZLBaSSDW7qKX"
  }
  stdout{
    codec => rubydebug
  }
}


logstash.yml文件注释说明


# Settings file in YAML
#
# Settings can be specified either in hierarchical form, e.g.:
#
#   pipeline:
#     batch:
#       size: 125
#       delay: 5
#
# Or as flat keys:
#
#   pipeline.batch.size: 125
#   pipeline.batch.delay: 5
#
# ------------  Node identity ------------
#
# Use a descriptive name for the node:
#默认机器主机名称
# node.name: test
#
# If omitted the node name will default to the machine's host name
#
# ------------ Data path ------------------
#
# Which directory should be used by logstash and its plugins
# for any persistent needs. Defaults to LOGSTASH_HOME/data
#logstash及其插件目录
# path.data:
#
# ------------ Pipeline Settings --------------
#
# The ID of the pipeline.
#
# pipeline.id: main
#
# Set the number of workers that will, in parallel, execute the filters+outputs
# stage of the pipeline.
#
# This defaults to the number of the host's CPU cores.
#将并行执行管道的过滤器和输出阶段的工作线程数,默认是cpu核数
# pipeline.workers: 2
#
# How many events to retrieve from inputs before sending to filters+workers
#单个工作线程将从输入中收集的最大事件数
pipeline.batch.size: 125
#
# How long to wait in milliseconds while polling for the next event
# before dispatching an undersized batch to filters+outputs
#轮询下一个事件时等待的时间(毫秒)
pipeline.batch.delay: 50
#
# Force Logstash to exit during shutdown even if there are still inflight
# events in memory. By default, logstash will refuse to quit until all
# received events have been pushed to the outputs.
#
# WARNING: enabling this can lead to data loss during shutdown
#设置为 时true,强制 Logstash 在关闭期间退出,即使内存中仍有进行中的事件。
#默认情况下,Logstash 将拒绝退出,直到所有接收到的事件都已推送到输出
# pipeline.unsafe_shutdown: false
#
# Set the pipeline event ordering. Options are "auto" (the default), "true" or "false".
# "auto" will  automatically enable ordering if the 'pipeline.workers' setting
# is also set to '1'.
# "true" will enforce ordering on the pipeline and prevent logstash from starting
# if there are multiple workers.
# "false" will disable any extra processing necessary for preserving ordering.
#排序
# pipeline.ordered: auto
#
# ------------ Pipeline Configuration Settings --------------
#
# Where to fetch the pipeline configuration for the main pipeline
#
# path.config:
#
# Pipeline configuration string for the main pipeline
#
# config.string:
#
# At startup, test if the configuration is valid and exit (dry run)
#检查配置是否正确,默认不检查
# config.test_and_exit: false
#
# Periodically check if the configuration has changed and reload the pipeline
# This can also be triggered manually through the SIGHUP signal
#会定期检查配置是否已更改,并在更改时重新加载配置。默认不检查
# config.reload.automatic: false
#
# How often to check if the pipeline configuration has changed (in seconds)
# Note that the unit value (s) is required. Values without a qualifier (e.g. 60) 
# are treated as nanoseconds.
# Setting the interval this way is not recommended and might change in later versions.
#Logstash 检查配置文件的更改频率(以秒为单位)
# config.reload.interval: 3s
#
# Show fully compiled configuration as debug log message
# NOTE: --log.level must be 'debug'
#
# config.debug: false
#
# When enabled, process escaped characters such as \n and \" in strings in the
# pipeline configuration files.
#带引号的字符串是否转义
# config.support_escapes: false
#
# ------------ HTTP API Settings -------------
# Define settings related to the HTTP API here.
#
# The HTTP API is enabled by default. It can be disabled, but features that rely
# on it will not work as intended.
# http.enabled: true
#
# By default, the HTTP API is bound to only the host's local loopback interface,
# ensuring that it is not accessible to the rest of the network. Because the API
# includes neither authentication nor authorization and has not been hardened or
# tested for use as a publicly-reachable API, binding to publicly accessible IPs
# should be avoided where possible.
#
# http.host: 127.0.0.1
#
# The HTTP API web server will listen on an available port from the given range.
# Values can be specified as a single port (e.g., `9600`), or an inclusive range
# of ports (e.g., `9600-9700`).
#默认9600
# http.port: 9600-9700
#
# ------------ Module Settings ---------------
# Define modules here.  Modules definitions must be defined as an array.
# The simple way to see this is to prepend each `name` with a `-`, and keep
# all associated variables under the `name` they are associated with, and
# above the next, like this:
#
# modules:
#   - name: MODULE_NAME
#     var.PLUGINTYPE1.PLUGINNAME1.KEY1: VALUE
#     var.PLUGINTYPE1.PLUGINNAME1.KEY2: VALUE
#     var.PLUGINTYPE2.PLUGINNAME1.KEY1: VALUE
#     var.PLUGINTYPE3.PLUGINNAME3.KEY1: VALUE
#
# Module variable names must be in the format of
#
# var.PLUGIN_TYPE.PLUGIN_NAME.KEY
#
# modules:
#
# ------------ Cloud Settings ---------------
# Define Elastic Cloud settings here.
# Format of cloud.id is a base64 value e.g. dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRub3RhcmVhbCRpZGVudGlmaWVy
# and it may have an label prefix e.g. staging:dXMtZ...
# This will overwrite 'var.elasticsearch.hosts' and 'var.kibana.host'
# cloud.id: <identifier>
#
# Format of cloud.auth is: <user>:<pass>
# This is optional
# If supplied this will overwrite 'var.elasticsearch.username' and 'var.elasticsearch.password'
# If supplied this will overwrite 'var.kibana.username' and 'var.kibana.password'
# cloud.auth: elastic:<password>
#
# ------------ Queuing Settings --------------
#
# Internal queuing model, "memory" for legacy in-memory based queuing and
# "persisted" for disk-based acked queueing. Defaults is memory
# persisted基于磁盘的 ACKed 队列,会将未消费的消息持久化到磁盘
#memory基于内存,宕机之后,有可能丢失数据,默认是memory
# queue.type: memory
#
# If using queue.type: persisted, the directory path where the data files will be stored.
# Default is path.data/queue
#启用持久队列时将存储数据文件的目录路径
# path.queue:
#
# If using queue.type: persisted, the page data files size. The queue data consists of
# append-only data files separated into pages. Default is 64mb
#启用持久队列时使用的页面数据文件的大小,默认64M
# queue.page_capacity: 64mb
#
# If using queue.type: persisted, the maximum number of unread events in the queue.
# Default is 0 (unlimited)
#启用持久队列时队列中未读事件的最大数量,0表示没有限制
# queue.max_events: 0
#
# If using queue.type: persisted, the total capacity of the queue in number of bytes.
# If you would like more unacked events to be buffered in Logstash, you can increase the
# capacity using this setting. Please make sure your disk drive has capacity greater than
# the size specified here. If both max_bytes and max_events are specified, Logstash will pick
# whichever criteria is reached first
# Default is 1024mb or 1gb
#队列的总容量
# queue.max_bytes: 1024mb
#
# If using queue.type: persisted, the maximum number of acked events before forcing a checkpoint
# Default is 1024, 0 for unlimited
#启用持久队列时强制检查点之前 ACKed 事件的最大数量
# queue.checkpoint.acks: 1024
#
# If using queue.type: persisted, the maximum number of written events before forcing a checkpoint
# Default is 1024, 0 for unlimited
#启用持久队列时强制检查点之前的最大写入事件数
# queue.checkpoint.writes: 1024
#
# If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page
# Default is 1000, 0 for no periodic checkpoint.
#
# queue.checkpoint.interval: 1000
#
# ------------ Dead-Letter Queue Settings --------------
# Flag to turn on dead-letter queue.
#死信队列
# dead_letter_queue.enable: false
# If using dead_letter_queue.enable: true, the maximum size of each dead letter queue. Entries
# will be dropped if they would increase the size of the dead letter queue beyond this setting.
# Default is 1024mb
# dead_letter_queue.max_bytes: 1024mb
# If using dead_letter_queue.enable: true, the interval in milliseconds where if no further events eligible for the DLQ
# have been created, a dead letter queue file will be written. A low value here will mean that more, smaller, queue files
# may be written, while a larger value will introduce more latency between items being "written" to the dead letter queue, and
# being available to be read by the dead_letter_queue input when items are are written infrequently.
# Default is 5000.
#
# dead_letter_queue.flush_interval: 5000
# If using dead_letter_queue.enable: true, the directory path where the data files will be stored.
# Default is path.data/dead_letter_queue
#
# path.dead_letter_queue:
#
# ------------ Metrics Settings --------------
#
# Bind address for the metrics REST endpoint
#
# http.host: "127.0.0.1"
#
# Bind port for the metrics REST endpoint, this option also accept a range
# (9600-9700) and logstash will pick up the first available ports.
#
# http.port: 9600-9700
#
# ------------ Debugging Settings --------------
#
# Options for log.level:
#   * fatal
#   * error
#   * warn
#   * info (default)
#   * debug
#   * trace
#
# log.level: info
# path.logs:
#
# ------------ Other Settings --------------
#
# Where to find custom plugins
# path.plugins: []
#
# Flag to output log lines of each pipeline in its separate log file. Each log filename contains the pipeline.name
# Default is false
#用于启用不同日志文件中每个管道的日志分离
# pipeline.separate_logs: false
#
# ------------ X-Pack Settings (not applicable for OSS build)--------------
#
# X-Pack Monitoring
# https://www.elastic.co/guide/en/logstash/current/monitoring-logstash.html
#xpack.monitoring.enabled: false
#xpack.monitoring.elasticsearch.username: logstash_system
#xpack.monitoring.elasticsearch.password: password
#xpack.monitoring.elasticsearch.proxy: ["http://proxy:port"]
#xpack.monitoring.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"]
# an alternative to hosts + username/password settings is to use cloud_id/cloud_auth
#xpack.monitoring.elasticsearch.cloud_id: monitoring_cluster_id:xxxxxxxxxx
#xpack.monitoring.elasticsearch.cloud_auth: logstash_system:password
# another authentication alternative is to use an Elasticsearch API key
#xpack.monitoring.elasticsearch.api_key: "id:api_key"
#xpack.monitoring.elasticsearch.ssl.certificate_authority: [ "/path/to/ca.crt" ]
#xpack.monitoring.elasticsearch.ssl.truststore.path: path/to/file
#xpack.monitoring.elasticsearch.ssl.truststore.password: password
#xpack.monitoring.elasticsearch.ssl.keystore.path: /path/to/file
#xpack.monitoring.elasticsearch.ssl.keystore.password: password
#xpack.monitoring.elasticsearch.ssl.verification_mode: certificate
#xpack.monitoring.elasticsearch.sniffing: false
#xpack.monitoring.collection.interval: 10s
#xpack.monitoring.collection.pipeline.details.enabled: true
#
# X-Pack Management
# https://www.elastic.co/guide/en/logstash/current/logstash-centralized-pipeline-management.html
#xpack.management.enabled: false
#xpack.management.pipeline.id: ["main", "apache_logs"]
#xpack.management.elasticsearch.username: logstash_admin_user
#xpack.management.elasticsearch.password: password
#xpack.management.elasticsearch.proxy: ["http://proxy:port"]
#xpack.management.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"]
# an alternative to hosts + username/password settings is to use cloud_id/cloud_auth
#xpack.management.elasticsearch.cloud_id: management_cluster_id:xxxxxxxxxx
#xpack.management.elasticsearch.cloud_auth: logstash_admin_user:password
# another authentication alternative is to use an Elasticsearch API key
#xpack.management.elasticsearch.api_key: "id:api_key"
#xpack.management.elasticsearch.ssl.certificate_authority: [ "/path/to/ca.crt" ]
#xpack.management.elasticsearch.ssl.truststore.path: /path/to/file
#xpack.management.elasticsearch.ssl.truststore.password: password
#xpack.management.elasticsearch.ssl.keystore.path: /path/to/file
#xpack.management.elasticsearch.ssl.keystore.password: password
#xpack.management.elasticsearch.ssl.verification_mode: certificate
#xpack.management.elasticsearch.sniffing: false
#xpack.management.logstash.poll_interval: 5s
# X-Pack GeoIP plugin
# https://www.elastic.co/guide/en/logstash/current/plugins-filters-geoip.html#plugins-filters-geoip-manage_update
#xpack.geoip.download.endpoint: "https://geoip.elastic.co/v1/database"


4、启动


1. [root@localhost logstash]# ./bin/logstash
2. #后台启动
3. [root@localhost logstash]# nohup ./bin/logstash &


Docker方式


1、拉取镜像


docker pull docker.elastic.co/logstash/logstash:7.15.2


2、创建挂载


mkdir -p /data/logstash/{pipeline,config}
logstash.yml


logstash.yml

#开启
http.host: 0.0.0.0


pipelines.yml


# List of pipelines to be loaded by Logstash
#
# This document must be a list of dictionaries/hashes, where the keys/values are pipeline settings.
# Default values for omitted settings are read from the `logstash.yml` file.
# When declaring multiple pipelines, each MUST have its own `pipeline.id`.
#
# Example of two pipelines:
  - pipeline.id: kafka
    pipeline.workers: 2 #线程数默认与cpu核数一致
    pipeline.batch.size: 1 #批量处理的条数默认125
    path.config: "/usr/share/logstash/pipeline"


降配置文件放到pipeline目录(logstash-kafka.conf)


3、创建容器


docker run -it --name logstash --net=host \
-v /data/logstash/pipeline/:/usr/share/logstash/pipeline/ \
-v /data/logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml \
-v /data/logstash/config/pipelines.yml:/usr/share/logstash/config/pipelines.yml \
docker.elastic.co/logstash/logstash:7.15.2


注意若是es与logstash不在同一台服务器上启动参数一定要加上--net=host,不然其他es节点连接不上!


相关文章
|
9月前
|
关系型数据库 应用服务中间件 nginx
Docker一键安装中间件(RocketMq、Nginx、MySql、Minio、Jenkins、Redis)
本系列脚本提供RocketMQ、Nginx、MySQL、MinIO、Jenkins和Redis的Docker一键安装与配置方案,适用于快速部署微服务基础环境。
|
7月前
|
关系型数据库 数据库 PostgreSQL
docker 安装 Postgres 17.6
本文介绍如何通过Docker安装和配置PostgreSQL 17.6。内容包括拉取镜像、导出配置文件、运行容器并挂载数据与配置文件目录,以及进入容器使用psql操作数据库的完整步骤,便于持久化管理和自定义配置。
1172 3
docker 安装 Postgres 17.6
|
6月前
|
NoSQL 算法 Redis
【Docker】(3)学习Docker中 镜像与容器数据卷、映射关系!手把手带你安装 MySql主从同步 和 Redis三主三从集群!并且进行主从切换与扩容操作,还有分析 哈希分区 等知识点!
Union文件系统(UnionFS)是一种**分层、轻量级并且高性能的文件系统**,它支持对文件系统的修改作为一次提交来一层层的叠加,同时可以将不同目录挂载到同一个虚拟文件系统下(unite several directories into a single virtual filesystem) Union 文件系统是 Docker 镜像的基础。 镜像可以通过分层来进行继承,基于基础镜像(没有父镜像),可以制作各种具体的应用镜像。
743 6
|
6月前
|
Java Linux 虚拟化
【Docker】(1)Docker的概述与架构,手把手带你安装Docker,云原生路上不可缺少的一门技术!
1. Docker简介 1.1 Docker是什么 为什么docker会出现? 假定您在开发一款平台项目,您的开发环境具有特定的配置。其他开发人员身处的环境配置也各有不同。 您正在开发的应用依赖于您当前的配置且还要依赖于某些配置文件。 您的企业还拥有标准化的测试和生产环境,且具有自身的配置和一系列支持文件。 **要求:**希望尽可能多在本地模拟这些环境而不产生重新创建服务器环境的开销 问题: 要如何确保应用能够在这些环境中运行和通过质量检测? 在部署过程中不出现令人头疼的版本、配置问题 无需重新编写代码和进行故障修复
567 2
|
10月前
|
存储 NoSQL MongoDB
Docker中安装MongoDB并配置数据、日志、配置文件持久化。
现在,你有了一个运行在Docker中的MongoDB,它拥有自己的小空间,对高楼大厦的崩塌视而不见(会话丢失和数据不持久化的问题)。这个MongoDB的数据、日志、配置文件都会妥妥地保存在你为它精心准备的地方,天旋地转,它也不会失去一丁点儿宝贵的记忆(即使在容器重启后)。
1178 4
|
9月前
|
Linux Docker Windows
windows docker安装报错适用于 Linux 的 Windows 子系统必须更新到最新版本才能继续。可通过运行 “wsl.exe --update” 进行更新。
适用于 Linux 的 Windows 子系统需更新至最新版本(如 wsl.2.4.11.0.x64.msi)以解决 2025 年 Windows 更新后可能出现的兼容性问题。用户可通过运行 “wsl.exe --update” 或访问提供的链接下载升级包进行更新。
3654 0
|
10月前
|
Linux iOS开发 Docker
MyEMS开源系统安装之Linux/macOS上的DOcker
本指南详细介绍了如何在Linux/macOS上使用Docker部署MyEMS系统。主要内容包括:前置条件(如安装Docker、npm和MySQL),以及分步骤部署各个组件(如myems-api、myems-admin、myems-modbus-tcp等)。每个步骤涵盖源代码复制、环境配置、镜像构建、容器运行及日志管理等操作,并提供了多平台构建的支持。最后,指南还说明了默认端口和登录凭据,帮助用户快速启动并访问MyEMS的管理界面和Web界面。
358 1

热门文章

最新文章

下一篇
开通oss服务