java里面的设计模式:模板模式
把骨架(这个骨架就用通用算法进行抽象出来)定义好,具体实现交给子类去实现。
意思是说在模板里面只要把过程给定义好,具体怎么实现,这个模板方法是不关注的,具体的实现是又子类来完成的,可以有多个子类,每个子类实现的功能可以都不一样。
定义一个模板类:
package com.ruozedata.pattern.template;
public abstract class Mapper {
//setUp mapper clearUp三个方法都是抽象方法
//后面会用具体的子类来实现它
/**
* 初始化的操作 : 打开冰箱门
*/
abstract void setUp();
/**
* 具体的业务逻辑 : 把大象、狗、猪等放进去
*/
abstract void mapper();
/**
* 资源释放的操作 : 关上冰箱门
*/
abstract void clearUp();
/**
* 定义我们的模板方法的执行流程
* 这个run方法,会调用前面的方法,定义执行顺序:初始化、执行、结束
*/
public void run(){
setUp();
mapper();
clearUp();
}
}
定义一个子类,来实现模板抽象类里的抽象方法:
package com.ruozedata.pattern.template;
public class SubMapper extends Mapper{
void setUp() {
System.out.println("SubMapper.setUp");
}
void mapper() {
System.out.println("SubMapper.mapper");
}
void clearUp() {
System.out.println("SubMapper.clearUp");
}
}
再定义一个子类,来实现模板抽象类里的抽象方法,和上面的子类类似,但是可以实现不同的功能:
package com.ruozedata.pattern.template;
public class SubMapper2 extends Mapper{
void setUp() {
System.out.println("SubMapper2.setUp");
}
void mapper() {
System.out.println("SubMapper2.mapper");
}
void clearUp() {
System.out.println("SubMapper2.clearUp");
}
}
再定义一个类,去调用上面的已经实现好的子类:
package com.ruozedata.pattern.template;
public class Client {
public static void main(String[] args) {
SubMapper subMapper = new SubMapper();
subMapper.run();
SubMapper2 subMapper2 = new SubMapper2();
subMapper2.run();
}
}
运行结果:
SubMapper.setUp
SubMapper.mapper
SubMapper.clearUp
SubMapper2.setUp
SubMapper2.mapper
SubMapper2.clearUp
使用HDFS API来完成词频(wordcount wc)统计
功能拆解
词频统计就是给你一个或者一批文件,让你统计每个单词出现的次数。
当拿到一个功能的时候,千万不要想着代码怎么写,而是要进行功能、需求的分析:用中文描述清楚这个事情是做什么的 1 2 3 4等步骤,把步骤写清楚。然后再是开发:把1234翻译成代码而已。上层的架构,包括上面每个步骤用什么技术框架去实现,这才是重要的。所以思路一定要理清楚。
现在分析这个:使用HDFS API来完成词频(wordcount wc)统计
另外大数据里的东西就是三段论:
1.输入
2.处理
3.输出
所有的都是上面的流程。
对上面进行功能拆解:
第一步:输入:要去使用HDFS 的 API去读取文件;
第二步:处理:词频
1.读文件进来的内容是一行一行的,按照某个指定的分隔符进行行内容的拆分,就变成一堆单词了
2.给每个单词赋上出现的次数为1,如下:
比如这一行单词: wc,word,hello,word ,按照逗号分割,每个单词赋上出现的次数为1
(wc,1)
(word,1)
(hello,1)
(word,1)
上面每个单词出现的次数都是1,我们要的是每个单词出现的总次数,那么怎么给它们累加起来?
3.把上面的分割后的单词放到一个缓存中,如放到map中,map<单词,次数>,当word出现一次的时候是map<word,1>,当出现两次的时候就是map<word,2>,这个map就是缓存。
4.把这个map的缓存中的内容遍历处理 这个就是 词频。
第三步:输出 可以按照你想输出的地方进行输出
1.打印到本地
2.写到本地文件系统
3.写到HDFS文件系统
4.写到MySQL数据库…
上面的骨架已经定义好,下面来进行实现。
代码实现
1.首先定义一个抽象类或者接口Mapper,只是定义了功能,但并不关注具体怎么实现。
package com.ruozedata.hadoop.hdfs;
public interface Mapper {
/**
* map 一一操作 对每个元素进行操作
* 现在读进来是一行数据,对读进来的每行数据进行操作
*/
public void map(String line,Context context);
}
这个接口,只是定义了一个map方法,它的功能是传进来一行数据line,中间处理过程数据以及结果数据会放在context缓存中,所以可以理解line是一行数据,context是一个缓存,临时存放数据的一个东西。
2.定义这个缓存Context
这个缓存Context ,它有一个cacheMap 对象,这个对象是一个HashMap实例,有两个参数,第一个是key,第二个是value,可以理解为可以存放<key,value>的数据,就是缓存。
代码如下:
package com.ruozedata.hadoop.hdfs;
import java.util.HashMap;
import java.util.Map;
public class Context {
private Map<Object,Object> cacheMap = new HashMap<Object, Object>();
//get方法
public Map<Object, Object> getCacheMap() {
return cacheMap;
}
/**
* set方法
* 把数据写入到缓存中
* @param key 单词
* @param value 次数
*/
public void write(Object key, Object value) {
cacheMap.put(key, value);
}
/**
* 从缓存中获取单词对应的次数
* @param key 单词
* @return 次数
*/
public Object get(Object key) {
return cacheMap.get(key);
}
}
3.定义一个类WordCountMapper ,来实现上面的接口Mapper,具体怎么实现是由WordCountMapper来完成。
传进去一行数据和一个缓存,把这行数据按照空格进行分割,分割后是一个数组,然后对这个数组进行遍历,然后根据key的值去缓存中看看有没有对应的value,如果没有,则把这个单词,也就是key放进缓存中,并且value值给1。如果有这个单词key,那么把对应的value取出来再加1,然后再放进缓存中去。
package com.ruozedata.hadoop.hdfs;
public class WordCountMapper implements Mapper {
public void map(String line, Context context) {
String[] splits = line.split(" ");
for (String word : splits) {
Object value = context.get(word);
if (null == value){ //单词不存在的情况
context.write(word,1);
} else { //单词存在的情况 先把读取出来的值加1,去然后再写进去
context.write(word,Integer.parseInt(value.toString()) + 1);
}
}
}
}
4.定义一个类HDFSWCAPI01,读取文件,处理数据,然后输出。
定义一个configuration,然后给它配置相关的hdfs地址等,然后定义一个fileSystem,有了configuration,就有了fileSystem入口。
package com.ruozedata.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import javax.swing.plaf.synth.ColorType;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Map;
public class HDFSWCAPI01 {
public static void main(String[] args) throws Exception{
//Configuration和FileSystem
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS","hdfs://hadoop001:9000");
configuration.set("dfs.replication","1");
System.setProperty("HADOOP_USER_NAME","ruoze");
FileSystem fileSystem = FileSystem.get(configuration);
//读取数据 input
Path input = new Path("/hdfsapi/test3/");
WordCountMapper mapper = new WordCountMapper();
Context context = new Context();
//远程迭代,路径可能是一个文件或者文件夹,文件夹下面可能有多个文件包括子文件夹下面的文件
RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(input, true);
//如果有多个文件,则iterator.next为一个文件,每次循环一个文件,但是context缓存中会一直累加
while (iterator.hasNext()){
LocatedFileStatus status = iterator.next();
FSDataInputStream in = fileSystem.open(status.getPath());
BufferedReader read = new BufferedReader(new InputStreamReader(in));
String line = "";
while ((line = read.readLine()) != null){
System.out.println(line);
mapper.map(line,context);
}
read.close();
in.close();
//从context缓存中获取数据,把cacheMap中的<key,value>循环读取完。
Map<Object, Object> cacheMap = context.getCacheMap();
for (Map.Entry<Object, Object> entry : cacheMap.entrySet()) {
System.out.println(entry.getKey() + "\t" + entry.getValue());
}
}
}
}
忽略单词大小写以及多态的使用
如果忽略单词大小写去统计wc,只需要把上面的WordCountMapper的复制一份,CaseIgnoreWordCountMapper只需要加上line.toLowerCase()。
package com.ruozedata.hadoop.hdfs;
public class CaseIgnoreWordCountMapper implements Mapper {
public void map(String line, Context context) {
String[] splits = line.toLowerCase().split(" ");
for (String word : splits) {
Object value = context.get(word);
if (null == value){ //单词不存在的情况
context.write(word,1);
} else { //单词存在的情况 先把读取出来的值加1,去然后再写进去
context.write(word,Integer.parseInt(value.toString()) + 1);
}
}
}
}
然后加入了Mapper mapper = new CaseIgnoreWordCountMapper();进行调用,会忽略大小写,这个也是多态的使用。
package com.ruozedata.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Map;
public class HDFSWCAPI01 {
public static void main(String[] args) throws Exception{
//Configuration和FileSystem
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS","hdfs://hadoop001:9000");
configuration.set("dfs.replication","1");
System.setProperty("HADOOP_USER_NAME","ruoze");
FileSystem fileSystem = FileSystem.get(configuration);
//读取数据 input
Path input = new Path("/hdfsapi/test3/");
//WordCountMapper mapper = new WordCountMapper(); --这个不会忽略大小写
//下面这个加了Mapper,为多态的使用,如果把Mapper换成CaseIgnoreWordCountMapper,则不是
//CaseIgnoreWordCountMapper这个会忽略大小写
Mapper mapper = new CaseIgnoreWordCountMapper();
Context context = new Context();
//远程迭代,路径可能是一个文件或者文件夹,文件夹下面可能有多个文件包括子文件夹下面的文件
RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(input, true);
//如果有多个文件,则iterator.next为一个文件,每次循环一个文件,但是context缓存中会一直累加
while (iterator.hasNext()){
LocatedFileStatus status = iterator.next();
FSDataInputStream in = fileSystem.open(status.getPath());
BufferedReader read = new BufferedReader(new InputStreamReader(in));
String line = "";
while ((line = read.readLine()) != null){
System.out.println(line);
mapper.map(line,context);
}
read.close();
in.close();
System.out.println("\n\n");
//TODO... 后面可以考虑把结果写入到hdfs的某个文件中去
//Path result = new Path("/hdfsapi/result/result.txt");
//从context缓存中获取数据,把cacheMap中的<key,value>循环读取完。
Map<Object, Object> cacheMap = context.getCacheMap();
for (Map.Entry<Object, Object> entry : cacheMap.entrySet()) {
System.out.println(entry.getKey() + "\t" + entry.getValue());
}
}
}
}
代码改造
上面的写法还不够灵活,可以通过配置文件的形式进行改造,把需要的东西放在配置文件中,需要什么读什么,读进来之后通过反射的方式进行处理。
1.在resources中创建一个文件:wc.properties
INPUT_PATH=/hdfsapi/test3/
OUTPUT_PATH=/hdfsapi/result/
HDFS_URI=hdfs://hadoop001:9000
##具体用哪个子类来实现,写这个类,底层要用反射才能实现
MAPPER_CLASS=com.ruozedata.hadoop.hdfs.WordCountMapper
2.创建一个工具类ParamsUtils去读取上面的配置文件
package com.ruozedata.hadoop.hdfs;
import java.io.IOException;
import java.util.Properties;
public class ParamsUtils {
private static Properties properties = new Properties();
static {
try {
properties.load(ParamsUtils.class.getClassLoader().getResourceAsStream("wc.properties"));
} catch (IOException e) {
e.printStackTrace();
}
}
//get方法
public static Properties getProperties() {
return properties;
}
public static void main(String[] args) {
System.out.println(getProperties().getProperty("MAPPER_CLASS"));
System.out.println(getProperties().getProperty("INPUT_PATH"));
}
}
上面这个getProperties().getProperty(“MAPPER_CLASS”)都是写死的,可以用一个常量类来封装一下,不封装也可以。
package com.ruozedata.hadoop.hdfs;
public class Constants {
public static final String INPUT_PATH = "INPUT_PATH";
public static final String OUTPUT_PATH = "OUTPUT_PATH";
public static final String HDFS_URI = "HDFS_URI";
public static final String MAPPER_CLASS = "MAPPER_CLASS";
}
然后再调用常量类:
package com.ruozedata.hadoop.hdfs;
import java.io.IOException;
import java.util.Properties;
public class ParamsUtils {
private static Properties properties = new Properties();
static {
try {
properties.load(ParamsUtils.class.getClassLoader().getResourceAsStream("wc.properties"));
} catch (IOException e) {
e.printStackTrace();
}
}
//get方法
public static Properties getProperties() {
return properties;
}
public static void main(String[] args) {
// System.out.println(getProperties().getProperty("MAPPER_CLASS"));
// System.out.println(getProperties().getProperty("INPUT_PATH"));
System.out.println(getProperties().getProperty(Constants.MAPPER_CLASS));
System.out.println(getProperties().getProperty(Constants.INPUT_PATH));
System.out.println(getProperties().getProperty(Constants.HDFS_URI));
System.out.println(getProperties().getProperty(Constants.OUTPUT_PATH));
}
}
输出结果:
com.ruozedata.hadoop.hdfs.WordCountMapper
/hdfsapi/test3/
hdfs://hadoop001:9000
/hdfsapi/result/
最后再进行测试,HDFSWCAPI02 再HDFSWCAPI01基础上只是修改了通过properties 拿到input,以及通过反射拿到MAPPER_CLASS类,其他都不需要修改。
package com.ruozedata.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Map;
import java.util.Properties;
public class HDFSWCAPI02 {
public static void main(String[] args) throws Exception{
//拿到配置
Properties properties = ParamsUtils.getProperties();
Path input = new Path(properties.getProperty(Constants.INPUT_PATH));
//Configuration和FileSystem
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS","hdfs://hadoop001:9000");
configuration.set("dfs.replication","1");
System.setProperty("HADOOP_USER_NAME","ruoze");
FileSystem fileSystem = FileSystem.get(configuration);
//因为上面有了input,所以这里不需要了
// //读取数据 input
// Path input = new Path("/hdfsapi/test3/");
//因为配置文件中有MAPPER_CLASS,在下面会用到,所以在这里需要通过反射把它给拿出来
Class<?> aClass = Class.forName(properties.getProperty(Constants.MAPPER_CLASS));
//然后通过aClass反射,去new一个instance:aClass.newInstance()
//因前面Class<?> 的类型是不明确的,所以在这里
// 需要在aClass.newInstance()前面加一个强制转换(Mapper)成Mapper类型,就是转成它的父类
Mapper mapper = (Mapper) aClass.newInstance();
Context context = new Context();
//远程迭代,路径可能是一个文件或者文件夹,文件夹下面可能有多个文件包括子文件夹下面的文件
RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(input, true);
//如果有多个文件,则iterator.next为一个文件,每次循环一个文件,但是context缓存中会一直累加
while (iterator.hasNext()){
LocatedFileStatus status = iterator.next();
FSDataInputStream in = fileSystem.open(status.getPath());
BufferedReader read = new BufferedReader(new InputStreamReader(in));
String line = "";
while ((line = read.readLine()) != null){
System.out.println(line);
mapper.map(line,context);
}
read.close();
in.close();
System.out.println("\n\n");
//TODO... 后面可以考虑把结果写入到hdfs的某个文件中去
//Path result = new Path("/hdfsapi/result/result.txt");
//从context缓存中获取数据,把cacheMap中的<key,value>循环读取完。
Map<Object, Object> cacheMap = context.getCacheMap();
for (Map.Entry<Object, Object> entry : cacheMap.entrySet()) {
System.out.println(entry.getKey() + "\t" + entry.getValue());
}
}
}
}
这样的话所有的输入的东西都在配置文件中完成,包括用哪个类也指定好了,上面统计是区分大小写的,如果要调用区分大小写的类,只需要在wc.properties中把:
MAPPER_CLASS=com.ruozedata.hadoop.hdfs.WordCountMapper
修改成即可:
MAPPER_CLASS=com.ruozedata.hadoop.hdfs.CaseIgnoreWordCountMapper