数据同步接口

简介: 数据同步接口

公众号merlinsea


业务场景分析:


   通常我方系统会与第三方系统的数据打交道,但第三方的生产库并不允许我们直接操作。在企业里面,一般都是通过中间表进行同步,即第三方系统将生产数据放入一张与其生产环境隔离的另一个独立的库中的独立的表,再根据接口协议,增加相应的字段。而我方需要读取该中间表中的数据,并对数据进行同步操作。我们需要编写的就是生产者-消费者模型


模型抽取:


   单生产者多消费者模型--->生产者负责批量读取记录---->消费者负责将生产   者读取的记录持久化到我方数据库。

640.jpg


数据库表的设计:


   中间数据库student表设计:中间数据库的表结构除了包含第三方库的基本属性外,还添加了【status状态】信息。

   当status=10I 表示这条记录是这条记录是初始化的记录,没有经过任何处理。

   当status=10D 标识这条记录是处于处理中,即只被生产者读取了,但消费者没有完成这条记录的持久化。

   当status=10F 表示这条记录已经处理完成,即已经被消费者持久化到我方数据库中。

   当status=10E 表示这条记录在生产者或者消费者处理的过程中发生了异常,后续需要人工干预。


生产者业务逻辑QryBusiImpl,主要负责从中间数据库中批量读取数据


/**
 * 生产者业务逻辑
 */
public class QryBusiImpl implements QryBusi {
    private static final Logger LOGGER = LoggerFactory.getLogger(QryBusiImpl.class);
    /**
     * 生产者每次都批量查询count条记录返回
     * @param count 一次查询的数量
     * @return
     */
    @Override
    public List queryList(int count) {
        SqlSession middleSession = SqlSessionUtil.getSqlSession("middle");
        List<Object> objects = null;
        try {
            objects = middleSession.selectList("com.xdclass.middle.mapper.StudentMapper.selectList", count);
        } catch (Exception e) {
            LOGGER.error("查询发生异常=====》", e);
        } finally {
            middleSession.close();
        }
        return objects;
    }
    /**
     * 将生产者将查询到的记录的状态修改为处理中
     * @param data 待修改数据
     * @param status 要修改成的状态
     * @return
     */
    @Override
    public int modifyListStatus(List data, String status) {
        List<Student> students = data;
        students.forEach(student -> {
            student.setDataStatus(status);
            SqlSession middle = SqlSessionUtil.getSqlSession("middle");
            try {
                middle.update("com.xdclass.middle.mapper.StudentMapper.updateByPrimaryKey", student);
                middle.commit();
            } catch (Exception e) {
                LOGGER.error("修改状态失败=======》",e);
            } finally {
                middle.close();
            }
        });
        return 0;
    }
}


生产者线程Producer


核心:

1、持有生产者业务逻辑的实例

2、将读取的数据交给消费者线程持久化

3、修改中间数据库的记录状态为处理中


/**
 * 生产者线程,持有QryBusiImpl业务逻辑类
 */
