7.7 热点文章接口
ApHotArticleService
对ApHotArticle操作Service
接口位置:com.heima.migration.service.ApHotArticleService
public interface ApHotArticleService { List<ApHotArticles> selectList(ApHotArticles apHotArticlesQuery); void insert(ApHotArticles apHotArticles); /** * 热数据 Hbase 同步 * * @param apArticleId */ public void hotApArticleSync(Integer apArticleId); void deleteById(Integer id); /** * 查询过期的数据 * * @return */ public List<ApHotArticles> selectExpireMonth(); void deleteHotData(ApHotArticles apHotArticle); }
ApHotArticleServiceImpl
对ApHotArticle的相关操作
代码位置:com.heima.migration.service.impl.ApHotArticleServiceImpl
/** * 热点数据操作Service 类 */ @Service @Log4j2 public class ApHotArticleServiceImpl implements ApHotArticleService { @Autowired private ApHotArticlesMapper apHotArticlesMapper; @Autowired private MongoTemplate mongoTemplate; @Autowired private ArticleQuantityService articleQuantityService; @Autowired private HBaseStorageClient hBaseStorageClient; @Override public List<ApHotArticles> selectList(ApHotArticles apHotArticlesQuery) { return apHotArticlesMapper.selectList(apHotArticlesQuery); } /** * 根据ID删除 * * @param id */ @Override public void deleteById(Integer id) { log.info("删除热数据,apArticleId:{}", id); apHotArticlesMapper.deleteById(id); } /** * 查询一个月之前的数据 * * @return */ @Override public List<ApHotArticles> selectExpireMonth() { return apHotArticlesMapper.selectExpireMonth(); } /** * 删除过去的热数据 * * @param apHotArticle */ @Override public void deleteHotData(ApHotArticles apHotArticle) { deleteById(apHotArticle.getId()); String rowKey = DataConvertUtils.toString(apHotArticle.getId()); hBaseStorageClient.gethBaseClent().deleteRow(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, rowKey); MongoStorageEntity mongoStorageEntity = mongoTemplate.findById(rowKey, MongoStorageEntity.class); if (null != mongoStorageEntity) { mongoTemplate.remove(mongoStorageEntity); } } /** * 插入操作 * * @param apHotArticles */ @Override public void insert(ApHotArticles apHotArticles) { apHotArticlesMapper.insert(apHotArticles); } /** * 热点数据同步方法 * * @param apArticleId */ @Override public void hotApArticleSync(Integer apArticleId) { log.info("开始将热数据同步,apArticleId:{}", apArticleId); ArticleQuantity articleQuantity = getHotArticleQuantity(apArticleId); if (null != articleQuantity) { //热点数据同步到DB中 hotApArticleToDBSync(articleQuantity); //热点数据同步到MONGO hotApArticleMongoSync(articleQuantity); log.info("热数据同步完成,apArticleId:{}", apArticleId); } else { log.error("找不到对应的热数据,apArticleId:{}", apArticleId); } } /** * 获取热数据的ArticleQuantity 对象 * * @param apArticleId * @return */ private ArticleQuantity getHotArticleQuantity(Integer apArticleId) { Long id = Long.valueOf(apArticleId); ArticleQuantity articleQuantity = articleQuantityService.getArticleQuantityByArticleId(id); if (null == articleQuantity) { articleQuantity = articleQuantityService.getArticleQuantityByArticleIdForHbase(id); } return articleQuantity; } /** * 热数据 到数据库Mysql的同步 * * @param articleQuantity */ public void hotApArticleToDBSync(ArticleQuantity articleQuantity) { Integer apArticleId = articleQuantity.getApArticleId(); log.info("开始将热数据从Hbase同步到mysql,apArticleId:{}", apArticleId); if (null == apArticleId) { log.error("apArticleId不存在无法进行同步"); return; } ApHotArticles apHotArticlesQuery = new ApHotArticles() {{ setArticleId(apArticleId); }}; List<ApHotArticles> apHotArticlesList = apHotArticlesMapper.selectList(apHotArticlesQuery); if (null != apHotArticlesList && !apHotArticlesList.isEmpty()) { log.info("Mysql数据已同步过不需要再次同步,apArticleId:{}", apArticleId); } else { ApHotArticles apHotArticles = articleQuantity.getApHotArticles(); apHotArticlesMapper.insert(apHotArticles); } log.info("将热数据从Hbase同步到mysql完成,apArticleId:{}", apArticleId); } /** * 热数据向从Hbase到Mongodb同步 * * @param articleQuantity */ public void hotApArticleMongoSync(ArticleQuantity articleQuantity) { Integer apArticleId = articleQuantity.getApArticleId(); log.info("开始将热数据从Hbase同步到MongoDB,apArticleId:{}", apArticleId); if (null == apArticleId) { log.error("apArticleId不存在无法进行同步"); return; } String rowKeyId = DataConvertUtils.toString(apArticleId); MongoStorageEntity mongoStorageEntity = mongoTemplate.findById(rowKeyId, MongoStorageEntity.class); if (null != mongoStorageEntity) { log.info("MongoDB数据已同步过不需要再次同步,apArticleId:{}", apArticleId); } else { List<StorageData> storageDataList = articleQuantity.getStorageDataList(); if (null != storageDataList && !storageDataList.isEmpty()) { mongoStorageEntity = new MongoStorageEntity(); mongoStorageEntity.setDataList(storageDataList); mongoStorageEntity.setRowKey(rowKeyId); mongoTemplate.insert(mongoStorageEntity); } } log.info("将热数据从Hbase同步到MongoDB完成,apArticleId:{}", apArticleId); } }
8 定时同步数据
8.1 全量数据从mysql同步到HBase
@Component @DisallowConcurrentExecution @Log4j2 /** * 全量数据从mysql 同步到HBase */ public class MigrationDbToHBaseQuartz extends AbstractJob { @Autowired private ArticleQuantityService articleQuantityService; @Override public String[] triggerCron() { /** * 2019/8/9 10:15:00 * 2019/8/9 10:20:00 * 2019/8/9 10:25:00 * 2019/8/9 10:30:00 * 2019/8/9 10:35:00 */ return new String[]{"0 0/5 * * * ?"}; } @Override protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException { log.info("开始进行数据库到HBASE同步任务"); articleQuantityService.dbToHbase(); log.info("数据库到HBASE同步任务完成"); } }
8.2 定期删除过期的数据
/** * 定期删除过期的数据 */ @Component @Log4j2 public class MigrationDeleteHotDataQuartz extends AbstractJob { @Autowired private ApHotArticleService apHotArticleService; @Override public String[] triggerCron() { /** * 2019/8/9 22:30:00 * 2019/8/10 22:30:00 * 2019/8/11 22:30:00 * 2019/8/12 22:30:00 * 2019/8/13 22:30:00 */ return new String[]{"0 30 22 * * ?"}; } @Override protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException { long cutrrentTime = System.currentTimeMillis(); log.info("开始删除数据库过期数据"); deleteExpireHotData(); log.info("删除数据库过期数据结束,耗时:{}", System.currentTimeMillis() - cutrrentTime); } /** * 删除过期的热数据 */ public void deleteExpireHotData() { List<ApHotArticles> apHotArticlesList = apHotArticleService.selectExpireMonth(); if (null != apHotArticlesList && !apHotArticlesList.isEmpty()) { for (ApHotArticles apHotArticle : apHotArticlesList) { apHotArticleService.deleteHotData(apHotArticle); } } } }
9 消息接收同步数据
9.1 文章审核成功同步
9.1.1 消息发送
(1)消息名称定义及消息发送方法声明
maven_test.properties
kafka.topic.article-audit-success=kafka.topic.article.audit.success.sigle.test
kafka.properties
kafka.topic.article-audit-success=${kafka.topic.article-audit-success}
com.heima.common.kafka.KafkaTopicConfig新增属性
/** * 审核成功 */ String articleAuditSuccess;
com.heima.common.kafka.KafkaSender
/** * 发送审核成功消息 */ public void sendArticleAuditSuccessMessage(ArticleAuditSuccess message) { ArticleAuditSuccessMessage temp = new ArticleAuditSuccessMessage(); temp.setData(message); this.sendMesssage(kafkaTopicConfig.getArticleAuditSuccess(), UUID.randomUUID().toString(), temp); }
(2)修改自动审核代码,自媒体要修改
在审核成功后,发送消息
自媒体
//文章审核成功 ArticleAuditSuccess articleAuditSuccess = new ArticleAuditSuccess(); articleAuditSuccess.setArticleId(apArticle.getId()); articleAuditSuccess.setType(ArticleAuditSuccess.ArticleType.MEDIA); articleAuditSuccess.setChannelId(apArticle.getChannelId()); kafkaSender.sendArticleAuditSuccessMessage(articleAuditSuccess);
9.1.2消息接收
/** * 热点文章监听类 */ @Component @Log4j2 public class MigrationAuditSucessArticleListener implements KafkaListener<String, String> { /** * 通用转换mapper */ @Autowired ObjectMapper mapper; /** * kafka 主题 配置 */ @Autowired KafkaTopicConfig kafkaTopicConfig; @Autowired private ArticleQuantityService articleQuantityService; @Override public String topic() { return kafkaTopicConfig.getArticleAuditSuccess(); } /** * 监听消息 * * @param data * @param consumer */ @Override public void onMessage(ConsumerRecord<String, String> data, Consumer<?, ?> consumer) { log.info("kafka接收到审核通过消息:{}", data); String value = (String) data.value(); if (null != value) { ArticleAuditSuccessMessage message = null; try { message = mapper.readValue(value, ArticleAuditSuccessMessage.class); } catch (IOException e) { e.printStackTrace(); } ArticleAuditSuccess auto = message.getData(); if (null != auto) { //调用方法 将HBAESE中的热数据进行同步 Integer articleId = auto.getArticleId(); if (null != articleId) { articleQuantityService.dbToHbase(articleId); } } } } }
9.2 热点文章同步
创建监听类:com.heima.migration.kafka.listener.MigrationHotArticleListener
/** * 热点文章监听类 */ @Component @Log4j2 public class MigrationHotArticleListener implements KafkaListener<String, String> { /** * 通用转换mapper */ @Autowired ObjectMapper mapper; /** * kafka 主题 配置 */ @Autowired KafkaTopicConfig kafkaTopicConfig; /** * 热点文章service注入 */ @Autowired private ApHotArticleService apHotArticleService; @Override public String topic() { return kafkaTopicConfig.getHotArticle(); } /** * 监听消息 * * @param data * @param consumer */ @Override public void onMessage(ConsumerRecord<String, String> data, Consumer<?, ?> consumer) { log.info("kafka接收到热数据同步消息:{}", data); String value = (String) data.value(); if (null != value) { ApHotArticleMessage message = null; try { message = mapper.readValue(value, ApHotArticleMessage.class); } catch (IOException e) { e.printStackTrace(); } Integer articleId = message.getData().getArticleId(); if (null != articleId) { //调用方法 将HBAESE中的热数据进行同步 apHotArticleService.hotApArticleSync(articleId); } } } }