spring integration同步数据库数据

简介:

 需求为:当客户已有系统的数据被同步到我方数据库后,若再有新数据,只同步新数据到我方数据库。

解决:因为客户的业务表是不能变动的,我方在客户数据库中新建一状态表,记录哪些数据被更新过。

当客户业务表有新数据插入时,用触发器将新数据id插入到状态表。

 

为方便实例:业务表pp,状态表status

结构为:

pp:

CREATE TABLE `pp` (
  `name` varchar(255) default NULL,
  `address` varchar(255) default NULL,
  `id` int(11) NOT NULL auto_increment,
  PRIMARY KEY  (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;

status:

CREATE TABLE `status` (
  `id` int(11) NOT NULL auto_increment,
  `status` varchar(255) default 'new',
  `ppid` int(11) NOT NULL,
  PRIMARY KEY  (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=12 DEFAULT CHARSET=utf8;

触发器:

DROP TRIGGER if EXISTS mytrigger
CREATE TRIGGER mytrigger after INSERT on pp
for EACH ROW
BEGIN
 INSERT into `status`(ppid) values(new.id);
END;

 

核心配置:jdbc-inbound-context.xml

Xml代码   收藏代码
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"   
  3.        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   
  4.        xmlns:context="http://www.springframework.org/schema/context"   
  5.        xmlns:int="http://www.springframework.org/schema/integration"   
  6.        xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"      
  7.        xmlns:int-jms="http://www.springframework.org/schema/integration/jms"   
  8.        xmlns:jdbc="http://www.springframework.org/schema/jdbc"   
  9.        xsi:schemaLocation="http://www.springframework.org/schema/beans   
  10.     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd   
  11.     http://www.springframework.org/schema/context   
  12.     http://www.springframework.org/schema/context/spring-context-3.0.xsd   
  13.     http://www.springframework.org/schema/integration   
  14.     http://www.springframework.org/schema/integration/spring-integration-2.0.xsd   
  15.     http://www.springframework.org/schema/integration/jdbc   
  16.     http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc-2.0.xsd   
  17.     http://www.springframework.org/schema/jdbc   
  18.     http://www.springframework.org/schema/jdbc/spring-jdbc-3.0.xsd  
  19.      http://www.springframework.org/schema/integration/jms   
  20.     http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.0.xsd">  
  21.     <context:component-scan base-package="com.wisely.inbound"/>  
  22.        
  23.     <int:channel id="target"/>  
  24.       
  25.     <int-jdbc:inbound-channel-adapter channel="target"   
  26.                     data-source="dataSource"  
  27.                     query="select p.id as ppid,p.name as ppname from pp p,status s where p.id=s.ppid and s.status='new'"  
  28.                     update="update status as st set st.status='old' where ppid in (:ppid)"  
  29.                                        >  
  30.         <!-- 每隔多少毫秒去抓取 -->  
  31.         <int:poller fixed-rate="5000" >  
  32.             <int:transactional/>  
  33.         </int:poller>  
  34.         <!--  指定时刻抓取  
  35.         <int:poller max-messages-per-poll="1">  
  36.             <int:transactional/>  
  37.             <int:cron-trigger expression="0 0 3 * * ?"/>  
  38.         </int:poller>  
  39.         -->  
  40.     </int-jdbc:inbound-channel-adapter>  
  41.     <int:service-activator input-channel="target" ref="jdbcMessageHandler"/>     
  42.      <context:property-placeholder location="classpath*:META-INF/spring/*.properties"/>  
  43.      <bean class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close" id="dataSource">  
  44.         <property name="driverClassName" value="${database.driverClassName}"/>  
  45.         <property name="url" value="${database.url}"/>  
  46.         <property name="username" value="${database.username}"/>  
  47.         <property name="password" value="${database.password}"/>  
  48.     </bean>     
  49.     <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">  
  50.         <property name="dataSource" ref="dataSource"/>  
  51.     </bean>      
  52.    </beans>  

 

JdbcMessageHandler:

 

Java代码   收藏代码
  1. package com.wisely.inbound.jdbc;  
  2.   
  3. import java.util.List;  
  4. import java.util.Map;  
  5.   
  6. import org.springframework.integration.annotation.ServiceActivator;  
  7. import org.springframework.stereotype.Component;  
  8.   
  9. @Component  
  10. public class JdbcMessageHandler {  
  11.     @ServiceActivator  
  12.     public void handleJdbcMessage(List<Map<String ,Object>> message){  
  13.         for(Map<String,Object> resultMap:message){  
  14.             System.out.println("组:");  
  15.             for(String column:resultMap.keySet()){  
  16.                 System.out.println("字段:"+column+" 值:"+resultMap.get(column));  
  17.             }  
  18.         }  
  19.     }  
  20. }  

 

测试类:

Java代码   收藏代码
  1. package com.wisely.inbound.jdbc;  
  2.   
  3. import org.springframework.context.ApplicationContext;  
  4. import org.springframework.context.support.ClassPathXmlApplicationContext;  
  5.   
  6. public class JdbcInbound {  
  7.   
  8.     /** 
  9.      * @param args 
  10.      */  
  11.     public static void main(String[] args) {  
  12.           ApplicationContext context =   
  13.                     new ClassPathXmlApplicationContext("/META-INF/spring/jdbc-inbound-context.xml");  
  14.     }  
  15.   
  16. }  

 

 

若将channel改为jms的通道。配置文件做以下修改:

 

Xml代码   收藏代码
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"   
  3.        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   
  4.        xmlns:context="http://www.springframework.org/schema/context"   
  5.        xmlns:int="http://www.springframework.org/schema/integration"   
  6.        xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"      
  7.        xmlns:int-jms="http://www.springframework.org/schema/integration/jms"   
  8.        xmlns:jdbc="http://www.springframework.org/schema/jdbc"   
  9.        xsi:schemaLocation="http://www.springframework.org/schema/beans   
  10.     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd   
  11.     http://www.springframework.org/schema/context   
  12.     http://www.springframework.org/schema/context/spring-context-3.0.xsd   
  13.     http://www.springframework.org/schema/integration   
  14.     http://www.springframework.org/schema/integration/spring-integration-2.0.xsd   
  15.     http://www.springframework.org/schema/integration/jdbc   
  16.     http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc-2.0.xsd   
  17.     http://www.springframework.org/schema/jdbc   
  18.     http://www.springframework.org/schema/jdbc/spring-jdbc-3.0.xsd  
  19.      http://www.springframework.org/schema/integration/jms   
  20.     http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.0.xsd">  
  21.     <context:component-scan base-package="com.wisely.inbound"/>  
  22.        
  23.     <int-jms:channel id="target"  queue-name="jdbc.queue" connection-factory="connectionFactory"/>  
  24.       
  25.     <int-jdbc:inbound-channel-adapter channel="target"   
  26.                                       data-source="dataSource"  
  27.                                       query="select p.id as ppid,p.name as ppname from pp p,status s where p.id=s.ppid and s.status='new'"  
  28.                                       update="update status as st set st.status='old' where ppid in (:ppid)"  
  29.                                        >  
  30.         <!-- 每隔多少毫秒去抓取 -->  
  31.         <int:poller fixed-rate="5000" >  
  32.             <int:transactional/>  
  33.         </int:poller>  
  34.         <!--  指定时刻抓取  
  35.         <int:poller max-messages-per-poll="1">  
  36.             <int:transactional/>  
  37.             <int:cron-trigger expression="0 0 3 * * ?"/>  
  38.         </int:poller>  
  39.         -->  
  40.     </int-jdbc:inbound-channel-adapter>  
  41.     <!--   
  42.     <int-jms:message-driven-channel-adapter id="queInbound" destination-name="jmsQueue" channel="target"/> 
  43.     -->  
  44.     <int:service-activator input-channel="target" ref="jdbcMessageHandler"/>  
  45.        
  46.      <context:property-placeholder location="classpath*:META-INF/spring/*.properties"/>  
  47.      <bean class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close" id="dataSource">  
  48.         <property name="driverClassName" value="${database.driverClassName}"/>  
  49.         <property name="url" value="${database.url}"/>  
  50.         <property name="username" value="${database.username}"/>  
  51.         <property name="password" value="${database.password}"/>  
  52.     </bean>  
  53.       
  54.     <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">  
  55.         <property name="dataSource" ref="dataSource"/>  
  56.     </bean>  
  57.       
  58.       
  59.     <bean id="activeMqConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">  
  60.         <property name="brokerURL" value="vm://localhost" />  
  61.     </bean>  
  62.       
  63.     <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">  
  64.         <property name="sessionCacheSize" value="10" />  
  65.         <property name="cacheProducers" value="false"/>  
  66.         <property name="targetConnectionFactory" ref="activeMqConnectionFactory"/>  
  67.     </bean>  
  68.     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
  69.         <property name="connectionFactory" ref="connectionFactory"/>  
  70.         <property name="defaultDestinationName" value="jmsQueue" />  
  71.     </bean>  
  72. </beans>  

 

目录
相关文章
|
2月前
|
存储 监控 数据处理
flink 向doris 数据库写入数据时出现背压如何排查?
本文介绍了如何确定和解决Flink任务向Doris数据库写入数据时遇到的背压问题。首先通过Flink Web UI和性能指标监控识别背压,然后从Doris数据库性能、网络连接稳定性、Flink任务数据处理逻辑及资源配置等方面排查原因,并通过分析相关日志进一步定位问题。
196 61
|
6天前
|
SQL 存储 运维
从建模到运维:联犀如何完美融入时序数据库 TDengine 实现物联网数据流畅管理
本篇文章是“2024,我想和 TDengine 谈谈”征文活动的三等奖作品。文章从一个具体的业务场景出发,分析了企业在面对海量时序数据时的挑战,并提出了利用 TDengine 高效处理和存储数据的方法,帮助企业解决在数据采集、存储、分析等方面的痛点。通过这篇文章,作者不仅展示了自己对数据处理技术的理解,还进一步阐释了时序数据库在行业中的潜力与应用价值,为读者提供了很多实际的操作思路和技术选型的参考。
18 1
|
10天前
|
存储 Java easyexcel
招行面试:100万级别数据的Excel,如何秒级导入到数据库?
本文由40岁老架构师尼恩撰写,分享了应对招商银行Java后端面试绝命12题的经验。文章详细介绍了如何通过系统化准备,在面试中展示强大的技术实力。针对百万级数据的Excel导入难题,尼恩推荐使用阿里巴巴开源的EasyExcel框架,并结合高性能分片读取、Disruptor队列缓冲和高并发批量写入的架构方案,实现高效的数据处理。此外,文章还提供了完整的代码示例和配置说明,帮助读者快速掌握相关技能。建议读者参考《尼恩Java面试宝典PDF》进行系统化刷题,提升面试竞争力。关注公众号【技术自由圈】可获取更多技术资源和指导。
|
13天前
|
前端开发 JavaScript 数据库
获取数据库中字段的数据作为下拉框选项
获取数据库中字段的数据作为下拉框选项
44 5
|
10天前
|
NoSQL 关系型数据库 分布式数据库
基于PolarDB的图分析:通过DTS将其它数据库的数据表同步到PolarDB的图
本文介绍了使用DTS任务将数据从MySQL等数据源实时同步到PolarDB-PG的图数据库中的步骤.
|
2月前
|
缓存 关系型数据库 MySQL
高并发架构系列:数据库主从同步的 3 种方案
本文详解高并发场景下数据库主从同步的三种解决方案:数据主从同步、数据库半同步复制、数据库中间件同步和缓存记录写key同步,旨在帮助解决数据一致性问题。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
高并发架构系列:数据库主从同步的 3 种方案
|
2月前
|
关系型数据库 MySQL 数据库
GBase 数据库如何像MYSQL一样存放多行数据
GBase 数据库如何像MYSQL一样存放多行数据
|
2月前
|
关系型数据库 分布式数据库 数据库
云栖大会|从数据到决策:AI时代数据库如何实现高效数据管理?
在2024云栖大会「海量数据的高效存储与管理」专场,阿里云瑶池讲师团携手AMD、FunPlus、太美医疗科技、中石化、平安科技以及小赢科技、迅雷集团的资深技术专家深入分享了阿里云在OLTP方向的最新技术进展和行业最佳实践。
|
3月前
|
人工智能 自然语言处理 前端开发
SpringBoot + 通义千问 + 自定义React组件:支持EventStream数据解析的技术实践
【10月更文挑战第7天】在现代Web开发中,集成多种技术栈以实现复杂的功能需求已成为常态。本文将详细介绍如何使用SpringBoot作为后端框架,结合阿里巴巴的通义千问(一个强大的自然语言处理服务),并通过自定义React组件来支持服务器发送事件(SSE, Server-Sent Events)的EventStream数据解析。这一组合不仅能够实现高效的实时通信,还能利用AI技术提升用户体验。
261 2
|
13天前
|
Java 数据库连接 Maven
最新版 | 深入剖析SpringBoot3源码——分析自动装配原理(面试常考)
自动装配是现在面试中常考的一道面试题。本文基于最新的 SpringBoot 3.3.3 版本的源码来分析自动装配的原理,并在文未说明了SpringBoot2和SpringBoot3的自动装配源码中区别,以及面试回答的拿分核心话术。
最新版 | 深入剖析SpringBoot3源码——分析自动装配原理(面试常考)