【大厂技术内幕】字节跳动原来是这么做数据迁移的!(下)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 【大厂技术内幕】字节跳动原来是这么做数据迁移的!

7.7 热点文章接口

ApHotArticleService

对ApHotArticle操作Service

接口位置:com.heima.migration.service.ApHotArticleService

public interface ApHotArticleService {
    List<ApHotArticles> selectList(ApHotArticles apHotArticlesQuery);
    void insert(ApHotArticles apHotArticles);
    /**
     * 热数据 Hbase 同步
     *
     * @param apArticleId
     */
    public void hotApArticleSync(Integer apArticleId);
    void deleteById(Integer id);
    /**
     * 查询过期的数据
     *
     * @return
     */
    public List<ApHotArticles> selectExpireMonth();
    void deleteHotData(ApHotArticles apHotArticle);
}

ApHotArticleServiceImpl

对ApHotArticle的相关操作

代码位置:com.heima.migration.service.impl.ApHotArticleServiceImpl

/**
 * 热点数据操作Service 类
 */
@Service
@Log4j2
public class ApHotArticleServiceImpl implements ApHotArticleService {
    @Autowired
    private ApHotArticlesMapper apHotArticlesMapper;
    @Autowired
    private MongoTemplate mongoTemplate;
    @Autowired
    private ArticleQuantityService articleQuantityService;
    @Autowired
    private HBaseStorageClient hBaseStorageClient;
    @Override
    public List<ApHotArticles> selectList(ApHotArticles apHotArticlesQuery) {
        return apHotArticlesMapper.selectList(apHotArticlesQuery);
    }
    /**
     * 根据ID删除
     *
     * @param id
     */
    @Override
    public void deleteById(Integer id) {
        log.info("删除热数据,apArticleId:{}", id);
        apHotArticlesMapper.deleteById(id);
    }
    /**
     * 查询一个月之前的数据
     *
     * @return
     */
    @Override
    public List<ApHotArticles> selectExpireMonth() {
        return apHotArticlesMapper.selectExpireMonth();
    }
    /**
     * 删除过去的热数据
     *
     * @param apHotArticle
     */
    @Override
    public void deleteHotData(ApHotArticles apHotArticle) {
        deleteById(apHotArticle.getId());
        String rowKey = DataConvertUtils.toString(apHotArticle.getId());
        hBaseStorageClient.gethBaseClent().deleteRow(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, rowKey);
        MongoStorageEntity mongoStorageEntity = mongoTemplate.findById(rowKey, MongoStorageEntity.class);
        if (null != mongoStorageEntity) {
            mongoTemplate.remove(mongoStorageEntity);
        }
    }
    /**
     * 插入操作
     *
     * @param apHotArticles
     */
    @Override
    public void insert(ApHotArticles apHotArticles) {
        apHotArticlesMapper.insert(apHotArticles);
    }
    /**
     * 热点数据同步方法
     *
     * @param apArticleId
     */
    @Override
    public void hotApArticleSync(Integer apArticleId) {
        log.info("开始将热数据同步,apArticleId:{}", apArticleId);
        ArticleQuantity articleQuantity = getHotArticleQuantity(apArticleId);
        if (null != articleQuantity) {
            //热点数据同步到DB中
            hotApArticleToDBSync(articleQuantity);
            //热点数据同步到MONGO
            hotApArticleMongoSync(articleQuantity);
            log.info("热数据同步完成,apArticleId:{}", apArticleId);
        } else {
            log.error("找不到对应的热数据,apArticleId:{}", apArticleId);
        }
    }
    /**
     * 获取热数据的ArticleQuantity 对象
     *
     * @param apArticleId
     * @return
     */
    private ArticleQuantity getHotArticleQuantity(Integer apArticleId) {
        Long id = Long.valueOf(apArticleId);
        ArticleQuantity articleQuantity = articleQuantityService.getArticleQuantityByArticleId(id);
        if (null == articleQuantity) {
            articleQuantity = articleQuantityService.getArticleQuantityByArticleIdForHbase(id);
        }
        return articleQuantity;
    }
    /**
     * 热数据 到数据库Mysql的同步
     *
     * @param articleQuantity
     */
    public void hotApArticleToDBSync(ArticleQuantity articleQuantity) {
        Integer apArticleId = articleQuantity.getApArticleId();
        log.info("开始将热数据从Hbase同步到mysql,apArticleId:{}", apArticleId);
        if (null == apArticleId) {
            log.error("apArticleId不存在无法进行同步");
            return;
        }
        ApHotArticles apHotArticlesQuery = new ApHotArticles() {{
            setArticleId(apArticleId);
        }};
        List<ApHotArticles> apHotArticlesList = apHotArticlesMapper.selectList(apHotArticlesQuery);
        if (null != apHotArticlesList && !apHotArticlesList.isEmpty()) {
            log.info("Mysql数据已同步过不需要再次同步,apArticleId:{}", apArticleId);
        } else {
            ApHotArticles apHotArticles = articleQuantity.getApHotArticles();
            apHotArticlesMapper.insert(apHotArticles);
        }
        log.info("将热数据从Hbase同步到mysql完成,apArticleId:{}", apArticleId);
    }
    /**
     * 热数据向从Hbase到Mongodb同步
     *
     * @param articleQuantity
     */
    public void hotApArticleMongoSync(ArticleQuantity articleQuantity) {
        Integer apArticleId = articleQuantity.getApArticleId();
        log.info("开始将热数据从Hbase同步到MongoDB,apArticleId:{}", apArticleId);
        if (null == apArticleId) {
            log.error("apArticleId不存在无法进行同步");
            return;
        }
        String rowKeyId = DataConvertUtils.toString(apArticleId);
        MongoStorageEntity mongoStorageEntity = mongoTemplate.findById(rowKeyId, MongoStorageEntity.class);
        if (null != mongoStorageEntity) {
            log.info("MongoDB数据已同步过不需要再次同步,apArticleId:{}", apArticleId);
        } else {
            List<StorageData> storageDataList = articleQuantity.getStorageDataList();
            if (null != storageDataList && !storageDataList.isEmpty()) {
                mongoStorageEntity = new MongoStorageEntity();
                mongoStorageEntity.setDataList(storageDataList);
                mongoStorageEntity.setRowKey(rowKeyId);
                mongoTemplate.insert(mongoStorageEntity);
            }
        }
        log.info("将热数据从Hbase同步到MongoDB完成,apArticleId:{}", apArticleId);
    }
}

