采用OpenReplicator解析MySQL binlog

本文涉及的产品
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
RDS Agent(兼容Hermes Agent),2核4GB
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
简介: Open Replicator是一个用Java编写的MySQL binlog分析程序。Open Replicator 首先连接到MySQL(就像一个普通的MySQL Slave一样),然后接收和分析binlog,最终将分析得出的binlog events以回调的方式通知应用。

Open Replicator是一个用Java编写的MySQL binlog分析程序。Open Replicator 首先连接到MySQL(就像一个普通的MySQL Slave一样),然后接收和分析binlog,最终将分析得出的binlog events以回调的方式通知应用。Open Replicator可以被应用到MySQL数据变化的实时推送,多Master到单Slave的数据同步等多种应用场景。Open Replicator目前只支持MySQL5.0及以上版本。
Open Replicator项目地址:https://github.com/whitesock/open-replicator

binlog事件分析结构图

这里写图片描述

在阅读下面的内容时,首先需要对binlog有一定的了解,可以 参考《MySQL Binlog解析》。

这里通过open-replicator解析binlog日志事件(binlog-format = row)。binlog日志事件里存在两种操作:DDL和DML,当DDL时输出一条sql,当DML时输出相关行信息。可以参考下面:

DDL(CREATE, ALTER, DROP, TRUNCATE,主要用在定义或改变表的结构):

{
  "eventId": 1,
  "databaseName": "canal_test",
  "tableName": "`company`",
  "eventType": 2,
  "timestamp": 1477033198000,
  "timestampReceipt": 1477033248780,
  "binlogName": "mysql-bin.000006",
  "position": 353,
  "nextPostion": 468,
  "serverId": 2,
  "before": null,
  "after": null,
  "isDdl": true,
  "sql": "DROP TABLE `company` /* generated by server */"
}

DML(SELECT, UPDATE, INSERT, DELETE,对数据库里的数据进行操作):

{
  "eventId": 0,
  "databaseName": "canal_test",
  "tableName": "person",
  "eventType": 24,
  "timestamp": 1477030734000,
  "timestampReceipt": 1477032161988,
  "binlogName": "mysql-bin.000006",
  "position": 242,
  "nextPostion": 326,
  "serverId": 2,
  "before": {
    "id": "3",
    "sex": "f",
    "address": "shanghai",
    "age": "23",
    "name": "zzh3"
  },
  "after": {
    "id": "3",
    "sex": "m",
    "address": "shanghai",
    "age": "23",
    "name": "zzh3"
  },
  "isDdl": false,
  "sql": null
}

相关的类文件如下:
CDCEvent.java

package or;

import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import com.google.code.or.binlog.BinlogEventV4;
import com.google.code.or.binlog.BinlogEventV4Header;
import com.google.code.or.binlog.impl.event.AbstractBinlogEventV4;

public class CDCEvent {
    private long eventId = 0;//事件唯一标识
    private String databaseName = null;
    private String tableName = null;
    private int eventType = 0;//事件类型
    private long timestamp = 0;//事件发生的时间戳[MySQL服务器的时间]
    private long timestampReceipt = 0;//Open-replicator接收到的时间戳[CDC执行的时间戳]
    private String binlogName = null;// binlog file name
    private long position = 0;
    private long nextPostion = 0;
    private long serverId = 0;
    private Map<String,String> before = null;
    private Map<String,String> after = null;
    private Boolean isDdl= null;
    private String sql = null;

    private static AtomicLong uuid = new AtomicLong(0);
    public CDCEvent(){}

    public CDCEvent(final AbstractBinlogEventV4 are, String databaseName, String tableName){
        this.init(are);
        this.databaseName = databaseName;
        this.tableName = tableName;
    }

    private void init(final BinlogEventV4 be){
        this.eventId = uuid.getAndAdd(1);
        BinlogEventV4Header header = be.getHeader();

        this.timestamp = header.getTimestamp();
        this.eventType = header.getEventType();
        this.serverId = header.getServerId();
        this.timestampReceipt = header.getTimestampOfReceipt();
        this.position = header.getPosition();
        this.nextPostion = header.getNextPosition();
        this.binlogName = header.getBinlogFileName();
    }

