spring integration同步数据库数据

简介:

 需求为:当客户已有系统的数据被同步到我方数据库后,若再有新数据,只同步新数据到我方数据库。

解决:因为客户的业务表是不能变动的,我方在客户数据库中新建一状态表,记录哪些数据被更新过。

当客户业务表有新数据插入时,用触发器将新数据id插入到状态表。

 

为方便实例:业务表pp,状态表status

结构为:

pp:

CREATE TABLE `pp` (
  `name` varchar(255) default NULL,
  `address` varchar(255) default NULL,
  `id` int(11) NOT NULL auto_increment,
  PRIMARY KEY  (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;

status:

CREATE TABLE `status` (
  `id` int(11) NOT NULL auto_increment,
  `status` varchar(255) default 'new',
  `ppid` int(11) NOT NULL,
  PRIMARY KEY  (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=12 DEFAULT CHARSET=utf8;

触发器:

DROP TRIGGER if EXISTS mytrigger
CREATE TRIGGER mytrigger after INSERT on pp
for EACH ROW
BEGIN
 INSERT into `status`(ppid) values(new.id);
END;

 

核心配置:jdbc-inbound-context.xml

Xml代码   收藏代码
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"   
  3.        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   
  4.        xmlns:context="http://www.springframework.org/schema/context"   
  5.        xmlns:int="http://www.springframework.org/schema/integration"   
  6.        xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"      
  7.        xmlns:int-jms="http://www.springframework.org/schema/integration/jms"   
  8.        xmlns:jdbc="http://www.springframework.org/schema/jdbc"   
  9.        xsi:schemaLocation="http://www.springframework.org/schema/beans   
  10.     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd   
  11.     http://www.springframework.org/schema/context   
  12.     http://www.springframework.org/schema/context/spring-context-3.0.xsd   
  13.     http://www.springframework.org/schema/integration   
  14.     http://www.springframework.org/schema/integration/spring-integration-2.0.xsd   
  15.     http://www.springframework.org/schema/integration/jdbc   
  16.     http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc-2.0.xsd   
  17.     http://www.springframework.org/schema/jdbc   
  18.     http://www.springframework.org/schema/jdbc/spring-jdbc-3.0.xsd  
  19.      http://www.springframework.org/schema/integration/jms   
  20.     http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.0.xsd">  
  21.     <context:component-scan base-package="com.wisely.inbound"/>  
  22.        
  23.     <int:channel id="target"/>  
  24.       
  25.     <int-jdbc:inbound-channel-adapter channel="target"   
  26.                     data-source="dataSource"  
  27.                     query="select p.id as ppid,p.name as ppname from pp p,status s where p.id=s.ppid and s.status='new'"  
  28.                     update="update status as st set st.status='old' where ppid in (:ppid)"  
  29.                                        >  
  30.         <!-- 每隔多少毫秒去抓取 -->  
  31.         <int:poller fixed-rate="5000" >  
  32.             <int:transactional/>  
  33.         </int:poller>  
  34.         <!--  指定时刻抓取  
  35.         <int:poller max-messages-per-poll="1">  
  36.             <int:transactional/>  
  37.             <int:cron-trigger expression="0 0 3 * * ?"/>  
  38.         </int:poller>  
  39.         -->  
  40.     </int-jdbc:inbound-channel-adapter>  
  41.     <int:service-activator input-channel="target" ref="jdbcMessageHandler"/>     
  42.      <context:property-placeholder location="classpath*:META-INF/spring/*.properties"/>  
  43.      <bean class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close" id="dataSource">  
  44.         <property name="driverClassName" value="${database.driverClassName}"/>  
  45.         <property name="url" value="${database.url}"/>  
  46.         <property name="username" value="${database.username}"/>  
  47.         <property name="password" value="${database.password}"/>  
  48.     </bean>     
  49.     <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">  
  50.         <property name="dataSource" ref="dataSource"/>  
  51.     </bean>      
  52.    </beans>  

 

JdbcMessageHandler:

 

Java代码   收藏代码
  1. package com.wisely.inbound.jdbc;  
  2.   
  3. import java.util.List;  
  4. import java.util.Map;  
  5.   
  6. import org.springframework.integration.annotation.ServiceActivator;  
  7. import org.springframework.stereotype.Component;  
  8.   
  9. @Component  
  10. public class JdbcMessageHandler {  
  11.     @ServiceActivator  
  12.     public void handleJdbcMessage(List<Map<String ,Object>> message){  
  13.         for(Map<String,Object> resultMap:message){  
  14.             System.out.println("组:");  
  15.             for(String column:resultMap.keySet()){  
  16.                 System.out.println("字段:"+column+" 值:"+resultMap.get(column));  
  17.             }  
  18.         }  
  19.     }  
  20. }  

 

测试类:

Java代码   收藏代码
  1. package com.wisely.inbound.jdbc;  
  2.   
  3. import org.springframework.context.ApplicationContext;  
  4. import org.springframework.context.support.ClassPathXmlApplicationContext;  
  5.   
  6. public class JdbcInbound {  
  7.   
  8.     /** 
  9.      * @param args 
  10.      */  
  11.     public static void main(String[] args) {  
  12.           ApplicationContext context =   
  13.                     new ClassPathXmlApplicationContext("/META-INF/spring/jdbc-inbound-context.xml");  
  14.     }  
  15.   
  16. }  

 

 

若将channel改为jms的通道。配置文件做以下修改:

 

Xml代码   收藏代码
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"   
  3.        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   
  4.        xmlns:context="http://www.springframework.org/schema/context"   
  5.        xmlns:int="http://www.springframework.org/schema/integration"   
  6.        xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"      
  7.        xmlns:int-jms="http://www.springframework.org/schema/integration/jms"   
  8.        xmlns:jdbc="http://www.springframework.org/schema/jdbc"   
  9.        xsi:schemaLocation="http://www.springframework.org/schema/beans   
  10.     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd   
  11.     http://www.springframework.org/schema/context   
  12.     http://www.springframework.org/schema/context/spring-context-3.0.xsd   
  13.     http://www.springframework.org/schema/integration   
  14.     http://www.springframework.org/schema/integration/spring-integration-2.0.xsd   
  15.     http://www.springframework.org/schema/integration/jdbc   
  16.     http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc-2.0.xsd   
  17.     http://www.springframework.org/schema/jdbc   
  18.     http://www.springframework.org/schema/jdbc/spring-jdbc-3.0.xsd  
  19.      http://www.springframework.org/schema/integration/jms   
  20.     http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.0.xsd">  
  21.     <context:component-scan base-package="com.wisely.inbound"/>  
  22.        
  23.     <int-jms:channel id="target"  queue-name="jdbc.queue" connection-factory="connectionFactory"/>  
  24.       
  25.     <int-jdbc:inbound-channel-adapter channel="target"   
  26.                                       data-source="dataSource"  
  27.                                       query="select p.id as ppid,p.name as ppname from pp p,status s where p.id=s.ppid and s.status='new'"  
  28.                                       update="update status as st set st.status='old' where ppid in (:ppid)"  
  29.                                        >  
  30.         <!-- 每隔多少毫秒去抓取 -->  
  31.         <int:poller fixed-rate="5000" >  
  32.             <int:transactional/>  
  33.         </int:poller>  
  34.         <!--  指定时刻抓取  
  35.         <int:poller max-messages-per-poll="1">  
  36.             <int:transactional/>  
  37.             <int:cron-trigger expression="0 0 3 * * ?"/>  
  38.         </int:poller>  
  39.         -->  
  40.     </int-jdbc:inbound-channel-adapter>  
  41.     <!--   
  42.     <int-jms:message-driven-channel-adapter id="queInbound" destination-name="jmsQueue" channel="target"/> 
  43.     -->  
  44.     <int:service-activator input-channel="target" ref="jdbcMessageHandler"/>  
  45.        
  46.      <context:property-placeholder location="classpath*:META-INF/spring/*.properties"/>  
  47.      <bean class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close" id="dataSource">  
  48.         <property name="driverClassName" value="${database.driverClassName}"/>  
  49.         <property name="url" value="${database.url}"/>  
  50.         <property name="username" value="${database.username}"/>  
  51.         <property name="password" value="${database.password}"/>  
  52.     </bean>  
  53.       
  54.     <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">  
  55.         <property name="dataSource" ref="dataSource"/>  
  56.     </bean>  
  57.       
  58.       
  59.     <bean id="activeMqConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">  
  60.         <property name="brokerURL" value="vm://localhost" />  
  61.     </bean>  
  62.       
  63.     <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">  
  64.         <property name="sessionCacheSize" value="10" />  
  65.         <property name="cacheProducers" value="false"/>  
  66.         <property name="targetConnectionFactory" ref="activeMqConnectionFactory"/>  
  67.     </bean>  
  68.     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
  69.         <property name="connectionFactory" ref="connectionFactory"/>  
  70.         <property name="defaultDestinationName" value="jmsQueue" />  
  71.     </bean>  
  72. </beans>  

 

目录
相关文章
|
25天前
|
存储 Oracle 关系型数据库
Dataphin常见问题之想要周期执行任务如何解决
Dataphin是阿里云提供的一站式数据处理服务,旨在帮助企业构建一体化的智能数据处理平台。Dataphin整合了数据建模、数据处理、数据开发、数据服务等多个功能,支持企业更高效地进行数据治理和分析。
|
1月前
|
存储 关系型数据库 MySQL
如何处理爬取到的数据,例如存储到数据库或文件中?
【2月更文挑战第23天】【2月更文挑战第73篇】如何处理爬取到的数据,例如存储到数据库或文件中?
|
1月前
|
SQL 开发框架 JavaScript
在 Vue 中进行数据持久化时,有哪些常用的数据库框架?
在 Vue 中进行数据持久化时,有哪些常用的数据库框架?
46 3
|
1月前
|
存储 数据管理 数据处理
数据之光:探索数据库技术的演进之路
数据之光:探索数据库技术的演进之路
60 1
|
1月前
|
存储 运维 关系型数据库
数据的力量:构筑现代大型网站之数据库基础与应用
数据的力量:构筑现代大型网站之数据库基础与应用
49 0
|
28天前
|
SQL Java 数据库连接
从来没想到我们会扒拉nohup文件去找我们想要的数据,然后往数据库中添加。。。...
从来没想到我们会扒拉nohup文件去找我们想要的数据,然后往数据库中添加。。。...
17 0
|
2天前
|
SQL 关系型数据库 MySQL
关系型数据库插入数据的语句
使用SQL的`INSERT INTO`语句向关系型数据库的`students`表插入数据。例如,插入一个`id`为1,`name`为&#39;张三&#39;,`age`为20的记录:`INSERT INTO students (id, name, age) VALUES (1, &#39;张三&#39;, 20)。如果`id`自增,则可简化为`INSERT INTO students (name, age) VALUES (&#39;张三&#39;, 20)`。
5 2
|
2天前
|
SQL 存储 Oracle
关系型数据库查询数据的语句
本文介绍了关系型数据库中的基本SQL查询语句,包括选择所有或特定列、带条件查询、排序、分组、过滤分组、表连接、限制记录数及子查询。SQL还支持窗口函数、存储过程等高级功能,是高效管理数据库的关键。建议深入学习SQL及相应数据库系统文档。
6 2
|
6天前
|
数据采集 前端开发 Java
数据塑造:Spring MVC中@ModelAttribute的高级数据预处理技巧
数据塑造:Spring MVC中@ModelAttribute的高级数据预处理技巧
20 3
|
6天前
|
前端开发 Java Spring
数据之桥:深入Spring MVC中传递数据给视图的实用指南
数据之桥:深入Spring MVC中传递数据给视图的实用指南
20 3