8 定时同步数据

8.1 全量数据从mysql同步到HBase

@Component
@DisallowConcurrentExecution
@Log4j2
/**
 * 全量数据从mysql 同步到HBase
 */
public class MigrationDbToHBaseQuartz extends AbstractJob {
    @Autowired
    private ArticleQuantityService articleQuantityService;
    @Override
    public String[] triggerCron() {
        /**
         * 2019/8/9 10:15:00
         * 2019/8/9 10:20:00
         * 2019/8/9 10:25:00
         * 2019/8/9 10:30:00
         * 2019/8/9 10:35:00
         */
        return new String[]{"0 0/5 * * * ?"};
    }
    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        log.info("开始进行数据库到HBASE同步任务");
        articleQuantityService.dbToHbase();
        log.info("数据库到HBASE同步任务完成");
    }
}

8.2 定期删除过期的数据

/**
 * 定期删除过期的数据
 */
@Component
@Log4j2
public class MigrationDeleteHotDataQuartz extends AbstractJob {
    @Autowired
    private ApHotArticleService apHotArticleService;
    @Override
    public String[] triggerCron() {
        /**
         * 2019/8/9 22:30:00
         * 2019/8/10 22:30:00
         * 2019/8/11 22:30:00
         * 2019/8/12 22:30:00
         * 2019/8/13 22:30:00
         */
        return new String[]{"0 30 22 * * ?"};
    }
    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        long cutrrentTime = System.currentTimeMillis();
        log.info("开始删除数据库过期数据");
        deleteExpireHotData();
        log.info("删除数据库过期数据结束,耗时:{}", System.currentTimeMillis() - cutrrentTime);
    }
    /**
     * 删除过期的热数据
     */
    public void deleteExpireHotData() {
        List<ApHotArticles> apHotArticlesList = apHotArticleService.selectExpireMonth();
        if (null != apHotArticlesList && !apHotArticlesList.isEmpty()) {
            for (ApHotArticles apHotArticle : apHotArticlesList) {
                apHotArticleService.deleteHotData(apHotArticle);
            }
        }
    }
}

