Hadoop MapReduce编程 API入门系列之Crime数据分析(二十五)(未完)

  不多说,直接上代码。

Hadoop MapReduce编程 API入门系列之Crime数据分析(二十五)(未完)

Hadoop MapReduce编程 API入门系列之Crime数据分析(二十五)(未完)

  一共12列,我们只需提取有用的列:第二列(犯罪类型)、第四列(一周的哪一天)、第五列(具体时间)和第七列(犯罪场所)。

思路分析

基于项目的需求,我们通过以下几步完成:

1、首先根据数据集,分别统计出不同犯罪类别在周时段内发生犯罪次数和不同区域在周时段内发生犯罪的次数。

2、然后根据第一步的输出结果,再按日期统计出每天每种犯罪类别在每个区域发生的犯罪次数。

3、将前两步的输出结果,按需求插入数据库,便于对犯罪数据的分析。

程序开发

我们要编写5个文件:

编写基类,MapReduceJobBase.java

数据处理类,DataFile.java

编写第一个任务类,SanFranciscoCrime.java

编写第二个任务类,SanFranciscoCrimePrepOlap.java

编写第三个任务,插入数据库类,LoadStarDB.java

Hadoop MapReduce编程 API入门系列之Crime数据分析(二十五)(未完)

Hadoop MapReduce编程 API入门系列之Crime数据分析(二十五)(未完)

Hadoop MapReduce编程 API入门系列之Crime数据分析(二十五)(未完)

Hadoop MapReduce编程 API入门系列之Crime数据分析(二十五)(未完)

Hadoop MapReduce编程 API入门系列之Crime数据分析(二十五)(未完)

    Hive那边的 数据库首先需要创建4个表,

分别为:category(name,cid)、

district(name,did)、

fact(fid,district_id,category_id,time_id,crimes)和

timeperiod(tpid,year,month,week,day)。

编译和执行MapReduce作业

  1、myclipse将项目编译和打包为crime.jar,使用SSH将crime.jar上传至hadoop的/home/hadoop/目录下。

  2、使用cd /home/hadoop/djt 切换到当前目录,通过命令行执行任务。

 2.1 首先执行第一个作业 SanFranciscoCrime.java。

hadoop    jar    crime.jar     zhouls.bigdata.myMapReduce.SanFranciscoCrime.SanFranciscoCrime

    2.2    然后执行第二个作业SanFranciscoCrimePrepOlap.java。

hadoop    jar    crime.jar    zhouls.bigdata.myMapReduce.SanFranciscoCrime.SanFranciscoCrimePrepOlap

      2.3      最后执行第三个作业LoadStarDB.java,将数据插入数据库。

hadoop     jar     crime.jar     zhouls.bigdata.myMapReduce.SanFranciscoCrime.LoadStarDB

运行结果

任务的最终结果插入数据库,数据结果如下图所示。字段分别为:区域主键district_id、类别主键category_id、时间主键time_id、犯罪次数crimes和主键fid。

Hadoop MapReduce编程 API入门系列之Crime数据分析(二十五)(未完)

代码

package zhouls.bigdata.myMapReduce.SanFranciscoCrime;
import java.text.DateFormat;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configured;

/**
*
* @function 在 MapReduce 基类中,定义基础成员变量,减少 MapReduce 主类的工作量
*
*
*/
public class MapReduceJobBase extends Configured
{

/**
* 犯罪类型在犯罪数据数组的下标为1的位置
*/
protected static final int CATEGORY_COLUMN_INDEX = 1;

/**
* 礼拜几在犯罪数据数组的下标为3的位置
*/
protected static final int DAY_OF_WEEK_COLUMN_INDEX = 3;

/**
* 日期在犯罪数据数组的下标为4的位置
*/
protected static final int DATE_COLUMN_INDEX = 4;

/**
* 犯罪区域在犯罪数据数组的下标为6的位置
*/
protected static final int DISTRICT_COLUMN_INDEX = 6;

/**
* 定义日期的数据格式
*/
protected static final DateFormat df = new SimpleDateFormat("MM/dd/yyyy");

/**
* 定义 map/reduce job结果中,日期的输出格式
*/
protected static final DateFormat outputDateFormat = new SimpleDateFormat("yyyy/MM/dd");

/**
* @function 将字符串格式的日期转换为自定义Date类型的日期
* @param value 包含完整的日期字符串
* @return Date类型的日期
* @throws ParseException
*/
protected static Date getDate(String value) throws ParseException
{
Date retVal = null;
String[] dp = value.split(" ");
if (dp.length > 0) {
retVal = df.parse(dp[0]);
}
return retVal;
}

}

