张坤的个人博客

  • 首页
  • 分类
  • 标签
  • 日志

  • 搜索
Jenkins RabbitMQ Zookeeper IDEA Logstash Kibana ELK NIO Netty Spring Cloud Golang DataX Elasticsearch React Native Mysql H2 Socket Spring Boot Kafka Mybatis Sqlmap Vue Postgresql Docker Vert.x Flutter Flink Redis

Java数据同步

发表于 2020-05-29 | 分类于 默认分类 | 0 | 阅读次数 34

Java数据同步

项目基于Spring Boot,数据库同步涉及到多个数据源,配置多个数据源。

在application.properties中配置两个数据源,remote.wz和local.wz

spring.datasource.remote.wz.driver-class-name=com.microsoft.sqlserver.jdbc.SQLServerDriver
spring.datasource.remote.wz.jdbc-url=jdbc:sqlserver://localhost:1433;databaseName=szf
spring.datasource.remote.wz.username=sa
spring.datasource.remote.wz.password=sa

spring.datasource.local.wz.driver-class-name=com.microsoft.sqlserver.jdbc.SQLServerDriver
spring.datasource.local.wz.jdbc-url=jdbc:sqlserver://localhost:1433;databaseName=WZ
spring.datasource.local.wz.username=sa
spring.datasource.local.wz.password=sa

在java类中配置这两个数据源

remote.wz

@Configuration
@MapperScan(basePackages = {"com.demo.dataimport.dao.remotewz"}, sqlSessionFactoryRef = "remoteWzSqlSessionFactory", sqlSessionTemplateRef = "remoteWzSqlSessionTemplate")
public class RemoteWzConfig {

    /**
     * 远程工商
     */
    @Bean("remoteWzDataSource")
    @ConfigurationProperties(prefix = "spring.datasource.remote.wz")
    public DataSource remoteGsDataSource() {
        return DataSourceBuilder.create().build();
    }

