mysql大数据量导入
mysql大数据量导入
数据生成
测试数据5000万
import java.io.*;
import java.util.Random;
public class GenerateDataTest {
public static void main(String[] args) throws IOException {
File file = new File("E:importData2.txt");
if (!file.exists()) {
file.createNewFile();
}
FileOutputStream fos = new FileOutputStream(file);
OutputStreamWriter osw = new OutputStreamWriter(fos, "utf-8");
BufferedWriter out = new BufferedWriter(osw);
Random r = new Random();
long mobile=13000000000L;
long i = 0L;
while(i<50000000L){
mobile++;
i++;
out.write(Long.toString(i));
out.write(",");
out.write(getChineseFamilyName()+getChineseGivenName());
out.write(",");
out.write(Long.toString(mobile));
out.newLine();
if(i%100000 ==0){
out.flush();
}
}
out.close();
System.out.println("数据生成到"+file);
}
public static String getChineseFamilyName(){
String str = null;
Random random=new Random(System.currentTimeMillis());
/* 598 百家姓 */
String[] Surname= {"赵","钱","孙","李","周","吴","郑","王","冯","陈","褚","卫","蒋","沈","韩","杨","朱","秦","尤","许",
"何","吕","施","张","孔","曹","严","华","金","魏","陶","姜","戚","谢","邹","喻","柏","水","窦","章","云","苏","潘","葛","奚","范","彭","郎",
"鲁","韦","昌","马","苗","凤","花","方","俞","任","袁","柳","酆","鲍","史","唐","费","廉","岑","薛","雷","贺","倪","汤","滕","殷",
"罗","毕","郝","邬","安","常","乐","于","时","傅","皮","卞","齐","康","伍","余","元","卜","顾","孟","平","黄","和",
"穆","萧","尹","姚","邵","湛","汪","祁","毛","禹","狄","米","贝","明","臧","计","伏","成","戴","谈","宋","茅","庞","熊","纪","舒",
"屈","项","祝","董","梁","杜","阮","蓝","闵","席","季","麻","强","贾","路","娄","危","江","童","颜","郭","梅","盛","林","刁","钟",
"徐","邱","骆","高","夏","蔡","田","樊","胡","凌","霍","虞","万","支","柯","昝","管","卢","莫","经","房","裘","缪","干","解","应",
"宗","丁","宣","贲","邓","郁","单","杭","洪","包","诸","左","石","崔","吉","钮","龚","程","嵇","邢","滑","裴","陆","荣","翁","荀",
"羊","于","惠","甄","曲","家","封","芮","羿","储","靳","汲","邴","糜","松","井","段","富","巫","乌","焦","巴","弓","牧","隗","山",
"谷","车","侯","宓","蓬","全","郗","班","仰","秋","仲","伊","宫","宁","仇","栾","暴","甘","钭","厉","戎","祖","武","符","刘","景",
"詹","束","龙","叶","幸","司","韶","郜","黎","蓟","溥","印","宿","白","怀","蒲","邰","从","鄂","索","咸","籍","赖","卓","蔺","屠",
"蒙","池","乔","阴","郁","胥","能","苍","双","闻","莘","党","翟","谭","贡","劳","逄","姬","申","扶","堵","冉","宰","郦","雍","却",
"璩","桑","桂","濮","牛","寿","通","边","扈","燕","冀","浦","尚","农","温","别","庄","晏","柴","瞿","阎","充","慕","连","茹","习",
"宦","艾","鱼","容","向","古","易","慎","戈","廖","庾","终","暨","居","衡","步","都","耿","满","弘","匡","国","文","寇","广","禄",
"阙","东","欧","殳","沃","利","蔚","越","夔","隆","师","巩","厍","聂","晁","勾","敖","融","冷","訾","辛","阚","那","简","饶","空",
"曾","毋","沙","乜","养","鞠","须","丰","巢","关","蒯","相","查","后","荆","红","游","郏","竺","权","逯","盖","益","桓","公","仉",
"督","岳","帅","缑","亢","况","郈","有","琴","归","海","晋","楚","闫","法","汝","鄢","涂","钦","商","牟","佘","佴","伯","赏","墨",
"哈","谯","篁","年","爱","阳","佟","言","福","南","火","铁","迟","漆","官","冼","真","展","繁","檀","祭","密","敬","揭","舜","楼",
"疏","冒","浑","挚","胶","随","高","皋","原","种","练","弥","仓","眭","蹇","覃","阿","门","恽","来","綦","召","仪","风","介","巨",
"木","京","狐","郇","虎","枚","抗","达","杞","苌","折","麦","庆","过","竹","端","鲜","皇","亓","老","是","秘","畅","邝","还","宾",
"闾","辜","纵","侴","万俟","司马","上官","欧阳","夏侯","诸葛","闻人","东方","赫连","皇甫","羊舌","尉迟","公羊","澹台","公冶","宗正",
"濮阳","淳于","单于","太叔","申屠","公孙","仲孙","轩辕","令狐","钟离","宇文","长孙","慕容","鲜于","闾丘","司徒","司空","兀官","司寇",
"南门","呼延","子车","颛孙","端木","巫马","公西","漆雕","车正","壤驷","公良","拓跋","夹谷","宰父","谷梁","段干","百里","东郭","微生",
"梁丘","左丘","东门","西门","南宫","第五","公仪","公乘","太史","仲长","叔孙","屈突","尔朱","东乡","相里","胡母","司城","张廖","雍门",
"毋丘","贺兰","綦毋","屋庐","独孤","南郭","北宫","王孙"};
int index=random.nextInt(Surname.length-1);
str = Surname[index]; //获得一个随机的姓氏
return str;
}
// getChineseGivenName方法具体实现如下
public static String getChineseGivenName() {
String str = null;
int highPos, lowPos;
Random random = new Random();
highPos = (176 + Math.abs(random.nextInt(71)));//区码,0xA0打头,从第16区开始,即0xB0=11*16=176,16~55一级汉字,56~87二级汉字
random=new Random();
lowPos = 161 + Math.abs(random.nextInt(94));//位码,0xA0打头,范围第1~94列
byte[] bArr = new byte[2];
bArr[0] = (new Integer(highPos)).byteValue();
bArr[1] = (new Integer(lowPos)).byteValue();
try {
str = new String(bArr, "GB2312");//区位码组合成汉字
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return str;
}
}
使用springbatch
依赖
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>${version}</version>
</dependency>
或(springboot)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
创建springbatch数据库
springbatch初始化脚本
-- Autogenerated: do not edit this file
CREATE TABLE BATCH_JOB_INSTANCE (
JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_NAME VARCHAR(100) NOT NULL,
JOB_KEY VARCHAR(32) NOT NULL,
constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ENGINE=InnoDB;
CREATE TABLE BATCH_JOB_EXECUTION (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_INSTANCE_ID BIGINT NOT NULL,
CREATE_TIME DATETIME(6) NOT NULL,
START_TIME DATETIME(6) DEFAULT NULL ,
END_TIME DATETIME(6) DEFAULT NULL ,
STATUS VARCHAR(10) ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED DATETIME(6),
JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,
constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (
JOB_EXECUTION_ID BIGINT NOT NULL ,
TYPE_CD VARCHAR(6) NOT NULL ,
KEY_NAME VARCHAR(100) NOT NULL ,
STRING_VAL VARCHAR(250) ,
DATE_VAL DATETIME(6) DEFAULT NULL ,
LONG_VAL BIGINT ,
DOUBLE_VAL DOUBLE PRECISION ,
IDENTIFYING CHAR(1) NOT NULL ,
constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_STEP_EXECUTION (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT NOT NULL,
STEP_NAME VARCHAR(100) NOT NULL,
JOB_EXECUTION_ID BIGINT NOT NULL,
START_TIME DATETIME(6) NOT NULL ,
END_TIME DATETIME(6) DEFAULT NULL ,
STATUS VARCHAR(10) ,
COMMIT_COUNT BIGINT ,
READ_COUNT BIGINT ,
FILTER_COUNT BIGINT ,
WRITE_COUNT BIGINT ,
READ_SKIP_COUNT BIGINT ,
WRITE_SKIP_COUNT BIGINT ,
PROCESS_SKIP_COUNT BIGINT ,
ROLLBACK_COUNT BIGINT ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED DATETIME(6),
constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_STEP_EXECUTION_SEQ (
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, ‘0‘ as UNIQUE_KEY) as tmp where not exists(select * from BATCH_STEP_EXECUTION_SEQ);
CREATE TABLE BATCH_JOB_EXECUTION_SEQ (
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, ‘0‘ as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_EXECUTION_SEQ);
CREATE TABLE BATCH_JOB_SEQ (
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_JOB_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, ‘0‘ as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_SEQ);
删除语句
-- Autogenerated: do not edit this file
DROP TABLE IF EXISTS BATCH_STEP_EXECUTION_CONTEXT ;
DROP TABLE IF EXISTS BATCH_JOB_EXECUTION_CONTEXT ;
DROP TABLE IF EXISTS BATCH_STEP_EXECUTION ;
DROP TABLE IF EXISTS BATCH_JOB_EXECUTION_PARAMS ;
DROP TABLE IF EXISTS BATCH_JOB_EXECUTION ;
DROP TABLE IF EXISTS BATCH_JOB_INSTANCE ;
DROP TABLE IF EXISTS BATCH_STEP_EXECUTION_SEQ ;
DROP TABLE IF EXISTS BATCH_JOB_EXECUTION_SEQ ;
DROP TABLE IF EXISTS BATCH_JOB_SEQ ;
业务表建表语句
-- auto-generated definition
create table user
(
id varchar(32) not null comment ‘主键‘
primary key,
name varchar(32) null comment ‘用户名‘,
mobile varchar(32) null comment ‘手机号‘
)
comment ‘测试用户表‘;
实体
import java.io.Serializable;
public class User implements Serializable {
private static final Long serialVersionUID = 8904705906008476310L;
private String id;
private String name;
private String mobile;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getMobile() {
return mobile;
}
public void setMobile(String mobile) {
this.mobile = mobile;
}
}
job配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.springframework.org/schema/beans"
xmlns:batch="http://www.springframework.org/schema/batch"
xsi:schemaLocation="http://www.springframework.org/schema/batch
http://www.springframework.org/schema/batch/spring-batch.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- 定义 dataSource -->
<!-- <bean id="dataSource" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="oracle.jdbc.driver.OracleDriver" />
<property name="url" value="jdbc:oracle:thin:@192.168.221.130:1521/orcl" />
<property name="username" value="springbatch" />
<property name="password" value="springbatch" />
</bean>
-->
<bean id="dataSource" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.cj.jdbc.Driver" />
<property name="url" value="jdbc:mysql://127.0.0.1:3306/spring-batch?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true" />
<property name="username" value="root" />
<property name="password" value="root" />
</bean>
<!-- 定义 jdbc 事务管理器 -->
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>
<!-- 定义 jobRepository, 用来持久化 job -->
<!--mysql-->
<batch:job-repository id="jobRepository" data-source="dataSource" transaction-manager="transactionManager"/>
<!--oracle-->
<batch:job-repository id="jobRepository" isolation-level-for-create="READ_COMMITTED" data-source="dataSource" transaction-manager="transactionManager"/>
<!-- 定义 jobLauncher, 来用运行 job -->
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
<!-- 定义 job -->
<batch:job id="loadFileJob" job-repository="jobRepository">
<batch:step id="loadFileStep">
<batch:tasklet>
<batch:chunk reader="loadFileReader" processor="loadFileProcessor" writer="loadFileWriter" commit-interval="5000"/>
</batch:tasklet>
</batch:step>
</batch:job>
<!-- 定义 reader -->
<bean id="loadFileReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
<!-- 指定要读取的文件 -->
<property name="resource" value="file:///E:importData.txt" />
<!-- 忽略以 header 和 footer 开头的行-->
<property name="comments">
<list>
<value>header</value>
<value>footer</value>
</list>
</property>
<!-- 文件编码方式 -->
<property name="encoding" value="UTF-8" />
<!-- 文件不存在则报错 -->
<property name="strict" value="true" />
<!-- 指定如何将行转成对象 -->
<property name="lineMapper" ref="lineMapper" />
</bean>
<bean id="lineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper" scope="step">
<property name="fieldSetMapper" ref="fieldSetMapper" />
<property name="lineTokenizer" ref="lineTokenizer" />
</bean>
<bean id="fieldSetMapper" class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper" scope="step">
<property name="targetType" value="com.lan.dto.User" />
</bean>
<bean id="lineTokenizer" class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer" scope="step">
<property name="delimiter" value="," />
<property name="names" value="id,name,mobile" />
</bean>
<!-- 定义 processor -->
<bean id="loadFileProcessor" class="org.springframework.batch.item.support.PassThroughItemProcessor" scope="step"/>
<!-- 定义 writer -->
<bean id="loadFileWriter" class="org.springframework.batch.item.database.JdbcBatchItemWriter" scope="step">
<property name="sql" value="insert into user values (:id,:name,:mobile)"/>
<property name="jdbcTemplate" ref="jdbcTemplate"/>
<property name="itemSqlParameterSourceProvider" ref="itemSqlParameterSourceProvider"/>
</bean>
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate" scope="step">
<constructor-arg ref="dataSource"></constructor-arg>
</bean>
<bean id="itemSqlParameterSourceProvider" class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider" scope="step"/>
</beans>
测试类
import java.util.HashMap;
import java.util.Map;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class App {
public static void main(String[] args) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException {
ApplicationContext context = new ClassPathXmlApplicationContext("jobs/lan/importdataJob.xml");
// job 和 job 参数
Map<String,JobParameter> parameters = new HashMap<>();
JobParameters jobParameters = new JobParameters(parameters);
Job job = context.getBean(Job.class);
// 运行 job
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
jobLauncher.run(job, jobParameters);
}
}
耗时
_5000万条,24m15s657ms