package zhouls.bigdata.myMapReduce.SanFranciscoCrime;

import java.io.BufferedReader;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import com.opencsv.CSVReader;

/**
*
* @function 从 map/reduce的输出结果中读取并提取数据
*
*
*/
public abstract class DataFile
{

/**
* @function 从 map/reduce job 的输出结果,提取key值集合
* @param fn HDFS上的文件路径
* @return list key值的集合
* @throws IOException
*/
public static List<String> extractKeys(String fn,FileSystem fs) throws IOException
{
FSDataInputStream in = fs.open(new Path(fn));//打开文件
List<String> retVal = new ArrayList<String>();//新建存储key值的集合list
BufferedReader br = new BufferedReader(new InputStreamReader(in));
String line = br.readLine();//按行读取数据
while (line != null)
{
String[] lp = line.split("\t");
if (lp.length > 0)
{
retVal.add(lp[0]);//提取每行的第一个字段key
}
line = br.readLine();
}
br.close();
Collections.sort(retVal);//对key值进行排序
return retVal;
}

/**
* @function 将 csv文件格式的每行内容转换为数组返回
* @param 读取的一行数据
* @return array 数组
* @throws IOException
*/
public static String[] getColumns(String line) throws IOException
{
CSVReader reader = new CSVReader(new InputStreamReader(new ByteArrayInputStream(line.getBytes())));
String[] retVal = reader.readNext();
reader.close();
return retVal;
}

}

package zhouls.bigdata.myMapReduce.SanFranciscoCrime;

import java.io.IOException;

import java.text.MessageFormat;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
*
* 时段系统(bucketed system),在物料需求计划(MRP)、配销资源规划(DRP)或其他时程化(time-phased)的系统里,
* 所有时程化的资料都累积在同一时期,或称时段(buchet)。如果累积的时间是以周为时间单位,此系统就称为周时段(weekly buckets)。
* 周时段(weekly buckets)即是一种以周为单位的统计方式
* @function 统计每个事件在每个周时段内发生的次数
*
*
*/
public class SanFranciscoCrime extends MapReduceJobBase implements Tool
{

private static Logger log = Logger
.getLogger(SanFranciscoCrime.class.getCanonicalName());

/**
* CrimeMapper是一个公共的父类
*/
public static class CrimeMapper extends Mapper<LongWritable, Text, Text, Text>
{

protected int keyID = 0;

protected int valueID = 0;

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String line = value.toString();
try {
String[] col = DataFile.getColumns(line);
if (col != null)
{
// 防止数组超界
if (col.length >= (DISTRICT_COLUMN_INDEX + 1))
{
//过滤文件第一行头部名称
if (!"date".equalsIgnoreCase(col[valueID]))
{
Text tk = new Text();
tk.set(col[keyID]);
Text tv = new Text();
tv.set(col[valueID]);
context.write(tk, tv);
}
} else
{
log.warning(MessageFormat.format(
"Data {0} did not parse into columns.",
new Object[] { line }));
}
} else
{
log.warning(MessageFormat.format(
"Data {0} did not parse into columns.",
new Object[] { line }));
}
} catch (NumberFormatException nfe)
{
log.log(Level.WARNING, MessageFormat
.format("Expected {0} to be a number.\n",
new Object[] { line }), nfe);
} catch (IOException e) {
log.log(Level.WARNING, MessageFormat.format(
"Cannot parse {0} into columns.\n",
new Object[] { line }), e);
}
}
}

/**
* 输出key为犯罪类别,value为日期
*/
public static class CategoryMapByDate extends CrimeMapper
{
public CategoryMapByDate()
{
keyID = CATEGORY_COLUMN_INDEX;//key为犯罪类别
valueID = DATE_COLUMN_INDEX;//value为日期
}
}

/**
* 输出key为犯罪区域,value为日期
*/
public static class DistrictMapByDate extends CrimeMapper
{
public DistrictMapByDate()
{
keyID = DISTRICT_COLUMN_INDEX;//key为犯罪区域
valueID = DATE_COLUMN_INDEX;//value为日期
}
}

