hadoop处理Excel通话记录

前面我们所写mr程序的输入都是文本文件,但真正工作中我们难免会碰到需要处理其它格式的情况,下面以处理excel数据为例

1、项目需求

有刘超与家庭成员之间的通话记录一份,存储在Excel文件中,如下面的数据集所示。我们需要基于这份数据,统计每个月每个家庭成员给自己打电话的次数,并按月份输出到不同文件

下面是部分数据,数据格式:编号  联系人  电话  时间

hadoop处理Excel通话记录

2、分析

统计每个月每个家庭成员给自己打电话的次数这一点很简单,我们之前已经写过几个这样的程序。实现需求的麻烦点在于文件的输入是Excel文件,输出要按月份输出到不同文件。这就要我们实现自定义的InputFormat和OutputFormat

3、实现

首先,输入文件是Excel格式,我们可以借助poi来解析Excel文件,我们先来实现一个Excel的解析类(ExcelParser)

package com.buaa;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.poi.hssf.usermodel.HSSFSheet;
import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.Row; /**
* @ProjectName HandleExcelPhone
* @PackageName com.buaa
* @ClassName ExcelParser
* @Description 解析excel
* @Author 刘吉超
* @Date 2016-04-24 16:59:28
*/
public class ExcelParser {
private static final Log logger = LogFactory.getLog(ExcelParser.class); /**
* 解析is
*
* @param is 数据源
* @return String[]
*/
public static String[] parseExcelData(InputStream is) {
// 结果集
List<String> resultList = new ArrayList<String>(); try {
// 获取Workbook
HSSFWorkbook workbook = new HSSFWorkbook(is);
// 获取sheet
HSSFSheet sheet = workbook.getSheetAt(0); Iterator<Row> rowIterator = sheet.iterator(); while (rowIterator.hasNext()) {
// 行
Row row = rowIterator.next();
// 字符串
StringBuilder rowString = new StringBuilder(); Iterator<Cell> colIterator = row.cellIterator();
while (colIterator.hasNext()) {
Cell cell = colIterator.next(); switch (cell.getCellType()) {
case Cell.CELL_TYPE_BOOLEAN:
rowString.append(cell.getBooleanCellValue() + "\t");
break;
case Cell.CELL_TYPE_NUMERIC:
rowString.append(cell.getNumericCellValue() + "\t");
break;
case Cell.CELL_TYPE_STRING:
rowString.append(cell.getStringCellValue() + "\t");
break;
}
} resultList.add(rowString.toString());
}
} catch (IOException e) {
logger.error("IO Exception : File not found " + e);
} return resultList.toArray(new String[0]);
}
}

然后,我们需要定义一个从Excel读取数据的InputFormat类,命名为ExcelInputFormat,实现代码如下

package com.buaa;

import java.io.IOException;
import java.io.InputStream; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit; /**
* @ProjectName HandleExcelPhone
* @PackageName com.buaa
* @ClassName ExcelInputFormat
* @Description TODO
* @Author 刘吉超
* @Date 2016-04-28 17:31:54
*/
public class ExcelInputFormat extends FileInputFormat<LongWritable,Text>{
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException { return new ExcelRecordReader();
} public class ExcelRecordReader extends RecordReader<LongWritable, Text> {
private LongWritable key = new LongWritable(-1);
private Text value = new Text();
private InputStream inputStream;
private String[] strArrayofLines; @Override
public void initialize(InputSplit genericSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
// 分片
FileSplit split = (FileSplit) genericSplit;
// 获取配置
Configuration job = context.getConfiguration(); // 分片路径
Path filePath = split.getPath(); FileSystem fileSystem = filePath.getFileSystem(job); inputStream = fileSystem.open(split.getPath()); // 调用解析excel方法
this.strArrayofLines = ExcelParser.parseExcelData(inputStream);
} @Override
public boolean nextKeyValue() throws IOException, InterruptedException {
int pos = (int) key.get() + 1; if (pos < strArrayofLines.length){ if(strArrayofLines[pos] != null){
key.set(pos);
value.set(strArrayofLines[pos]); return true;
}
} return false;
} @Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return key;
} @Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
} @Override
public float getProgress() throws IOException, InterruptedException {
return 0;
} @Override
public void close() throws IOException {
if (inputStream != null) {
inputStream.close();
}
}
}
}

