编写静态多数据源代码并做定时任务实现数据库数据同步

首先我们在SpringBoot的配置文件中将两个数据源配置出来

server:
  port: 8083

spring:
  datasource:
    remote :
      driver-class-name: com.mysql.cj.jdbc.Driver
      jdbc-url: jdbc:mysql://21.33.322.22/greer?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC
      username: 333
      password: 22@2
    mine :
      driver-class-name: com.mysql.cj.jdbc.Driver
      jdbc-url: jdbc:mysql://23.33.212.22/ferer?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC
      username: w22
      password: 222
  profiles:
    active: dev

mybatis-plus:
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl #开启sql日志

logging:
  level:
    com.starcpdk.mapper: debug

然后我们可以先从mapper层开始写起 , 下面是我的mapper层结构
编写静态多数据源代码并做定时任务实现数据库数据同步
下面我们来写配置类进行数据源的配置
首先我们准备一个配置类读取配置文件中的数据库信息,并创建数据源

package com.starcpdk.config;

import com.zaxxer.hikari.HikariDataSource;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import javax.sql.DataSource;

@Configuration
public class DataSourceConfig {
 
    // 表示这个数据源是默认数据源
    @Primary
    // 将这个对象放入spring容器中(交给Spring管理)
    @Bean(name="remoteDataSource")
    // 读取 application.yml 中的配置参数映射成为一个对象
    @ConfigurationProperties(prefix = "spring.datasource.remote")
    public DataSource getDataSource1(){
        // 创建一个数据源
        return DataSourceBuilder.create().type(HikariDataSource.class).build();
    }
 
    @Primary
    @Bean(name="mineDataSource")
    @ConfigurationProperties(prefix = "spring.datasource.mine")
    public DataSource getDataSource2(){
        return DataSourceBuilder.create().type(HikariDataSource.class).build();
    }
}

接着我们需要针对这两个数据源进行mapper文件的映射

package com.starcpdk.config;

import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;

import javax.sql.DataSource;

/**
 * 数据源Config2
 */
@Configuration
@MapperScan(basePackages = {"com.starcpdk.mapper.mine"}, sqlSessionFactoryRef = "mineSqlSessionFactory")
public class MybatisMineConfig {
 
    @Autowired
    @Qualifier("mineDataSource")
    private DataSource dataSource;
 
    /**
     * 创建 SqlSessionFactory
     * @return
     * @throws Exception
     */
    @Bean(name="mineSqlSessionFactory")
//    @Primary
    public SqlSessionFactory mineSqlSessionFactory() throws Exception{
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSource);
        // 设置mybatis的xml所在位置
        bean.setMapperLocations(new PathMatchingResourcePatternResolver().
                getResources("classpath*:com/starcpdk/mapper/mine/xml/*.xml"));
        return bean.getObject();
    }
 
    /**
     * 通过 SqlSessionFactory 来创建 SqlSessionTemplate
     * @param sqlSessionFactory
     * @return
     */
    @Bean(name="mineSqlSessionTemplate")
//    @Primary
    public SqlSessionTemplate mineSqlSessionTemplate(@Qualifier("mineSqlSessionFactory") SqlSessionFactory sqlSessionFactory){
        // SqlSessionTemplate是线程安全的,可以被多个DAO所共享使用
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}
package com.starcpdk.config;

import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;

import javax.sql.DataSource;

/**
 * 数据源Config1
 */
@Configuration
@MapperScan(basePackages = {"com.starcpdk.mapper.remote"}, sqlSessionFactoryRef  = "remoteSqlSessionFactory")
public class MybatisRemoteConfig {
 
    @Autowired
    @Qualifier("remoteDataSource")
    private DataSource dataSource;
 
    /**
     * 创建 SqlSessionFactory
     * @return
     * @throws Exception
     */
    @Bean(name="remoteSqlSessionFactory")
    @Primary
    // @Qualifier表示查找Spring容器中名字为 preDataSource 的对象
    public SqlSessionFactory remoteSqlSessionFactory() throws Exception{
        // 用来创建 SqlSessionFactory 等同于下面配置
//        <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
//            <property name="dataSource" ref="dataSource" />
//            <property name="mapperLocations" value="classpath:mybatis-mapper/*.xml"/>
//        </bean>
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSource);
        // 设置mybatis的xml所在位置(扫描mybatis的相关xml文件,装配到容器中)
        bean.setMapperLocations(new PathMatchingResourcePatternResolver().
                getResources("classpath*:com/starcpdk/mapper/remote/xml/*.xml"));
        return bean.getObject();
    }
 
    /**
     * 通过 SqlSessionFactory 来创建 SqlSessionTemplate
     * @param sqlSessionFactory
     * @return
     */
    @Bean(name="remoteSqlSessionTemplate")
    @Primary
    public SqlSessionTemplate remoteSqlSessionTemplate(@Qualifier("remoteSqlSessionFactory") SqlSessionFactory sqlSessionFactory){
        // SqlSessionTemplate是线程安全的,可以被多个DAO所共享使用
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}

