logstash同步mysql至es数据库脚本

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
云数据库 RDS PostgreSQL,高可用系列 2核4GB
简介: logstash同步mysql至es数据库脚本

相关mysql数据库脚本文件

mysql.conf配置文件

input {
#  jdbc {
#    jdbc_driver_library => "F:/Install/repository/mysql/mysql-connector-java/8.0.20/mysql-connector-java-8.0.20.jar"
#    jdbc_driver_class => "com.mysql.jdbc.Driver"    # 8.0以上版本:一定要把serverTimezone=UTC天加上  &useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true
#    jdbc_connection_string => "jdbc:mysql://10.134.22.232:3307/cpmcbiddingx_report?characterEncoding=utf8"
#    jdbc_user => "root"
#    jdbc_password => "password"
#    schedule => "* * * * *"
# #数据库的查询语句,两种方法:1、statement_filepath指定sql路径;2、sql语句
#    # statement_filepath => "filename.sql"
#    statement => "SELECT * FROM cwdz_bsfbzjbank_flow WHERE acct_date >= :sql_last_value"
#    #id,project_id,project_code,project_name,package_id,package_code,deptid,org_name,order_no,supplier_code,company_name,amount,DATE_FORMAT(acct_date,'%Y-%m-%d %T') as acct_date,payee_name,payee_acctno,payee_bankno,payee_bankname,pay_type,tenant_id
# #是否分页
#    jdbc_paging_enabled => "true"     
#    jdbc_page_size => "5000"
# # 是否需要记录某列的值,用于实时同步更新。
# use_column_value => true
# # 需要记录的字段,一般用于记录主键id,或者更新时间,用于sql查询最新。
# tracking_column => "acct_date"
# tracking_column_type => "timestamp"
# # 是否清除记录的字段。
# clean_run => false
# #记录字段保存的位置。
# last_run_metadata_path => "D:/software/elasticsearch/03.elasticsearch/logstash-7.13.1/config/logstash_metadata2"
# #写入es数据的key,默认会被转成小写,该字段用于控制是否小写。
# lowercase_column_names => true
#
# jdbc_default_timezone => "Asia/Shanghai"
# plugin_timezone => "local"
# type => "pfflow"
# 
#  }
  jdbc {
    jdbc_driver_library => "F:/Install/repository/mysql/mysql-connector-java/8.0.20/mysql-connector-java-8.0.20.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"    # 8.0以上版本:一定要把serverTimezone=UTC天加上
    jdbc_connection_string => "jdbc:mysql://10.134.22.232:3307/cpmcbiddingx_report?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true"
    jdbc_user => "root"
    jdbc_password => "password"
    schedule => "* * * * *"
    # statement_filepath => "filename.sql"
  #解决相差8小时 时差
    statement => "SELECT * FROM cwdz_bsfbzjbank_flow WHERE acct_date  > convert_tz(:sql_last_value, '+00:00','-08:00') order by acct_date asc"
    jdbc_paging_enabled => "true"     
    jdbc_page_size => "5000"
  use_column_value => true
  tracking_column => "acct_date"
  tracking_column_type => "timestamp"
  clean_run => false
  last_run_metadata_path => "D:/software/elasticsearch/03.elasticsearch/logstash-7.13.1/config/logstash_metadata15"
  lowercase_column_names => true
  jdbc_default_timezone => "Asia/Shanghai"
  plugin_timezone => "local"
  type => "pufa"
  }
#  jdbc {
#    jdbc_driver_library => "F:/Install/repository/mysql/mysql-connector-java/8.0.20/mysql-connector-java-8.0.20.jar"
#    jdbc_driver_class => "com.mysql.jdbc.Driver"    # 8.0以上版本:一定要把serverTimezone=UTC天加上
#    jdbc_connection_string => "jdbc:mysql://10.134.22.232:3307/cpmcbiddingx_report?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true"
#    jdbc_user => "root"
#    jdbc_password => "password"
#    schedule => "* * * * *"
#    # statement_filepath => "filename.sql"
#    statement => "SELECT * FROM cpmc_dz_report_kpinfo WHERE created_time >= :sql_last_value"
#    
#    jdbc_paging_enabled => "true"     
#    jdbc_page_size => "5000"
# use_column_value => true
# tracking_column => "created_time"
# tracking_column_type => "timestamp"
# clean_run => false
# last_run_metadata_path => "D:/software/elasticsearch/03.elasticsearch/logstash-7.13.1/config/logstash_metadata2"
# lowercase_column_names => true
# jdbc_default_timezone => "Asia/Shanghai"
# plugin_timezone => "local"
# type => "kpinfo"
#  }
}
#ElasticSearch中默认使用UTC时间,和中国时间相差8小时,加入以下配置
#filter {
#    ruby {
#        code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
#    }
#    ruby {
#        code => "event.set('@timestamp',event.get('timestamp'))"
#    }
#    mutate {
#        remove_field => ["timestamp"]
#    }
#    ruby {
#        code => "event.set('acct_date', event.get('acct_date').time.localtime + 8*60*60)"
#    }
#}
output {
  if[type] == "pfflow" {
        elasticsearch {
            hosts => ["localhost:9200"]
            index => "pfflow_0611"
            document_id => "%{id}"
        }
    }
    if[type] == "kpinfo" {
        elasticsearch {
            hosts => ["localhost:9200"]
            index => "kpinfo_0615"
            #document_id => "%{id}"
        }
    }
  if[type] == "pufa" {
        elasticsearch {
            hosts => ["localhost:9200"]
            index => "pufa_0615"
            document_id => "%{id}"
        }
    }
    #elasticsearch {
        # ES的IP地址及端口
    #    hosts => ["127.0.0.1:9200"]
        # 索引名称 可自定义
    #    index => "0603logstash"
        # 需要关联的数据库中有有一个id字段,对应类型中的id
        #document_id => "%{id}"
        #document_type => "cpmc_dz_project"
    #template =>"D:/software/elasticsearch/03.elasticsearch/logstash-7.13.1/config/nm_course_template.json"
    #template_name =>"nm_course"
    #template_overwrite =>"true"
    #}
    stdout {
        # JSON格式输出
        codec => json_lines
    }
}

