JAVA通过Gearman实现MySQL到Redis的数据同步(异步复制)

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,高可用系列 2核4GB
简介: 无论MySQL还是Redis,自身都带有数据同步的机制,像比较常用的 MySQL的Master/Slave模式 ,就是由Slave端分析Master的binlog来实现的,这样的数据复制其实还是一个异步过程,只不过当服务器都在同一内网时,异步的延迟几乎可以忽略。

MySQL到Redis数据复制方案


无论MySQL还是Redis,自身都带有数据同步的机制,像比较常用的 MySQL的Master/Slave模式 ,就是由Slave端分析Master的binlog来实现的,这样的数据复制其实还是一个异步过程,只不过当服务器都在同一内网时,异步的延迟几乎可以忽略。


那么理论上我们也可以用同样方式,分析MySQL的binlog文件并将数据插入Redis。但是这需要对binlog文件以及MySQL有非常深入的理解,同时由于 binlog存在Statement/Row/Mixedlevel多种形式 ,分析binlog实现同步的工作量是非常大的。


因此这里选择了一种开发成本更加低廉的方式,借用已经比较成熟的MySQL UDF,将MySQL数据首先放入Gearman中,然后通过一个自己编写的PHP Gearman Worker,将数据同步到Redis。比分析binlog的方式增加了不少流程,但是实现成本更低,更容易操作。


Gearman的安装与使用


Gearman 是一个支持分布式的任务分发框架。设计简洁,获得了非常广泛的支持。一个典型的Gearman应用包括以下这些部分:


 

QQ图片20220427164352.jpg


  • Gearman Job Server:Gearman核心程序,需要编译安装并以守护进程形式运行在后台


  • Gearman Client:可以理解为任务的收件员,比如我要在后台执行一个发送邮件的任务,可以在程序中调用一个Gearman Client并传入邮件的信息,然后就可以将执行结果立即展示给用户,而任务本身会慢慢在后台运行。


  • Gearman Worker:任务的真正执行者,一般需要自己编写具体逻辑并通过守护进程方式运行,Gearman Worker接收到Gearman Client传递的任务内容后,会按顺序处理。


以前曾经介绍过类似的 后台任务处理项目Resque 。两者的设计其实非常接近,简单可以类比为:


  • Gearman Job Server:对应Resque的Redis部分


  • Gearman Client:对应Resque的Queue操作


  • Gearman Worker:对应Resque的Worker和Job


这里之所以选择Gearman而不是Resque是因为Gearman提供了比较好用的MySQL UDF,工作量更小。

 

1、安装依赖


yum install -y boost-devel gperf libevent-devel libuuid-devel
 yum install mysql-devel -y


2、下载gearman


wget https://launchpad.net/gearmand/1.2/1.1.12/+download/gearmand-1.1.12.tar.gz


3、编译安装,指定mysqlclient的链接路径


tar -zxvf gearmand-1.1.12.tar.gz 
  cd gearmand-1.1.12
   ./configure  
make && make install

 