    @Override
    public String toString(){
        StringBuilder builder = new StringBuilder();
        builder.append("{ eventId:").append(eventId);
        builder.append(",databaseName:").append(databaseName);
        builder.append(",tableName:").append(tableName);
        builder.append(",eventType:").append(eventType);
        builder.append(",timestamp:").append(timestamp);
        builder.append(",timestampReceipt:").append(timestampReceipt);
        builder.append(",binlogName:").append(binlogName);
        builder.append(",position:").append(position);
        builder.append(",nextPostion:").append(nextPostion);
        builder.append(",serverId:").append(serverId);
        builder.append(",isDdl:").append(isDdl);
        builder.append(",sql:").append(sql);
        builder.append(",before:").append(before);
        builder.append(",after:").append(after).append("}");

        return builder.toString();
    }
// 省略Getter和Setter方法    
}

open-replicator的解析主要是通过注册Listener的形式实现的,整个过程最重要的步骤在下面:
InstanceListener.java

package or;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import or.keeper.TableInfoKeeper;
import or.manager.CDCEventManager;
import or.model.ColumnInfo;
import or.model.TableInfo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.code.or.binlog.BinlogEventListener;
import com.google.code.or.binlog.BinlogEventV4;
import com.google.code.or.binlog.impl.event.DeleteRowsEvent;
import com.google.code.or.binlog.impl.event.FormatDescriptionEvent;
import com.google.code.or.binlog.impl.event.QueryEvent;
import com.google.code.or.binlog.impl.event.TableMapEvent;
import com.google.code.or.binlog.impl.event.UpdateRowsEvent;
import com.google.code.or.binlog.impl.event.WriteRowsEvent;
import com.google.code.or.binlog.impl.event.XidEvent;
import com.google.code.or.common.glossary.Column;
import com.google.code.or.common.glossary.Pair;
import com.google.code.or.common.glossary.Row;
import com.google.code.or.common.util.MySQLConstants;

public class InstanceListener implements BinlogEventListener{
    private static final Logger logger = LoggerFactory.getLogger(InstanceListener.class);

    @Override
    public void onEvents(BinlogEventV4 be) {
        if(be == null){
            logger.error("binlog event is null");
            return;
        }

        int eventType = be.getHeader().getEventType();
        switch(eventType){
            case MySQLConstants.FORMAT_DESCRIPTION_EVENT:
            {
                logger.trace("FORMAT_DESCRIPTION_EVENT");
                break;
            }
            case MySQLConstants.TABLE_MAP_EVENT://每次ROW_EVENT前都伴随一个TABLE_MAP_EVENT事件,保存一些表信息,如tableId, tableName, databaseName, 而ROW_EVENT只有tableId
            {
                TableMapEvent tme = (TableMapEvent)be;
                TableInfoKeeper.saveTableIdMap(tme);
                logger.trace("TABLE_MAP_EVENT:tableId:{}",tme.getTableId());
                break;
            }
            case MySQLConstants.DELETE_ROWS_EVENT:
            {
                DeleteRowsEvent dre = (DeleteRowsEvent) be;
                long tableId = dre.getTableId();
                logger.trace("DELETE_ROW_EVENT:tableId:{}",tableId);

                TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);
                String databaseName = tableInfo.getDatabaseName();
                String tableName = tableInfo.getTableName();

                List<Row> rows = dre.getRows();
                for(Row row:rows){
                    List<Column> before = row.getColumns();
                    Map<String,String> beforeMap = getMap(before,databaseName,tableName);
                    if(beforeMap !=null && beforeMap.size()>0){
                        CDCEvent cdcEvent = new CDCEvent(dre,databaseName,tableName);
                        cdcEvent.setIsDdl(false);
                        cdcEvent.setSql(null);
                        cdcEvent.setBefore(beforeMap);
                        CDCEventManager.queue.addLast(cdcEvent);
                        logger.info("cdcEvent:{}",cdcEvent);
                    }
                }
                break;
            }
            case MySQLConstants.UPDATE_ROWS_EVENT:
            {
                UpdateRowsEvent upe = (UpdateRowsEvent)be;
                long tableId = upe.getTableId();
                logger.info("UPDATE_ROWS_EVENT:tableId:{}",tableId);

                TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);
                String databaseName = tableInfo.getDatabaseName();
                String tableName = tableInfo.getTableName();

                List<Pair<Row>> rows = upe.getRows();
                for(Pair<Row> p:rows){
                    List<Column> colsBefore = p.getBefore().getColumns();
                    List<Column> colsAfter = p.getAfter().getColumns();

                    Map<String,String> beforeMap = getMap(colsBefore,databaseName,tableName);
                    Map<String,String> afterMap = getMap(colsAfter,databaseName,tableName);
                    if(beforeMap!=null && afterMap!=null && beforeMap.size()>0 && afterMap.size()>0){
                        CDCEvent cdcEvent = new CDCEvent(upe,databaseName,tableName);
                        cdcEvent.setIsDdl(false);
                        cdcEvent.setSql(null);
                        cdcEvent.setBefore(beforeMap);
                        cdcEvent.setAfter(afterMap);
                        CDCEventManager.queue.addLast(cdcEvent);
                        logger.info("cdcEvent:{}",cdcEvent);
                    }
                }
                break;
            }
            case MySQLConstants.WRITE_ROWS_EVENT:
            {
                WriteRowsEvent wre = (WriteRowsEvent)be;
                long tableId = wre.getTableId();
                logger.trace("WRITE_ROWS_EVENT:tableId:{}",tableId);

                TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);
                String databaseName = tableInfo.getDatabaseName();
                String tableName = tableInfo.getTableName();