/**
* 统计并解析 Mapper 端的输出结果
*/
public static class CrimeReducerByWeek extends Reducer<Text, Text, Text, Text>
{

public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
{

List<String> incidents = new ArrayList<String>();
// 将values放入incidents列表中
for (Text value : values)
{
incidents.add(value.toString());
}
if (incidents.size() > 0)
{
//对incidents列表排序
Collections.sort(incidents);
java.util.Map<Integer, Integer> weekSummary = new HashMap<Integer, Integer>();
//因为是对1-3月数据分析,周时段(weekly buckets)最大为15,所以weekSummary长度为15即可
for (int i = 0; i < 16; i++)
{
weekSummary.put(i, 0);
}
//统计每个周时段(weekly buckets)内,该事件发生的次数
for (String incidentDay : incidents)
{
try
{
Date d = getDate(incidentDay);
Calendar cal = Calendar.getInstance();
cal.setTime(d);
int week = cal.get(Calendar.WEEK_OF_MONTH);//这个月的第几周
int month = cal.get(Calendar.MONTH);//第几个月,从0开始
//如果累积的时间是以周为时间单位,此系统就称为周时段(weekly buckets)。
//周时段的计算公式,最大为15,它只是一种统计方式,不必深究
int bucket = (month * 5) + week;
//统计每个周时段内,该事件发生的次数
if (weekSummary.containsKey(bucket))
{
weekSummary.put(bucket, new Integer(weekSummary
.get(bucket).intValue() + 1));
} else
{
weekSummary.put(bucket, new Integer(1));
}
} catch (ParseException pe)
{
log.warning(MessageFormat.format("Invalid date {0}",
new Object[] { incidentDay }));
}
}
// 将该事件在每个周时段内发生的次数生成字符串输出
StringBuffer rpt = new StringBuffer();
boolean first = true;
for (int week : weekSummary.keySet())
{
if (first)
{
first = false;
} else
{
rpt.append(",");
}
rpt.append(new Integer(weekSummary.get(week)).toString());
}
String list = rpt.toString();
Text tv = new Text();
tv.set(list);
//value为0-15周时段内,该事件发生的次数
context.write(key, tv);
}
}
}
@Override
public int run(String[] args) throws Exception
{
Configuration conf1 = new Configuration();

Path out1 = new Path(args[1]);

FileSystem hdfs1 = out1.getFileSystem(conf1);
if (hdfs1.isDirectory(out1))
{
hdfs1.delete(out1, true);
}

// 任务1
Job job1 = new Job(conf1, "crime");
job1.setJarByClass(SanFranciscoCrime.class);

job1.setMapperClass(CategoryMapByDate.class);
job1.setReducerClass(CrimeReducerByWeek.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job1, new Path(args[0]));
FileOutputFormat.setOutputPath(job1, new Path(args[1]));
// 任务2
Configuration conf2 = new Configuration();
Path out2 = new Path(args[2]);
FileSystem hdfs2 = out2.getFileSystem(conf2);
if (hdfs2.isDirectory(out2))
{
hdfs2.delete(out2, true);
}
Job job2 = new Job(conf2, "crime");
job2.setJarByClass(SanFranciscoCrime.class);

job2.setMapperClass(DistrictMapByDate.class);
job2.setReducerClass(CrimeReducerByWeek.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job2, new Path(args[0]));
FileOutputFormat.setOutputPath(job2, new Path(args[2]));

// 构造一个 cJob1
ControlledJob cJob1 = new ControlledJob(conf1);
//设置 MapReduce job1
cJob1.setJob(job1);

// 构造一个 cJob2
ControlledJob cJob2 = new ControlledJob(conf2);
//设置 MapReduce job2
cJob2.setJob(job2);

//cJob2.addDependingJob(cJob1);// cjob2依赖cjob1

// 定义job管理对象
JobControl jobControl = new JobControl("12");

//把两个构造的job加入到JobControl中
jobControl.addJob(cJob1);
jobControl.addJob(cJob2);

//启动线程运行任务
Thread t = new Thread(jobControl);
t.start();
while (true)
{
if (jobControl.allFinished())
{
jobControl.stop();
break;
}

}
return 0;

}

