数据同步接口

简介: 数据同步接口

公众号merlinsea


业务场景分析:


   通常我方系统会与第三方系统的数据打交道,但第三方的生产库并不允许我们直接操作。在企业里面,一般都是通过中间表进行同步,即第三方系统将生产数据放入一张与其生产环境隔离的另一个独立的库中的独立的表,再根据接口协议,增加相应的字段。而我方需要读取该中间表中的数据,并对数据进行同步操作。我们需要编写的就是生产者-消费者模型


模型抽取:


   单生产者多消费者模型--->生产者负责批量读取记录---->消费者负责将生产   者读取的记录持久化到我方数据库。

640.jpg


数据库表的设计:


   中间数据库student表设计:中间数据库的表结构除了包含第三方库的基本属性外,还添加了【status状态】信息。

   当status=10I 表示这条记录是这条记录是初始化的记录,没有经过任何处理。

   当status=10D 标识这条记录是处于处理中,即只被生产者读取了,但消费者没有完成这条记录的持久化。

   当status=10F 表示这条记录已经处理完成,即已经被消费者持久化到我方数据库中。

   当status=10E 表示这条记录在生产者或者消费者处理的过程中发生了异常,后续需要人工干预。


生产者业务逻辑QryBusiImpl,主要负责从中间数据库中批量读取数据


/**
 * 生产者业务逻辑
 */
public class QryBusiImpl implements QryBusi {
    private static final Logger LOGGER = LoggerFactory.getLogger(QryBusiImpl.class);
    /**
     * 生产者每次都批量查询count条记录返回
     * @param count 一次查询的数量
     * @return
     */
    @Override
    public List queryList(int count) {
        SqlSession middleSession = SqlSessionUtil.getSqlSession("middle");
        List<Object> objects = null;
        try {
            objects = middleSession.selectList("com.xdclass.middle.mapper.StudentMapper.selectList", count);
        } catch (Exception e) {
            LOGGER.error("查询发生异常=====》", e);
        } finally {
            middleSession.close();
        }
        return objects;
    }
    /**
     * 将生产者将查询到的记录的状态修改为处理中
     * @param data 待修改数据
     * @param status 要修改成的状态
     * @return
     */
    @Override
    public int modifyListStatus(List data, String status) {
        List<Student> students = data;
        students.forEach(student -> {
            student.setDataStatus(status);
            SqlSession middle = SqlSessionUtil.getSqlSession("middle");
            try {
                middle.update("com.xdclass.middle.mapper.StudentMapper.updateByPrimaryKey", student);
                middle.commit();
            } catch (Exception e) {
                LOGGER.error("修改状态失败=======》",e);
            } finally {
                middle.close();
            }
        });
        return 0;
    }
}


生产者线程Producer


核心:

1、持有生产者业务逻辑的实例

2、将读取的数据交给消费者线程持久化

3、修改中间数据库的记录状态为处理中


/**
 * 生产者线程,持有QryBusiImpl业务逻辑类
 */
