Logstash同步MySql数据到Elasticsearch

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: Logstash同步MySql数据到Elasticsearch

1 MySql数据到Elasticsearch

1.1 下载logstash

官网

https://www.elastic.co/cn/logstash/

wget https://artifacts.elastic.co/downloads/logstash/logstash-6.6.0.tar.gz

1.2 解压logstash

tar -zxvf logstash-6.6.0.tar.gz

1.3 在logstash 目录创建 mysql 文件夹

[root@bigdata01 logstash-6.6.0]# mkdir mysql

1.4 将 mysql 驱动文件和数据库查询文件 放进mysql中

1.5 在config 目录下创建 mysqltoes.conf 文件

1.6 mysqltoes.conf 配置

input {
  # 多张表的同步只需要设置多个jdbc的模块就行了
  jdbc {
      # mysql 数据库链接,shop为数据库名
      jdbc_connection_string => "jdbc:mysql://ip:3306/mall?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true"
      # 用户名和密码
      jdbc_user => "root"
      jdbc_password => "root"
      # 驱动
      jdbc_driver_library => "/usr/local/logstash-6.6.0/mysql/mysql-connector-java-8.0.16.jar"
      # 驱动类名
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      #是否分页
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      #直接执行sql语句
      # statement =>"select * from t_item"
      # 执行的sql 文件路径+名称
      statement_filepath => "/usr/local/logstash-6.6.0/mysql/item.sql"
      # 默认列名转换为小写
      lowercase_column_names => "false"
      #设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
      schedule => "* * * * *"
      # 索引类型
      #type => "jdbc"
    }
}
output {
  elasticsearch {
        #es的ip和端口
        hosts => ["http://ip:9200"]
        #ES索引名称(自己定义的)
            index => "mall"
        #文档类型
        document_type => "mall_item"
        #设置数据的id为数据库中的字段
        document_id => "%{iteId}"
    }
    stdout {
        codec => json_lines
    }
}

1.7 启动 logstash

前台启动:

[root@bigdata01 bin]# ./logstash -f ../config/mysqltoes.conf

后台启动:

[root@bigdata01 bin]# nohup  ./logstash -f ../config/mysqltoes.conf >logstash.log &

2 配置语法讲解

logstash使用{ }来定义配置区域,区域内又可以包含其插件的区域配置。

# 最基本的配置文件定义,必须包含input 和 output。
input{
    stdin{ }
}
output{
    stdout{
        codec=>rubydebug
    }
}
# 如果需要对数据进操作,则需要加上filter段
input{
    stdin{ }
}
filter{
  # 里面可以包含各种数据处理的插件,如文本格式处理 grok、键值定义 kv、字段添加、
  # geoip 获取地理位置信息等等... 
}
output{
    stdout{
        codec=>rubydebug
    }
}
# 可以定义多个输入源与多个输出位置
input{
    stdin{ }
    file{
        path => ["/var/log/message"]
        type => "system"
        start_position => "beginning"
    }
}
output{
    stdout{
        codec=>rubydebug
    }
    file {
        path => "/var/datalog/mysystem.log.gz"
        gzip => true
    }
}

3 启动方式

# 通过手动指定配置文件启动
/bin/logstash -f /etc/logstash/conf.d/nginx_logstash.conf
# 以daemon方式运行,则在指令后面加一个 & 符号
/bin/logstash -f /etc/logstash/conf.d/nginx_logstash.conf &
# 如果是通过rpm包安装的logstash则可以使用自带的脚本启动
/etc/init.d/logstash start 
# 通过这种方式启动,logstash会自动加载 /etc/logstash/conf.d/ 下的配置文件

4 filebeat基本讲解

filebeat是基于原先 logstash-forwarder 的源码开发而来,无需JAVA环境,运行起来更轻便,无疑是业务服务器端的日志收集工具。

配 置