public static void main(String[] args) throws Exception
{
String[] args0 =
{
"hdfs://HadoopMaster:9000/middle/crime/crime.csv",
"hdfs://HadoopMaster:9000/middle/test/out1/",
"hdfs://HadoopMaster:9000/middle/test/out2/" };
int ec = ToolRunner.run(new Configuration(), new SanFranciscoCrime(), args0);
System.exit(ec);
}

}

package zhouls.bigdata.myMapReduce.SanFranciscoCrime;

import java.io.IOException;

import java.net.URI;
import java.text.MessageFormat;
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
*
* @function 统计每天每种犯罪类型在每个区域发生的次数
*
*
*/
public class SanFranciscoCrimePrepOlap extends MapReduceJobBase implements Tool
{

private static Logger log = Logger.getLogger(SanFranciscoCrimePrepOlap.class.getCanonicalName());
private static List<String> categories = null;
private static List<String> districts = null;
private static final java.util.Map<String, Integer> categoryLookup = new HashMap<String, Integer>();
private static final java.util.Map<String, Integer> districtLookup = new HashMap<String, Integer>();
public static abstract class Map extends Mapper<LongWritable, Text, Text, Text>
{
protected int keyID = 0;
protected int valueID = 0;
protected int value2ID = 0;

/**
* @function 将key值转换为规范的数据格式
* @param value 包含不规范的 key值
* @return 返回规范的key值
* @throws ParseException
*/
protected abstract String formatKey(String value) throws ParseException;

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String line = value.toString();
try
{
String[] col = DataFile.getColumns(line);//将读取的每行数据转换为数组
if (col != null)
{
if (col.length >= (DISTRICT_COLUMN_INDEX + 1))
{
Text tk = new Text();
tk.set(formatKey(col[keyID]));//将日期作为key值
Text tv = new Text();
StringBuffer sv = new StringBuffer();
sv.append("\"");
sv.append(col[valueID]);//犯罪区域
sv.append("\"");
sv.append(",");
sv.append("\"");
sv.append(col[value2ID]);//犯罪类型
sv.append("\"");
tv.set(sv.toString());
context.write(tk, tv);
} else
{
log.warning(MessageFormat.format("Data {0} did not parse into columns.", new Object[]{line}));
}
} else
{
log.warning(MessageFormat.format("Data {0} did not parse into columns.", new Object[]{line}));
}
} catch (NumberFormatException nfe)
{
log.log(Level.WARNING, MessageFormat.format("Expected {0} to be a number.\n", new Object[]{line}), nfe);
} catch (IOException e)
{
log.log(Level.WARNING, MessageFormat.format("Cannot parse {0} into columns.\n", new Object[]{line}), e);
} catch (ParseException e)
{
log.log(Level.WARNING, MessageFormat.format("Expected {0} to be a date but it was not.\n", new Object[]{line}), e);
}
}
}

/**
* @function 将 map 输入数据的日期作为key,犯罪区域和犯罪类型作为value,然后输出
*/
public static class DateMapByCategoryAndDistrict extends Map
{
public DateMapByCategoryAndDistrict()
{
keyID = DATE_COLUMN_INDEX;//代表日期下标
valueID = DISTRICT_COLUMN_INDEX;//代表犯罪区域下标
value2ID = CATEGORY_COLUMN_INDEX;//代表犯罪类型下标
}

@Override
protected String formatKey(String value) throws ParseException
{
return outputDateFormat.format(getDate(value));
}
}