4、启动gearmand服务端 (启动之时,在/var/log/下创建gearmand.log日志文件。-l 指定日志文件  -d后台运行 -L 0.0.0.0 绑定到IPV4


gearmand -L 0.0.0.0 -l /var/log/gearmand.log -d


5、查看是否启动成功


ps -ef | grep gearman


6、查看是否安装成功,查看gearman版本信息


gearmand -V

 

7、MySQL UDF + Trigger同步数据到Gearman (https://github.com/mysqludf)

安装lib_mysqludf_json(lib_mysqludf_json可以把MySQL表的数据以json数据格式输出)


wget https://github.com/mysqludf/lib_mysqludf_json/archive/master.zip


unzip master.zip
cd lib_mysqludf_json-master/
rm -rf lib_mysqludf_json.so


8、编译 mysql_config 这是mysql的配置文件,可以 find /usr -name mysql_config 搜索下在什么位置


gcc $(/usr/local/mysql/bin/mysql_config  --cflags) -shared -fPIC -o lib_mysqludf_json.so lib_mysqludf_json.c


9、拷贝lib_mysqludf_json.so到MySQL的plugin目录

(可以登陆MySQL,输入命令"show variables like '%plugin%'"查看plugin位置)


cp lib_mysqludf_json.so /usr/local/mysql/lib/plugin/

 

演示lib_mysqludf_json功能
登录mysql
mysql -uroot -h127.0.0.1 -p
注册UDF函数
CREATE FUNCTION json_object RETURNS STRING SONAME "lib_mysqludf_json.so";
CREATE FUNCTION json_array RETURNS STRING SONAME "lib_mysqludf_json.so";
CREATE FUNCTION json_members RETURNS STRING SONAME "lib_mysqludf_json.so";
CREATE FUNCTION json_values RETURNS STRING SONAME "lib_mysqludf_json.so";
//json_array|json_members|json_values函数注册方式与json_object一样.
select json_object(id,file_save_type,base_dir) as sys_file_save_config from sys_file_save_config;
ERROR 1123 (HY000): Can't initialize function 'json_object'; Invalid json member name - name cannot be empty


以上错误这样解决,给每个成员名称使用别名即可:


select json_object(id as id ,file_save_type as fileSaveType,app_id as appID) as sys_file_save_config from sys_file_save_config;


10、安装gearman-mysql-udf (https://launchpad.net/gearman-mysql-udf)


 wget https://launchpad.net/gearman-mysql-udf/trunk/0.6/+download/gearman-mysql-udf-0.6.tar.gz


tar zxvf gearman-mysql-udf-0.6.tar.gz 
   cd gearman-mysql-udf-0.6


11、安装libgearman-devel


yum install libgearman-devel -y

 

如果没有yum源,添加epel.repo yum源


[epel]
name=Extra Packages for Enterprise Linux 6 - $basearch
#baseurl=http://download.fedoraproject.org/pub/epel/6/$basearch
mirrorlist=https://mirrors.fedoraproject.org/metalink?repo=epel-6&arch=$basearch
failovermethod=priority
enabled=1
gpgcheck=1
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL-6
[epel-debuginfo]
name=Extra Packages for Enterprise Linux 6 - $basearch - Debug
#baseurl=http://download.fedoraproject.org/pub/epel/6/$basearch/debug
mirrorlist=https://mirrors.fedoraproject.org/metalink?repo=epel-debug-6&arch=$basearch
failovermethod=priority
enabled=0
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL-6
gpgcheck=1
[epel-source]
name=Extra Packages for Enterprise Linux 6 - $basearch - Source
#baseurl=http://download.fedoraproject.org/pub/epel/6/SRPMS
mirrorlist=https://mirrors.fedoraproject.org/metalink?repo=epel-source-6&arch=$basearch
failovermethod=priority
enabled=0
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL-6
gpgcheck=1

   

12、编译安装


(可以登陆MySQL,输入命令"show variables like '%plugin%'"查看plugin位置, mysql_config的配置文件,以及插件库所在路径,编译之后会在此路径生成.so文件)


./configure --with-mysql=/usr/local/mysql/bin/mysql_config --libdir=/usr/local/mysql/lib/plugin/
make && make install

 

演示gearman-mysql-udf功能


mysql -uroot -p
CREATE FUNCTION gman_do_background RETURNS STRING SONAME "libgearman_mysql_udf.so";
CREATE FUNCTION gman_servers_set RETURNS STRING SONAME "libgearman_mysql_udf.so"; 
CREATE FUNCTION gman_do RETURNS STRING SONAME "libgearman_mysql_udf.so"; 
CREATE FUNCTION gman_do_high RETURNS STRING SONAME "libgearman_mysql_udf.so"; 
CREATE FUNCTION gman_do_low RETURNS STRING SONAME "libgearman_mysql_udf.so"; 
CREATE FUNCTION gman_do_high_background RETURNS STRING SONAME "libgearman_mysql_udf.so"; 
CREATE FUNCTION gman_do_low_background RETURNS STRING SONAME "libgearman_mysql_udf.so"; 
CREATE FUNCTION gman_sum RETURNS STRING SONAME "libgearman_mysql_udf.so"; 
//函数gman_do|gman_do_high|gman_do_low|gman_do_high_background|gman_do_low_background|gman_sum注册方式类似,请参考gearman-mysql-udf-0.6/README 
//指定gearman job server地址 
SELECT gman_servers_set('127.0.0.1:4730');

 

如果出现异常信息:


ERROR 1126 (HY000): Can't open shared library 'libgearman_mysql_udf.so' (errno: 11 libgearman.so.8: cannot open shared object file: No such file or directory)


表示系统找不到 libgearman.so 文件,一般so都在/usr/local/lib目录下,修改配置文件/etc/ld.so.conf,将/usr/local/lib目录加入进去即可:


$ cat /etc/ld.so.conf
include ld.so.conf.d/*.conf
/usr/local/lib
$ /sbin/ldconfig -v | grep gearman*

 

13、MySQL Trigger调用Gearman UDF实现同步


创建触发器
DELIMITER $$
CREATE TRIGGER test_data_to_redis AFTER UPDATE ON test FOR EACH ROW BEGIN
    SET@ret=gman_do_background('syncToRedis', json_object(NEW.id AS `id`, NEW.phone AS`phone`));
END$$;
DELIMITER $$
CREATE TRIGGER test_data_to_redis2 AFTER INSERT ON test
  FOR EACH ROW BEGIN
    SET @ret=gman_do_background('syncToRedis2', json_object(NEW.id AS `id`, NEW.phone AS`phone`)); 
  END$$
DELIMITER ;
DELIMITER $$
CREATE TRIGGER test_data_to_redis3 BEFORE DELETE ON test
  FOR EACH ROW BEGIN
    SET @ret=gman_do_background('syncToRedis3', json_object(OLD.id AS `id`, OLD.phone AS`phone`)); 
  END$$
DELIMITER ;

 

 说明以及问题:此类采用了gearman官网的java-gearman-service(地址:https://launchpad.net/gearman-java),目前release版本是0.6.6。java-gearman-servic.jar包中,即包括gearman server,还包括client和work客户端API。


 问题:config类为spring注入的配置文件类,在worker.addFunction中,如果通过config类的属性,并且属性是从配置文件来的就会有问题。不知道为啥,写死就是OK的。此类连接远程的gearman job server。

 

 jar包需要添加到本地jar仓库:


mvn install:install-file -Dfile=C:\software\java-gearman-service-0.6.6.jar -DgroupId=org.gearman.jgs -DartifactId=java-gearman-service -Dversion=0.6.6 -Dpackaging=jar
import java.util.concurrent.TimeUnit;
import org.gearman.Gearman;
import org.gearman.GearmanFunction;
import org.gearman.GearmanFunctionCallback;
import org.gearman.GearmanServer;
import org.gearman.GearmanWorker;
/**
 * *ECHO_HOST = "192.168.125.131"为安装了Gearman并开启geramand服务的主机地址
 *int ECHO_PORT = 4730默认端口为4730
 *
 * @author Administrator
 *
 */
public class EchoWorker implements GearmanFunction {
// function name
public static final String ECHO_FUNCTION_NAME = "syncToRedis";
// job server地址
public static final String ECHO_HOST = "192.168.1.245";
// job server监听的端口
public static final int ECHO_PORT = 4730;
public static void main(String[] args) {
// 创建一个Gearman实例
Gearman gearman = Gearman.createGearman();
/*
 * 创建一个jobserver
 * 
 * Parameter 1: job server的IP地址 Parameter 2: job server监听的端口
 * 
 * job server收到client的job,并将其分发给注册worker
 * 
 */
GearmanServer server = gearman.createGearmanServer(EchoWorker.ECHO_HOST, EchoWorker.ECHO_PORT);
// 创建一个Gearman的worker
GearmanWorker worker = gearman.createGearmanWorker(); // 正题来了,创建work节点。
worker.setReconnectPeriod(2, TimeUnit.SECONDS); // 设置超时重连时间
worker.setMaximumConcurrency(5); // 最大并发数
// 告诉工人如何执行工作(主要实现了GearmanFunction接口)
worker.addFunction(EchoWorker.ECHO_FUNCTION_NAME, new EchoWorker());
// worker连接服务器
worker.addServer(server);
}
@Override
public byte[] work(String function, byte[] data, GearmanFunctionCallback callback) throws Exception {
// work方法实现了GearmanFunction接口中的work方法,本实例中进行了字符串的反写
if (data != null) {
String str = new String(data);
System.out.println(str);
StringBuffer sb = new StringBuffer(str);
return sb.reverse().toString().getBytes();
} else {
return "未接收到data".getBytes();
}
}
}
import org.gearman.Gearman;  
import org.gearman.GearmanClient;  
import org.gearman.GearmanJobEvent;  
import org.gearman.GearmanJobReturn;  
import org.gearman.GearmanServer;  
public class EchoClient {  
    public static void main(String... args) throws InterruptedException {  
            //创建一个Gearman实例  
            Gearman gearman = Gearman.createGearman();  
            //创建一个Gearman client               
            GearmanClient client = gearman.createGearmanClient();  
            /*  
             * 创建一个jobserver  
             *   
             * Parameter 1: job server的IP地址  
             * Parameter 2: job server监听的端口  
             *   
             *job server收到client的job,并将其分发给注册worker  
             *  
             */  
            GearmanServer server = gearman.createGearmanServer(  
                            EchoWorker.ECHO_HOST, EchoWorker.ECHO_PORT);  
             // 告诉客户端,提交工作时它可以连接到该服务器  
            client.addServer(server);  
            /*  
             * 向job server提交工作  
             *   
             * Parameter 1: gearman function名字  
             * Parameter 2: 传送给job server和worker的数据  
             *   
             * GearmanJobReturn返回job发热结果  
             */  
            GearmanJobReturn jobReturn = client.submitJob(  
                            EchoWorker.ECHO_FUNCTION_NAME, ("Hello World!").getBytes());  
            //遍历作业事件,直到我们打到最后文件               
            while (!jobReturn.isEOF()) {  
                    //下一个作业事件  
                    GearmanJobEvent event = jobReturn.poll();  
                    switch (event.getEventType()) {  
                    case GEARMAN_JOB_SUCCESS:     //job执行成功  
                            System.out.println(new String(event.getData()));  
                            break;  
                    case GEARMAN_SUBMIT_FAIL:     //job提交失败  
                    case GEARMAN_JOB_FAIL:        //job执行失败  
                            System.err.println(event.getEventType() + ": "  
                                            + new String(event.getData()));  
                    default:  
                    }  
            }  
            //关闭  
            gearman.shutdown();  
    }  
}

 

http://gearman.org/download/


php方案:https://www.tuicool.com/articles/B7Jjaa

相关文章
|
6月前
|
负载均衡 算法 关系型数据库
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
本文聚焦 MySQL 集群架构中的负载均衡算法,阐述其重要性。详细介绍轮询、加权轮询、最少连接、加权最少连接、随机、源地址哈希等常用算法,分析各自优缺点及适用场景。并提供 Java 语言代码实现示例,助力直观理解。文章结构清晰,语言通俗易懂,对理解和应用负载均衡算法具有实用价值和参考价值。
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
|
2月前
|
存储 SQL NoSQL
Redis-常用语法以及java互联实践案例
本文详细介绍了Redis的数据结构、常用命令及其Java客户端的使用,涵盖String、Hash、List、Set、SortedSet等数据类型及操作,同时提供了Jedis和Spring Boot Data Redis的实战示例,帮助开发者快速掌握Redis在实际项目中的应用。
284 1
Redis-常用语法以及java互联实践案例
|
2月前
|
SQL Java 关系型数据库
Java连接MySQL数据库环境设置指南
请注意,在实际部署时应该避免将敏感信息(如用户名和密码)硬编码在源码文件里面;应该使用配置文件或者环境变量等更为安全可靠地方式管理这些信息。此外,在处理大量数据时考虑使用PreparedStatement而不是Statement可以提高性能并防止SQL注入攻击;同时也要注意正确处理异常情况,并且确保所有打开过得资源都被正确关闭释放掉以防止内存泄漏等问题发生。
123 13
|
5月前
|
缓存 监控 NoSQL
Redis 实操要点:Java 最新技术栈的实战解析
本文介绍了基于Spring Boot 3、Redis 7和Lettuce客户端的Redis高级应用实践。内容包括:1)现代Java项目集成Redis的配置方法;2)使用Redisson实现分布式可重入锁与公平锁;3)缓存模式解决方案,包括布隆过滤器防穿透和随机过期时间防雪崩;4)Redis数据结构的高级应用,如HyperLogLog统计UV和GeoHash处理地理位置。文章提供了详细的代码示例,涵盖Redis在分布式系统中的核心应用场景,特别适合需要处理高并发、分布式锁等问题的开发场景。
390 41
|
7月前
|
NoSQL Java API
在Java环境下如何进行Redis数据库的操作
总的来说,使用Jedis在Java环境下进行Redis数据库的操作,是一种简单而高效的方法。只需要几行代码,就可以实现复杂的数据操作。同时,Jedis的API设计得非常直观,即使是初学者,也可以快速上手。
341 94
|
5月前
|
缓存 NoSQL Java
Java Redis 面试题集锦 常见高频面试题目及解析
本文总结了Redis在Java中的核心面试题,包括数据类型操作、单线程高性能原理、键过期策略及分布式锁实现等关键内容。通过Jedis代码示例展示了String、List等数据类型的操作方法,讲解了惰性删除和定期删除相结合的过期策略,并提供了Spring Boot配置Redis过期时间的方案。文章还探讨了缓存穿透、雪崩等问题解决方案,以及基于Redis的分布式锁实现,帮助开发者全面掌握Redis在Java应用中的实践要点。
319 6
|
4月前
|
人工智能 Java 关系型数据库
Java的时间处理与Mysql的时间查询
本文总结了Java中时间与日历的常用操作,包括时间的转换、格式化、日期加减及比较,并介绍了MySQL中按天、周、月、季度和年进行时间范围查询的方法,适用于日常开发中的时间处理需求。
|
7月前
|
存储 NoSQL Redis
阿里面试:Redis 为啥那么快?怎么实现的100W并发?说出了6大架构,面试官跪地: 纯内存 + 尖端结构 + 无锁架构 + EDA架构 + 异步日志 + 集群架构
阿里面试:Redis 为啥那么快?怎么实现的100W并发?说出了6大架构,面试官跪地: 纯内存 + 尖端结构 + 无锁架构 + EDA架构 + 异步日志 + 集群架构
阿里面试:Redis 为啥那么快?怎么实现的100W并发?说出了6大架构,面试官跪地: 纯内存 + 尖端结构 +  无锁架构 +  EDA架构  + 异步日志 + 集群架构
|
9月前
|
Linux 网络安全 Docker
尼恩一键开发环境: vagrant+java+springcloud+redis+zookeeper镜像下载(&制作详解)
尼恩提供了一系列文章,旨在帮助开发者轻松搭建一键开发环境,涵盖Java分布式、高并发场景下的多种技术组件安装与配置。内容包括但不限于Windows和CentOS虚拟机的安装与排坑指南、MySQL、Kafka、Redis、Zookeeper等关键组件在Linux环境下的部署教程,并附带详细的视频指导。此外,还特别介绍了Vagrant这一虚拟环境部署工具,
尼恩一键开发环境: vagrant+java+springcloud+redis+zookeeper镜像下载(&制作详解)
|
SQL DataWorks 关系型数据库
DataWorks操作报错合集之如何处理数据同步时(mysql->hive)报:Render instance failed
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
275 0

推荐镜像

更多