import
java.net.InetSocketAddress;
import
java.util.List;
import
com.alibaba.fastjson.JSONObject;
import
com.alibaba.otter.canal.client.CanalConnector;
import
com.alibaba.otter.canal.common.utils.AddressUtils;
import
com.alibaba.otter.canal.protocol.Message;
import
com.alibaba.otter.canal.protocol.CanalEntry.Column;
import
com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import
com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import
com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import
com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import
com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import
com.alibaba.otter.canal.client.*;
public
class
CanalClient{
public
static
void
main(String args[]) {
CanalConnector connector = CanalConnectors.newSingleConnector(
new
InetSocketAddress(AddressUtils.getHostIp(),
11111
),
"example"
,
""
,
""
);
int
batchSize =
1000
;
try
{
connector.connect();
connector.subscribe(
".*\\..*"
);
connector.rollback();
while
(
true
) {
Message message = connector.getWithoutAck(batchSize);
long
batchId = message.getId();
int
size = message.getEntries().size();
if
(batchId == -
1
|| size ==
0
) {
try
{
Thread.sleep(
1000
);
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
else
{
printEntry(message.getEntries());
}
connector.ack(batchId);
}
}
finally
{
connector.disconnect();
}
}
private
static
void
printEntry( List<Entry> entrys) {
for
(Entry entry : entrys) {
if
(entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue
;
}
RowChange rowChage =
null
;
try
{
rowChage = RowChange.parseFrom(entry.getStoreValue());
}
catch
(Exception e) {
throw
new
RuntimeException(
"ERROR ## parser of eromanga-event has an error , data:"
+ entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format(
"================> binlog[%s:%s] , name[%s,%s] , eventType : %s"
,
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for
(RowData rowData : rowChage.getRowDatasList()) {
if
(eventType == EventType.DELETE) {
redisDelete(rowData.getBeforeColumnsList());
}
else
if
(eventType == EventType.INSERT) {
redisInsert(rowData.getAfterColumnsList());
}
else
{
System.out.println(
"-------> before"
);
printColumn(rowData.getBeforeColumnsList());
System.out.println(
"-------> after"
);
redisUpdate(rowData.getAfterColumnsList());
}
}
}
}
private
static
void
printColumn( List<Column> columns) {
for
(Column column : columns) {
System.out.println(column.getName() +
" : "
+ column.getValue() +
" update="
+ column.getUpdated());
}
}
private
static
void
redisInsert( List<Column> columns){
JSONObject json=
new
JSONObject();
for
(Column column : columns) {
json.put(column.getName(), column.getValue());
}
if
(columns.size()>
0
){
RedisUtil.stringSet(
"user:"
+ columns.get(
0
).getValue(),json.toJSONString());
}
}
private
static
void
redisUpdate( List<Column> columns){
JSONObject json=
new
JSONObject();
for
(Column column : columns) {
json.put(column.getName(), column.getValue());
}
if
(columns.size()>
0
){
RedisUtil.stringSet(
"user:"
+ columns.get(
0
).getValue(),json.toJSONString());
}
}
private
static
void
redisDelete( List<Column> columns){
JSONObject json=
new
JSONObject();
for
(Column column : columns) {
json.put(column.getName(), column.getValue());
}
if
(columns.size()>
0
){
RedisUtil.delKey(
"user:"
+ columns.get(
0
).getValue());
}
}
}
RedisUtil 工具类
import
redis.clients.jedis.Jedis;
import
redis.clients.jedis.JedisPool;
import
redis.clients.jedis.JedisPoolConfig;
public
class
RedisUtil {
private
static
String ADDR =
"0.0.0.0"
;
private
static
int
PORT =
6379
;
private
static
int
MAX_ACTIVE =
1024
;
private
static
int
MAX_IDLE =
200
;
private
static
int
MAX_WAIT =
10000
;
protected
static
int
expireTime =
60
*
60
*
24
;
protected
static
JedisPool pool;
/**
* 静态代码,只在初次调用一次
*/
static
{
JedisPoolConfig config =
new
JedisPoolConfig();
config.setMaxTotal(MAX_ACTIVE);
config.setMaxIdle(MAX_IDLE);
config.setMaxWaitMillis(MAX_WAIT);
config.setTestOnBorrow(
false
);
pool =
new
JedisPool(config, ADDR, PORT,
1000
);
}
/**
* 获取jedis实例
*/
protected
static
synchronized
Jedis getJedis() {
Jedis jedis =
null
;
try
{
jedis = pool.getResource();
}
catch
(Exception e) {
e.printStackTrace();
if
(jedis !=
null
) {
pool.returnBrokenResource(jedis);
}
}
return
jedis;
}
/**
* 释放jedis资源
* @param jedis
* @param isBroken
*/
protected
static
void
closeResource(Jedis jedis,
boolean
isBroken) {
try
{
if
(isBroken) {
pool.returnBrokenResource(jedis);
}
else
{
pool.returnResource(jedis);
}
}
catch
(Exception e) {
}
}
/**
* 是否存在key
* @param key
*/
public
static
boolean
existKey(String key) {
Jedis jedis =
null
;
boolean
isBroken =
false
;
try
{
jedis = getJedis();
jedis.select(
0
);
return
jedis.exists(key);
}
catch
(Exception e) {
isBroken =
true
;
}
finally
{
closeResource(jedis, isBroken);
}
return
false
;
}
/**
* 删除key
* @param key
*/
public
static
void
delKey(String key) {
Jedis jedis =
null
;
boolean
isBroken =
false
;
try
{
jedis = getJedis();
jedis.select(
0
);
jedis.del(key);
}
catch
(Exception e) {
isBroken =
true
;
}
finally
{
closeResource(jedis, isBroken);
}
}
/**
* 取得key的值
* @param key
*/
public
static
String stringGet(String key) {
Jedis jedis =
null
;
boolean
isBroken =
false
;
String lastVal =
null
;
try
{
jedis = getJedis();
jedis.select(
0
);
lastVal = jedis.get(key);
jedis.expire(key, expireTime);
}
catch
(Exception e) {
isBroken =
true
;
}
finally
{
closeResource(jedis, isBroken);
}
return
lastVal;
}
/**
* 添加string数据
* @param key
* @param value
*/
public
static
String stringSet(String key, String value) {
Jedis jedis =
null
;
boolean
isBroken =
false
;
String lastVal =
null
;
try
{
jedis = getJedis();
jedis.select(
0
);
lastVal = jedis.set(key, value);
jedis.expire(key, expireTime);
}
catch
(Exception e) {
e.printStackTrace();
isBroken =
true
;
}
finally
{
closeResource(jedis, isBroken);
}
return
lastVal;
}
/**
* 添加hash数据
* @param key
* @param field
* @param value
*/
public
static
void
hashSet(String key, String field, String value) {
boolean
isBroken =
false
;
Jedis jedis =
null
;
try
{
jedis = getJedis();
if
(jedis !=
null
) {
jedis.select(
0
);
jedis.hset(key, field, value);
jedis.expire(key, expireTime);
}
}
catch
(Exception e) {
isBroken =
true
;
}
finally
{
closeResource(jedis, isBroken);
}
}
}