公众号merlinsea
业务场景分析:
通常我方系统会与第三方系统的数据打交道,但第三方的生产库并不允许我们直接操作。在企业里面,一般都是通过中间表进行同步,即第三方系统将生产数据放入一张与其生产环境隔离的另一个独立的库中的独立的表,再根据接口协议,增加相应的字段。而我方需要读取该中间表中的数据,并对数据进行同步操作。我们需要编写的就是生产者-消费者模型
模型抽取:
单生产者多消费者模型--->生产者负责批量读取记录---->消费者负责将生产 者读取的记录持久化到我方数据库。
数据库表的设计:
中间数据库student表设计:中间数据库的表结构除了包含第三方库的基本属性外,还添加了【status状态】信息。
当status=10I 表示这条记录是这条记录是初始化的记录,没有经过任何处理。
当status=10D 标识这条记录是处于处理中,即只被生产者读取了,但消费者没有完成这条记录的持久化。
当status=10F 表示这条记录已经处理完成,即已经被消费者持久化到我方数据库中。
当status=10E 表示这条记录在生产者或者消费者处理的过程中发生了异常,后续需要人工干预。
生产者业务逻辑QryBusiImpl,主要负责从中间数据库中批量读取数据
/** * 生产者业务逻辑 */ public class QryBusiImpl implements QryBusi { private static final Logger LOGGER = LoggerFactory.getLogger(QryBusiImpl.class); /** * 生产者每次都批量查询count条记录返回 * @param count 一次查询的数量 * @return */ @Override public List queryList(int count) { SqlSession middleSession = SqlSessionUtil.getSqlSession("middle"); List<Object> objects = null; try { objects = middleSession.selectList("com.xdclass.middle.mapper.StudentMapper.selectList", count); } catch (Exception e) { LOGGER.error("查询发生异常=====》", e); } finally { middleSession.close(); } return objects; } /** * 将生产者将查询到的记录的状态修改为处理中 * @param data 待修改数据 * @param status 要修改成的状态 * @return */ @Override public int modifyListStatus(List data, String status) { List<Student> students = data; students.forEach(student -> { student.setDataStatus(status); SqlSession middle = SqlSessionUtil.getSqlSession("middle"); try { middle.update("com.xdclass.middle.mapper.StudentMapper.updateByPrimaryKey", student); middle.commit(); } catch (Exception e) { LOGGER.error("修改状态失败=======》",e); } finally { middle.close(); } }); return 0; } }
生产者线程Producer
核心:
1、持有生产者业务逻辑的实例
2、将读取的数据交给消费者线程持久化
3、修改中间数据库的记录状态为处理中
/** * 生产者线程,持有QryBusiImpl业务逻辑类 */ public class Producer implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class); /** * 生产者业务逻辑 */ private QryBusi qryBusi; /** * 消费者任务队列 */ private LinkedBlockingQueue<Runnable> consumers; private ThreadPoolExecutor executor; public Producer(QryBusi qryBusi,LinkedBlockingQueue<Runnable> consumers,ThreadPoolExecutor executor) { this.qryBusi = qryBusi; this.consumers = consumers; this.executor = executor; } @Override public void run() { while (true) { //查询一批数据 List list = qryBusi.queryList(10); try { if (list != null && list.size() > 0) { //先将数据修改为处理中 qryBusi.modifyListStatus(list,DataStutusConst.DEALING); //获取消费者的一个任务来将读取的数据放入我方数据库 Consumer consumer = (Consumer) consumers.take(); consumer.setData(list); executor.execute(consumer); } else { try { Thread.sleep(5000L); } catch (InterruptedException e) { e.printStackTrace(); } } } catch (Exception e) { LOGGER.error("生产者发生异常=======》",e); //发生异常做好标记 qryBusi.modifyListStatus(list, DataStutusConst.ERROR); } } } }
消费者业务逻辑DealBusiImpl类
/** * 消费者业务逻辑 */ public class DealBusiImpl implements DealBusi { private static final Logger LOGGER = LoggerFactory.getLogger(DealBusiImpl.class); /** * 将生产者查询的数据列表写入我方数据库中 * @param data */ @Override public void deal(List data) { //將data转换成我方业务库的实体格式 List<com.xdclass.our.model.Student> students = adapter(data); //处理数据,并入库 students.forEach(student -> { student.setName(student.getName() + "test"); SqlSession ourSession = SqlSessionUtil.getSqlSession("our"); try { ourSession.insert("com.xdclass.our.mapper.StudentMapper.insertSelective", student); ourSession.commit(); //修改中间表的状态 modifyMiddle(student.getId(), DataStutusConst.FINISH); } catch (Exception e) { LOGGER.error("处理数据发生异常=======》", e); //发生异常,修改中间表状态为10E modifyMiddle(student.getId(), DataStutusConst.ERROR); } finally { ourSession.close(); } }); } /** * 修改中间表的数据记录标识为处理完成或者发生异常 * @param id * @param status */ private void modifyMiddle(int id, String status) { Student student = new Student(); student.setId(id); student.setDataStatus(status); student.setDealTime(new Date()); SqlSession middleSession = SqlSessionUtil.getSqlSession("middle"); try { middleSession.update("com.xdclass.middle.mapper.StudentMapper.updateStatusById", student); middleSession.commit(); } catch (Exception e) { LOGGER.error("修改中间表状态失败========>",e); } finally { middleSession.close(); } } /** * 数据适配,将生产者从中间表查询的记录增加响应的属性转为我方表的记录格式 * @param students * @return */ private List<com.xdclass.our.model.Student> adapter(List<Student> students) { List<com.xdclass.our.model.Student> result = new ArrayList<>(); students.forEach(stu -> { com.xdclass.our.model.Student student = new com.xdclass.our.model.Student(); student.setDepartment(stu.getDepartment()); student.setSex(stu.getSex()); student.setId(stu.getId()); student.setName(stu.getName()); student.setBirth(stu.getBirth()); student.setAddTime(new Date()); result.add(student); }); return result; } }
消费者线程
/** * 消费者线程,持有DealBusiImpl业务类 */ public class Consumer implements Runnable { /** * 生产者传入的原始读取批数据 */ private List data; /** * 消费者业务逻辑 */ private DealBusi dealBusi; /** * 消费者的任务队列 */ private LinkedBlockingQueue<Runnable> consumers; public Consumer(DealBusi dealBusi, LinkedBlockingQueue<Runnable> consumers) { this.dealBusi = dealBusi; this.consumers = consumers; } @Override public void run() { try { dealBusi.deal(data); } finally { try { //消费者线程入队 consumers.put(this); } catch (InterruptedException e) { e.printStackTrace(); } } } public void setData(List data) { this.data = data; } }
mian函数启动类
public class Main { public static void main(String[] args) { QryBusi qryBusi = new QryBusiImpl(); DealBusi dealBusi = new DealBusiImpl(); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(20)); LinkedBlockingQueue<Runnable> runnables = new LinkedBlockingQueue<>(10); for (int i = 0; i < 10; i++) { try { runnables.put(new Consumer(dealBusi,runnables)); } catch (InterruptedException e) { e.printStackTrace(); } } Producer producer = new Producer(qryBusi,runnables,threadPoolExecutor); new Thread(producer).start(); } }
项目结构:
总结:在平时开发中,首先按照面向对象编程的方法抽取出项目的模型,比如本项目中抽取出来生产者消费者模型。然后从模型中抽取出各个不同的组件类,比如本项目中的生产者和消费者。第三抽取出每一个组件类的核心业务逻辑,即service层的功能【这里才是最关键的部分】,比如本项目中的busi层。最后提供一个启动类作为整合项目模型中的组件和各自的核心业务逻辑,比如本例中的main类。
vip算法班双十一优惠价详情
vip算法班详情如下:
奔跑的小梁,公众号:梁霖编程工具库leetcode刷题直播教学,手把手带你刷题,双十一优惠价来啦~~