9 消息接收同步数据

9.1 文章审核成功同步

9.1.1 消息发送

(1)消息名称定义及消息发送方法声明

maven_test.properties

kafka.topic.article-audit-success=kafka.topic.article.audit.success.sigle.test

kafka.properties

kafka.topic.article-audit-success=${kafka.topic.article-audit-success}

com.heima.common.kafka.KafkaTopicConfig新增属性

/**
     * 审核成功
     */
String articleAuditSuccess;

com.heima.common.kafka.KafkaSender

/**
     * 发送审核成功消息
     */
public void sendArticleAuditSuccessMessage(ArticleAuditSuccess message) {
    ArticleAuditSuccessMessage temp = new ArticleAuditSuccessMessage();
    temp.setData(message);
    this.sendMesssage(kafkaTopicConfig.getArticleAuditSuccess(), UUID.randomUUID().toString(), temp);
}

(2)修改自动审核代码,自媒体要修改

在审核成功后,发送消息

自媒体

//文章审核成功
ArticleAuditSuccess articleAuditSuccess = new ArticleAuditSuccess();
articleAuditSuccess.setArticleId(apArticle.getId());
articleAuditSuccess.setType(ArticleAuditSuccess.ArticleType.MEDIA);
articleAuditSuccess.setChannelId(apArticle.getChannelId());
kafkaSender.sendArticleAuditSuccessMessage(articleAuditSuccess);

9.1.2消息接收

/**
 * 热点文章监听类
 */
@Component
@Log4j2
public class MigrationAuditSucessArticleListener implements KafkaListener<String, String> {
    /**
     * 通用转换mapper
     */
    @Autowired
    ObjectMapper mapper;
    /**
     * kafka 主题 配置
     */
    @Autowired
    KafkaTopicConfig kafkaTopicConfig;
    @Autowired
    private ArticleQuantityService articleQuantityService;
    @Override
    public String topic() {
        return kafkaTopicConfig.getArticleAuditSuccess();
    }
    /**
     * 监听消息
     *
     * @param data
     * @param consumer
     */
    @Override
    public void onMessage(ConsumerRecord<String, String> data, Consumer<?, ?> consumer) {
        log.info("kafka接收到审核通过消息:{}", data);
        String value = (String) data.value();
        if (null != value) {
            ArticleAuditSuccessMessage message = null;
            try {
                message = mapper.readValue(value, ArticleAuditSuccessMessage.class);
            } catch (IOException e) {
                e.printStackTrace();
            }
            ArticleAuditSuccess auto = message.getData();
            if (null != auto) {
                //调用方法 将HBAESE中的热数据进行同步
                Integer articleId = auto.getArticleId();
                if (null != articleId) {
                    articleQuantityService.dbToHbase(articleId);
                }
            }
        }
    }
}

9.2 热点文章同步

创建监听类:com.heima.migration.kafka.listener.MigrationHotArticleListener

/**
 * 热点文章监听类
 */