                List<Row> rows = wre.getRows();
                for(Row row: rows){
                    List<Column> after = row.getColumns();
                    Map<String,String> afterMap = getMap(after,databaseName,tableName);
                    if(afterMap!=null && afterMap.size()>0){
                        CDCEvent cdcEvent = new CDCEvent(wre,databaseName,tableName);
                        cdcEvent.setIsDdl(false);
                        cdcEvent.setSql(null);
                        cdcEvent.setAfter(afterMap);
                        CDCEventManager.queue.addLast(cdcEvent);
                        logger.info("cdcEvent:{}",cdcEvent);
                    }
                }
                break;
            }
            case MySQLConstants.QUERY_EVENT:
            {
                QueryEvent qe = (QueryEvent)be;
                TableInfo tableInfo = createTableInfo(qe);
                if(tableInfo == null)
                    break;
                String databaseName = tableInfo.getDatabaseName();
                String tableName = tableInfo.getTableName();
                logger.trace("QUERY_EVENT:databaseName:{},tableName:{}",databaseName,tableName);

                CDCEvent cdcEvent = new CDCEvent(qe,databaseName,tableName);
                cdcEvent.setIsDdl(true);
                cdcEvent.setSql(qe.getSql().toString());

                CDCEventManager.queue.addLast(cdcEvent);
                logger.info("cdcEvent:{}",cdcEvent);

                break;
            }
            case MySQLConstants.XID_EVENT:{
                XidEvent xe = (XidEvent)be;
                logger.trace("XID_EVENT: xid:{}",xe.getXid());
                break;
            }
            default:
            {
                logger.trace("DEFAULT:{}",eventType);
                break;
            }
        }

    }

    /**
     * ROW_EVENT中是没有Column信息的,需要通过MysqlConnection(下面会讲到)的方式读取列名信息,
     * 然后跟取回的List<Column>进行映射。
     * 
     * @param cols
     * @param databaseName
     * @param tableName
     * @return
     */
    private Map<String,String> getMap(List<Column> cols, String databaseName, String tableName){
        Map<String,String> map = new HashMap<>();
        if(cols == null || cols.size()==0){
            return null;
        }

        String fullName = databaseName+"."+tableName;
        List<ColumnInfo> columnInfoList = TableInfoKeeper.getColumns(fullName);
        if(columnInfoList == null)
            return null;
        if(columnInfoList.size() != cols.size()){
            TableInfoKeeper.refreshColumnsMap();
            if(columnInfoList.size() != cols.size())
            {
                logger.warn("columnInfoList.size is not equal to cols.");
                return null;
            }
        }

        for(int i=0;i<columnInfoList.size(); i++){
            if(cols.get(i).getValue()==null)
                map.put(columnInfoList.get(i).getName(),"");
            else
                map.put(columnInfoList.get(i).getName(), cols.get(i).toString());
        }

        return map;
    }

    /**
     * 从sql中提取Table信息,因为QUERY_EVENT是对应DATABASE这一级别的,不像ROW_EVENT是对应TABLE这一级别的,
     * 所以需要通过从sql中提取TABLE信息,封装到TableInfo对象中
     * 
     * @param qe
     * @return
     */ 
    private TableInfo createTableInfo(QueryEvent qe){
        String sql = qe.getSql().toString().toLowerCase();

        TableInfo ti = new TableInfo();
        String databaseName = qe.getDatabaseName().toString();
        String tableName = null;
        if(checkFlag(sql,"table")){
            tableName = getTableName(sql,"table");
        } else if(checkFlag(sql,"truncate")){
            tableName = getTableName(sql,"truncate");
        } else{
            logger.warn("can not find table name from sql:{}",sql);
            return null;
        }
        ti.setDatabaseName(databaseName);
        ti.setTableName(tableName);
        ti.setFullName(databaseName+"."+tableName);

        return ti;
    }

    private boolean checkFlag(String sql, String flag){
        String[] ss = sql.split(" ");
        for(String s:ss){
            if(s.equals(flag)){
                return true;
            }
        }
        return false;
    }

    private String getTableName(String sql, String flag){
        String[] ss = sql.split("\\.");
        String tName = null;
        if (ss.length > 1) {
            String[] strs = ss[1].split(" ");
            tName = strs[0];
        } else {
            String[] strs = sql.split(" ");
            boolean start = false;
            for (String s : strs) {
                if (s.indexOf(flag) >= 0) {
                    start = true;
                    continue;
                }
                if (start && !s.isEmpty()) {
                    tName = s;
                    break;
                }
            }
        }
        tName.replaceAll("`", "").replaceAll(";", "");

        //del "("[create table person(....]
        int index = tName.indexOf('(');
        if(index>0){
            tName = tName.substring(0, index);
        }

        return tName;
    }
}

