本案例要实现的目标:
1、模拟修改配置,通过发指令的方式统计一个文件中出现的单词的字数。
案例代码结构如下:
在整个案例中需要有以下几类文件:
A:worker服务端,用于类似Mapreduce接收jar,接收配置文件,执行业务逻辑
B:程序客户端、用于组装配置文件、发送业务执行的命令(听过socket发送jarfile、jobconf、和job2run的命令)
代码结构,每个包和代码作用介绍
cn.toto.bigdata.mymr.task | TaskProcessor | 核心的主体执行程序 |
| ProcessLogic | 定义客户端调用必须实现的方法,相当于WebService中的接口规范 |
cn.toto.bigdata.mymr.io | InputFormat | 封装读文件的组件(接口用途) |
| DefaultInputFormat | 封装读文件的组件的实现类 |
| OutPutFormat | 封装写文件的组件(接口用途) |
| DefaultOutPutFormat | 封装写文件的组件的实现 |
cn.toto.bigdata.mymr.common | Constants | 常量定义 |
| Context | 应用上下文,用于存储计算单词出现字数的次数的中间变量 |
cn.toto.bigdata.mymr.userapp | UserLogic | 客户端对ProcessLogic规范的实现 |
| UserApp | 客户端主入口程序 |
cn.toto.bigdata.mymr.scheduler | Runner | 客户端UserApp执行命令是依赖的Runner类,通过这里面的Socket发送命令。 |
| WorkerClient | 客户端执行时需要用到的client相关的代码 |
| WorkerServer | UserApp执行时,需要提前启动的服务端 |
| WorkerRunnable | 服务端执行的相关逻辑 |
运行条件:
1、将mapreduce-my-demo导出成test.jar放置在E:/test.jar下。 |
2、需要有用于统计用的文本文件a.txt,文件在E:\a.txt 内容截图类似: 假设a.txt内容为: The true nobility is in being superior to your previous self guess No great discovery was ever made without a bold Knowledge will give you power but character respect The sun is just rising in the morning of another day I I figure life is a gift and I don’t intend on wasting |
3、首先运行:WorkerServer,相当于是启动服务端的代码 |
4、再次运行:UserApp,相当于是客户端 |
5、最终的统计结果将显示在:E:/out.txt中。统计结果如下: nobility 1 but 1 gift 1 wasting 1 rising 1 don't 1 another 1 I 3 your 1 Knowledge 1 sun 1 without 1 life 1 The 2 character 1 and 1 of 1 power 1 just 1 day 1 you 1 on 1 No 1 a 2 give 1 figure 1 previous 1 in 2 will 1 made 1 was 1 is 3 being 1 bold 1 great 1 respect 1 morning 1 the 1 ever 1 superior 1 guess 1 discovery 1 true 1 self 1 to 1 intend 1
|
6、最终的日志将存储在:E:/task/task.log,最终的配置和工作用的jar也将会生成到这个目录下面,效果如下: 其中job.conf的内容为: 生成的task.log效果如下: |
接着:具体的代码实现如下:
cn.toto.bigdata.mymr.task | TaskProcessor | 核心的主体执行程序 |
| ProcessLogic | 定义客户端调用必须实现的方法,相当于WebService中的接口规范 |
TaskProcessor代码如下 package cn.toto.bigdata.mymr.task;
import java.util.HashMap; import java.util.logging.FileHandler; import java.util.logging.Level; import java.util.logging.Logger;
import cn.toto.bigdata.mymr.common.Constants; import cn.toto.bigdata.mymr.common.Context; import cn.toto.bigdata.mymr.io.InputFormat; import cn.toto.bigdata.mymr.io.OutPutFormat;
/** * 1、核心的主体执行程序 * 这里是任务执行者 */ public class TaskProcessor {
public static void main(String[] args) throws Exception { // 加载用户指定的所有配置参数到上下文对象中,同时读取配置文件 Context context = new Context(); //获取上下文中的配置文件 HashMap<String, String> conf = context.getConfiguration();
//通过打印日志的方式查看程序运行的结果 Logger logger = Logger.getLogger("TaskProcessor"); //设置日志的输出级别是INFO级别 logger.setLevel(Level.INFO); FileHandler fileHandler = new FileHandler("E:/task/task.log"); fileHandler.setLevel(Level.INFO); logger.addHandler(fileHandler); logger.info("context:" + context); logger.info("conf:" + conf);
//初始化文件读取组件 //从配置文件中获取用于读取的组件的class信息 Class<?> forName = Class.forName(conf.get(Constants.INPUT_FORMAT)); InputFormat inputFormat = (InputFormat) forName.newInstance(); inputFormat.init(context);
//用inputFormat组件读数据,并调用用户逻辑 Class<?> forName2 = Class.forName(conf.get(Constants.USER_LOGIC)); ProcessLogic userLogic = (ProcessLogic) forName2.newInstance(); //对每一行调用用户逻辑,并通过context将用户调用结果存储内部缓存 while(inputFormat.hasNext()) { Integer key = inputFormat.nextKey(); String value = inputFormat.nextValue(); userLogic.process(key, value, context); } userLogic.cleanUp(context);
//替用户输出结果 Class<?> forName3 = Class.forName(conf.get(Constants.OUTPUT_FORMAT)); OutPutFormat outputFormat = (OutPutFormat) forName3.newInstance(); outputFormat.write(context); } }
ProcessLogic代码如下: package cn.toto.bigdata.mymr.task;
import cn.toto.bigdata.mymr.common.Context;
/** * 1、规定的业务逻辑编写规范 * process() 和 cleanUp都没有写实现,这里的实现在客户端 */ public abstract class ProcessLogic {
/** * 这里的context存储处理后的结果值 * @param key :行号 * @param value :所在行的一行内容 * @param context :应用上下文的内容 */ public abstract void process(Integer key,String value,Context context);
/** * 通过CleanUp输出处理后的结果 */ public void cleanUp(Context context){} }
| ||
cn.toto.bigdata.mymr.io | InputFormat | 封装读文件的组件(接口用途) |
| DefaultInputFormat | 封装读文件的组件的实现类 |
| OutPutFormat | 封装写文件的组件(接口用途) |
| DefaultOutPutFormat | 封装写文件的组件的实现 |
package cn.toto.bigdata.mymr.io;
import cn.toto.bigdata.mymr.common.Context;
public abstract class InputFormat {
/** * 获取下一行要读的行的位置 */ public abstract int nextKey();
/** * 获取从文件中读取的到的行的信息 */ public abstract String nextValue();
/** * 从文件中读取到一行信息 */ public abstract String readLine() throws Exception;
/** * 判断是否还可以读取到下一行的内容 */ public abstract boolean hasNext() throws Exception;
/** * 初始化要读取的文件的路径和文件流 */ public abstract void init(Context context) throws Exception; } | ||
package cn.toto.bigdata.mymr.io;
import java.io.BufferedReader; import java.io.FileReader;
import cn.toto.bigdata.mymr.common.Constants; import cn.toto.bigdata.mymr.common.Context;
/** * 这里是默认的读取的实现类 */ public class DefaultInputFormat extends InputFormat{ //这里表示要读取的文件的路径 private String inputPath; private BufferedReader br = null; //这里的key是指文本中类似读取到的指针的偏移量,是行号的偏移量 private int key; //这里的value是指一行中的数据 private String value; //默认读取的行是第0行 private int lineNumber = 0;
@Override public void init(Context context) throws Exception { //获取要读的文件的路径 this.inputPath = context.getConfiguration().get(Constants.INPUT_PATH); //开始初始化输入流,只不过,这个流是从文件中获取的 this.br = new BufferedReader(new FileReader(inputPath)); }
@Override public int nextKey() { return this.key; }
@Override public String nextValue() { return this.value; }
@Override public boolean hasNext() throws Exception { String line = null; line = readLine();
//数据读取完成之后行号加一 this.key = lineNumber++; this.value = line;
return null != line; }
/** * 读取一行数据 */ @Override public String readLine() throws Exception { String line = br.readLine(); //如果读取到空了之后,将BufferedReader的值变成空 if (line == null) { br.close(); } return line; } } | ||
package cn.toto.bigdata.mymr.io;
import cn.toto.bigdata.mymr.common.Context;
/** * 用于输出结果的类 */ public abstract class OutPutFormat {
/** * 将结果写入文件中 */ public abstract void write(Context context) throws Exception;
/** * 关闭流 */ public abstract void cleanUp() throws Exception; }
| ||
package cn.toto.bigdata.mymr.io;
import java.io.BufferedWriter; import java.io.FileWriter; import java.util.HashMap; import java.util.Set; import java.util.Map.Entry;
import cn.toto.bigdata.mymr.common.Constants; import cn.toto.bigdata.mymr.common.Context;
public class DefaultOutPutFormat extends OutPutFormat{ BufferedWriter bw = null;
@Override public void write(Context context) throws Exception { String outputPath = context.getConfiguration().get(Constants.OUTPUT_PATH); HashMap<String, Integer> KVBuffer = context.getKVBuffer(); this.bw = new BufferedWriter(new FileWriter(outputPath)); Set<Entry<String, Integer>> entrySet = KVBuffer.entrySet(); for (Entry<String, Integer> entry : entrySet) { bw.write(entry.getKey() + "\t" + entry.getValue() + "\r"); } bw.flush(); }
@Override public void cleanUp() throws Exception { bw.close(); }
} | ||
cn.toto.bigdata.mymr.common | Constants | 常量定义 |
| Context | 应用上下文,用于存储计算单词出现字数的次数的中间变量 |
package cn.toto.bigdata.mymr.common;
public class Constants {
public static final String JAR_PATH = "jar.path";
public static final String JAR_FILE = "job.jar";
public static final String WORKER_HOST = "worker.host";
public static final String WORKER_PORT = "worker.port";
public static final String CONF_FILE = "job.conf";
public static final String INPUT_FORMAT = "input.format.class";
public static final String OUTPUT_FORMAT = "output.format.class";
public static final String INPUT_PATH = "input.path";
public static final String OUTPUT_PATH = "output.path";
public static final String TASK_PROCESSOR = "cn.toto.bigdata.mymr.task.TaskProcessor";
public static final String USER_LOGIC = "user.logic.class";
public static final String TASK_WORK_DIR = "E:/task";
} | ||
package cn.toto.bigdata.mymr.common;
import java.io.File; import java.io.FileInputStream; import java.io.ObjectInputStream; import java.util.HashMap;
/** * 应用上下文,通过这个内容获取配置文件 * 通过这个上下文最终输出结果 */ public class Context { private HashMap<String, Integer> KVBuffer = new HashMap<String, Integer>(); private HashMap<String, String> conf;
@SuppressWarnings("unchecked") public Context() throws Exception { //加载配置参数 File file = new File(Constants.TASK_WORK_DIR + "/" + Constants.CONF_FILE); if (file.exists()) { @SuppressWarnings("resource") ObjectInputStream oi = new ObjectInputStream(new FileInputStream(file)); this.conf = (HashMap<String, String>) oi.readObject(); } else { // throw new RuntimeException("read conf failed ...."); } }
/** * 通过这种变量最后输出结果 */ public void write(String k, Integer v) { KVBuffer.put(k, v); }
public HashMap<String, Integer> getKVBuffer() { return KVBuffer; }
public void setKVBuffer(HashMap<String, Integer> tmpKV) { this.KVBuffer = tmpKV; }
/** * 获取配置文件中的信息 */ public HashMap<String, String> getConfiguration() { return conf; }
/** * 在Context()构造函数里面已经有了conf的配置,这里再次传入说明配置可以让用户手动指定 */ public void setConfiguration(HashMap<String, String> configuration) { this.conf = configuration; } } | ||
cn.toto.bigdata.mymr.userapp | UserLogic | 客户端对ProcessLogic规范的实现 |
| UserApp | 客户端主入口程序 |
package cn.toto.bigdata.mymr.userapp;
import java.util.HashMap; import java.util.Set; import java.util.Map.Entry;
import cn.toto.bigdata.mymr.common.Context; import cn.toto.bigdata.mymr.task.ProcessLogic;
public class UserLogic extends ProcessLogic {
private HashMap<String, Integer> wordCount = new HashMap<String, Integer>();
@Override public void process(Integer key, String value, Context context) { String [] words = value.split(" "); for(String word : words) { Integer count = wordCount.get(word); if (count == null) { wordCount.put(word, 1); } else { wordCount.put(word, count + 1); } } }
public void cleanUp(Context context) { Set<Entry<String, Integer>> entrySet = wordCount.entrySet(); for(Entry<String, Integer> entry : entrySet) { context.write(entry.getKey(), entry.getValue()); } } } | ||
package cn.toto.bigdata.mymr.userapp;
import java.util.HashMap;
import cn.toto.bigdata.mymr.common.Constants; import cn.toto.bigdata.mymr.scheduler.Runner;
public class UserApp {
public static void main(String[] args) throws Exception { HashMap<String, String> conf = new HashMap<String,String>(); conf.put(Constants.INPUT_PATH, "E:/a.txt"); conf.put(Constants.OUTPUT_PATH, "E:/out.txt"); conf.put(Constants.INPUT_FORMAT, "cn.toto.bigdata.mymr.io.DefaultInputFormat"); conf.put(Constants.OUTPUT_FORMAT, "cn.toto.bigdata.mymr.io.DefaultOutPutFormat"); conf.put(Constants.JAR_PATH, "E:/test.jar"); conf.put(Constants.WORKER_HOST, "localhost"); conf.put(Constants.WORKER_PORT, "9889"); conf.put(Constants.USER_LOGIC, "cn.toto.bigdata.mymr.userapp.UserLogic");
Runner runner = new Runner(conf); runner.submit("localhost", 9889); } } | ||
cn.toto.bigdata.mymr.scheduler | Runner | 客户端UserApp执行命令是依赖的Runner类,通过这里面的Socket发送命令。 |
| WorkerClient | 客户端执行时需要用到的client相关的代码 |
| WorkerServer | UserApp执行时,需要提前启动的服务端 |
| WorkerRunnable | 服务端执行的相关逻辑 |
package cn.toto.bigdata.mymr.scheduler;
import java.io.FileOutputStream; import java.io.ObjectOutputStream; import java.util.HashMap;
import cn.toto.bigdata.mymr.common.Constants;
public class Runner { private HashMap<String, String> conf;
public Runner(HashMap<String, String> conf) { this.conf = conf; }
public void submit(String host,int port) throws Exception { ObjectOutputStream jobConfStream = new ObjectOutputStream(new FileOutputStream(Constants.CONF_FILE)); jobConfStream.writeObject(conf);
WorkerClient workerClient = new WorkerClient(conf); workerClient.submit(); } } | ||
package cn.toto.bigdata.mymr.scheduler;
import java.io.FileInputStream; import java.io.OutputStream; import java.net.Socket; import java.util.HashMap;
import cn.toto.bigdata.mymr.common.Constants;
public class WorkerClient {
private HashMap<String, String> conf; Socket socket = null; OutputStream so = null;
public WorkerClient(HashMap<String, String> conf) { this.conf = conf; }
public void submit() throws Exception { socket = new Socket(conf.get(Constants.WORKER_HOST), Integer.parseInt(conf.get(Constants.WORKER_PORT))); so = socket.getOutputStream();
String jarPath = conf.get(Constants.JAR_PATH);
// 发送jar包 byte[] buff = new byte[4096]; FileInputStream jarIns = new FileInputStream(jarPath); so.write("jarfile".getBytes()); int read = 0; while ((read=jarIns.read(buff)) != -1) { so.write(buff,0,read); } jarIns.close(); so.close(); socket.close();
// 发送job.conf文件 socket = new Socket(conf.get(Constants.WORKER_HOST), Integer.parseInt(conf.get(Constants.WORKER_PORT))); so = socket.getOutputStream();
FileInputStream confIns = new FileInputStream(Constants.CONF_FILE); so.write("jobconf".getBytes()); while ((read = confIns.read(buff)) != -1) { so.write(buff,0,read); } confIns.close(); so.close(); socket.close();
// 发送启动命令 socket = new Socket(conf.get(Constants.WORKER_HOST), Integer.parseInt(conf.get(Constants.WORKER_PORT))); so = socket.getOutputStream(); so.write("job2run".getBytes()); String shell = "java -cp E:/test.jar cn.toto.bigdata.mymr.task.TaskProcessor"; so.write(shell.getBytes()); so.close(); socket.close(); } } | ||
package cn.toto.bigdata.mymr.scheduler;
import java.net.ServerSocket; import java.net.Socket;
public class WorkerServer {
public static void main(String[] args) throws Exception { ServerSocket ssc = new ServerSocket(9889); System.out.println("Worker服务器启动-->9889"); while (true) { Socket accept = ssc.accept(); new Thread(new WorkerRunnable(accept)).start(); } } } | ||
package cn.toto.bigdata.mymr.scheduler;
import java.io.BufferedReader; import java.io.File; import java.io.FileOutputStream; import java.io.InputStream; import java.io.InputStreamReader; import java.net.Socket;
import cn.toto.bigdata.mymr.common.Constants;
public class WorkerRunnable implements Runnable { Socket socket; InputStream in = null; volatile long confSize = 0; volatile long jarSize = 0;
public WorkerRunnable(Socket socket) { this.socket = socket; }
@Override public void run() { try { this.in = socket.getInputStream(); byte[] protocal = new byte[7]; int read = in.read(protocal, 0, 7); if (read < 7) { System.out.println("客户端请求不符合协议规范......"); return; } String command = new String(protocal); switch (command) { case "jarfile": receiveJarFile(); break; case "jobconf": receiveConfFile(); break; case "job2run": runJob(); break; default: System.out.println("客户端请求不符合协议规范....."); socket.close(); break; }
} catch (Exception e) { e.printStackTrace(); }
}
private void receiveConfFile() throws Exception { System.out.println("开始接收conf文件"); FileOutputStream fo = new FileOutputStream(Constants.TASK_WORK_DIR + "/" + Constants.CONF_FILE); byte[] buff = new byte[4096]; int read = 0; while ((read = in.read(buff)) != -1) { confSize += read; fo.write(buff, 0, read); } fo.flush(); fo.close(); in.close(); socket.close();
}
private void receiveJarFile() throws Exception { System.out.println("开始接收jar文件"); FileOutputStream fo = new FileOutputStream(Constants.TASK_WORK_DIR + "/" + Constants.JAR_FILE); byte[] buff = new byte[4096]; int read = 0; while ((read = in.read(buff)) != -1) { jarSize += read; fo.write(buff, 0, read); } fo.flush(); fo.close(); in.close(); socket.close();
}
private void runJob() throws Exception {
byte[] buff = new byte[4096]; int read = in.read(buff); String shell = new String(buff, 0, read); System.out.println("接收到启动命令......." + shell); in.close(); socket.close(); Thread.sleep(500);
File jarFile = new File(Constants.TASK_WORK_DIR + "/" + Constants.JAR_FILE); File confFile = new File(Constants.TASK_WORK_DIR + "/" + Constants.CONF_FILE); System.out.println("jarfile 存在?" + jarFile.exists()); System.out.println("confFile 存在?" + confFile.exists()); System.out.println("jarfile可读?" + jarFile.canRead()); System.out.println("jarfile可写?" + jarFile.canWrite()); System.out.println("confFile可读?" + confFile.canRead()); System.out.println("confFile可写?" + confFile.canWrite());
System.out.println("jarFile.length():" + jarFile.length()); System.out.println("confFile.length():" + confFile.length());
/*if (jarFile.length() == jarSize && confFile.length() == confSize) { System.out.println("jar 和 conf 文件已经准备就绪......"); }*/ System.out.println("开始启动数据处理TaskProcessor......");
Process exec = Runtime.getRuntime().exec(shell); int waitFor = exec.waitFor();
InputStream errStream = exec.getErrorStream(); BufferedReader errReader = new BufferedReader(new InputStreamReader(errStream)); String inLine = null; /* * InputStream stdStream = exec.getInputStream(); BufferedReader * stdReader = new BufferedReader(new InputStreamReader(stdStream)); * while ((inLine = stdReader.readLine()) != null) { * System.out.println(inLine); } */ while ((inLine = errReader.readLine()) != null) { System.out.println(inLine); }
if (waitFor == 0) { System.out.println("task成功运行完毕....."); } else { System.out.println("task异常退出......"); }
}
} |