# 配置文件路径 "/etc/filebeat/filebeat.yml"
# 一个配置文件可以包含多个prospectors,一个prospectors可以包含多个path。
filebeat:
  # List of prospectors to fetch data.
  prospectors:
    # Each - is a prospector. Below are the prospector specific configurations
    -
      paths:
        - /var/log/messages
      input_type: log
      document_type: messages
    -
      paths:
        - /alidata/log/nginx/access/access.log.json
      input_type: log
      document_type: nginxacclog
    -
      paths:
        - /alidata/www/storage/logs/laravel.log
      input_type: log
      document_type: larlog
    -
      paths:
        - /alidata/www/500_error/500_error.log
      input_type: log
      document_type: error500
    -
      paths:
        - /alidata/www/deposit/deposit.log
      input_type: log
      document_type: deposit
    -
      paths:
        - /alidata/www/call_error.log
      input_type: log
      document_type: call_error
    -
      paths:
        - /alidata/www/weixin_deposit.log
      input_type: log
      document_type: weixin_deposit
    -
      paths:
        - /alidata/log/php/php-fpm.log.slow
      input_type: log
      document_type: phpslowlog
      # 多行处理
      multiline:
          pattern: '^[[:space:]]'
          negate: true
          match: after
    # Additional prospector
registry_file: /var/lib/filebeat/registry
############################# Libbeat Config ##################################
# Base config file used by all other beats for using libbeat features
############################# Output ##########################################
# 输出数据到 redis 
output:
  redis:
    host: "10.122.52.129"
    port: 6379
    password: "123456"
# 输出数据到 logstash ,一般两者选用其一
  logstash:
    hosts: ["10.160.8.221:5044"]
############################# Shipper #########################################
shipper:
# 打上服务器tag
  name: "host_2"
############################# Logging #########################################  
logging:  
  files:
    rotateeverybytes: 10485760 # = 10MB

filebeat主要配置就是这个配置文件了,设定好之后启动服务就会自动从源拉取数据发送到指定位置,当数据为普通行数据时,filebeat会自动为其添加字段信息,其中一项字段 @timestamp 为filebeat读取到这条数据的时间,默认格式为UTC时间,比中国大陆时间早8小时。


如果数据为json格式,而数据中已包含@timestamp字段,filebeat处理时会把@timestamp字段值替换为filebeat读取到该行数据的当前UTC时间。

5 实战运用

5.1 业务到redis到es之间迁移

nginx 日志格式配置

       log_format json '{"@timestamp":"$time_iso8601",'
                 '"slbip":"$remote_addr",'
                 '"clientip":"$http_x_forwarded_for",'
                 '"serverip":"$server_addr",'
                 '"size":$body_bytes_sent,'
                 '"responsetime":$request_time,'
                 '"domain":"$host",'
                 '"method":"$request_method",'
                 '"requesturi":"$request_uri",'
                 '"url":"$uri",'
                 '"appversion":"$HTTP_APP_VERSION",'
                 '"referer":"$http_referer",'
                 '"agent":"$http_user_agent",'
                 '"status":"$status"}';

filebeat 配置

filebeat:
  # List of prospectors to fetch data.
  prospectors:
    # Each - is a prospector. Below are the prospector specific configurations
    -
      paths:
        - /alidata/log/nginx/access/access.log.json
      input_type: log
      document_type: nginxacclog
############################# Output ##########################################
output:            
  logstash:
    hosts: ["10.160.8.221:5044"]
# 其他部分配置省略。

logstash 配置 (此处logstash用于接收filebeat的数据,然后转存redis)

