spring integration同步数据库数据

简介:

 需求为:当客户已有系统的数据被同步到我方数据库后,若再有新数据,只同步新数据到我方数据库。

解决:因为客户的业务表是不能变动的,我方在客户数据库中新建一状态表,记录哪些数据被更新过。

当客户业务表有新数据插入时,用触发器将新数据id插入到状态表。

 

为方便实例:业务表pp,状态表status

结构为:

pp:

CREATE TABLE `pp` (
  `name` varchar(255) default NULL,
  `address` varchar(255) default NULL,
  `id` int(11) NOT NULL auto_increment,
  PRIMARY KEY  (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;

status:

CREATE TABLE `status` (
  `id` int(11) NOT NULL auto_increment,
  `status` varchar(255) default 'new',
  `ppid` int(11) NOT NULL,
  PRIMARY KEY  (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=12 DEFAULT CHARSET=utf8;

触发器:

DROP TRIGGER if EXISTS mytrigger
CREATE TRIGGER mytrigger after INSERT on pp
for EACH ROW
BEGIN
 INSERT into `status`(ppid) values(new.id);
END;

 

核心配置:jdbc-inbound-context.xml

Xml代码   收藏代码
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"   
  3.        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   
  4.        xmlns:context="http://www.springframework.org/schema/context"   
  5.        xmlns:int="http://www.springframework.org/schema/integration"   
  6.        xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"      
  7.        xmlns:int-jms="http://www.springframework.org/schema/integration/jms"   
  8.        xmlns:jdbc="http://www.springframework.org/schema/jdbc"   
  9.        xsi:schemaLocation="http://www.springframework.org/schema/beans   
  10.     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd   
  11.     http://www.springframework.org/schema/context   
  12.     http://www.springframework.org/schema/context/spring-context-3.0.xsd   
  13.     http://www.springframework.org/schema/integration   
  14.     http://www.springframework.org/schema/integration/spring-integration-2.0.xsd   
  15.     http://www.springframework.org/schema/integration/jdbc   
  16.     http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc-2.0.xsd   
  17.     http://www.springframework.org/schema/jdbc   
  18.     http://www.springframework.org/schema/jdbc/spring-jdbc-3.0.xsd  
  19.      http://www.springframework.org/schema/integration/jms   
  20.     http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.0.xsd">  
  21.     <context:component-scan base-package="com.wisely.inbound"/>  
  22.        
  23.     <int:channel id="target"/>  
  24.       
  25.     <int-jdbc:inbound-channel-adapter channel="target"   
  26.                     data-source="dataSource"  
  27.                     query="select p.id as ppid,p.name as ppname from pp p,status s where p.id=s.ppid and s.status='new'"  
  28.                     update="update status as st set st.status='old' where ppid in (:ppid)"  
  29.                                        >  
  30.         <!-- 每隔多少毫秒去抓取 -->  
  31.         <int:poller fixed-rate="5000" >  
  32.             <int:transactional/>  
  33.         </int:poller>  
  34.         <!--  指定时刻抓取  
  35.         <int:poller max-messages-per-poll="1">  
  36.             <int:transactional/>  
  37.             <int:cron-trigger expression="0 0 3 * * ?"/>  
  38.         </int:poller>  
  39.         -->  
  40.     </int-jdbc:inbound-channel-adapter>  
  41.     <int:service-activator input-channel="target" ref="jdbcMessageHandler"/>     
  42.      <context:property-placeholder location="classpath*:META-INF/spring/*.properties"/>  
  43.      <bean class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close" id="dataSource">  
  44.         <property name="driverClassName" value="${database.driverClassName}"/>  
  45.         <property name="url" value="${database.url}"/>  
  46.         <property name="username" value="${database.username}"/>  
  47.         <property name="password" value="${database.password}"/>  
  48.     </bean>     
  49.     <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">  
  50.         <property name="dataSource" ref="dataSource"/>  
  51.     </bean>      
  52.    </beans>  

 

JdbcMessageHandler:

 

Java代码   收藏代码
  1. package com.wisely.inbound.jdbc;  
  2.   
  3. import java.util.List;  
  4. import java.util.Map;  
  5.   
  6. import org.springframework.integration.annotation.ServiceActivator;  
  7. import org.springframework.stereotype.Component;  
  8.   
  9. @Component  
  10. public class JdbcMessageHandler {  
  11.     @ServiceActivator  
  12.     public void handleJdbcMessage(List<Map<String ,Object>> message){  
  13.         for(Map<String,Object> resultMap:message){  
  14.             System.out.println("组:");  
  15.             for(String column:resultMap.keySet()){  
  16.                 System.out.println("字段:"+column+" 值:"+resultMap.get(column));  
  17.             }  
  18.         }  
  19.     }  
  20. }  

 

测试类:

Java代码   收藏代码
  1. package com.wisely.inbound.jdbc;  
  2.   
  3. import org.springframework.context.ApplicationContext;  
  4. import org.springframework.context.support.ClassPathXmlApplicationContext;  
  5.   
  6. public class JdbcInbound {  
  7.   
  8.     /** 
  9.      * @param args 
  10.      */  
  11.     public static void main(String[] args) {  
  12.           ApplicationContext context =   
  13.                     new ClassPathXmlApplicationContext("/META-INF/spring/jdbc-inbound-context.xml");  
  14.     }  
  15.   
  16. }  

 

 

若将channel改为jms的通道。配置文件做以下修改:

 

Xml代码   收藏代码
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"   
  3.        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   
  4.        xmlns:context="http://www.springframework.org/schema/context"   
  5.        xmlns:int="http://www.springframework.org/schema/integration"   
  6.        xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"      
  7.        xmlns:int-jms="http://www.springframework.org/schema/integration/jms"   
  8.        xmlns:jdbc="http://www.springframework.org/schema/jdbc"   
  9.        xsi:schemaLocation="http://www.springframework.org/schema/beans   
  10.     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd   
  11.     http://www.springframework.org/schema/context   
  12.     http://www.springframework.org/schema/context/spring-context-3.0.xsd   
  13.     http://www.springframework.org/schema/integration   
  14.     http://www.springframework.org/schema/integration/spring-integration-2.0.xsd   
  15.     http://www.springframework.org/schema/integration/jdbc   
  16.     http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc-2.0.xsd   
  17.     http://www.springframework.org/schema/jdbc   
  18.     http://www.springframework.org/schema/jdbc/spring-jdbc-3.0.xsd  
  19.      http://www.springframework.org/schema/integration/jms   
  20.     http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.0.xsd">  
  21.     <context:component-scan base-package="com.wisely.inbound"/>  
  22.        
  23.     <int-jms:channel id="target"  queue-name="jdbc.queue" connection-factory="connectionFactory"/>  
  24.       
  25.     <int-jdbc:inbound-channel-adapter channel="target"   
  26.                                       data-source="dataSource"  
  27.                                       query="select p.id as ppid,p.name as ppname from pp p,status s where p.id=s.ppid and s.status='new'"  
  28.                                       update="update status as st set st.status='old' where ppid in (:ppid)"  
  29.                                        >  
  30.         <!-- 每隔多少毫秒去抓取 -->  
  31.         <int:poller fixed-rate="5000" >  
  32.             <int:transactional/>  
  33.         </int:poller>  
  34.         <!--  指定时刻抓取  
  35.         <int:poller max-messages-per-poll="1">  
  36.             <int:transactional/>  
  37.             <int:cron-trigger expression="0 0 3 * * ?"/>  
  38.         </int:poller>  
  39.         -->  
  40.     </int-jdbc:inbound-channel-adapter>  
  41.     <!--   
  42.     <int-jms:message-driven-channel-adapter id="queInbound" destination-name="jmsQueue" channel="target"/> 
  43.     -->  
  44.     <int:service-activator input-channel="target" ref="jdbcMessageHandler"/>  
  45.        
  46.      <context:property-placeholder location="classpath*:META-INF/spring/*.properties"/>  
  47.      <bean class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close" id="dataSource">  
  48.         <property name="driverClassName" value="${database.driverClassName}"/>  
  49.         <property name="url" value="${database.url}"/>  
  50.         <property name="username" value="${database.username}"/>  
  51.         <property name="password" value="${database.password}"/>  
  52.     </bean>  
  53.       
  54.     <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">  
  55.         <property name="dataSource" ref="dataSource"/>  
  56.     </bean>  
  57.       
  58.       
  59.     <bean id="activeMqConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">  
  60.         <property name="brokerURL" value="vm://localhost" />  
  61.     </bean>  
  62.       
  63.     <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">  
  64.         <property name="sessionCacheSize" value="10" />  
  65.         <property name="cacheProducers" value="false"/>  
  66.         <property name="targetConnectionFactory" ref="activeMqConnectionFactory"/>  
  67.     </bean>  
  68.     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
  69.         <property name="connectionFactory" ref="connectionFactory"/>  
  70.         <property name="defaultDestinationName" value="jmsQueue" />  
  71.     </bean>  
  72. </beans>  

 

目录
相关文章
|
11天前
|
人工智能 运维 Java
Spring AI Alibaba Admin 开源!以数据为中心的 Agent 开发平台
Spring AI Alibaba Admin 正式发布!一站式实现 Prompt 管理、动态热更新、评测集构建、自动化评估与全链路可观测,助力企业高效构建可信赖的 AI Agent 应用。开源共建,现已上线!
1029 34
|
14天前
|
NoSQL Java 数据库连接
《深入理解Spring》Spring Data——数据访问的统一抽象与极致简化
Spring Data通过Repository抽象和方法名派生查询,简化数据访问层开发,告别冗余CRUD代码。支持JPA、MongoDB、Redis等多种存储,统一编程模型,提升开发效率与架构灵活性,是Java开发者必备利器。(238字)
|
2月前
|
数据采集 关系型数据库 MySQL
python爬取数据存入数据库
Python爬虫结合Scrapy与SQLAlchemy,实现高效数据采集并存入MySQL/PostgreSQL/SQLite。通过ORM映射、连接池优化与批量提交,支持百万级数据高速写入,具备良好的可扩展性与稳定性。
|
2月前
|
人工智能 Java 关系型数据库
使用数据连接池进行数据库操作
使用数据连接池进行数据库操作
72 11
|
3月前
|
存储 数据管理 数据库
数据字典是什么?和数据库、数据仓库有什么关系?
在数据处理中,你是否常困惑于字段含义、指标计算或数据来源?数据字典正是解答这些问题的关键工具,它清晰定义数据的名称、类型、来源、计算方式等,服务于开发者、分析师和数据管理者。本文详解数据字典的定义、组成及其与数据库、数据仓库的关系,助你夯实数据基础。
数据字典是什么?和数据库、数据仓库有什么关系?
|
2月前
|
缓存 Java 应用服务中间件
Spring Boot配置优化:Tomcat+数据库+缓存+日志,全场景教程
本文详解Spring Boot十大核心配置优化技巧,涵盖Tomcat连接池、数据库连接池、Jackson时区、日志管理、缓存策略、异步线程池等关键配置,结合代码示例与通俗解释,助你轻松掌握高并发场景下的性能调优方法,适用于实际项目落地。
391 4
|
2月前
|
监控 Cloud Native Java
Spring Integration 企业集成模式技术详解与实践指南
本文档全面介绍 Spring Integration 框架的核心概念、架构设计和实际应用。作为 Spring 生态系统中的企业集成解决方案,Spring Integration 基于著名的 Enterprise Integration Patterns(EIP)提供了轻量级的消息驱动架构。本文将深入探讨其消息通道、端点、过滤器、转换器等核心组件,以及如何构建可靠的企业集成解决方案。
166 0
|
3月前
|
存储 关系型数据库 数据库
【赵渝强老师】PostgreSQL数据库的WAL日志与数据写入的过程
PostgreSQL中的WAL(预写日志)是保证数据完整性的关键技术。在数据修改前,系统会先将日志写入WAL,确保宕机时可通过日志恢复数据。它减少了磁盘I/O,提升了性能,并支持手动切换日志文件。WAL文件默认存储在pg_wal目录下,采用16进制命名规则。此外,PostgreSQL提供pg_waldump工具解析日志内容。
228 0
|
10天前
|
XML 前端开发 Java
一文搞懂 Spring Boot 自动配置原理
Spring Boot 自动配置原理揭秘:通过 `@EnableAutoConfiguration` 加载 `META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports` 中的配置类,结合 `@Conditional` 按条件注入 Bean,实现“开箱即用”。核心在于约定大于配置,简化开发。
217 0

热门文章

最新文章

下一篇
开通oss服务