启动:进入logstash bin目录文件 cmd命令执行logstash -f “mysql.conf”

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。   相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情: https://www.aliyun.com/product/rds/mysql 
相关文章
|
3月前
|
人工智能 运维 关系型数据库
数据库运维:mysql 数据库迁移方法-mysqldump
本文介绍了MySQL数据库迁移的方法与技巧,重点探讨了数据量大小对迁移方式的影响。对于10GB以下的小型数据库,推荐使用mysqldump进行逻辑导出和source导入;10GB以上可考虑mydumper与myloader工具;100GB以上则建议物理迁移。文中还提供了统计数据库及表空间大小的SQL语句,并讲解了如何使用mysqldump导出存储过程、函数和数据结构。通过结合实际应用场景选择合适的工具与方法,可实现高效的数据迁移。
665 1
|
3月前
|
SQL 关系型数据库 MySQL
Go语言数据库编程:使用 `database/sql` 与 MySQL/PostgreSQL
Go语言通过`database/sql`标准库提供统一数据库操作接口,支持MySQL、PostgreSQL等多种数据库。本文介绍了驱动安装、连接数据库、基本增删改查操作、预处理语句、事务处理及错误管理等内容,涵盖实际开发中常用的技巧与注意事项,适合快速掌握Go语言数据库编程基础。
257 62
|
2月前
|
SQL 存储 关系型数据库
MySQL功能模块探秘:数据库世界的奇妙之旅
]带你轻松愉快地探索MySQL 8.4.5的核心功能模块,从SQL引擎到存储引擎,从复制机制到插件系统,让你在欢声笑语中掌握数据库的精髓!
|
2月前
|
SQL Oracle 关系型数据库
比较MySQL和Oracle数据库系统,特别是在进行分页查询的方法上的不同
两者的性能差异将取决于数据量大小、索引优化、查询设计以及具体版本的数据库服务器。考虑硬件资源、数据库设计和具体需求对于实现优化的分页查询至关重要。开发者和数据库管理员需要根据自身使用的具体数据库系统版本和环境,选择最合适的分页机制,并进行必要的性能调优来满足应用需求。
105 11
|
2月前
|
SQL 关系型数据库 MySQL
【赵渝强老师】MySQL中的数据库对象
本教程详细介绍了MySQL数据库中的常见对象,包括表、索引、视图、事件、存储过程和存储函数的创建与管理。内容涵盖表的基本操作、索引的使用、视图简化查询、事件调度功能等,并通过具体示例演示相关SQL语句的使用方法。
|
4月前
|
关系型数据库 MySQL Linux
实现MySQL数据库的定时自动备份脚本。
拿走,不谢,这个脚本配方(指引)保证你的数据库数据像蛋糕店一样地天天更新,还能确保老旧的蛋糕(数据)不会堆积满仓库。这下可好,数据安全有保障,数据库管理员也能轻松一点,偶尔闲下来的时候,煮杯咖啡,看个剧岂不美哉?别忘了偶尔检查一下你的自动备份是否正常工作,以防万一蛋糕机器出了点小差错。
228 20
|
4月前
|
关系型数据库 MySQL 定位技术
MySQL与Clickhouse数据库:探讨日期和时间的加法运算。
这一次的冒险就到这儿,期待你的再次加入,我们一起在数据库的世界中找寻下一个宝藏。
196 9
|
4月前
|
存储 关系型数据库 MySQL
【赵渝强老师】OceanBase数据库从零开始:MySQL模式
《OceanBase数据库从零开始:MySQL模式》是一门包含11章的课程,涵盖OceanBase分布式数据库的核心内容。从体系架构、安装部署到租户管理、用户安全,再到数据库对象操作、事务与锁机制,以及应用程序开发、备份恢复、数据迁移等方面进行详细讲解。此外,还涉及连接路由管理和监控诊断等高级主题,帮助学员全面掌握OceanBase数据库的使用与管理。
264 5
|
4月前
|
负载均衡 算法 关系型数据库
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
本文聚焦 MySQL 集群架构中的负载均衡算法,阐述其重要性。详细介绍轮询、加权轮询、最少连接、加权最少连接、随机、源地址哈希等常用算法,分析各自优缺点及适用场景。并提供 Java 语言代码实现示例,助力直观理解。文章结构清晰,语言通俗易懂,对理解和应用负载均衡算法具有实用价值和参考价值。
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
|
6月前
|
关系型数据库 MySQL Java
【YashanDB知识库】原生mysql驱动配置连接崖山数据库
【YashanDB知识库】原生mysql驱动配置连接崖山数据库
【YashanDB知识库】原生mysql驱动配置连接崖山数据库

推荐镜像

更多