public static class Reduce extends Reducer<Text, Text, Text, Text>
{
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
{
// 分配和初始化犯罪类型所在区域的二维数组
int[][] crimes = new int[categories.size()][districts.size()];
for (int i = 0; i < categories.size(); i++)
{
for (int j = 0; j < districts.size(); j++)
{
crimes[i][j] = 0;
}
}
//统计犯罪类型/区域二维数组的值(即每种犯罪类型在每个区域发生的次数)
for (Text crime:values)
{
String[] cols = DataFile.getColumns(crime.toString());
if (cols.length == 2)
{
if (categoryLookup.containsKey(cols[1]))
{
if (districtLookup.containsKey(cols[0]))
{
int cat = categoryLookup.get(cols[1]);
int dist = districtLookup.get(cols[0]);
crimes[cat][dist]++;
} else
{
log.warning(MessageFormat.format("District {0} not found.", new Object[]{cols[0]}));
}
} else
{
log.warning(MessageFormat.format("Category {0} not found.", new Object[]{cols[1]}));
}
} else
{
log.warning(MessageFormat.format("Input {0} was in unexpected format", new Object[]{crime}));
}
}
//将非0二维数组的犯罪类别下标,犯罪区域下标,犯罪次数作为value输出
for (int i = 0; i < categories.size(); i++)
{
for (int j = 0; j < districts.size(); j++)
{
if (crimes[i][j] > 0)
{
StringBuffer sv = new StringBuffer();
sv.append(new Integer(i).toString());//犯罪类别下标
sv.append(",");
sv.append(new Integer(j).toString());//犯罪区域下标
sv.append(",");
sv.append(new Integer(crimes[i][j]));//犯罪次数
Text tv = new Text();
tv.set(sv.toString());
context.write(key, tv);
}
}
}
}
}
/**
* @function 加载已经生成的 犯罪类别数据和犯罪区域数据,并将这些数据排序后存入Map
* @param categoryReport SanFranciscoCrime job任务输出犯罪类别的文件路径
* @param districtReport SanFranciscoCrime job任务输出犯罪区域的文件路径
* @throws IOException
*/
private static void setup(String categoryReport, String districtReport,FileSystem fs) throws IOException
{
categories = DataFile.extractKeys(categoryReport,fs);
districts = DataFile.extractKeys(districtReport,fs);
int i = 0;
for (String category : categories)
{
categoryLookup.put(category, i++);
}
i = 0;
for (String district : districts)
{
districtLookup.put(district, i++);
}
}
@Override
public int run(String[] arg0) throws Exception
{
// TODO Auto-generated method stub
Configuration conf = new Configuration();

Path out = new Path(arg0[3]);

FileSystem hdfs = out.getFileSystem(conf);
if (hdfs.isDirectory(out))
{
hdfs.delete(out, true);
}

// 任务1
Job job = new Job(conf, "SanFranciscoCrimePrepOlap");
job.setJarByClass(SanFranciscoCrimePrepOlap.class);

job.setMapperClass(DateMapByCategoryAndDistrict.class);//Mapper
job.setReducerClass(Reduce.class);//Reducer
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path(arg0[0]));
FileOutputFormat.setOutputPath(job, new Path(arg0[3]));
job.waitForCompletion(true);//提交任务
return 0;
}

public static void main(String[] args) throws Exception
{
String[] args0 = {
"hdfs://HadoopMaster:9000/middle/crime/crime.csv",
"hdfs://HadoopMaster:9000/middle/test/out1/part-r-00000",
"hdfs://HadoopMaster:9000/middle/test/out2/part-r-00000",
"hdfs://HadoopMaster:9000/middle/test/out3/"};
if (args0.length == 4)
{
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://HadoopMaster:9000"), conf);
//调用setup
setup(args0[1], args0[2],fs);
//执行MapReduce任务
int ec = ToolRunner.run(conf, new SanFranciscoCrimePrepOlap(), args0);
System.exit(ec);
} else
{
System.err.println("\nusage: bin/hadoop jar sfcrime.hadoop.mapreduce.jobs-0.0.1-SNAPSHOT.jar SanFranciscoCrimePrepOlap path/to/category/report path/to/district/report path/to/input/data path/to/output/data");
}
}
}

package zhouls.bigdata.myMapReduce.SanFranciscoCrime;

import java.io.BufferedReader;