input {
    beats {
    port => 5044
    codec => "json"
}
}
filter {
    if [type] == "nginxacclog" {
    geoip {
        source => "clientip"
        target => "geoip"
        database => "/u01/elk/logstash/GeoLiteCity.dat"
        add_field => [ "[geoip][coordinates]","%{[geoip][longitude]}" ]
        add_field => [ "[geoip][coordinates]","%{[geoip][latitude]}" ]
}
    mutate {
        convert => [ "[geoip][coordinates]","float" ]
}
}
}
output{
    if [type] == "nginxacclog" {
    redis {
        data_type => "list"
        key => "nginxacclog"
        host => "127.0.0.1"
        port => "26379"
        password => "123456"
        db => "0"
}
}
    if [type] == "messages" {
    redis {
        data_type => "list"
        key => "messages"
        host => "127.0.0.1"
        port => "26379"
        password => "123456"
        db => "0"
}
}
}

logstash 配置 (此处logstash用于读取redis list中的数据,然后转存elasticsearch)

input{
    redis {
        host => "10.10.1.2"
        port => "26379"
        db => "0"
        key => "nginxacclog"
        threads => 300
        password => "123456"
        data_type => "list"
        codec => "json"
}
    redis {
        host => "10.10.1.2"
        port => "26379"
        db => "0"
        key => "messages"
        password => "123456"
        threads => 50
        data_type => "list"
        codec => "json"
}
}
output {
    if [type] == "nginxacclog" {
        elasticsearch {
            hosts => ["127.0.0.1:9200"]
            index => "logstash-nginxacclog-%{+YYYY.MM.dd}"
            manage_template => true
            flush_size => 50000
            idle_flush_time => 10
            workers => 2
}
}
    if [type] == "messages" {
        elasticsearch {
            hosts => ["127.0.0.1:9200"]
            index => "logstash-messages-%{+YYYY.MM.dd}"
            manage_template => true
            flush_size => 50000
            idle_flush_time => 30
            workers => 1
}
}
}

关键指令解释:


threads 开启多少个线程读取redis数据,也就是从redis输入到logstash的速度,线程越多读取速度越快,但是根据接收节点的接收速度来设置,如果输入过快,接收速度不够,则会出现丢数据的情况,设置一个最佳的threads值需要和接收节点做反复测试才能得出。


flush_size 控制logstash向Elasticsearch批量发送数据,上面的配置表示,logstash会努力赞到50000条数据一次发送给Elasticsearch。


idle_flush_time 控制logstash多长时间向Elasticsearch发送一次数据,默认为1秒,根据以上配置,logstash积攒数据未到flush_size 10秒后也会向Elasticsearch发送一次数据。


workers 建议设置为1或2,如果机器性能不错可以设置为2. 不建议设置的更高。

5.2 业务到redis到mongo

filebeat 配置(从日志文件读取到的数据直接缓存至redis队列)

filebeat:
  # List of prospectors to fetch data.
  prospectors:
    # Each - is a prospector. Below are the prospector specific configurations
    -
      paths:
        - /alidata/log/nginx/access/access.log.json
      input_type: log
      document_type: nginxacclog
############################# Output ##########################################
output:
  redis:
    host: "10.160.8.221"
    port: 26379
    password: "123456"

document_type 自定义日志类型,在logstash中可通过type判断做不同的处理。

logstash 配置 (此处logstash用于读取redis list中的数据,然后转存mongodb)

input {
        redis {
            host => "10.160.8.221"
            port => "26379"
            key => "filebeat"
            data_type => "list"
            password => "123456"
            threads => 50
         }
        redis {
            host => "10.160.8.221"
            port => "26379"
            key => "mycat"
            data_type => "list"
            password => "123456"
            threads => 50
            type => "mycat"
        }
}
output {
if [type] == "mycat" {
        mongodb{
            collection => "mycat%{+yyyyMMdd}"
            isodate => true
            database => "logdb"
            uri => "mongodb://log_user:123456@10.10.1.102:27017/logdb"
        }
}
if [type_xi09wnk] == "nginxacclog" {
        mongodb{
            collection => "nginx_accress%{years_dik3k}%{months_dik3k}%{days_dik3k}"
            isodate => true
            database => "logdb"
            uri => "mongodb://log_user:123456@10.10.1.102:27017/logdb"
        }
}
}