最后我们再写一个自己的config配置类就可以了 , 在这个配置类中我们可以写定时任务 , 这里我写的定时任务是靠springboot自带的定时任务实现的

package com.starcpdk.config;

import com.baomidou.mybatisplus.extension.plugins.PerformanceInterceptor;
import com.starcpdk.pojo.Maindata;
import com.starcpdk.service.mine.MaindataMineService;
import com.starcpdk.service.remote.MaindataRemoteService;
import com.starcpdk.util.DateUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;

@Configuration
@EnableScheduling   // 2.开启定时任务
@Slf4j

public class MyConfig {

    @Resource
    MaindataMineService maindataMineService;

    @Resource
    MaindataRemoteService maindataRemoteService;


    @Bean
    @Profile({"dev", "test"})// 设置 dev test 环境开启
    public PerformanceInterceptor performanceInterceptor() {
        PerformanceInterceptor performanceInterceptor = new PerformanceInterceptor();
        performanceInterceptor.setMaxTime(100);//ms,超过此处设置的ms则sql不执行
        performanceInterceptor.setFormat(true);
        return performanceInterceptor;
    }

    //3.添加定时任务
    @Scheduled(cron = "0 27 9 * * ?")
    private void insertData() {
        log.info("定时任务开始时间: " + LocalDateTime.now());
        log.info("mineDateTimeMax" + maindataMineService.maxMineDatetime());
        log.info("remoteDateTimeMax" + maindataRemoteService.maxRemoteDatetime());

        String mineDatetime = maindataMineService.maxMineDatetime();
        String remoteDatetime = maindataRemoteService.maxRemoteDatetime();
        HashMap<String, String> map = new HashMap<>();
        map.put("mineDatetime", mineDatetime);
        map.put("remoteDatetime", remoteDatetime);


        if (map.get("mineDatetime") != null){
            List<Maindata> list = maindataRemoteService.getAllData(map);
            log.info("list:", list);
            for (Maindata maindata : list) {
                HashMap<String, Object> insertMap = new HashMap<>();
                insertMap.put("mdId", maindata.getMdId());
                insertMap.put("sid", maindata.getSid());
                insertMap.put("sId", "");
                insertMap.put("stationId", maindata.getStationId());
                insertMap.put("mdValue", maindata.getMdValue());
                insertMap.put("mdDatetime", maindata.getMdDatetime());
                insertMap.put("mdSn", maindata.getMdSn());
                maindataMineService.insertData(insertMap);
            }
        }else {
            List<Maindata> list = maindataRemoteService.getAllDataWithMineIsNull(map);

            log.info("list:", list);
            
            for (Maindata maindata : list) {
            	HashMap<String, Object> insertMap = new HashMap<>();
                insertMap.put("mdId", maindata.getMdId());
                insertMap.put("sid", maindata.getSid());
                insertMap.put("sId", "");
                insertMap.put("stationId", maindata.getStationId());
                insertMap.put("mdValue", maindata.getMdValue());
                insertMap.put("mdDatetime", maindata.getMdDatetime());
                insertMap.put("mdSn", maindata.getMdSn());
                maindataMineService.insertData(insertMap);
            }
        }

        log.info("定时任务结束时间: " + LocalDateTime.now());
    }

}

在定时任务中我写的代码会导致内存溢出
首先我看到堆内存溢出的问题是考虑到HashMap的new过程放到了循环里面 , 大量循环会导致内存溢出 , 因为map对象键相同 , 值会覆盖 , 所以我们可以将hashMap的nnew 对象过程放到循环外面 , 这样整个程序中就只会存在一个map对象了

但是 , 我们发现他还是会内存溢出 , 于时我们就定位到了list集合
我们查询数据库得到的list数据集合中有三百多万条数据 , 也就是说有三百多万个对象存在list集合中 , 这样同样会导致内存溢出

所以我只能通过优化代码实现
我以时间作为限制条件 , 让他每次查询两天的数据进行同步 , 同步完之后再进行下一天数据查询

package com.starcpdk.config;