接下来,我们要定义一个ExcelOutputFormat类,用于实现按月份输出到不同文件中

package com.buaa;

import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils; /**
* @ProjectName HandleExcelPhone
* @PackageName com.buaa
* @ClassName ExcelOutputFormat
* @Description TODO
* @Author 刘吉超
* @Date 2016-04-28 17:24:23
*/
public class ExcelOutputFormat extends FileOutputFormat<Text,Text> {
// MultiRecordWriter对象
private MultiRecordWriter writer = null; @Override
public RecordWriter<Text,Text> getRecordWriter(TaskAttemptContext job) throws IOException,
InterruptedException {
if (writer == null) {
writer = new MultiRecordWriter(job, getTaskOutputPath(job));
} return writer;
} private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {
Path workPath = null; OutputCommitter committer = super.getOutputCommitter(conf); if (committer instanceof FileOutputCommitter) {
workPath = ((FileOutputCommitter) committer).getWorkPath();
} else {
Path outputPath = super.getOutputPath(conf);
if (outputPath == null) {
throw new IOException("没有定义输出目录");
}
workPath = outputPath;
} return workPath;
} /**
* 通过key, value, conf来确定输出文件名(含扩展名)
*
* @param key
* @param value
* @param conf
* @return String
*/
protected String generateFileNameForKeyValue(Text key, Text value, Configuration conf){
// name + month
String[] records = key.toString().split("\t");
return records[1] + ".txt";
} /**
* 定义MultiRecordWriter
*/
public class MultiRecordWriter extends RecordWriter<Text,Text> {
// RecordWriter的缓存
private HashMap<String, RecordWriter<Text,Text>> recordWriters = null;
// TaskAttemptContext
private TaskAttemptContext job = null;
// 输出目录
private Path workPath = null; public MultiRecordWriter(TaskAttemptContext job, Path workPath) {
super();
this.job = job;
this.workPath = workPath;
this.recordWriters = new HashMap<String, RecordWriter<Text,Text>>();
} @Override
public void write(Text key, Text value) throws IOException, InterruptedException {
// 得到输出文件名
String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration());
RecordWriter<Text,Text> rw = this.recordWriters.get(baseName);
if (rw == null) {
rw = getBaseRecordWriter(job, baseName);
this.recordWriters.put(baseName, rw);
}
rw.write(key, value);
} private RecordWriter<Text,Text> getBaseRecordWriter(TaskAttemptContext job, String baseName)
throws IOException, InterruptedException {
Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job);
//key value 分隔符
String keyValueSeparator = "\t"; RecordWriter<Text,Text> recordWriter = null;
if (isCompressed) {
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
GzipCodec.class);
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); Path file = new Path(workPath, baseName + codec.getDefaultExtension()); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); recordWriter = new MailRecordWriter<Text,Text>(new DataOutputStream(codec
.createOutputStream(fileOut)), keyValueSeparator);
} else {
Path file = new Path(workPath, baseName); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); recordWriter = new MailRecordWriter<Text,Text>(fileOut, keyValueSeparator);
} return recordWriter;
} @Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
Iterator<RecordWriter<Text,Text>> values = this.recordWriters.values().iterator();
while (values.hasNext()) {
values.next().close(context);
}
this.recordWriters.clear();
} }
}
package com.buaa;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException; import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext; /**
* @ProjectName HandleExcelPhone
* @PackageName com.buaa
* @ClassName MailRecordWriter
* @Description TODO
* @Author 刘吉超
* @Date 2016-04-24 16:59:23
*/
public class MailRecordWriter< K, V > extends RecordWriter< K, V > {
// 编码
private static final String utf8 = "UTF-8";
// 换行
private static final byte[] newline;
static {
try {
newline = "\n".getBytes(utf8);//换行符 "/n"不对
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
// 输出数据
protected DataOutputStream out;
// 分隔符
private final byte[] keyValueSeparator; public MailRecordWriter(DataOutputStream out, String keyValueSeparator) {
this.out = out;
try {
this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
} public MailRecordWriter(DataOutputStream out) {
this(out, "/t");
} private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());
} else {
out.write(o.toString().getBytes(utf8));
}
} public synchronized void write(K key, V value) throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}
if (!nullKey) {
writeObject(key);
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
out.write(newline);
} public synchronized void close(TaskAttemptContext context) throws IOException {
out.close();
}
}