上面所涉及到的TableInfo .java如下:

package or.model;

public class TableInfo {

    private String databaseName;
    private String tableName;
    private String fullName;
    // 省略Getter和Setter

    @Override
    public boolean equals(Object o){
        if(this == o)
            return true;
        if(o == null || this.getClass()!=o.getClass())
            return false;
        TableInfo tableInfo = (TableInfo)o;
        if(!this.databaseName.equals(tableInfo.getDatabaseName()))
            return false;
        if(!this.tableName.equals(tableInfo.getTableName()))
            return false;
        if(!this.fullName.equals(tableInfo.getFullName()))
            return false;
        return true;
    }

    @Override
    public int hashCode(){
        int result = this.tableName.hashCode();
        result = 31*result+this.databaseName.hashCode();
        result = 31*result+this.fullName.hashCode();
        return result;
    }
}

接着需要有个地方保存从TABLE_MAP_EVENT中提取到的信息,TableInfoKeeper .java

package or.keeper;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import or.MysqlConnection;
import or.model.ColumnInfo;
import or.model.TableInfo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.code.or.binlog.impl.event.TableMapEvent;

public class TableInfoKeeper {

    private static final Logger logger = LoggerFactory.getLogger(TableInfoKeeper.class);

    private static Map<Long,TableInfo> tabledIdMap = new ConcurrentHashMap<>();
    private static Map<String,List<ColumnInfo>> columnsMap = new ConcurrentHashMap<>();

    static{
        columnsMap = MysqlConnection.getColumns();
    }