@Component
@Log4j2
public class MigrationHotArticleListener implements KafkaListener<String, String> {
    /**
     * 通用转换mapper
     */
    @Autowired
    ObjectMapper mapper;
    /**
     * kafka 主题 配置
     */
    @Autowired
    KafkaTopicConfig kafkaTopicConfig;
    /**
     * 热点文章service注入
     */
    @Autowired
    private ApHotArticleService apHotArticleService;
    @Override
    public String topic() {
        return kafkaTopicConfig.getHotArticle();
    }
    /**
     * 监听消息
     *
     * @param data
     * @param consumer
     */
    @Override
    public void onMessage(ConsumerRecord<String, String> data, Consumer<?, ?> consumer) {
        log.info("kafka接收到热数据同步消息:{}", data);
        String value = (String) data.value();
        if (null != value) {
            ApHotArticleMessage message = null;
            try {
                message = mapper.readValue(value, ApHotArticleMessage.class);
            } catch (IOException e) {
                e.printStackTrace();
            }
            Integer articleId = message.getData().getArticleId();
            if (null != articleId) {
                //调用方法 将HBAESE中的热数据进行同步
                apHotArticleService.hotApArticleSync(articleId);
            }
        }
    }
}
目录
相关文章
|
2月前
|
存储 运维 OLAP
【Meetup回顾 第1期】竟是这样的国产数据库,YashanDB技术内幕曝光
YashanDB是一款基于统一内核,支持单机/主备、共享集群、分布式等多种部署方式,覆盖OLTP/HTAP/OLAP交易和分析混合负载场景的新型数据库系统;YashanDB同时提供开发平台、运维平台和迁移平台3大工具平台以满足数据全生命周期管理。
49 2
【Meetup回顾 第1期】竟是这样的国产数据库,YashanDB技术内幕曝光
|
7月前
|
canal 关系型数据库 MySQL
"揭秘阿里数据同步黑科技Canal:从原理到实战,手把手教你玩转MySQL数据秒级同步,让你的数据处理能力瞬间飙升,成为技术界的新晋网红!"
【8月更文挑战第18天】Canal是一款由阿里巴巴开源的高性能数据同步系统,它通过解析MySQL的增量日志(Binlog),提供低延迟、可靠的数据订阅和消费功能。Canal模拟MySQL Slave与Master间的交互协议来接收并解析Binary Log,支持数据的增量同步。配置简单直观,包括Server和Instance两层配置。在实战中,Canal可用于数据库镜像、实时备份等多种场景,通过集成Canal Client可实现数据的消费和处理,如更新缓存或写入消息队列。
1121 0
深入浅出阿里数据同步神器:Canal原理+配置+实战全网最全解析!
canal 翻译为管道,主要用途是基于 MySQL 数据库的增量日志 Binlog 解析,提供增量数据订阅和消费。 早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
|
缓存 监控 供应链
36【学习心得】学习心得-数据同步
【学习心得】学习心得-数据同步
36【学习心得】学习心得-数据同步
|
存储 NoSQL 关系型数据库
【大厂技术内幕】字节跳动原来是这么做数据迁移的!(上)
【大厂技术内幕】字节跳动原来是这么做数据迁移的!
425 0
【大厂技术内幕】字节跳动原来是这么做数据迁移的!(上)
|
存储 分布式数据库 Hbase
【大厂技术内幕】字节跳动原来是这么做数据迁移的!(中)
【大厂技术内幕】字节跳动原来是这么做数据迁移的!
186 0
【大厂技术内幕】字节跳动原来是这么做数据迁移的!(中)
|
存储 机器学习/深度学习 cobar
|
SQL NoSQL 架构师
【重磅】直播预告:宽表数据库领域国际技术大牛开讲
本次非常有幸邀请到两位宽表数据库领域的国际技术大牛,给中文Cassandra社区做技术直播讲座: 1、Jonathan Ellis(Apache Cassandra开源项目最重要的技术元老、开山鼻祖之一,曾以一己之力为Cassandra开源项目贡献了近50%的代码,曾任Apache Cassandra项目主席) 2、邓为:DataStax领航架构师。 直播时间:4月27日16:00-17:30。搜索钉钉群名称『Cassandra+Spark社区大群』提前入群。 文末附开发者福利。
【重磅】直播预告:宽表数据库领域国际技术大牛开讲
|
Cloud Native 数据库 C++
Meetup 报名 | 从数据库到架构,OceanBase CTO 杨传辉邀你聊透分布式
6 月 19 日(本周六),北京 Meetup 开始报名啦!从数据库到架构,我们来聊透分布式
Meetup 报名 | 从数据库到架构,OceanBase CTO 杨传辉邀你聊透分布式
|
NoSQL
【直播预告】饿了么高级架构师陈东明:MongoDB是如何逐步提高可靠性的
讲述MongoDB架构,以及该架构下引发MongoDB的多种种丢数据的异常、脏读异常、陈旧读异常,MongoDB是如何致力于引入新版本的复制协议逐步消除了所有这些异常,最终让MongoDB达到一个high level的一致性和可靠性,成为一个可信任的数据库。
11358 0