MapReduce的数据输出到MySQL

MapReduce的数据输出到MySQL

MapReduce的数据输出到MySQL中,首先要在javaBean中实现DBWritable接口,重写接口中的方法

看下官网对DBWritable接口的表述

Objects that are read from/written to a database should implement DBWritable. DBWritable, is similar to Writableexcept that the write(PreparedStatement) method takes a PreparedStatement, and readFields(ResultSet)takes a ResultSet

Implementations are responsible for writing the fields of the object to PreparedStatement, and reading the fields of the object from the ResultSet.

自己上手做一下:

//java获取mysql的数据,得到的ResultSet 集合,索引是从1开始
public void write(PreparedStatement statement) throws SQLException {
    
    //在statement中写入第一个数值
    statement.setString(1,this.keyword);
	//在statement中写入第二个数值
    statement.setInt(2,this.sum);
}

/**DBWritable
 * get data from resultset.And set in your fields
 * @param resultSet
 * @throws SQLException
 */
public void readFields(ResultSet resultSet) throws SQLException {
    
    //从resultSet中取出数值,注意,和write写入的顺序相同
    this.keyword = resultSet.getString(1);
    this.sum = resultSet.getInt(2);
}

在map阶段就是正常的流程,指定输入输出的类型,重写map方法,写出数据

在reduce阶段也是正常的流程,指定输入输出的类型,重写reduce方法,写出数据

在driver阶段的写法就会有些不同,使用DBConfiguration类的configureDB创建MySQL的连接

DBConfiguration.configureDB(conf, driverClass,url,user,password);

在创建连接之前就需要声明MySQL必须要的四个属性

user=root
password=000000
url=jdbc:mysql://192.168.3.119:3306/mydatabase
driverClass=com.mysql.jdbc.Driver

然后再最后的输出路径阶段,调用DBOutputFormat类设置

DBOutputFormat.setOutput(job,tableName,fields);

其中tableName是数据库中的表明

fields是表中字段的字典形式

public static String tableName = "test";
public static String [] fields = {"name","age"};

整个Driver类得代码:

package com.gis507.test.AISDataToMysqlTest;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.junit.Test;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;


public class Driver {

    public static String driverClass = "com.mysql.jdbc.Driver";
    public static String dbUrl = "jdbc:mysql://192.168.3.119:3306/mydatabase";
    public static String userName = "root";
    public static String passwd = "000000";
    
    public static String inputFilePath = "D:\\AAUser\\dic\\Files\\testFile\\test.txt";
    public static String tableName = "test";
	public static String [] fields = {"name","age"};
    
    public static void main(String[] args) throws IOException, ClassNotFoundException {

        Configuration conf = new Configuration();
        conf.set("mapreduce.map.memory.mb","1024");
        conf.set("mapreduce.reduce.memory.mb","1024");
        DBConfiguration.configureDB(conf, driverClass,url,user,password);
        
        Job job = Job.getInstance(conf);

        job.setJarByClass(TestBean.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setMapOutputKeyClass(Text.class);

        job.setMapperClass(Map.class);
        job.setReducerClass(Redece.class);

        FileInputFormat.setInputPaths(job,new Path(inputFilePath));
        DBOutputFormat.setOutput(job,tableName,fields);

        Boolean result = job.waitForCompletion(true);
        System.exit(result ? 0:1) 
}

补充:

上述这种写法将Mysql的四种属性信息暴露了出来,可以迭代到配置文件中,然后通过读取配置文件获得

将jdbc.properties文件放到项目的resource中

user=root
password=000000
url=jdbc:mysql://192.168.3.119:3306/mydatabase
driverClass=com.mysql.jdbc.Driver

然后通过类加载器读取properties配置文件

InputStream is = Driver.class.getClassLoader().getResourceAsStream("jdbc.properties");

声明Properties实例,加在数据流

Properties pros = new Properties();

pros.load(is);

读取MySQL的属性值

String user = pros.getProperty("user");
String password = pros.getProperty("password");
String url = pros.getProperty("url");
String driverClass = pros.getProperty("driverClass");

剩下的代码和上述的一样了,整体的代码块

    public static String inputFilePath = "D:\\AAUser\\dic\\Files\\testFile\\test.txt";
    public static String tableName = "test";
	public static String [] fields = {"name","age"};

public static void main(String[] args) throws IOException, ClassNotFoundException {
    
    //通过类加载器加在配置文件
    InputStream is = Driver.class.getClassLoader().getResourceAsStream("jdbc.properties");

    //实例化一个Properties对象
    Properties pros = new Properties();
	//加在数据流
    pros.load(is);

    //获取属性
    String user = pros.getProperty("user");
    String password = pros.getProperty("password");
    String url = pros.getProperty("url");
    String driverClass = pros.getProperty("driverClass");

    Class.forName(driverClass);

    Configuration conf = new Configuration();
    conf.set("mapreduce.map.memory.mb","1024");
    conf.set("mapreduce.reduce.memory.mb","1024");
    //创建和MySQL的连接
    DBConfiguration.configureDB(conf, driverClass,url,user,password);

    //获取一个job实例
    Job job = Job.getInstance(conf);

    job.setJarByClass(TestBean.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setMapOutputKeyClass(Text.class);

    job.setMapperClass(Map.class);
    job.setReducerClass(Redece.class);

    FileInputFormat.setInputPaths(job,new Path(inputFilePath));
    DBOutputFormat.setOutput(job,tableName,fields);

    job.waitForCompletion(true);
    System.exit(result ? 0:1)
   }
上一篇:记录---ClassCastException: java.util.Date cannot be cast to java.sql.Date报错解决方法


下一篇:JDBC_获取数据库的链接