    @Bean("remoteWzSqlSessionFactory")
    public SqlSessionFactory sqlSessionFactory1(@Qualifier("remoteWzDataSource") DataSource ds1) throws Exception {
        SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
        factoryBean.setDataSource(ds1);
        factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/remotewz/*.xml"));
        return factoryBean.getObject();

    }

    @Bean(name = "remoteWzTransactionManager")
    public DataSourceTransactionManager testTransactionManager(@Qualifier("remoteWzDataSource") DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }

    @Bean(name = "remoteWzSqlSessionTemplate")
    public SqlSessionTemplate testSqlSessionTemplate(@Qualifier("remoteWzSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}

local.wz

@Configuration
@MapperScan(basePackages = {"com.demo.dataimport.dao.localwz"}, sqlSessionFactoryRef = "localWzSqlSessionFactory", sqlSessionTemplateRef = "localWzSqlSessionTemplate")
public class LocalWzConfig {

    /**
     * 远程工商
     */
    @Bean("localWzDataSource")
    @ConfigurationProperties(prefix = "spring.datasource.local.wz")
    public DataSource remoteGsDataSource() {
        return DataSourceBuilder.create().build();
    }

    @Bean("localWzSqlSessionFactory")
    public SqlSessionFactory sqlSessionFactory1(@Qualifier("localWzDataSource") DataSource ds1) throws Exception {
        SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
        factoryBean.setDataSource(ds1); // 使用titan数据源, 连接titan库
        factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/localwz/*.xml"));
        return factoryBean.getObject();

    }

    @Bean(name = "localWzTransactionManager")
    public DataSourceTransactionManager testTransactionManager(@Qualifier("localWzDataSource") DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }

    @Bean(name = "localWzSqlSessionTemplate")
    public SqlSessionTemplate testSqlSessionTemplate(@Qualifier("localWzSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}

remote.wz的dao接口就放在com.demo.dataimport.dao.remotewz包下,mapper.xml放在resources目录的mapper/remotewz目录下

local.wz的dao在com.demo.dataimport.dao.localwz包下,mapper.xml放在resources目录的mapper/localwz目录下

定时同步数据

每天晚上10点钟开始同步,最好是确定一个唯一标识,比如时间或者自增字段,我下面这个例子就是根据自增字段和时间戳来同步一个简单demo,自增字段负责insert,时间戳负责update。通过判断以前已经同步过的数据的时间戳是否大于。没有用到多线程啥的。因为这个自增字段在数据库中时decimal类型,所以java类型用的是BigDecimal而不是Integer。

gsRemoteDao代表远程库,gsLocalDao代表本地库,需要将远程库的数据同步到本地库

// 可通过配置文件设置
@Value("${querysize}")
private String querySizeStr;

@Scheduled(cron = "0 0 22 * * ?")
@PostConstruct	// 第一次启动就同步一次
public void syncGsDatabase() {
    // 每次从远程库查多少条,按批次同步
    int querySize = Integer.valueOf(querySizeStr);
    
    // 远程数据库最后一条数据
    GsDO lastRemoteGs = gsRemoteDao.getLastRow();
    // 远程库最后一条数据的自增标识
    BigDecimal endIns = lastRemoteGs.getIns();
    
    // 如果是第一次同步数据,默认从第一条开始同步
    BigDecimal beginIns;
    String updatetime = null;

    // 获取本地最后一条数据
    GsDO localDO = gsLocalDao.getLastRow();

    // 本地还没开始同步数据
    if (localDO == null) {
        // 默认开始位置
        beginIns = gsRemoteDao.getFirstRow().getIns().subtract(BigDecimal.valueOf(1));
    } else {
        // 更新部分的结束位置,插入部分的开始位置
        beginIns = localDO.getIns();
        // 本地最后一条数据的最后插入时间
        updatetime = gsLocalDao.getLastUpdateTime().getUpdatetime();
    }

    // 更新数据程序部分,如果updatetime为null就说明是第一次同步,就跳过更新部分
    if (updatetime != null) {
        // currUpdatedIns是游标,查询的时候LIMIT #{currUpdatedIns}, #{querySize},需要每次修改currUpdatedIns来查询下一个分页数据
        BigDecimal currUpdatedIns = BigDecimal.valueOf(0);
        GsDO param = new GsDO();
        while (beginIns.compareTo(currUpdatedIns) >= 0) {
            // 通过上次本地同步的最后一条数据的更新时间来判断远程库中是否有更新数据
            param.setUpdatetime(updatetime);
            // 设置上限,beginIns后面的数据是本地没有的,需要的是insert而不是update
            param.setMaxIns(beginIns);
            // 批量查
            param.setLimit(querySize);
            // 重置游标为上一次查询最大值的自增标识
            param.setIns(currUpdatedIns);
            
            // 查出远程数据库上的已更新的数据
            List<GsDO> updateList = gsRemoteDao.list(param);
            
            // 更新旧数据
            for (GsDO gsDO : updateList) {
                gsLocalDao.updateByPrimaryKeySelective(gsDO);
            }
		
            // 按自增标识排序,获取最大的自增标识,在下一次查询使用它来做分页
            Optional<GsDO> o = updateList.stream().max(Comparator.comparing(GsDO::getIns));
            if (o.isPresent()) {
                currUpdatedIns = o.get().getIns();
            } else {
                break;
            }
        }
    }

    // 插入数据程序部分,基本逻辑和更新部分差不多
    GsDO param = new GsDO();
    while (endIns.compareTo(beginIns) > 0) {
        param.setIns(beginIns);
        param.setLimit(querySize);
        // 获取新数据
        List<GsDO> insertList = gsRemoteDao.list(param);
        for (GsDO gsDO : insertList) {
            // 插入
            gsLocalDao.insertSelective(gsDO);
        }

        // 获取新数据最大的自增标识,用来做分页
        Optional<GsDO> o = insertList.stream().max(Comparator.comparing(GsDO::getIns));
        if (o.isPresent()) {
            beginIns = o.get().getIns();
        } else {
            break;
        }
    }
}

部分Sql代码实现

gsRemoteDao.getLastRow

<select id="getLastRow" resultMap="BaseResultMap">
    SELECT
    <include refid="Base_Column_List"/>
    FROM sszt_jbxx110497 WHERE ins = (SELECT MAX(ins) FROM sszt_jbxx110497)
    LIMIT 0, 1
</select>

gsRemoteDao.getFirstRow

<select id="getFirstRow" resultMap="BaseResultMap">
    SELECT
    <include refid="Base_Column_List"/>
    FROM sszt_jbxx110497 WHERE ins = (SELECT MIN(ins) FROM sszt_jbxx110497)
    LIMIT 0, 1
</select>

gsRemoteDao.list

<select id="list" parameterType="com.ketu.dataimport.model.GsDO" resultMap="BaseResultMap">
    SELECT
    <include refid="Base_Column_List"/>
    FROM sszt_jbxx110497
    <include refid="BaseWhere"/>
    ORDER BY ins ASC
    LIMIT 0, #{limit}
</select>

gsLocalDao.getLastRow

<select id="getLastRow" resultMap="BaseResultMap">
    SELECT
    <include refid="Base_Column_List"/>
    FROM sszt_jbxx110497 WHERE ins = (SELECT MAX(ins) FROM sszt_jbxx110497)
</select>

gsLocalDao.getLastUpdateTime

<select id="getLastUpdateTime" resultMap="BaseResultMap">
    SELECT TOP 1
    <include refid="Base_Column_List"/>
    FROM sszt_jbxx110497 ORDER BY updatetime DESC
</select>

gsLocalDao.updateByPrimaryKeySelective

<update id="updateByPrimaryKeySelective" parameterType="com.ketu.dataimport.model.GsDO">
    update sszt_jbxx110497
    <set>
        <if test="jyfw != null">
            JYFW = #{jyfw,jdbcType=VARCHAR},
        </if>
       	...
    </set>
    where RECORDID = #{recordid,jdbcType=VARCHAR}
</update>
# Jenkins # RabbitMQ # Zookeeper # IDEA # Logstash # Kibana # ELK # NIO # Netty # Spring Cloud # Golang # DataX # Elasticsearch # React Native # Mysql # H2 # Socket # Spring Boot # Kafka # Mybatis # Sqlmap # Vue # Postgresql # Docker # Vert.x # Flutter # Flink # Redis
Redis字符串详解
登录验证码开发记录
  • 文章目录
  • 站点概览
会Coding的猴子

会Coding的猴子

57 日志
19 分类
28 标签
RSS
Github
Creative Commons
© 2021 会Coding的猴子
由 Halo 强力驱动
|
主题 - NexT.Gemini v5.1.4

湘ICP备18011740号