    public static void saveTableIdMap(TableMapEvent tme){
        long tableId = tme.getTableId();
        tabledIdMap.remove(tableId);

        TableInfo table = new TableInfo();
        table.setDatabaseName(tme.getDatabaseName().toString());
        table.setTableName(tme.getTableName().toString());
        table.setFullName(tme.getDatabaseName()+"."+tme.getTableName());

        tabledIdMap.put(tableId, table);
    }

    public static synchronized void refreshColumnsMap(){
        Map<String,List<ColumnInfo>> map = MysqlConnection.getColumns();
        if(map.size()>0){
//          logger.warn("refresh and clear cols.");
            columnsMap = map;
//          logger.warn("refresh and switch cols:{}",map);
        }
        else
        {
            logger.error("refresh columnsMap error.");
        }
    }

    public static TableInfo getTableInfo(long tableId){
        return tabledIdMap.get(tableId);
    }

    public static List<ColumnInfo> getColumns(String fullName){
        return columnsMap.get(fullName);
    }
}

正如上面InstanceListener中提到的,有些信息需要直接从MySQL中读取,比如数据库表的列信息,相关的类MysqlConnection如下:

package or;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import or.model.BinlogInfo;
import or.model.BinlogMasterStatus;
import or.model.ColumnInfo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MysqlConnection {
    private static final Logger logger = LoggerFactory.getLogger(MysqlConnection.class);

    private static Connection conn;

    private static String host;
    private static int port;
    private static String user;
    private static String password;

    public static void setConnection(String hostArg, int portArg, String userArg, String passwordArg){
        try {
            if(conn == null || conn.isClosed()){
                Class.forName("com.mysql.jdbc.Driver");

                host = hostArg;
                port = portArg;
                user = userArg;
                password = passwordArg;

                conn = DriverManager.getConnection("jdbc:mysql://"+host+":"+port+"/",user,password);
                logger.info("connected to mysql:{} : {}",user,password);
            }
        } catch (ClassNotFoundException e) {
            logger.error(e.getMessage(),e);
        } catch (SQLException e) {
            logger.error(e.getMessage(),e);
        }
    }

    public static Connection getConnection(){
        try {
            if(conn == null || conn.isClosed()){
                setConnection(host,port,user,password);
            }
        } catch (SQLException e) {
            logger.error(e.getMessage(),e);
        }
        return conn;
    }

    /**
     * 获取Column信息
     * 
     * @return
     */
    public static Map<String,List<ColumnInfo>> getColumns(){
        Map<String,List<ColumnInfo>> cols = new HashMap<>();
        Connection conn = getConnection();

        try {
            DatabaseMetaData metaData = conn.getMetaData();
            ResultSet r = metaData.getCatalogs();
            String tableType[] = {"TABLE"};
            while(r.next()){
                String databaseName = r.getString("TABLE_CAT");
                ResultSet result = metaData.getTables(databaseName, null, null, tableType);
                while(result.next()){
                    String tableName = result.getString("TABLE_NAME");
//                  System.out.println(result.getInt("TABLE_ID"));
                    String key = databaseName +"."+tableName;
                    ResultSet colSet = metaData.getColumns(databaseName, null, tableName, null);
                    cols.put(key, new ArrayList<ColumnInfo>());
                    while(colSet.next()){
                        ColumnInfo columnInfo = new ColumnInfo(colSet.getString("COLUMN_NAME"),colSet.getString("TYPE_NAME"));
                        cols.get(key).add(columnInfo);
                    }

                }
            }
        } catch (SQLException e) { 
            logger.error(e.getMessage(),e);
        }   
        return cols;
    }

    /**
     * 参考
     * mysql> show binary logs
     *  +------------------+-----------+
     *  | Log_name         | File_size |
     *  +------------------+-----------+
     *  | mysql-bin.000001 |       126 |
     *  | mysql-bin.000002 |       126 |
     *  | mysql-bin.000003 |      6819 |
     *  | mysql-bin.000004 |      1868 |
     *  +------------------+-----------+
     */
    public static List<BinlogInfo> getBinlogInfo(){
        List<BinlogInfo> binlogList = new ArrayList<>();

        Connection conn = null;
        Statement statement = null;
        ResultSet resultSet = null;

        try {
            conn = getConnection();
            statement = conn.createStatement();
            resultSet = statement.executeQuery("show binary logs");
            while(resultSet.next()){
                BinlogInfo binlogInfo = new BinlogInfo(resultSet.getString("Log_name"),resultSet.getLong("File_size"));
                binlogList.add(binlogInfo);
            }
        } catch (Exception e) {
            logger.error(e.getMessage(),e);
        } finally{
            try {
                if(resultSet != null)
                    resultSet.close();
                if(statement != null)
                    statement.close();
                if(conn != null)
                    conn.close();
            } catch (SQLException e) {
                logger.error(e.getMessage(),e);
            }
        }

        return binlogList;
    }

    /**
     * 参考:  
     * mysql> show master status;
     *  +------------------+----------+--------------+------------------+
     *  | File             | Position | Binlog_Do_DB | Binlog_Ignore_DB |
     *  +------------------+----------+--------------+------------------+
     *  | mysql-bin.000004 |     1868 |              |                  |
     *  +------------------+----------+--------------+------------------+
     * @return
     */
    public static BinlogMasterStatus getBinlogMasterStatus(){
        BinlogMasterStatus binlogMasterStatus = new BinlogMasterStatus();

        Connection conn = null;
        Statement statement = null;
        ResultSet resultSet = null;

        try {
            conn = getConnection();
            statement = conn.createStatement();
            resultSet = statement.executeQuery("show master status");
            while(resultSet.next()){
                binlogMasterStatus.setBinlogName(resultSet.getString("File"));
                binlogMasterStatus.setPosition(resultSet.getLong("Position"));
            }
        } catch (Exception e) {
            logger.error(e.getMessage(),e);
        } finally{
            try {
                if(resultSet != null)
                    resultSet.close();
                if(statement != null)
                    statement.close();
                if(conn != null)
                    conn.close();
            } catch (SQLException e) {
                logger.error(e.getMessage(),e);
            }
        }

        return binlogMasterStatus;
    }

    /**
     * 获取open-replicator所连接的mysql服务器的serverid信息
     * @return
     */
    public static int getServerId(){
        int serverId=6789;
        Connection conn = null;
        Statement statement = null;
        ResultSet resultSet = null;

        try {
            conn = getConnection();
            statement = conn.createStatement();
            resultSet = statement.executeQuery("show variables like 'server_id'");
            while(resultSet.next()){
                serverId = resultSet.getInt("Value");
            }
        } catch (Exception e) {
            logger.error(e.getMessage(),e);
        } finally{
            try {
                if(resultSet != null)
                    resultSet.close();
                if(statement != null)
                    statement.close();
                if(conn != null)
                    conn.close();
            } catch (SQLException e) {
                logger.error(e.getMessage(),e);
            }
        }

        return serverId;
    }
}