import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.DateFormat;
import java.text.MessageFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/***
* @function 从 MapReduce 任务中,提取数据,插入到mysql数据库
*
*/
public class LoadStarDB
{

private Connection db = null;//mysql数据库连接

private Map<String, Integer> lastPrimaryKey = new HashMap<String, Integer>();

private List<String> categories = null;//犯罪类别list

private List<String> districts = null;//犯罪区域list

//映射date主键的关系
private final java.util.Map<Date, Integer> timeperiodLookup = new HashMap<Date, Integer>();

private final DateFormat df = new SimpleDateFormat("MM/dd/yyyy");//插入数据库的日期格式

private final DateFormat kdf = new SimpleDateFormat("yyyy/MM/dd");//从map/reduce任务输出文件中,解析出此日期

/***
* @function 向数据库表中插入一条记录
* @param table 表名称
* @param row 包含插入字段的数据
* @return 返回此记录的主键id
* @throws SQLException
*/
private int insert(String table, DataRecord row) throws SQLException
{
int retVal = 0;
Statement s = db.createStatement();
StringBuffer sql = new StringBuffer();
sql.append("insert into ");
sql.append(table);
sql.append(" ");

sql.append(row.toString());
s.execute(sql.toString());
if (lastPrimaryKey.containsKey(table))
{
retVal = lastPrimaryKey.get(table) + 1;
lastPrimaryKey.put(table, retVal);
} else
{
lastPrimaryKey.put(table, 1);
retVal = 1;
}
return retVal;
}

/***
* @function 向数据库中插入一条犯罪类别记录
* @param category name字段对应的值
* @return 返回此记录的主键id
* @throws SQLException
*/
private int insertCategory(String category) throws SQLException
{
DataRecord dr = new DataRecord();
dr.put("name", category);
return insert("category", dr);
}

/***
* @function 向数据库中插入一条犯罪区域记录
* @param district name字段对应的值
* @return 返回此记录的主键id
* @throws SQLException
*/
private int insertDistrict(String district) throws SQLException
{
DataRecord dr = new DataRecord();
dr.put("name", district);
return insert("district", dr);
}

/***
* @function 将日期date拆分为字段 year, month, week, 和 day
* @param dr 包含date被拆分的字段
* @param d 需要拆分的date日期
*/
private void setTimePeriod(DataRecord dr, Date d)
{
Calendar cal = Calendar.getInstance();
cal.setTime(d);
dr.put("year", cal.get(Calendar.YEAR));
dr.put("month", cal.get(Calendar.MONTH));
dr.put("week", cal.get(Calendar.WEEK_OF_MONTH));
dr.put("day", cal.get(Calendar.DAY_OF_MONTH));
}

/***
* @function 如果日期date已经存在表中,返回主键id,如果不存在,则插入数据库并返回主键id
* @param d 日期date
* @return 返回此日期对应的主键id
* @throws SQLException
*/
private int insertTimePeriod(Date d) throws SQLException
{
int retVal = 0;
if (timeperiodLookup.containsKey(d))
{
retVal = timeperiodLookup.get(d);
} else
{
DataRecord dr = new DataRecord();
setTimePeriod(dr, d);
retVal = insert("timeperiod", dr);
timeperiodLookup.put(d, retVal);
}
return retVal;
}

/***
* @function 将数据记录插入fact表中
* @param districtId 犯罪区域外键id
* @param categoryId 犯罪类别外键id
* @param timeId 日期外键id
* @param crimes 在某一日期 某一区域 发生某一犯罪类别的总犯罪次数
* committed in this district of this category at his time*
* @throws SQLException
*/
private void insertFact(int districtId, int categoryId, int timeId, int crimes) throws SQLException
{
DataRecord dr = new DataRecord();
dr.put("district_id", districtId);
dr.put("category_id", categoryId);
dr.put("time_id", timeId);
dr.put("crimes", crimes);
insert("fact", dr);
}

/***
* @function 从SanFrancisco Crime map/reduce job输出结果中,读取数据
* @param categoryReport 犯罪类别文件路径
* @param districtReport 犯罪区域文件路径
* @throws IOException*
* @throws SQLException
*/
private void setup(String categoryReport, String districtReport,FileSystem fs) throws IOException, SQLException
{
categories = DataFile.extractKeys(categoryReport,fs);
districts = DataFile.extractKeys(districtReport,fs);
for (String category : categories)
{
insertCategory(category);
}
for (String district : districts)
{
insertDistrict(district);
}
}

/***
* @function 清空name表中的所有记录
* @param name 表名称
* @throws SQLException
*/
private void truncate(String name) throws SQLException
{
Statement s = db.createStatement();
s.execute("truncate table ".concat(name));
s.close();
}

/***
* @function 调用truncate()方法,清空表记录
* @throws SQLException
*/
private void reset() throws SQLException
{
truncate("fact");
truncate("category");
truncate("district");
truncate("timeperiod");
}

/***
* @function 解析加载的数据
* @param categoryReport 犯罪类别文件路径
* @param districtReport 犯罪区域文件路径
* @param dbhost 数据库地址
* @param dbname 数据库名称
* @param dbuser 用户名
* @param dbpassword 密码
* @throws ClassNotFoundException*
* @throws SQLException*
* @throws IOException
*/
private LoadStarDB(String categoryReport, String districtReport,
String dbhost, String dbname, String dbuser, String dbpassword,FileSystem fs)
throws ClassNotFoundException, SQLException, IOException
{
Class.forName("com.mysql.jdbc.Driver");
String cs = MessageFormat
.format("jdbc:mysql://192.168.80.128:3306/test?user=root&password=root&autoReconnect=true",
new Object[] { dbhost, dbname, dbuser, dbpassword });
db = DriverManager.getConnection(cs);
reset();
setup(categoryReport, districtReport,fs);
}

/***
*
* @function 处理 SanFranciscoCrimPrepOlap map/reduce job任务输出结果,填充 timeperiod表和fact表
* @param dataFile 文件路径
* @throws IOException*
* @throws ParseException
*/
private void processData(String dataFile,FileSystem fs) throws IOException,ParseException
{
FSDataInputStream in = fs.open(new Path(dataFile));//打开数据流
BufferedReader br = new BufferedReader(new InputStreamReader(in));//读取数据
String line = br.readLine();
while (line != null)
{
String[] lp = line.split("\t");
if (lp.length > 0)
{
Date d = kdf.parse(lp[0]);//日期
String[] data = DataFile.getColumns(lp[1]);
if (data.length == 3)
{
try
{
int categoryId = Integer.parseInt(data[0]) + 1;//犯罪类别id
int districtId = Integer.parseInt(data[1]) + 1;//犯罪区域id
int crimes = Integer.parseInt(data[2]);//犯罪次数
int timeId = insertTimePeriod(d);//时间id
insertFact(districtId, categoryId, timeId, crimes);//插入fact表
} catch (NumberFormatException nfe)
{
System.err.println("invalid data: " + line);
} catch (SQLException e)
{
e.printStackTrace();
}
} else
{
System.err.println("invalid data: " + line);
}
}
line = br.readLine();
}
br.close();
}

/***
* @function 运行job任务
* @param args
* @throws IOException
* */
public static void main(String[] args) throws IOException
{
String[] args0 =
{
"hdfs://HadoopMaster:9000/middle/crime/out1/part-r-00000",
"hdfs://HadoopMaster:9000/middle/crime/out2/part-r-00000",
"hdfs://HadoopMaster:9000/middle/crime/out3/part-r-00000",
"192.168.80.128:3306",
"test",
"root",
"root"};
if (args0.length == 7)
{
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://HadoopMaster:9000"), conf);
try
{
LoadStarDB m = new LoadStarDB(args0[0], args0[1], args0[3],args0[4], args0[5], args0[6],fs);
m.processData(args0[2],fs);
} catch (ClassNotFoundException e)
{
e.printStackTrace();
} catch (SQLException e)
{
e.printStackTrace();
} catch (IOException e)
{
e.printStackTrace();
} catch (ParseException e)
{
e.printStackTrace();
}
} else {
System.err
.println("\nusage: java -jar sfcrime.hadoop.mapreduce.jobs-0.0.1-SNAPSHOT.jar com.dynamicalsoftware.olap.etl.LoadStarDB path/to/category/report path/to/district/report path/to/star/data dbhost dbname dbuser dbpassword\n");
}
}

/***
* 生成一条数据记录
*/
class DataRecord extends HashMap<String, Object>
{
@Override
public String toString()
{
StringBuffer retVal = new StringBuffer();
// 生成表的数据字段
retVal.append("(");
boolean first = true;
for (String key : keySet())
{
if (first)
{
first = false;
} else
{
retVal.append(",");
}
retVal.append(key);
}
//生成表字段对应的值
retVal.append(") values (");
first = true;
for (String key : keySet())
{
Object o = get(key);
if (first)
{
first = false;
} else
{
retVal.append(",");
}
if (o instanceof Long)
{
retVal.append(((Long) o).toString());
} else if (o instanceof Integer)
{
retVal.append(((Integer) o).toString());
} else if (o instanceof Date)
{
Date d = (Date) o;
retVal.append("'");
retVal.append(df.format(d));
retVal.append("'");
} else if (o instanceof String)
{
retVal.append("'");
retVal.append(o.toString());
retVal.append("'");
}
}
retVal.append(")");
//返回一条sql格式的数据记录
return retVal.toString();
}
}

}

上一篇:hql语句拼接的替换方式


下一篇:Hadoop MapReduce编程 API入门系列之MapReduce多种输入格式(十七)