MapReduce的数据输出到MySQL
MapReduce的数据输出到MySQL中,首先要在javaBean中实现DBWritable接口,重写接口中的方法
看下官网对DBWritable接口的表述
Objects that are read from/written to a database should implement DBWritable. DBWritable, is similar to Writableexcept that the write(PreparedStatement) method takes a PreparedStatement, and readFields(ResultSet)takes a ResultSet
Implementations are responsible for writing the fields of the object to PreparedStatement, and reading the fields of the object from the ResultSet.
自己上手做一下:
//java获取mysql的数据,得到的ResultSet 集合,索引是从1开始
public void write(PreparedStatement statement) throws SQLException {
//在statement中写入第一个数值
statement.setString(1,this.keyword);
//在statement中写入第二个数值
statement.setInt(2,this.sum);
}
/**DBWritable
* get data from resultset.And set in your fields
* @param resultSet
* @throws SQLException
*/
public void readFields(ResultSet resultSet) throws SQLException {
//从resultSet中取出数值,注意,和write写入的顺序相同
this.keyword = resultSet.getString(1);
this.sum = resultSet.getInt(2);
}
在map阶段就是正常的流程,指定输入输出的类型,重写map方法,写出数据
在reduce阶段也是正常的流程,指定输入输出的类型,重写reduce方法,写出数据
在driver阶段的写法就会有些不同,使用DBConfiguration类的configureDB创建MySQL的连接
DBConfiguration.configureDB(conf, driverClass,url,user,password);
在创建连接之前就需要声明MySQL必须要的四个属性
user=root
password=000000
url=jdbc:mysql://192.168.3.119:3306/mydatabase
driverClass=com.mysql.jdbc.Driver
然后再最后的输出路径阶段,调用DBOutputFormat类设置
DBOutputFormat.setOutput(job,tableName,fields);
其中tableName是数据库中的表明
fields是表中字段的字典形式
public static String tableName = "test";
public static String [] fields = {"name","age"};
整个Driver类得代码:
package com.gis507.test.AISDataToMysqlTest;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.junit.Test;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
public class Driver {
public static String driverClass = "com.mysql.jdbc.Driver";
public static String dbUrl = "jdbc:mysql://192.168.3.119:3306/mydatabase";
public static String userName = "root";
public static String passwd = "000000";
public static String inputFilePath = "D:\\AAUser\\dic\\Files\\testFile\\test.txt";
public static String tableName = "test";
public static String [] fields = {"name","age"};
public static void main(String[] args) throws IOException, ClassNotFoundException {
Configuration conf = new Configuration();
conf.set("mapreduce.map.memory.mb","1024");
conf.set("mapreduce.reduce.memory.mb","1024");
DBConfiguration.configureDB(conf, driverClass,url,user,password);
Job job = Job.getInstance(conf);
job.setJarByClass(TestBean.class);
job.setMapOutputValueClass(IntWritable.class);
job.setMapOutputKeyClass(Text.class);
job.setMapperClass(Map.class);
job.setReducerClass(Redece.class);
FileInputFormat.setInputPaths(job,new Path(inputFilePath));
DBOutputFormat.setOutput(job,tableName,fields);
Boolean result = job.waitForCompletion(true);
System.exit(result ? 0:1)
}
补充:
上述这种写法将Mysql的四种属性信息暴露了出来,可以迭代到配置文件中,然后通过读取配置文件获得
将jdbc.properties文件放到项目的resource中
user=root
password=000000
url=jdbc:mysql://192.168.3.119:3306/mydatabase
driverClass=com.mysql.jdbc.Driver
然后通过类加载器读取properties配置文件
InputStream is = Driver.class.getClassLoader().getResourceAsStream("jdbc.properties");
声明Properties实例,加在数据流
Properties pros = new Properties();
pros.load(is);
读取MySQL的属性值
String user = pros.getProperty("user");
String password = pros.getProperty("password");
String url = pros.getProperty("url");
String driverClass = pros.getProperty("driverClass");
剩下的代码和上述的一样了,整体的代码块
public static String inputFilePath = "D:\\AAUser\\dic\\Files\\testFile\\test.txt";
public static String tableName = "test";
public static String [] fields = {"name","age"};
public static void main(String[] args) throws IOException, ClassNotFoundException {
//通过类加载器加在配置文件
InputStream is = Driver.class.getClassLoader().getResourceAsStream("jdbc.properties");
//实例化一个Properties对象
Properties pros = new Properties();
//加在数据流
pros.load(is);
//获取属性
String user = pros.getProperty("user");
String password = pros.getProperty("password");
String url = pros.getProperty("url");
String driverClass = pros.getProperty("driverClass");
Class.forName(driverClass);
Configuration conf = new Configuration();
conf.set("mapreduce.map.memory.mb","1024");
conf.set("mapreduce.reduce.memory.mb","1024");
//创建和MySQL的连接
DBConfiguration.configureDB(conf, driverClass,url,user,password);
//获取一个job实例
Job job = Job.getInstance(conf);
job.setJarByClass(TestBean.class);
job.setMapOutputValueClass(IntWritable.class);
job.setMapOutputKeyClass(Text.class);
job.setMapperClass(Map.class);
job.setReducerClass(Redece.class);
FileInputFormat.setInputPaths(job,new Path(inputFilePath));
DBOutputFormat.setOutput(job,tableName,fields);
job.waitForCompletion(true);
System.exit(result ? 0:1)
}