
    为了方便 MapReduce 直接訪问关系型数据库(Mysql,Oracle)。Hadoop提供了DBInputFormat和DBOutputFormat两个类。通过DBInputFormat类把数据库表数据读入到HDFS,依据DBOutputFormat类把MapReduce产生的结果集导入到数据库表中。
执行MapReduce时候报错:java.io.IOException: com.mysql.jdbc.Driver,通常是因为程序找不到mysql驱动包。解决方法是让每一个tasktracker执行MapReduce程序时都能够找到该驱动包。



(2)a)把包传到集群上: hadoop fs -put mysql-connector-java-5.1.0- bin.jar /hdfsPath/

b)在mr程序提交job前,加入语句:DistributedCache.addFileToClassPath(new Path(“/hdfsPath/mysql- connector-java-5.1.0-bin.jar”),conf);

mysql数据库存储到hadoop hdfs


DROP TABLE IF EXISTS `wu_testhadoop`;
CREATE TABLE `wu_testhadoop` (
`title` varchar(255) DEFAULT NULL,
`content` varchar(255) DEFAULT NULL,
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8; -- ----------------------------
-- Records of wu_testhadoop
-- ----------------------------
INSERT INTO `wu_testhadoop` VALUES ('1', '123', '122312');
INSERT INTO `wu_testhadoop` VALUES ('2', '123', '123456');







 public class MyWritable implements Writable {
// Some data
private int counter;
private long timestamp; public void write(DataOutput out) throws IOException {
} public void readFields(DataInput in) throws IOException {
counter = in.readInt();
timestamp = in.readLong();
} public static MyWritable read(DataInput in) throws IOException {
MyWritable w = new MyWritable();
return w;


public class MyWritable implements Writable, DBWritable {
// Some data
private int counter;
private long timestamp; //Writable#write() implementation
public void write(DataOutput out) throws IOException {
} //Writable#readFields() implementation
public void readFields(DataInput in) throws IOException {
counter = in.readInt();
timestamp = in.readLong();
} public void write(PreparedStatement statement) throws SQLException {
statement.setInt(1, counter);
statement.setLong(2, timestamp);
} public void readFields(ResultSet resultSet) throws SQLException {
counter = resultSet.getInt(1);
timestamp = resultSet.getLong(2);


package com.wyg.hadoop.mysql.bean;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable; public class DBRecord implements Writable, DBWritable{
private int id;
private String title;
private String content;
public int getId() {
return id;
} public void setId(int id) {
this.id = id;
} public String getTitle() {
return title;
} public void setTitle(String title) {
this.title = title;
} public String getContent() {
return content;
} public void setContent(String content) {
this.content = content;
} @Override
public void readFields(ResultSet set) throws SQLException {
this.id = set.getInt("id");
this.title = set.getString("title");
this.content = set.getString("content");
} @Override
public void write(PreparedStatement pst) throws SQLException {
pst.setInt(1, id);
pst.setString(2, title);
pst.setString(3, content);
} @Override
public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.title = Text.readString(in);
this.content = Text.readString(in);
} @Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, this.title);
Text.writeString(out, this.content);
} @Override
public String toString() {
return this.id + " " + this.title + " " + this.content;


package com.wyg.hadoop.mysql.mapper;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter; import com.wyg.hadoop.mysql.bean.DBRecord; @SuppressWarnings("deprecation")
public class DBRecordMapper extends MapReduceBase implements Mapper<LongWritable, DBRecord, LongWritable, Text>{ @Override
public void map(LongWritable key, DBRecord value,
OutputCollector<LongWritable, Text> collector, Reporter reporter)
throws IOException {
collector.collect(new LongWritable(value.getId()), new Text(value.toString()));
} }


package com.wyg.hadoop.mysql.db;
import java.io.IOException; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat; import com.wyg.hadoop.mysql.bean.DBRecord;
import com.wyg.hadoop.mysql.mapper.DBRecordMapper; public class DBAccess {
public static void main(String[] args) throws IOException {
JobConf conf = new JobConf(DBAccess.class);
Path path = new Path("hdfs://");
FileOutputFormat.setOutputPath(conf, path);
DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver", "jdbc:mysql://你的ip:3306/数据库名","username","password");
String [] fields = {"id", "title", "content"};
DBInputFormat.setInput(conf, DBRecord.class, "wu_testhadoop",
null, "id", fields);


1	1 123 122312
2 2 123 123456




package com.wyg.hadoop.mysql.mapper;

import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator; import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import com.wyg.hadoop.mysql.bean.DBRecord; public class WriteDB {
// Map处理过程
public static class Map extends MapReduceBase implements Mapper<Object, Text, Text, DBRecord> {
private final static DBRecord one = new DBRecord(); private Text word = new Text(); @Override public void map(Object key, Text value, OutputCollector<Text, DBRecord> output, Reporter reporter) throws IOException { String line = value.toString();
String[] infos = line.split(" ");
String id = infos[0].split(" ")[1];
one.setId(new Integer(id));
output.collect(word, one);
} } public static class Reduce extends MapReduceBase implements
Reducer<Text, DBRecord, DBRecord, Text> {
public void reduce(Text key, Iterator<DBRecord> values,
OutputCollector<DBRecord, Text> collector, Reporter reporter)
throws IOException {
DBRecord record = values.next();
collector.collect(record, new Text());


package com.wyg.hadoop.mysql.db;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat; import com.wyg.hadoop.mysql.bean.DBRecord;
import com.wyg.hadoop.mysql.mapper.WriteDB; public class DBInsert {
public static void main(String[] args) throws Exception { JobConf conf = new JobConf(WriteDB.class);
// 设置输入输出类型 conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(DBOutputFormat.class); // 不加这两句,通只是,可是网上给的样例没有这两句。
//Text, DBRecord
// 设置Map和Reduce类
// 设置输如文件夹
FileInputFormat.setInputPaths(conf, new Path("hdfs://"));
// 建立数据库连接
DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver", "jdbc:mysql://数据库ip:3306/数据库名称","username","password");
String[] fields = {"id","title","content" };
DBOutputFormat.setOutput(conf, "wu_testhadoop", fields);
} }