上面代码设计的附加类(BinlogInfo.java; BinlogMasterStatus.java; ColumnInfo.java)

package or.model;
public class BinlogInfo {
    private String binlogName;
    private Long fileSize;
    // 省略Getter和Setter
}

package or.model;
public class BinlogMasterStatus {
    private String binlogName;
    private long position;
 // 省略Getter和Setter
}

package or.model;
public class ColumnInfo {
    private String name;
    private String type;
 // 省略Getter和Setter
}

最后还要有个地方存储解析之后的事件信息,这里简要设计下,采用一个ConcurrentLinkedDeque好了(CDCEventManager.java)

package or.manager;
import java.util.concurrent.ConcurrentLinkedDeque;
import or.CDCEvent;
public class CDCEventManager {
    public static final ConcurrentLinkedDeque<CDCEvent> queue = new ConcurrentLinkedDeque<>();
}

所有的准备工作都完成了,下面可以解析binlog日志了:

package or.test;

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import or.CDCEvent;
import or.InstanceListener;
import or.MysqlConnection;
import or.OpenReplicatorPlus;
import or.manager.CDCEventManager;
import or.model.BinlogMasterStatus;

import com.google.code.or.OpenReplicator;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;

public class OpenReplicatorTest {
    private static final Logger logger = LoggerFactory.getLogger(OpenReplicatorTest.class);
    private static final String host = "10.198.197.60";
    private static final int port = 3306;
    private static final String user = "****";
    private static final String password = "****";

