基于SpringCloud+BinLog的广告系统设计总结(三)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 基于SpringCloud+BinLog的广告系统设计总结

只对template.json里定义的字段进行监听

package com.imooc.ad.mysql;
import com.alibaba.fastjson.JSON;
import com.imooc.ad.mysql.constant.OpType;
import com.imooc.ad.mysql.dto.ParseTemplate;
import com.imooc.ad.mysql.dto.TableTemplate;
import com.imooc.ad.mysql.dto.Template;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
@Slf4j
@Component
public class TemplateHolder {
    private ParseTemplate template;
    private final JdbcTemplate jdbcTemplate;
    private String SQL_SCHEMA = "select table_schema, table_name, " +
            "column_name, ordinal_position from information_schema.columns " +
            "where table_schema = ? and table_name = ?";
    @Autowired
    public TemplateHolder(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }
    @PostConstruct
    private void init() {
        loadJson("template.json");
    }
    public TableTemplate getTable(String tableName) {
        return template.getTableTemplateMap().get(tableName);
    }
    private void loadJson(String path) {
        //如果你使用当前类.class.getClassLoader(),可能会导致和当前线程所运行的类加载器不一致
        ClassLoader cl = Thread.currentThread().getContextClassLoader();
        InputStream inStream = cl.getResourceAsStream(path);
        try {
            Template template = JSON.parseObject(
                    inStream,
                    Charset.defaultCharset(),
                    Template.class
            );
            this.template = ParseTemplate.parse(template);
            loadMeta();
        } catch (IOException ex) {
            log.error(ex.getMessage());
            throw new RuntimeException("fail to parse json file");
        }
    }
    private void loadMeta() {
        for (Map.Entry<String, TableTemplate> entry :
                template.getTableTemplateMap().entrySet()) {
            TableTemplate table = entry.getValue();
            List<String> updateFields = table.getOpTypeFieldSetMap().get(
                    OpType.UPDATE
            );
            List<String> insertFields = table.getOpTypeFieldSetMap().get(
                    OpType.ADD
            );
            List<String> deleteFields = table.getOpTypeFieldSetMap().get(
                    OpType.DELETE
            );
            jdbcTemplate.query(SQL_SCHEMA, new Object[]{
                    template.getDatabase(), table.getTableName()
            }, (rs, i) -> {
                int pos = rs.getInt("ORDINAL_POSITION");
                String colName = rs.getString("COLUMN_NAME");
                if ((null != updateFields && updateFields.contains(colName))
                        || (null != insertFields && insertFields.contains(colName))
                        || (null != deleteFields && deleteFields.contains(colName))) {
                    table.getPosMap().put(pos - 1, colName);
                }
                return null;
            });
        }
    }
}
package com.imooc.ad.mysql.listener;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.imooc.ad.mysql.constant.Constant;
import com.imooc.ad.mysql.constant.OpType;
import com.imooc.ad.mysql.dto.BinlogRowData;
import com.imooc.ad.mysql.dto.MySqlRowData;
import com.imooc.ad.mysql.dto.TableTemplate;
import com.imooc.ad.sender.ISender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@Component
public class IncrementListener implements Ilistener {
    @Resource(name = "kafkaSender")
    private ISender sender;
    private final AggregationListener aggregationListener;
    @Autowired
    public IncrementListener(AggregationListener aggregationListener) {
        this.aggregationListener = aggregationListener;
    }
    @Override
    @PostConstruct
    public void register() {
        log.info("IncrementListener register db and table info");
        Constant.table2Db.forEach((k, v) ->
        aggregationListener.register(v, k, this));
    }
    @Override
    public void onEvent(BinlogRowData eventData) {
        TableTemplate table = eventData.getTable();
        EventType eventType = eventData.getEventType();
        // 包装成最后需要投递的数据
        MySqlRowData rowData = new MySqlRowData();
        rowData.setTableName(table.getTableName());
        rowData.setLevel(eventData.getTable().getLevel());
        OpType opType = OpType.to(eventType);
        rowData.setOpType(opType);
        // 取出模板中该操作对应的字段列表
        List<String> fieldList = table.getOpTypeFieldSetMap().get(opType);
        if (null == fieldList) {
            log.warn("{} not support for {}", opType, table.getTableName());
            return;
        }
        for (Map<String, String> afterMap : eventData.getAfter()) {
            Map<String, String> _afterMap = new HashMap<>();
            for (Map.Entry<String, String> entry : afterMap.entrySet()) {
                String colName = entry.getKey();
                String colValue = entry.getValue();
                _afterMap.put(colName, colValue);
            }
            rowData.getFieldValueMap().add(_afterMap);
        }
        //这里是下一章节的主逻辑的起点
        sender.sender(rowData);
    }
}
{
  "database": "imooc_ad_data",
  "tableList": [
    {
      "tableName": "ad_plan",
      "level": 2,
      "insert": [
        {"column": "id"},
        {"column": "user_id"},
        {"column": "plan_status"},
        {"column": "start_date"},
        {"column": "end_date"}
      ],
      "update": [
        {"column": "id"},
        {"column": "user_id"},
        {"column": "plan_status"},
        {"column": "start_date"},
        {"column": "end_date"}
      ],
      "delete": [
        {"column": "id"}
      ]
    },
    {
      "tableName": "ad_unit",
      "level": 3,
      "insert": [
        {"column": "id"},
        {"column": "unit_status"},
        {"column": "position_type"},
        {"column": "plan_id"}
      ],
      "update": [
        {"column": "id"},
        {"column": "unit_status"},
        {"column": "position_type"},
        {"column": "plan_id"}
      ],
      "delete": [
        {"column": "id"}
      ]
    },
    {
      "tableName": "ad_creative",
      "level": 2,
      "insert": [
        {"column": "id"},
        {"column": "type"},
        {"column": "material_type"},
        {"column": "height"},
        {"column": "width"},
        {"column": "audit_status"},
        {"column": "url"}
      ],
      "update": [
        {"column": "id"},
        {"column": "type"},
        {"column": "material_type"},
        {"column": "height"},
        {"column": "width"},
        {"column": "audit_status"},
        {"column": "url"}
      ],
      "delete": [
        {"column": "id"}
      ]
    },
    {
      "tableName": "creative_unit",
      "level": 3,
      "insert": [
        {"column": "creative_id"},
        {"column": "unit_id"}
      ],
      "update": [
      ],
      "delete": [
        {"column": "creative_id"},
        {"column": "unit_id"}
      ]
    },
    {
      "tableName": "ad_unit_district",
      "level": 4,
      "insert": [
        {"column": "unit_id"},
        {"column": "province"},
        {"column": "city"}
      ],
      "update": [
      ],
      "delete": [
        {"column": "unit_id"},
        {"column": "province"},
        {"column": "city"}
      ]
    },
    {
      "tableName": "ad_unit_it",
      "level": 4,
      "insert": [
        {"column": "unit_id"},
        {"column": "it_tag"}
      ],
      "update": [
      ],
      "delete": [
        {"column": "unit_id"},
        {"column": "it_tag"}
      ]
    },
    {
      "tableName": "ad_unit_keyword",
      "level": 4,
      "insert": [
        {"column": "unit_id"},
        {"column": "keyword"}
      ],
      "update": [
      ],
      "delete": [
        {"column": "unit_id"},
        {"column": "keyword"}
      ]
    }
  ]
}

