使用canal进行mysql数据同步到Redis

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介:

1. 可行方案

回归正题:我们的标题为《使用canal进行MySQL数据同步到Redis》,那就先来说说我们的目的:mysql数据同步到Redis,想达到读写分离,Redis只做缓存,MySQL做持久化。刚开始想这样干的时候就去网上收集资料,发现了N多做法:

  1. 先从Redis读取数据,如果没有查询到;便从mysql查询数据,将查询到的内容放到Redis中。对于写操作,先对mysql进行写,写成功对Redis进行写。当然这是一种相对直观而且简单的方法,但是看起来有许多操作需要我们自己去做。

  2. 使用mysql的udf去做,大体的思想是通过数据库中的Trigger调用自定义的函数库来触发对Redis的相应操作,比较麻烦的一点是:自定义的函数库需要我们基于mysql的API进行开发(C++),想想自己的Java程序要去调用这么一堆玩意,本人很不情愿。据了解,该方法也是阿里早起的解决方案,具体的步骤可参照:《【菜鸟玩Linux开发】通过MySQL自动同步刷新Redis》

  3. 通过Gearman去同步,但是通过了解发现,它一般使用在PHP的开发中。

    接下来的两种方案都属于对mysql中的binlog进行解析的方法了。

  4. 使用open-replicator解析binlog,https://github.com/whitesock/open-replicator.

  5. 使用canal进行同步,当然是能够解放双手的工具。

通过大量的资料收集和调查,我使用了canal进行了mysql数据同步到Redis。先简单谈谈canal:

canal主要是基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,核心基本就是模拟mysql中slave节点请求。具体的原理在这里不进行介绍,可以移步《阿里巴巴开源项目: canal 基于mysql数据库binlog的增量订阅&消费》 进行学习。

2. mysql的配置

  • 开启mysql的binlog模块

切换到mysql的安装路径,找到my.cnf(Linux)/my.ini (windows),加入如下内容:

1
2
3
[mysqld]log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id= 1  #配置mysql replaction需要定义,不能和canal的slaveId重复

配置完成后,需要重启数据库。当重启数据库遇到问题时,耐心解决,但需要警告的是,千万别动data文件夹下的文件。当然如果你觉得你比较有“资本”,同时遇到了“mysql 1067 无法启动”的错误,你可以试着备份一下data文件夹下的内容,删除logfile文件,重启数据库即可,但本人极不推荐这样进行操作。就是由于本人之前的无知,根据一个无良博客,误删了ibdata1文件,使得本人造成了很大的损失,mysql下的所有数据库瞬间毁灭。

  • 配置mysql数据库

创建canal用户,用来管理canal的访问权限。我们可以通过对canal用户访问权限的控制,进而控制canal能够获取的内容。

1
2
CREATE USER canal IDENTIFIED BY  'canal' ;  
   GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON 数据库名.表名 TO  'canal' @ '%' ;  -- GRANT ALL PRIVILEGES ON 数据库名.表名 TO  'canal' @ '%'  ;  FLUSH PRIVILEGES;

3. canal配置与部署

  • 下载部署包

下载,解压,我使用的是最新版本1.0.22

1
https: //github.com/alibaba/canal/releases/
  • 配置canal

主要配置的文件有两处,canal/conf/example/instance.properties 和 canal/conf/canal.properties . 而canal.properties 文件我们一般保持默认配置,所以我们仅对instance.properties 进行修改。如果需要对canal进行复杂的配置,可以参考《Canal AdminGuide》

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
## mysql serverId
canal.instance.mysql.slaveId =  1234
 
# position info
canal.instance.master.address = ***.***.***.***: 3306  #改成自己的数据库地址
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
 
#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 
 
# username/password
canal.instance.dbUsername = canal #改成自己的数据库信息 
canal.instance.dbPassword = canal #改成自己的数据库信息 
canal.instance.defaultDatabaseName =  #改成自己的数据库信息
canal.instance.connectionCharset = UTF- 8  #改成自己的数据库信息 
 