public class Producer implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
    /**
     * 生产者业务逻辑
     */
    private QryBusi qryBusi;
    /**
     * 消费者任务队列
     */
    private LinkedBlockingQueue<Runnable> consumers;
    private ThreadPoolExecutor executor;
    public Producer(QryBusi qryBusi,LinkedBlockingQueue<Runnable> consumers,ThreadPoolExecutor executor) {
        this.qryBusi = qryBusi;
        this.consumers = consumers;
        this.executor = executor;
    }
    @Override
    public void run() {
        while (true) {
            //查询一批数据
            List list = qryBusi.queryList(10);
            try {
                if (list != null && list.size() > 0) {
                    //先将数据修改为处理中
                    qryBusi.modifyListStatus(list,DataStutusConst.DEALING);
                    //获取消费者的一个任务来将读取的数据放入我方数据库
                    Consumer consumer = (Consumer) consumers.take();
                    consumer.setData(list);
                    executor.execute(consumer);
                } else {
                    try {
                        Thread.sleep(5000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            } catch (Exception e) {
                LOGGER.error("生产者发生异常=======》",e);
                //发生异常做好标记
                qryBusi.modifyListStatus(list, DataStutusConst.ERROR);
            }
        }
    }
}


消费者业务逻辑DealBusiImpl类

/**
 * 消费者业务逻辑
 */
public class DealBusiImpl implements DealBusi {
    private static final Logger LOGGER = LoggerFactory.getLogger(DealBusiImpl.class);
    /**
     * 将生产者查询的数据列表写入我方数据库中
     * @param data
     */
    @Override
    public void deal(List data) {
        //將data转换成我方业务库的实体格式
        List<com.xdclass.our.model.Student> students = adapter(data);
        //处理数据,并入库
        students.forEach(student -> {
            student.setName(student.getName() + "test");
            SqlSession ourSession = SqlSessionUtil.getSqlSession("our");
            try {
                ourSession.insert("com.xdclass.our.mapper.StudentMapper.insertSelective", student);
                ourSession.commit();
                //修改中间表的状态
                modifyMiddle(student.getId(), DataStutusConst.FINISH);
            } catch (Exception e) {
                LOGGER.error("处理数据发生异常=======》", e);
                //发生异常,修改中间表状态为10E
                modifyMiddle(student.getId(), DataStutusConst.ERROR);
            } finally {
                ourSession.close();
            }
        });
    }
    /**
     * 修改中间表的数据记录标识为处理完成或者发生异常
     * @param id
     * @param status
     */
    private void modifyMiddle(int id, String status) {
        Student student = new Student();
        student.setId(id);
        student.setDataStatus(status);
        student.setDealTime(new Date());
        SqlSession middleSession = SqlSessionUtil.getSqlSession("middle");
        try {
            middleSession.update("com.xdclass.middle.mapper.StudentMapper.updateStatusById", student);
            middleSession.commit();
        } catch (Exception e) {
            LOGGER.error("修改中间表状态失败========>",e);
        } finally {
            middleSession.close();
        }
    }
    /**
     * 数据适配,将生产者从中间表查询的记录增加响应的属性转为我方表的记录格式
     * @param students
     * @return
     */
    private List<com.xdclass.our.model.Student> adapter(List<Student> students) {
        List<com.xdclass.our.model.Student> result = new ArrayList<>();
        students.forEach(stu -> {
            com.xdclass.our.model.Student student = new com.xdclass.our.model.Student();
            student.setDepartment(stu.getDepartment());
            student.setSex(stu.getSex());
            student.setId(stu.getId());
            student.setName(stu.getName());
            student.setBirth(stu.getBirth());
            student.setAddTime(new Date());
            result.add(student);
        });
        return result;
    }
}


消费者线程

/**
 * 消费者线程,持有DealBusiImpl业务类
 */
public class Consumer implements Runnable {
    /**
     * 生产者传入的原始读取批数据
     */
    private List data;
    /**
     * 消费者业务逻辑
     */
    private DealBusi dealBusi;
    /**
     * 消费者的任务队列
     */
    private LinkedBlockingQueue<Runnable> consumers;
    public Consumer(DealBusi dealBusi, LinkedBlockingQueue<Runnable> consumers) {
        this.dealBusi = dealBusi;
        this.consumers = consumers;
    }
    @Override
    public void run() {
        try {
            dealBusi.deal(data);
        } finally {
            try {
                //消费者线程入队
                consumers.put(this);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public void setData(List data) {
        this.data = data;
    }
}


mian函数启动类

public class Main {
    public static void main(String[] args) {
        QryBusi qryBusi = new QryBusiImpl();
        DealBusi dealBusi = new DealBusiImpl();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(20));
        LinkedBlockingQueue<Runnable> runnables = new LinkedBlockingQueue<>(10);
        for (int i = 0; i < 10; i++) {
            try {
                runnables.put(new Consumer(dealBusi,runnables));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        Producer producer = new Producer(qryBusi,runnables,threadPoolExecutor);
        new Thread(producer).start();
    }
}


项目结构:

640.jpg


总结:在平时开发中,首先按照面向对象编程的方法抽取出项目的模型,比如本项目中抽取出来生产者消费者模型。然后从模型中抽取出各个不同的组件类,比如本项目中的生产者和消费者。第三抽取出每一个组件类的核心业务逻辑,即service层的功能【这里才是最关键的部分】,比如本项目中的busi层。最后提供一个启动类作为整合项目模型中的组件和各自的核心业务逻辑,比如本例中的main类。


vip算法班双十一优惠价详情


vip算法班详情如下:

奔跑的小梁,公众号:梁霖编程工具库leetcode刷题直播教学,手把手带你刷题,双十一优惠价来啦~~


相关文章
|
2月前
|
开发框架 .NET API
实现数据同步的webapi接口
【10月更文挑战第10天】本文介绍了一个在ASP.NET Core中使用C#实现数据同步的Web API接口示例。首先定义了用户数据模型和Entity Framework Core数据库上下文,然后创建了一个控制器来处理客户端的数据同步请求:通过清空现有数据并添加新数据实现同步。注意实际应用需考虑更多细节如错误处理等。
|
canal 监控 安全
谷粒学院——Day17【数据同步工具、SpringCloud【GateWay网关】、权限管理功能(接口)】
谷粒学院——Day17【数据同步工具、SpringCloud【GateWay网关】、权限管理功能(接口)】
316 0
谷粒学院——Day17【数据同步工具、SpringCloud【GateWay网关】、权限管理功能(接口)】
|
4月前
|
SQL DataWorks 关系型数据库
DataWorks操作报错合集之如何处理数据同步时(mysql->hive)报:Render instance failed
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
2月前
|
监控 关系型数据库 MySQL
深入了解MySQL主从复制:构建高效稳定的数据同步架构
深入了解MySQL主从复制:构建高效稳定的数据同步架构
138 1
|
3月前
|
canal 消息中间件 关系型数据库
Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
【9月更文挑战第1天】Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
810 4
|
4月前
|
关系型数据库 MySQL 数据库
【MySQL】手把手教你MySQL数据同步
【MySQL】手把手教你MySQL数据同步
|
2月前
|
消息中间件 NoSQL 关系型数据库
一文彻底搞定Redis与MySQL的数据同步
【10月更文挑战第21天】本文介绍了 Redis 与 MySQL 数据同步的原因及实现方式。同步的主要目的是为了优化性能和保持数据一致性。实现方式包括基于数据库触发器、应用层双写和使用消息队列。每种方式都有其优缺点,需根据具体场景选择合适的方法。此外,文章还强调了数据同步时需要注意的数据一致性、性能优化和异常处理等问题。
671 0
|
4月前
|
SQL 关系型数据库 MySQL
“震撼揭秘!Flink CDC如何轻松实现SQL Server到MySQL的实时数据同步?一招在手,数据无忧!”
【8月更文挑战第7天】随着大数据技术的发展,实时数据同步变得至关重要。Apache Flink作为高性能流处理框架,在实时数据处理领域扮演着核心角色。Flink CDC(Change Data Capture)组件的加入,使得数据同步更为高效。本文介绍如何使用Flink CDC实现从SQL Server到MySQL的实时数据同步,并提供示例代码。首先确保SQL Server启用了CDC功能,接着在Flink环境中引入相关连接器。通过定义源表与目标表,并执行简单的`INSERT INTO SELECT`语句,即可完成数据同步。
434 1
|
4月前
|
SQL canal 关系型数据库
(二十四)全解MySQL之主从篇:死磕主从复制中数据同步原理与优化
兜兜转转,经过《全解MySQL专栏》前面二十多篇的内容讲解后,基本对MySQL单机模式下的各方面进阶知识做了详细阐述,同时在前面的《分库分表概念篇》、《分库分表隐患篇》两章中也首次提到了数据库的一些高可用方案,但前两章大多属于方法论,并未涵盖真正的实操过程。接下来的内容,会以目前这章作为分割点,开启MySQL高可用方案的落地实践分享的新章程!
2117 1
|
4月前
|
canal 关系型数据库 MySQL
"揭秘阿里数据同步黑科技Canal:从原理到实战,手把手教你玩转MySQL数据秒级同步,让你的数据处理能力瞬间飙升,成为技术界的新晋网红!"
【8月更文挑战第18天】Canal是一款由阿里巴巴开源的高性能数据同步系统,它通过解析MySQL的增量日志(Binlog),提供低延迟、可靠的数据订阅和消费功能。Canal模拟MySQL Slave与Master间的交互协议来接收并解析Binary Log,支持数据的增量同步。配置简单直观,包括Server和Instance两层配置。在实战中,Canal可用于数据库镜像、实时备份等多种场景,通过集成Canal Client可实现数据的消费和处理,如更新缓存或写入消息队列。
866 0

热门文章

最新文章

下一篇
DataWorks