相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
25天前
|
安全 关系型数据库 MySQL
如何将数据从MySQL同步到其他系统
【10月更文挑战第17天】如何将数据从MySQL同步到其他系统
144 0
|
1月前
|
Web App开发 JavaScript Java
elasticsearch学习五:springboot整合 rest 操作elasticsearch的 实际案例操作,编写搜索的前后端,爬取京东数据到elasticsearch中。
这篇文章是关于如何使用Spring Boot整合Elasticsearch,并通过REST客户端操作Elasticsearch,实现一个简单的搜索前后端,以及如何爬取京东数据到Elasticsearch的案例教程。
174 0
elasticsearch学习五:springboot整合 rest 操作elasticsearch的 实际案例操作,编写搜索的前后端,爬取京东数据到elasticsearch中。
|
1月前
|
关系型数据库 MySQL API
MySQL 历史数据迁移到 Elasticsearch
MySQL 历史数据迁移到 Elasticsearch
71 4
|
1月前
|
SQL 存储 关系型数据库
Mysql主从同步 清理二进制日志的技巧
Mysql主从同步 清理二进制日志的技巧
27 1
|
1月前
|
消息中间件 监控 关系型数据库
MySQL数据实时同步到Elasticsearch:技术深度解析与实践分享
在当今的数据驱动时代,实时数据同步成为许多应用系统的核心需求之一。MySQL作为关系型数据库的代表,以其强大的事务处理能力和数据完整性保障,广泛应用于各种业务场景中。然而,随着数据量的增长和查询复杂度的提升,单一依赖MySQL进行高效的数据检索和分析变得日益困难。这时,Elasticsearch(简称ES)以其卓越的搜索性能、灵活的数据模式以及强大的可扩展性,成为处理复杂查询需求的理想选择。本文将深入探讨MySQL数据实时同步到Elasticsearch的技术实现与最佳实践。
84 0
|
9天前
|
SQL 关系型数据库 MySQL
go语言数据库中mysql驱动安装
【11月更文挑战第2天】
23 4
|
7天前
|
SQL 关系型数据库 MySQL
12 PHP配置数据库MySQL
路老师分享了PHP操作MySQL数据库的方法,包括安装并连接MySQL服务器、选择数据库、执行SQL语句(如插入、更新、删除和查询),以及将结果集返回到数组。通过具体示例代码,详细介绍了每一步的操作流程,帮助读者快速入门PHP与MySQL的交互。
20 1
|
1月前
|
存储 关系型数据库 MySQL
Mysql(4)—数据库索引
数据库索引是用于提高数据检索效率的数据结构,类似于书籍中的索引。它允许用户快速找到数据,而无需扫描整个表。MySQL中的索引可以显著提升查询速度,使数据库操作更加高效。索引的发展经历了从无索引、简单索引到B-树、哈希索引、位图索引、全文索引等多个阶段。
61 3
Mysql(4)—数据库索引
|
15天前
|
监控 关系型数据库 MySQL
数据库优化:MySQL索引策略与查询性能调优实战
【10月更文挑战第27天】本文深入探讨了MySQL的索引策略和查询性能调优技巧。通过介绍B-Tree索引、哈希索引和全文索引等不同类型,以及如何创建和维护索引,结合实战案例分析查询执行计划,帮助读者掌握提升查询性能的方法。定期优化索引和调整查询语句是提高数据库性能的关键。
82 1
|
18天前
|
关系型数据库 MySQL Linux
在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤,包括准备工作、下载源码、编译安装、配置 MySQL 服务、登录设置等。
本文介绍了在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤,包括准备工作、下载源码、编译安装、配置 MySQL 服务、登录设置等。同时,文章还对比了编译源码安装与使用 RPM 包安装的优缺点,帮助读者根据需求选择最合适的方法。通过具体案例,展示了编译源码安装的灵活性和定制性。
59 2