import com.baomidou.mybatisplus.extension.plugins.PerformanceInterceptor;
import com.starcpdk.pojo.Maindata;
import com.starcpdk.service.mine.MaindataMineService;
import com.starcpdk.service.remote.MaindataRemoteService;
import com.starcpdk.util.DateUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;

@Configuration
@EnableScheduling   // 2.开启定时任务
@Slf4j

public class MyConfig {

    @Resource
    MaindataMineService maindataMineService;

    @Resource
    MaindataRemoteService maindataRemoteService;


    @Bean
    @Profile({"dev", "test"})// 设置 dev test 环境开启
    public PerformanceInterceptor performanceInterceptor() {
        PerformanceInterceptor performanceInterceptor = new PerformanceInterceptor();
        performanceInterceptor.setMaxTime(100);//ms,超过此处设置的ms则sql不执行
        performanceInterceptor.setFormat(true);
        return performanceInterceptor;
    }

    //3.添加定时任务
    @Scheduled(cron = "0 27 9 * * ?")
    private void insertData() {
        log.info("定时任务开始时间: " + LocalDateTime.now());
        log.info("mineDateTimeMax" + maindataMineService.maxMineDatetime());
        log.info("remoteDateTimeMax" + maindataRemoteService.maxRemoteDatetime());

        String mineDatetime = maindataMineService.maxMineDatetime();
        String remoteDatetime = maindataRemoteService.maxRemoteDatetime();
        HashMap<String, String> map = new HashMap<>();
        map.put("mineDatetime", mineDatetime);
        map.put("remoteDatetime", remoteDatetime);


        if (map.get("mineDatetime") != null) {
            List<Maindata> list = maindataRemoteService.getAllData(map);
            // log.info("list:", list);
            HashMap<String, Object> insertMap = new HashMap<>();
            for (Maindata maindata : list) {
                insertMap.put("mdId", maindata.getMdId());
                insertMap.put("sid", maindata.getSid());
                insertMap.put("sId", "");
                insertMap.put("stationId", maindata.getStationId());
                insertMap.put("mdValue", maindata.getMdValue());
                insertMap.put("mdDatetime", maindata.getMdDatetime());
                insertMap.put("mdSn", maindata.getMdSn());
                maindataMineService.insertData(insertMap);
            }
        } else {

            String maxMineDateTime = maindataMineService.maxMineDatetime();
            String maxRemoteDatetime = maindataRemoteService.maxRemoteDatetime();

            while (maxMineDateTime != maxRemoteDatetime){
                if (maxMineDateTime == null || "".equals(maxMineDateTime)){
                    map.put("remoteDatetime", maindataRemoteService.minRemoteDatetime());

                    List<Maindata> list = maindataRemoteService.getAllDataWithMineIsNull(map);
                    // log.info("list:", list);
                    HashMap<String, Object> insertMap = new HashMap<>();

                    for (Maindata maindata : list) {
                        insertMap.put("mdId", maindata.getMdId());
                        insertMap.put("sid", maindata.getSid());
                        insertMap.put("sId", "");
                        insertMap.put("stationId", maindata.getStationId());
                        insertMap.put("mdValue", maindata.getMdValue());
                        insertMap.put("mdDatetime", maindata.getMdDatetime());
                        insertMap.put("mdSn", maindata.getMdSn());
                        maindataMineService.insertData(insertMap);
                    }
                }
                map.put("mineDatetime", maxMineDateTime);
                map.put("remoteDatetime", DateUtils.getNextDay(maxMineDateTime, "2"));
                List<Maindata> list = maindataRemoteService.getAllData(map);
                HashMap<String, Object> insertMap = new HashMap<>();

                for (Maindata maindata : list) {
                    insertMap.put("mdId", maindata.getMdId());
                    insertMap.put("sid", maindata.getSid());
                    insertMap.put("sId", "");
                    insertMap.put("stationId", maindata.getStationId());
                    insertMap.put("mdValue", maindata.getMdValue());
                    insertMap.put("mdDatetime", maindata.getMdDatetime());
                    insertMap.put("mdSn", maindata.getMdSn());
                    maindataMineService.insertData(insertMap);
                }
            }
        }

        log.info("定时任务结束时间: " + LocalDateTime.now());
    }

}

如上代码依旧可以优化 , 我们可以采用批量数据导入的方式进行 , 这样的话我们分批进行批量数据导入效率会更高~

上一篇:SpringBoot+ShardingSphere彻底解决生产环境数据库字段加解密问题


下一篇:发送纯文本qq邮件