只对template.json里定义的字段进行监听
package com.imooc.ad.mysql; import com.alibaba.fastjson.JSON; import com.imooc.ad.mysql.constant.OpType; import com.imooc.ad.mysql.dto.ParseTemplate; import com.imooc.ad.mysql.dto.TableTemplate; import com.imooc.ad.mysql.dto.Template; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; import java.util.List; import java.util.Map; @Slf4j @Component public class TemplateHolder { private ParseTemplate template; private final JdbcTemplate jdbcTemplate; private String SQL_SCHEMA = "select table_schema, table_name, " + "column_name, ordinal_position from information_schema.columns " + "where table_schema = ? and table_name = ?"; @Autowired public TemplateHolder(JdbcTemplate jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; } @PostConstruct private void init() { loadJson("template.json"); } public TableTemplate getTable(String tableName) { return template.getTableTemplateMap().get(tableName); } private void loadJson(String path) { //如果你使用当前类.class.getClassLoader(),可能会导致和当前线程所运行的类加载器不一致 ClassLoader cl = Thread.currentThread().getContextClassLoader(); InputStream inStream = cl.getResourceAsStream(path); try { Template template = JSON.parseObject( inStream, Charset.defaultCharset(), Template.class ); this.template = ParseTemplate.parse(template); loadMeta(); } catch (IOException ex) { log.error(ex.getMessage()); throw new RuntimeException("fail to parse json file"); } } private void loadMeta() { for (Map.Entry<String, TableTemplate> entry : template.getTableTemplateMap().entrySet()) { TableTemplate table = entry.getValue(); List<String> updateFields = table.getOpTypeFieldSetMap().get( OpType.UPDATE ); List<String> insertFields = table.getOpTypeFieldSetMap().get( OpType.ADD ); List<String> deleteFields = table.getOpTypeFieldSetMap().get( OpType.DELETE ); jdbcTemplate.query(SQL_SCHEMA, new Object[]{ template.getDatabase(), table.getTableName() }, (rs, i) -> { int pos = rs.getInt("ORDINAL_POSITION"); String colName = rs.getString("COLUMN_NAME"); if ((null != updateFields && updateFields.contains(colName)) || (null != insertFields && insertFields.contains(colName)) || (null != deleteFields && deleteFields.contains(colName))) { table.getPosMap().put(pos - 1, colName); } return null; }); } } }
package com.imooc.ad.mysql.listener; import com.github.shyiko.mysql.binlog.event.EventType; import com.imooc.ad.mysql.constant.Constant; import com.imooc.ad.mysql.constant.OpType; import com.imooc.ad.mysql.dto.BinlogRowData; import com.imooc.ad.mysql.dto.MySqlRowData; import com.imooc.ad.mysql.dto.TableTemplate; import com.imooc.ad.sender.ISender; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.HashMap; import java.util.List; import java.util.Map; @Slf4j @Component public class IncrementListener implements Ilistener { @Resource(name = "kafkaSender") private ISender sender; private final AggregationListener aggregationListener; @Autowired public IncrementListener(AggregationListener aggregationListener) { this.aggregationListener = aggregationListener; } @Override @PostConstruct public void register() { log.info("IncrementListener register db and table info"); Constant.table2Db.forEach((k, v) -> aggregationListener.register(v, k, this)); } @Override public void onEvent(BinlogRowData eventData) { TableTemplate table = eventData.getTable(); EventType eventType = eventData.getEventType(); // 包装成最后需要投递的数据 MySqlRowData rowData = new MySqlRowData(); rowData.setTableName(table.getTableName()); rowData.setLevel(eventData.getTable().getLevel()); OpType opType = OpType.to(eventType); rowData.setOpType(opType); // 取出模板中该操作对应的字段列表 List<String> fieldList = table.getOpTypeFieldSetMap().get(opType); if (null == fieldList) { log.warn("{} not support for {}", opType, table.getTableName()); return; } for (Map<String, String> afterMap : eventData.getAfter()) { Map<String, String> _afterMap = new HashMap<>(); for (Map.Entry<String, String> entry : afterMap.entrySet()) { String colName = entry.getKey(); String colValue = entry.getValue(); _afterMap.put(colName, colValue); } rowData.getFieldValueMap().add(_afterMap); } //这里是下一章节的主逻辑的起点 sender.sender(rowData); } }
{ "database": "imooc_ad_data", "tableList": [ { "tableName": "ad_plan", "level": 2, "insert": [ {"column": "id"}, {"column": "user_id"}, {"column": "plan_status"}, {"column": "start_date"}, {"column": "end_date"} ], "update": [ {"column": "id"}, {"column": "user_id"}, {"column": "plan_status"}, {"column": "start_date"}, {"column": "end_date"} ], "delete": [ {"column": "id"} ] }, { "tableName": "ad_unit", "level": 3, "insert": [ {"column": "id"}, {"column": "unit_status"}, {"column": "position_type"}, {"column": "plan_id"} ], "update": [ {"column": "id"}, {"column": "unit_status"}, {"column": "position_type"}, {"column": "plan_id"} ], "delete": [ {"column": "id"} ] }, { "tableName": "ad_creative", "level": 2, "insert": [ {"column": "id"}, {"column": "type"}, {"column": "material_type"}, {"column": "height"}, {"column": "width"}, {"column": "audit_status"}, {"column": "url"} ], "update": [ {"column": "id"}, {"column": "type"}, {"column": "material_type"}, {"column": "height"}, {"column": "width"}, {"column": "audit_status"}, {"column": "url"} ], "delete": [ {"column": "id"} ] }, { "tableName": "creative_unit", "level": 3, "insert": [ {"column": "creative_id"}, {"column": "unit_id"} ], "update": [ ], "delete": [ {"column": "creative_id"}, {"column": "unit_id"} ] }, { "tableName": "ad_unit_district", "level": 4, "insert": [ {"column": "unit_id"}, {"column": "province"}, {"column": "city"} ], "update": [ ], "delete": [ {"column": "unit_id"}, {"column": "province"}, {"column": "city"} ] }, { "tableName": "ad_unit_it", "level": 4, "insert": [ {"column": "unit_id"}, {"column": "it_tag"} ], "update": [ ], "delete": [ {"column": "unit_id"}, {"column": "it_tag"} ] }, { "tableName": "ad_unit_keyword", "level": 4, "insert": [ {"column": "unit_id"}, {"column": "keyword"} ], "update": [ ], "delete": [ {"column": "unit_id"}, {"column": "keyword"} ] } ] }
把监听的数据分发出去并接收数据
package com.imooc.ad.sender.kafka; import com.alibaba.fastjson.JSON; import com.imooc.ad.mysql.dto.MySqlRowData; import com.imooc.ad.sender.ISender; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import java.util.Optional; @Component("kafkaSender") public class KafkaSender implements ISender { @Value("${adconf.kafka.topic}") private String topic; private final KafkaTemplate<String, String> kafkaTemplate; @Autowired public KafkaSender(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } @Override public void sender(MySqlRowData rowData) { kafkaTemplate.send( topic, JSON.toJSONString(rowData) ); } @KafkaListener(topics = {"ad-search-mysql-data"}, groupId = "ad-search") public void processMysqlRowData(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); MySqlRowData rowData = JSON.parseObject( message.toString(), MySqlRowData.class ); System.out.println("kafka processMysqlRowData: " + JSON.toJSONString(rowData)); } } }