    public static void main(String[] args){
        OpenReplicator or = new OpenReplicator ();
        or.setUser(user);
        or.setPassword(password);
        or.setHost(host);
        or.setPort(port);
        MysqlConnection.setConnection(host, port, user, password);

//      or.setServerId(MysqlConnection.getServerId());
        //配置里的serverId是open-replicator(作为一个slave)的id,不是master的serverId

        BinlogMasterStatus bms = MysqlConnection.getBinlogMasterStatus();
        or.setBinlogFileName(bms.getBinlogName());
//      or.setBinlogFileName("mysql-bin.000004");
        or.setBinlogPosition(4);
        or.setBinlogEventListener(new InstanceListener());
        try {
            or.start();
        } catch (Exception e) {
            logger.error(e.getMessage(),e);
        }

        Thread thread = new Thread(new PrintCDCEvent());
        thread.start();
    }

    public static class PrintCDCEvent implements Runnable{
        @Override
        public void run() {
            while(true){
                if(CDCEventManager.queue.isEmpty() == false)
                {
                    CDCEvent ce = CDCEventManager.queue.pollFirst();
                    Gson gson = new GsonBuilder().setPrettyPrinting().serializeNulls().create();
                    String prettyStr1 = gson.toJson(ce);
                    System.out.println(prettyStr1); 
                }
                else{
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }       
    }
}

时间运行旧了会遇到这样一个问题:

16-10-21 10:41:49.365 ERROR[binlog-parser-1 AbstractBinlogParser.run:247] failed to parse binlog
java.io.EOFException: null
    at com.google.code.or.io.util.ActiveBufferedInputStream.read(ActiveBufferedInputStream.java:169) ~[open-replicator-1.0.7.jar:na]
    at com.google.code.or.io.impl.XInputStreamImpl.doFill(XInputStreamImpl.java:236) ~[open-replicator-1.0.7.jar:na]
    at com.google.code.or.io.impl.XInputStreamImpl.read(XInputStreamImpl.java:213) ~[open-replicator-1.0.7.jar:na]
    at com.google.code.or.io.impl.XInputStreamImpl.readInt(XInputStreamImpl.java:141) ~[open-replicator-1.0.7.jar:na]
    at com.google.code.or.io.impl.XInputStreamImpl.readInt(XInputStreamImpl.java:61) ~[open-replicator-1.0.7.jar:na]
    at com.google.code.or.binlog.impl.ReplicationBasedBinlogParser.doParse(ReplicationBasedBinlogParser.java:91) ~[open-replicator-1.0.7.jar:na]
    at com.google.code.or.binlog.impl.AbstractBinlogParser$Task.run(AbstractBinlogParser.java:244) ~[open-replicator-1.0.7.jar:na]
    at java.lang.Thread.run(Unknown Source) [na:1.7.0_80]
16-10-21 10:41:49.371 INFO [binlog-parser-1 TransportImpl.disconnect:121] disconnected from 10.198.197.60:3306

初步解决方案(extends OpenReplicator然后添加重试机制):

package or;

import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.code.or.OpenReplicator;

public class OpenReplicatorPlus extends OpenReplicator{
    private static final Logger logger = LoggerFactory.getLogger(OpenReplicatorPlus.class);
    private volatile boolean autoRestart = true;
    @Override
    public void stopQuietly(long timeout, TimeUnit unit){
        super.stopQuietly(timeout, unit);
        if(autoRestart){
            try {
                TimeUnit.SECONDS.sleep(10);
                logger.error("Restart OpenReplicator");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

最后只需将OpenReplicatorTest.java中的OpenReplicator or = new OpenReplicator ();改为OpenReplicator or = new OpenReplicatorPlus ();即可。

大功告成~~


参考资料
1. 谈谈对Canal(增量数据订阅与消费)的理解
2. MySQL主备复制原理、实现及异常处理
3. https://github.com/whitesock/open-replicator

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
目录
相关文章
|
9月前
|
SQL 运维 关系型数据库
深入探讨MySQL的二进制日志(binlog)选项
总结而言,对MySQL binlogs深度理解并妥善配置对数据库运维管理至关重要;它不仅关系到系统性能优化也是实现高可靠性架构设计必须考虑因素之一。通过精心规划与周密部署可以使得该机能充分发挥作用而避免潜在风险带来影响。
307 6
|
10月前
|
存储 SQL 关系型数据库
MySQL中binlog、redolog与undolog的不同之处解析
每个都扮演回答回溯与错误修正机构角色: BinLog像历史记载员详细记载每件大大小小事件; RedoLog则像紧急救援队伍遇见突發情況追踪最后活动轨迹尽力补救; UndoLog就类似时间机器可倒带历史让一切归位原始样貌同时兼具平行宇宙观察能让多人同时看见各自期望看见历程而互不干扰.
574 9
|
11月前
|
存储 SQL 关系型数据库
MySQL的Redo Log与Binlog机制对照分析
通过合理的配置和细致的管理,这两种日志机制相互配合,能够有效地提升MySQL数据库的可靠性和稳定性。
363 10
|
11月前
|
存储 SQL 关系型数据库
MySQL 核心知识与索引优化全解析
本文系统梳理了 MySQL 的核心知识与索引优化策略。在基础概念部分,阐述了 char 与 varchar 在存储方式和性能上的差异,以及事务的 ACID 特性、并发事务问题及对应的隔离级别(MySQL 默认 REPEATABLE READ)。 索引基础部分,详解了 InnoDB 默认的 B+tree 索引结构(多路平衡树、叶子节点存数据、双向链表支持区间查询),区分了聚簇索引(数据与索引共存,唯一)和二级索引(数据与索引分离,多个),解释了回表查询的概念及优化方法,并分析了 B+tree 作为索引结构的优势(树高低、效率稳、支持区间查询)。 索引优化部分,列出了索引创建的六大原则
306 2
|
11月前
|
存储 SQL 关系型数据库
MySQL 核心知识与性能优化全解析
我整理的这份内容涵盖了 MySQL 诸多核心知识。包括查询语句的书写与执行顺序,多表查询的连接方式及内、外连接的区别。还讲了 CHAR 和 VARCHAR 的差异,索引的类型、底层结构、聚簇与非聚簇之分,以及回表查询、覆盖索引、左前缀原则和索引失效情形,还有建索引的取舍。对比了 MyISAM 和 InnoDB 存储引擎的不同,提及性能优化的多方面方法,以及超大分页处理、慢查询定位与分析等,最后提到了锁和分库分表可参考相关资料。
236 0
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
559 2
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
922 140
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
算法 测试技术 C语言
深入理解HTTP/2:nghttp2库源码解析及客户端实现示例
通过解析nghttp2库的源码和实现一个简单的HTTP/2客户端示例,本文详细介绍了HTTP/2的关键特性和nghttp2的核心实现。了解这些内容可以帮助开发者更好地理解HTTP/2协议,提高Web应用的性能和用户体验。对于实际开发中的应用,可以根据需要进一步优化和扩展代码,以满足具体需求。
1484 29
|
前端开发 数据安全/隐私保护 CDN
二次元聚合短视频解析去水印系统源码
二次元聚合短视频解析去水印系统源码
582 4
|
JavaScript 算法 前端开发
JS数组操作方法全景图,全网最全构建完整知识网络!js数组操作方法全集(实现筛选转换、随机排序洗牌算法、复杂数据处理统计等情景详解,附大量源码和易错点解析)
这些方法提供了对数组的全面操作,包括搜索、遍历、转换和聚合等。通过分为原地操作方法、非原地操作方法和其他方法便于您理解和记忆,并熟悉他们各自的使用方法与使用范围。详细的案例与进阶使用,方便您理解数组操作的底层原理。链式调用的几个案例,让您玩转数组操作。 只有锻炼思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~

热门文章

最新文章

推荐镜像

更多