# table regex
canal.instance.filter.regex = .*\\..*
# table black regex
canal.instance.filter.black.regex =
  • 启动canal

./canal/startup.sh
  • 查看启动状态

我们可以通过查看logs/canal/canal.log 和logs/example/example.log日志来判断canal是否启动成功。

canal/logs/canal/canal.log

1
2
3
2016 - 12 - 29  14 : 03 : 00.956  [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2016 - 12 - 29  14 : 03 : 01.071  [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[ 192.168 . 1.99 : 11111 ]
2016 - 12 - 29  14 : 03 : 01.628  [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ....


canal/logs/example/example.log

1
2
3
4
2016 - 12 - 29  14 : 03 : 01.357  [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from  class  path resource [canal.properties]
2016 - 12 - 29  14 : 03 : 01.362  [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from  class  path resource [example/instance.properties]
2016 - 12 - 29  14 : 03 : 01.535  [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance  for  1 -example 
2016 - 12 - 29  14 : 03 : 01.555  [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....

4. Java连接canal执行同步操作

在maven项目中中加载canal和redis依赖包.

1
2
3
4
5
6
7
8
9
10
dependency>    
     <groupId>redis.clients</groupId>    
     <artifactId>jedis</artifactId>    
     <version> 2.4 . 2 </version>    
</dependency> 
<dependency>    
     <groupId>com.alibaba.otter</groupId>    
     <artifactId>canal.client</artifactId>    
     <version> 1.0 . 22 </version>    
</dependency>

建立canal客户端,从canal中获取数据,并将数据更新至Redis.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
import  java.net.InetSocketAddress;  
import  java.util.List;  
 
import  com.alibaba.fastjson.JSONObject;
import  com.alibaba.otter.canal.client.CanalConnector;  
import  com.alibaba.otter.canal.common.utils.AddressUtils;  
import  com.alibaba.otter.canal.protocol.Message;  
import  com.alibaba.otter.canal.protocol.CanalEntry.Column;  
import  com.alibaba.otter.canal.protocol.CanalEntry.Entry;  
import  com.alibaba.otter.canal.protocol.CanalEntry.EntryType;  
import  com.alibaba.otter.canal.protocol.CanalEntry.EventType;  
import  com.alibaba.otter.canal.protocol.CanalEntry.RowChange;  
import  com.alibaba.otter.canal.protocol.CanalEntry.RowData;  
import  com.alibaba.otter.canal.client.*;  
 
public  class  CanalClient{  
    public  static  void  main(String args[]) {  
        CanalConnector connector = CanalConnectors.newSingleConnector( new  InetSocketAddress(AddressUtils.getHostIp(),  
                11111 ),  "example" "" "" );  
        int  batchSize =  1000 ;  
        try  {  
            connector.connect();  
            connector.subscribe( ".*\\..*" );  
            connector.rollback();    
            while  ( true ) {  
                Message message = connector.getWithoutAck(batchSize);  // 获取指定数量的数据  
                long  batchId = message.getId();  
                int  size = message.getEntries().size();  
                if  (batchId == - 1  || size ==  0 ) {  
                    try  {  
                        Thread.sleep( 1000 );  
                    catch  (InterruptedException e) {  
                        e.printStackTrace();  
                    }  
                else  {  
                    printEntry(message.getEntries());  
                }  
                connector.ack(batchId);  // 提交确认  
                // connector.rollback(batchId); // 处理失败, 回滚数据  
            }  
        finally  {  
            connector.disconnect();  
        }  
    }  
 
    private  static  void  printEntry( List<Entry> entrys) {  
        for  (Entry entry : entrys) {  
            if  (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {  
                continue ;  
            }  
            RowChange rowChage =  null ;  
            try  {  
                rowChage = RowChange.parseFrom(entry.getStoreValue());  
            catch  (Exception e) {  
                throw  new  RuntimeException( "ERROR ## parser of eromanga-event has an error , data:"  + entry.toString(),  
                        e);  
            }  
            EventType eventType = rowChage.getEventType();  
            System.out.println(String.format( "================> binlog[%s:%s] , name[%s,%s] , eventType : %s" ,  
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),  
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),  
                    eventType));  
 
            for  (RowData rowData : rowChage.getRowDatasList()) {  
                if  (eventType == EventType.DELETE) {  
                    redisDelete(rowData.getBeforeColumnsList());  
                else  if  (eventType == EventType.INSERT) {  
                    redisInsert(rowData.getAfterColumnsList());  
                else  {  
                    System.out.println( "-------> before" );  
                    printColumn(rowData.getBeforeColumnsList());  
                    System.out.println( "-------> after" );  
                    redisUpdate(rowData.getAfterColumnsList());  
                }  
            }  
        }  
    }  
 
    private  static  void  printColumn( List<Column> columns) {  
        for  (Column column : columns) {  
            System.out.println(column.getName() +  " : "  + column.getValue() +  "    update="  + column.getUpdated());  
        }  
    }  
 
   private  static  void  redisInsert( List<Column> columns){
       JSONObject json= new  JSONObject();
       for  (Column column : columns) {  
           json.put(column.getName(), column.getValue());  
        }  
       if (columns.size()> 0 ){
           RedisUtil.stringSet( "user:" + columns.get( 0 ).getValue(),json.toJSONString());
       }
    }
 
   private  static   void  redisUpdate( List<Column> columns){
       JSONObject json= new  JSONObject();
       for  (Column column : columns) {  
           json.put(column.getName(), column.getValue());  
        }  
       if (columns.size()> 0 ){
           RedisUtil.stringSet( "user:" + columns.get( 0 ).getValue(),json.toJSONString());
       }
   }
 
    private  static   void  redisDelete( List<Column> columns){
        JSONObject json= new  JSONObject();
           for  (Column column : columns) {  
               json.put(column.getName(), column.getValue());  
            }  
           if (columns.size()> 0 ){
               RedisUtil.delKey( "user:" + columns.get( 0 ).getValue());
           }
    }   
}  
 
RedisUtil 工具类
 
import  redis.clients.jedis.Jedis;
import  redis.clients.jedis.JedisPool;
import  redis.clients.jedis.JedisPoolConfig;
 
public  class  RedisUtil {
 
     // Redis服务器IP
     private  static  String ADDR =  "0.0.0.0" ;
 
     // Redis的端口号
     private  static  int  PORT =  6379 ;
 
     // 访问密码
     //private static String AUTH = "admin";
 
     // 可用连接实例的最大数目,默认值为8;
     // 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
     private  static  int  MAX_ACTIVE =  1024 ;
 
     // 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。
     private  static  int  MAX_IDLE =  200 ;
 
     // 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException;
     private  static  int  MAX_WAIT =  10000 ;
 
     // 过期时间
     protected  static  int   expireTime =  60  60  * 24 ;
 
     // 连接池
     protected  static  JedisPool pool;
 
     /**
      * 静态代码,只在初次调用一次
      */
     static  {
         JedisPoolConfig config =  new  JedisPoolConfig();
         //最大连接数
         config.setMaxTotal(MAX_ACTIVE);
         //最多空闲实例
         config.setMaxIdle(MAX_IDLE);
         //超时时间
         config.setMaxWaitMillis(MAX_WAIT);
         //
         config.setTestOnBorrow( false );
         pool =  new  JedisPool(config, ADDR, PORT,  1000 );
     }
 
     /**
      * 获取jedis实例
      */
     protected  static  synchronized  Jedis getJedis() {
         Jedis jedis =  null ;
         try  {
             jedis = pool.getResource();
         catch  (Exception e) {
             e.printStackTrace();
             if  (jedis !=  null ) {
                 pool.returnBrokenResource(jedis);
             }
         }
         return  jedis;
     }
 
     /**
      * 释放jedis资源
      * @param jedis
      * @param isBroken
      */
     protected  static  void  closeResource(Jedis jedis,  boolean  isBroken) {
         try  {
             if  (isBroken) {
                 pool.returnBrokenResource(jedis);
             else  {
                 pool.returnResource(jedis);
             }
         catch  (Exception e) {
 
         }
     }
 
     /**
      * 是否存在key
      * @param key
      */
     public  static  boolean  existKey(String key) {
         Jedis jedis =  null ;
         boolean  isBroken =  false ;
         try  {
             jedis = getJedis();
             jedis.select( 0 );
             return  jedis.exists(key);
         catch  (Exception e) {
             isBroken =  true ;
         finally  {
             closeResource(jedis, isBroken);
         }
         return  false ;
     }
 
     /**
      * 删除key
      * @param key
      */
     public  static  void  delKey(String key) {
         Jedis jedis =  null ;
         boolean  isBroken =  false ;
         try  {
             jedis = getJedis();
             jedis.select( 0 );
             jedis.del(key);
         catch  (Exception e) {
             isBroken =  true ;
         finally  {
             closeResource(jedis, isBroken);
         }
     }
 
     /**
      * 取得key的值
      * @param key
      */
     public  static  String stringGet(String key) {
         Jedis jedis =  null ;
         boolean  isBroken =  false ;
         String lastVal =  null ;
         try  {
             jedis = getJedis();
             jedis.select( 0 );
             lastVal = jedis.get(key);
             jedis.expire(key, expireTime);
         catch  (Exception e) {
             isBroken =  true ;
         finally  {
             closeResource(jedis, isBroken);
         }
         return  lastVal;
     }
 
     /**
      * 添加string数据
      * @param key
      * @param value
      */
     public  static  String stringSet(String key, String value) {
         Jedis jedis =  null ;
         boolean  isBroken =  false ;
         String lastVal =  null ;
         try  {
             jedis = getJedis();
             jedis.select( 0 );
             lastVal = jedis.set(key, value);
             jedis.expire(key, expireTime);
         catch  (Exception e) {
             e.printStackTrace();
             isBroken =  true ;
         finally  {
             closeResource(jedis, isBroken);
         }
         return  lastVal;
     }
 
     /**
      *  添加hash数据
      * @param key
      * @param field
      * @param value
      */
     public  static  void  hashSet(String key, String field, String value) {
         boolean  isBroken =  false ;
         Jedis jedis =  null ;
         try  {
             jedis = getJedis();
             if  (jedis !=  null ) {
                 jedis.select( 0 );
                 jedis.hset(key, field, value);
                 jedis.expire(key, expireTime);
             }
         catch  (Exception e) {
             isBroken =  true ;
         finally  {
             closeResource(jedis, isBroken);
         }
     }
}

至此,我们利用canal进行了mysql数据同步到Redis的任务,可以根据不同的需求将代码进行修改置于需要的位置。

参考:

 z转载:http://blog.csdn.net/tb3039450/article/details/53928351













本文转自yunlielai51CTO博客,原文链接:http://blog.51cto.com/4925054/1910483,如需转载请自行联系原作者

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
14天前
|
NoSQL 关系型数据库 Redis
《docker高级篇(大厂进阶):1.Docker复杂安装详说》包括:安装mysql主从复制、安装redis集群
《docker高级篇(大厂进阶):1.Docker复杂安装详说》包括:安装mysql主从复制、安装redis集群
62 14
|
11天前
|
关系型数据库 MySQL 应用服务中间件
《docker基础篇:8.Docker常规安装简介》包括:docker常规安装总体步骤、安装tomcat、安装mysql、安装redis
《docker基础篇:8.Docker常规安装简介》包括:docker常规安装总体步骤、安装tomcat、安装mysql、安装redis
51 7
|
1月前
|
NoSQL Java 关系型数据库
Liunx部署java项目Tomcat、Redis、Mysql教程
本文详细介绍了如何在 Linux 服务器上安装和配置 Tomcat、MySQL 和 Redis,并部署 Java 项目。通过这些步骤,您可以搭建一个高效稳定的 Java 应用运行环境。希望本文能为您在实际操作中提供有价值的参考。
138 26
|
20天前
|
NoSQL 关系型数据库 MySQL
Linux安装jdk、mysql、redis
Linux安装jdk、mysql、redis
151 7
|
2月前
|
存储 NoSQL 关系型数据库
MySQL和Redis的区别
**MySQL和Redis的区别** MySQL和Redis都是流行的数据存储解决方案,但它们在设计、用途和特性上有显著区别。理解这些区别有助于选择合适的数据库来满足不同的应用需求。本文将详细介绍MySQL和Redis的区别,包括它们的架构、使用场景、性能和其他关键特性。 ### 一、基本概述 **MySQL**: MySQL是一个关系型数据库管理系统(RDBMS),使用结构化查询语言(SQL)进行数据管理。它支持事务、复杂查询和多种存储引擎,广泛应用于各种Web应用、企业系统和数据分析项目。 **Redis**: Redis是一个基于内存的键值数据库,通常被称为NoSQL数
161 4
|
21天前
|
存储 Oracle 关系型数据库
数据库传奇:MySQL创世之父的两千金My、Maria
《数据库传奇:MySQL创世之父的两千金My、Maria》介绍了MySQL的发展历程及其分支MariaDB。MySQL由Michael Widenius等人于1994年创建,现归Oracle所有,广泛应用于阿里巴巴、腾讯等企业。2009年,Widenius因担心Oracle收购影响MySQL的开源性,创建了MariaDB,提供额外功能和改进。维基百科、Google等已逐步替换为MariaDB,以确保更好的性能和社区支持。掌握MariaDB作为备用方案,对未来发展至关重要。
47 3
|
21天前
|
安全 关系型数据库 MySQL
MySQL崩溃保险箱:探秘Redo/Undo日志确保数据库安全无忧!
《MySQL崩溃保险箱:探秘Redo/Undo日志确保数据库安全无忧!》介绍了MySQL中的三种关键日志:二进制日志(Binary Log)、重做日志(Redo Log)和撤销日志(Undo Log)。这些日志确保了数据库的ACID特性,即原子性、一致性、隔离性和持久性。Redo Log记录数据页的物理修改,保证事务持久性;Undo Log记录事务的逆操作,支持回滚和多版本并发控制(MVCC)。文章还详细对比了InnoDB和MyISAM存储引擎在事务支持、锁定机制、并发性等方面的差异,强调了InnoDB在高并发和事务处理中的优势。通过这些机制,MySQL能够在事务执行、崩溃和恢复过程中保持
54 3
|
21天前
|
SQL 关系型数据库 MySQL
数据库灾难应对:MySQL误删除数据的救赎之道,技巧get起来!之binlog
《数据库灾难应对:MySQL误删除数据的救赎之道,技巧get起来!之binlog》介绍了如何利用MySQL的二进制日志(Binlog)恢复误删除的数据。主要内容包括: 1. **启用二进制日志**:在`my.cnf`中配置`log-bin`并重启MySQL服务。 2. **查看二进制日志文件**:使用`SHOW VARIABLES LIKE &#39;log_%&#39;;`和`SHOW MASTER STATUS;`命令获取当前日志文件及位置。 3. **创建数据备份**:确保在恢复前已有备份,以防意外。 4. **导出二进制日志为SQL语句**:使用`mysqlbinlog`
72 2
|
1月前
|
关系型数据库 MySQL 数据库
Python处理数据库:MySQL与SQLite详解 | python小知识
本文详细介绍了如何使用Python操作MySQL和SQLite数据库,包括安装必要的库、连接数据库、执行增删改查等基本操作,适合初学者快速上手。
227 15
|
28天前
|
SQL 关系型数据库 MySQL
数据库数据恢复—Mysql数据库表记录丢失的数据恢复方案
Mysql数据库故障: Mysql数据库表记录丢失。 Mysql数据库故障表现: 1、Mysql数据库表中无任何数据或只有部分数据。 2、客户端无法查询到完整的信息。