public class Producer implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
    /**
     * 生产者业务逻辑
     */
    private QryBusi qryBusi;
    /**
     * 消费者任务队列
     */
    private LinkedBlockingQueue<Runnable> consumers;
    private ThreadPoolExecutor executor;
    public Producer(QryBusi qryBusi,LinkedBlockingQueue<Runnable> consumers,ThreadPoolExecutor executor) {
        this.qryBusi = qryBusi;
        this.consumers = consumers;
        this.executor = executor;
    }
    @Override
    public void run() {
        while (true) {
            //查询一批数据
            List list = qryBusi.queryList(10);
            try {
                if (list != null && list.size() > 0) {
                    //先将数据修改为处理中
                    qryBusi.modifyListStatus(list,DataStutusConst.DEALING);
                    //获取消费者的一个任务来将读取的数据放入我方数据库
                    Consumer consumer = (Consumer) consumers.take();
                    consumer.setData(list);
                    executor.execute(consumer);
                } else {
                    try {
                        Thread.sleep(5000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            } catch (Exception e) {
                LOGGER.error("生产者发生异常=======》",e);
                //发生异常做好标记
                qryBusi.modifyListStatus(list, DataStutusConst.ERROR);
            }
        }
    }
}


消费者业务逻辑DealBusiImpl类

/**
 * 消费者业务逻辑
 */
public class DealBusiImpl implements DealBusi {
    private static final Logger LOGGER = LoggerFactory.getLogger(DealBusiImpl.class);
    /**
     * 将生产者查询的数据列表写入我方数据库中
     * @param data
     */
    @Override
    public void deal(List data) {
        //將data转换成我方业务库的实体格式
        List<com.xdclass.our.model.Student> students = adapter(data);
        //处理数据,并入库
        students.forEach(student -> {
            student.setName(student.getName() + "test");
            SqlSession ourSession = SqlSessionUtil.getSqlSession("our");
            try {
                ourSession.insert("com.xdclass.our.mapper.StudentMapper.insertSelective", student);
                ourSession.commit();
                //修改中间表的状态
                modifyMiddle(student.getId(), DataStutusConst.FINISH);
            } catch (Exception e) {
                LOGGER.error("处理数据发生异常=======》", e);
                //发生异常,修改中间表状态为10E
                modifyMiddle(student.getId(), DataStutusConst.ERROR);
            } finally {
                ourSession.close();
            }
        });
    }
    /**
     * 修改中间表的数据记录标识为处理完成或者发生异常
     * @param id
     * @param status
     */
    private void modifyMiddle(int id, String status) {
        Student student = new Student();
        student.setId(id);
        student.setDataStatus(status);
        student.setDealTime(new Date());
        SqlSession middleSession = SqlSessionUtil.getSqlSession("middle");
        try {
            middleSession.update("com.xdclass.middle.mapper.StudentMapper.updateStatusById", student);
            middleSession.commit();
        } catch (Exception e) {
            LOGGER.error("修改中间表状态失败========>",e);
        } finally {
            middleSession.close();
        }
    }
    /**
     * 数据适配,将生产者从中间表查询的记录增加响应的属性转为我方表的记录格式
     * @param students
     * @return
     */
    private List<com.xdclass.our.model.Student> adapter(List<Student> students) {
        List<com.xdclass.our.model.Student> result = new ArrayList<>();
        students.forEach(stu -> {
            com.xdclass.our.model.Student student = new com.xdclass.our.model.Student();
            student.setDepartment(stu.getDepartment());
            student.setSex(stu.getSex());
            student.setId(stu.getId());
            student.setName(stu.getName());
            student.setBirth(stu.getBirth());
            student.setAddTime(new Date());
            result.add(student);
        });
        return result;
    }
}


消费者线程

/**
 * 消费者线程,持有DealBusiImpl业务类
 */
public class Consumer implements Runnable {
    /**
     * 生产者传入的原始读取批数据
     */
    private List data;
    /**
     * 消费者业务逻辑
     */
    private DealBusi dealBusi;
    /**
     * 消费者的任务队列
     */
    private LinkedBlockingQueue<Runnable> consumers;
    public Consumer(DealBusi dealBusi, LinkedBlockingQueue<Runnable> consumers) {
        this.dealBusi = dealBusi;
        this.consumers = consumers;
    }
    @Override
    public void run() {
        try {
            dealBusi.deal(data);
        } finally {
            try {
                //消费者线程入队
                consumers.put(this);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public void setData(List data) {
        this.data = data;
    }
}


mian函数启动类

public class Main {
    public static void main(String[] args) {
        QryBusi qryBusi = new QryBusiImpl();
        DealBusi dealBusi = new DealBusiImpl();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(20));
        LinkedBlockingQueue<Runnable> runnables = new LinkedBlockingQueue<>(10);
        for (int i = 0; i < 10; i++) {
            try {
                runnables.put(new Consumer(dealBusi,runnables));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        Producer producer = new Producer(qryBusi,runnables,threadPoolExecutor);
        new Thread(producer).start();
    }
}


项目结构:

640.jpg


总结:在平时开发中,首先按照面向对象编程的方法抽取出项目的模型,比如本项目中抽取出来生产者消费者模型。然后从模型中抽取出各个不同的组件类,比如本项目中的生产者和消费者。第三抽取出每一个组件类的核心业务逻辑,即service层的功能【这里才是最关键的部分】,比如本项目中的busi层。最后提供一个启动类作为整合项目模型中的组件和各自的核心业务逻辑,比如本例中的main类。


vip算法班双十一优惠价详情


vip算法班详情如下:

奔跑的小梁,公众号:梁霖编程工具库leetcode刷题直播教学,手把手带你刷题,双十一优惠价来啦~~


相关文章
|
2月前
|
SQL DataWorks 关系型数据库
DataWorks操作报错合集之如何处理数据同步时(mysql->hive)报:Render instance failed
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
1月前
|
canal 消息中间件 关系型数据库
Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
【9月更文挑战第1天】Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
197 4
|
2月前
|
关系型数据库 MySQL 数据库
【MySQL】手把手教你MySQL数据同步
【MySQL】手把手教你MySQL数据同步
|
2月前
|
SQL 关系型数据库 MySQL
“震撼揭秘!Flink CDC如何轻松实现SQL Server到MySQL的实时数据同步?一招在手,数据无忧!”
【8月更文挑战第7天】随着大数据技术的发展,实时数据同步变得至关重要。Apache Flink作为高性能流处理框架,在实时数据处理领域扮演着核心角色。Flink CDC(Change Data Capture)组件的加入,使得数据同步更为高效。本文介绍如何使用Flink CDC实现从SQL Server到MySQL的实时数据同步,并提供示例代码。首先确保SQL Server启用了CDC功能,接着在Flink环境中引入相关连接器。通过定义源表与目标表,并执行简单的`INSERT INTO SELECT`语句,即可完成数据同步。
139 1
|
2月前
|
SQL canal 关系型数据库
(二十四)全解MySQL之主从篇:死磕主从复制中数据同步原理与优化
兜兜转转,经过《全解MySQL专栏》前面二十多篇的内容讲解后,基本对MySQL单机模式下的各方面进阶知识做了详细阐述,同时在前面的《分库分表概念篇》、《分库分表隐患篇》两章中也首次提到了数据库的一些高可用方案,但前两章大多属于方法论,并未涵盖真正的实操过程。接下来的内容,会以目前这章作为分割点,开启MySQL高可用方案的落地实践分享的新章程!
781 1
|
2月前
|
canal 关系型数据库 MySQL
"揭秘阿里数据同步黑科技Canal:从原理到实战,手把手教你玩转MySQL数据秒级同步,让你的数据处理能力瞬间飙升,成为技术界的新晋网红!"
【8月更文挑战第18天】Canal是一款由阿里巴巴开源的高性能数据同步系统,它通过解析MySQL的增量日志(Binlog),提供低延迟、可靠的数据订阅和消费功能。Canal模拟MySQL Slave与Master间的交互协议来接收并解析Binary Log,支持数据的增量同步。配置简单直观,包括Server和Instance两层配置。在实战中,Canal可用于数据库镜像、实时备份等多种场景,通过集成Canal Client可实现数据的消费和处理,如更新缓存或写入消息队列。
364 0
|
3月前
|
分布式计算 关系型数据库 MySQL
MySQL超时参数优化与DataX高效数据同步实践
通过合理设置MySQL的超时参数,可以有效地提升数据库的稳定性和性能。而DataX作为一种高效的数据同步工具,可以帮助企业轻松实现不同数据源之间的数据迁移。无论是优化MySQL参数还是使用DataX进行数据同步,都需要根据具体的应用场景来进行细致的配置和测试,以达到最佳效果。
|
3月前
|
分布式计算 关系型数据库 MySQL
MaxCompute产品使用合集之用flink mysql的数据同步到mc的Transaction Table2.0,时间会比mysql的时间多8小时,是什么导致的
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
5月前
|
SQL Kubernetes 关系型数据库
实时计算 Flink版产品使用合集之如何实现MySQL单表数据同步到多个表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之使用 MySQL CDC 进行数据同步时,设置 server_id 参数如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
下一篇
无影云桌面