把监听的数据分发出去并接收数据

package com.imooc.ad.sender.kafka;
import com.alibaba.fastjson.JSON;
import com.imooc.ad.mysql.dto.MySqlRowData;
import com.imooc.ad.sender.ISender;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.Optional;
@Component("kafkaSender")
public class KafkaSender implements ISender {
    @Value("${adconf.kafka.topic}")
    private String topic;
    private final KafkaTemplate<String, String> kafkaTemplate;
    @Autowired
    public KafkaSender(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    @Override
    public void sender(MySqlRowData rowData) {
        kafkaTemplate.send(
                topic, JSON.toJSONString(rowData)
        );
    }
    @KafkaListener(topics = {"ad-search-mysql-data"}, groupId = "ad-search")
    public void processMysqlRowData(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            MySqlRowData rowData = JSON.parseObject(
                    message.toString(),
                    MySqlRowData.class
            );
            System.out.println("kafka processMysqlRowData: " +
            JSON.toJSONString(rowData));
        }
    }
}

 

目录
相关文章
|
9月前
|
消息中间件 NoSQL Java
Spring Cloud项目实战Spring Cloud视频教程 含源码
Spring Cloud项目实战Spring Cloud视频教程 含源码
133 1
|
1月前
|
人工智能 安全 Java
AI 时代:从 Spring Cloud Alibaba 到 Spring AI Alibaba
本次分享由阿里云智能集团云原生微服务技术负责人李艳林主讲,主题为“AI时代:从Spring Cloud Alibaba到Spring AI Alibaba”。内容涵盖应用架构演进、AI agent框架发展趋势及Spring AI Alibaba的重磅发布。分享介绍了AI原生架构与传统架构的融合,强调了API优先、事件驱动和AI运维的重要性。同时,详细解析了Spring AI Alibaba的三层抽象设计,包括模型支持、工作流智能体编排及生产可用性构建能力,确保安全合规、高效部署与可观测性。最后,结合实际案例展示了如何利用私域数据优化AI应用,提升业务价值。
134 4
|
2月前
|
负载均衡 Java 开发者
深入探索Spring Cloud与Spring Boot:构建微服务架构的实践经验
深入探索Spring Cloud与Spring Boot:构建微服务架构的实践经验
207 5
|
4月前
|
负载均衡 Java API
【Spring Cloud生态】Spring Cloud Gateway基本配置
【Spring Cloud生态】Spring Cloud Gateway基本配置
80 0
|
6月前
|
Java Spring
【Azure Spring Cloud】Spring Cloud Azure 4.0 调用Key Vault遇见认证错误 AADSTS90002: Tenant not found.
【Azure Spring Cloud】Spring Cloud Azure 4.0 调用Key Vault遇见认证错误 AADSTS90002: Tenant not found.
|
6月前
|
Java Spring 容器
【Azure Spring Cloud】在Azure Spring Apps上看见 App Memory Usage 和 jvm.menory.use 的指标的疑问及OOM
【Azure Spring Cloud】在Azure Spring Apps上看见 App Memory Usage 和 jvm.menory.use 的指标的疑问及OOM
|
6月前
|
存储 Java Spring
【Azure Spring Cloud】Azure Spring Cloud服务,如何获取应用程序日志文件呢?
【Azure Spring Cloud】Azure Spring Cloud服务,如何获取应用程序日志文件呢?
|
6月前
|
SQL Java 数据库连接
【Azure Spring Cloud】Azure Spring Cloud connect to SQL using MSI
【Azure Spring Cloud】Azure Spring Cloud connect to SQL using MSI
|
6月前
|
Java 开发工具 Spring
【Azure Spring Cloud】使用azure-spring-boot-starter-storage来上传文件报错: java.net.UnknownHostException: xxxxxxxx.blob.core.windows.net: Name or service not known
【Azure Spring Cloud】使用azure-spring-boot-starter-storage来上传文件报错: java.net.UnknownHostException: xxxxxxxx.blob.core.windows.net: Name or service not known
|
6月前
|
NoSQL Java Redis
【Azure Spring Cloud】Java Spring Cloud 应用部署到Azure上后,发现大量的 java.lang.NullPointerException: null at io.lettuce.core.protocol.CommandHandler.writeSingleCommand(CommandHandler.java:426) at ... 异常
【Azure Spring Cloud】Java Spring Cloud 应用部署到Azure上后,发现大量的 java.lang.NullPointerException: null at io.lettuce.core.protocol.CommandHandler.writeSingleCommand(CommandHandler.java:426) at ... 异常