最后我们来编写Mapper类,实现 map() 函数;编写Reduce类,实现reduce函数;还有一些运行代码

package com.buaa;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; /**
* @ProjectName HandleExcelPhone
* @PackageName com.buaa
* @ClassName ExcelContactCount
* @Description TODO
* @Author 刘吉超
* @Date 2016-04-24 16:34:24
*/
public class ExcelContactCount extends Configured implements Tool { public static class PhoneMapper extends Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable key, Text value, Context context) throws InterruptedException, IOException {
Text pkey = new Text();
Text pvalue = new Text();
// 1.0, 老爸, 13999123786, 2014-12-20
String line = value.toString(); String[] records = line.split("\\s+");
// 获取月份
String[] months = records[3].split("-");
// 昵称+月份
pkey.set(records[1] + "\t" + months[1]);
// 手机号
pvalue.set(records[2]); context.write(pkey, pvalue);
}
} public static class PhoneReducer extends Reducer<Text, Text, Text, Text> { protected void reduce(Text Key, Iterable<Text> Values, Context context) throws IOException, InterruptedException {
Text phone = Values.iterator().next();
int phoneToal = 0; for(java.util.Iterator<Text> its = Values.iterator();its.hasNext();its.next()){
phoneToal++;
} Text pvalue = new Text(phone + "\t" + phoneToal); context.write(Key, pvalue);
}
} @Override
@SuppressWarnings("deprecation")
public int run(String[] args) throws Exception {
// 读取配置文件
Configuration conf = new Configuration(); // 判断输出路径,如果存在,则删除
Path mypath = new Path(args[1]);
FileSystem hdfs = mypath.getFileSystem(conf);
if (hdfs.isDirectory(mypath)) {
hdfs.delete(mypath, true);
} // 新建任务
Job job = new Job(conf,"Call Log");
job.setJarByClass(ExcelContactCount.class); // 输入路径
FileInputFormat.addInputPath(job, new Path(args[0]));
// 输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1])); // Mapper
job.setMapperClass(PhoneMapper.class);
// Reduce
job.setReducerClass(PhoneReducer.class); // 输出key类型
job.setOutputKeyClass(Text.class);
// 输出value类型
job.setOutputValueClass(Text.class); // 自定义输入格式
job.setInputFormatClass(ExcelInputFormat.class);
// 自定义输出格式
job.setOutputFormatClass(ExcelOutputFormat.class); return job.waitForCompletion(true) ? 0:1;
} public static void main(String[] args) throws Exception {
String[] args0 = {
"hdfs://ljc:9000/buaa/phone/phone.xls",
"hdfs://ljc:9000/buaa/phone/out/"
};
int ec = ToolRunner.run(new Configuration(), new ExcelContactCount(), args0);
System.exit(ec);
}
}

4、结果

hadoop处理Excel通话记录

通过这份数据很容易看出,刘超1月份与姐姐通话次数最多,19次

如果,您认为阅读这篇博客让您有些收获,不妨点击一下右下角的【推荐】。
如果,您希望更容易地发现我的新博客,不妨点击一下左下角的【关注我】。
如果,您对我的博客所讲述的内容有兴趣,请继续关注我的后续博客,我是【刘超★ljc】。

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

实现代码及数据:下载

上一篇:css 图片置灰


下一篇:django之class Meta