模拟MapReduce编程的程序案例(用于统计文本中单词出现频率)

本案例要实现的目标:

1、模拟修改配置,通过发指令的方式统计一个文件中出现的单词的字数。

案例代码结构如下:

模拟MapReduce编程的程序案例(用于统计文本中单词出现频率)

在整个案例中需要有以下几类文件:

A:worker服务端,用于类似Mapreduce接收jar,接收配置文件,执行业务逻辑

B:程序客户端、用于组装配置文件、发送业务执行的命令(听过socket发送jarfile、jobconf、和job2run的命令)

代码结构,每个包和代码作用介绍

cn.toto.bigdata.mymr.task

TaskProcessor

核心的主体执行程序

 

ProcessLogic

定义客户端调用必须实现的方法,相当于WebService中的接口规范

cn.toto.bigdata.mymr.io

InputFormat

封装读文件的组件(接口用途)

 

DefaultInputFormat

封装读文件的组件的实现类

 

OutPutFormat

封装写文件的组件(接口用途)

 

DefaultOutPutFormat

封装写文件的组件的实现

cn.toto.bigdata.mymr.common

Constants

常量定义

 

Context

应用上下文,用于存储计算单词出现字数的次数的中间变量

cn.toto.bigdata.mymr.userapp

UserLogic

客户端对ProcessLogic规范的实现

 

UserApp

客户端主入口程序

cn.toto.bigdata.mymr.scheduler

Runner

客户端UserApp执行命令是依赖的Runner类,通过这里面的Socket发送命令。

 

WorkerClient

客户端执行时需要用到的client相关的代码

 

WorkerServer

UserApp执行时,需要提前启动的服务端

 

WorkerRunnable

服务端执行的相关逻辑

 

运行条件:

1、将mapreduce-my-demo导出成test.jar放置在E:/test.jar下。

2、需要有用于统计用的文本文件a.txt,文件在E:\a.txt

内容截图类似:

模拟MapReduce编程的程序案例(用于统计文本中单词出现频率)

假设a.txt内容为:

The true

nobility is

in being

superior to

your previous

self guess

No great

discovery

was ever

made without

a bold

Knowledge will

give you

power but

character respect

The sun

is just

rising in

the morning

of another

day I

I figure

life is

a gift

and I

don’t intend

on wasting

3、首先运行:WorkerServer,相当于是启动服务端的代码

4、再次运行:UserApp,相当于是客户端

5、最终的统计结果将显示在:E:/out.txt中。统计结果如下:

nobility     1

but   1

gift   1

wasting    1

rising        1

don't         1

another    1

I        3

your 1

Knowledge       1

sun   1

without    1

life    1

The  2

character 1

and  1

of      1

power       1

just  1

day   1

you   1

on     1

No    1

a       2

give  1

figure        1

previous   1

in      2

will   1

made        1

was  1

is      3

being        1

bold 1

great        1

respect    1

morning   1

the   1

ever 1

superior   1

guess        1

discovery 1

true 1

self   1

to     1

intend       1

 

6、最终的日志将存储在:E:/task/task.log,最终的配置和工作用的jar也将会生成到这个目录下面,效果如下:

模拟MapReduce编程的程序案例(用于统计文本中单词出现频率)

其中job.conf的内容为:

模拟MapReduce编程的程序案例(用于统计文本中单词出现频率)

生成的task.log效果如下:

模拟MapReduce编程的程序案例(用于统计文本中单词出现频率)

 

接着:具体的代码实现如下:

cn.toto.bigdata.mymr.task

TaskProcessor

核心的主体执行程序

 

ProcessLogic

定义客户端调用必须实现的方法,相当于WebService中的接口规范

TaskProcessor代码如下

package cn.toto.bigdata.mymr.task;

 

import java.util.HashMap;

import java.util.logging.FileHandler;

import java.util.logging.Level;

import java.util.logging.Logger;

 

import cn.toto.bigdata.mymr.common.Constants;

import cn.toto.bigdata.mymr.common.Context;

import cn.toto.bigdata.mymr.io.InputFormat;

import cn.toto.bigdata.mymr.io.OutPutFormat;

 

/**

 * 1、核心的主体执行程序

 * 这里是任务执行者

 */

public class TaskProcessor {

 

   public static void main(String[] args) throws Exception {

      // 加载用户指定的所有配置参数到上下文对象中,同时读取配置文件

      Context context = new Context();

      //获取上下文中的配置文件

      HashMap<String, String>  conf = context.getConfiguration();

     

      //通过打印日志的方式查看程序运行的结果

      Logger logger = Logger.getLogger("TaskProcessor");

      //设置日志的输出级别是INFO级别

      logger.setLevel(Level.INFO);

      FileHandler fileHandler = new FileHandler("E:/task/task.log");

      fileHandler.setLevel(Level.INFO);

      logger.addHandler(fileHandler);

      logger.info("context:" + context);

      logger.info("conf:" + conf);

     

      //初始化文件读取组件

      //从配置文件中获取用于读取的组件的class信息

      Class<?> forName = Class.forName(conf.get(Constants.INPUT_FORMAT));

      InputFormat inputFormat = (InputFormat) forName.newInstance();

      inputFormat.init(context);

     

      //inputFormat组件读数据,并调用用户逻辑

      Class<?> forName2 = Class.forName(conf.get(Constants.USER_LOGIC));

      ProcessLogic userLogic = (ProcessLogic) forName2.newInstance();

      //对每一行调用用户逻辑,并通过context将用户调用结果存储内部缓存

      while(inputFormat.hasNext()) {

         Integer key = inputFormat.nextKey();

         String value = inputFormat.nextValue();

         userLogic.process(key, value, context);

      }

      userLogic.cleanUp(context);

     

      //替用户输出结果

      Class<?> forName3 = Class.forName(conf.get(Constants.OUTPUT_FORMAT));

      OutPutFormat outputFormat = (OutPutFormat) forName3.newInstance();

      outputFormat.write(context);

   }

}

 

 

ProcessLogic代码如下:

package cn.toto.bigdata.mymr.task;

 

import cn.toto.bigdata.mymr.common.Context;

 

/**

 * 1、规定的业务逻辑编写规范

 * process()   cleanUp都没有写实现,这里的实现在客户端

 */

public abstract class ProcessLogic {

 

   /**

    * 这里的context存储处理后的结果值

    * @param key          :行号

    * @param value        :所在行的一行内容

    * @param context      :应用上下文的内容

    */

   public abstract void process(Integer key,String value,Context context);

  

   /**

    * 通过CleanUp输出处理后的结果

    */

   public void cleanUp(Context context){}

}

 

cn.toto.bigdata.mymr.io

InputFormat

封装读文件的组件(接口用途)

 

DefaultInputFormat

封装读文件的组件的实现类

 

OutPutFormat

封装写文件的组件(接口用途)

 

DefaultOutPutFormat

封装写文件的组件的实现

package cn.toto.bigdata.mymr.io;

 

import cn.toto.bigdata.mymr.common.Context;

 

public abstract class InputFormat {

   

   /**

    * 获取下一行要读的行的位置

    */

   public abstract int nextKey();

 

   /**

    * 获取从文件中读取的到的行的信息

    */

   public abstract String nextValue();

 

   /**

    * 从文件中读取到一行信息

    */

   public abstract String readLine() throws Exception;

  

   /**

    * 判断是否还可以读取到下一行的内容

    */

   public abstract boolean hasNext() throws Exception;

  

   /**

    * 初始化要读取的文件的路径和文件流

    */

   public abstract void init(Context context) throws Exception;

}

package cn.toto.bigdata.mymr.io;

 

import java.io.BufferedReader;

import java.io.FileReader;

 

import cn.toto.bigdata.mymr.common.Constants;

import cn.toto.bigdata.mymr.common.Context;

 

/**

 * 这里是默认的读取的实现类

 */

public class DefaultInputFormat extends InputFormat{

   //这里表示要读取的文件的路径

   private String inputPath;

   private BufferedReader br = null;

   //这里的key是指文本中类似读取到的指针的偏移量,是行号的偏移量

   private int key;

   //这里的value是指一行中的数据

   private String value;

   //默认读取的行是第0

   private int lineNumber = 0;

  

   @Override

   public void init(Context context) throws Exception {

      //获取要读的文件的路径

       this.inputPath = context.getConfiguration().get(Constants.INPUT_PATH);

       //开始初始化输入流,只不过,这个流是从文件中获取的

       this.br = new BufferedReader(new FileReader(inputPath));

   }

  

   @Override

   public int nextKey() {

      return this.key;

   }

 

   @Override

   public String nextValue() {

      return this.value;

   }

  

   @Override

   public boolean hasNext() throws Exception {

      String line = null;

      line = readLine();

     

      //数据读取完成之后行号加一

      this.key = lineNumber++;

      this.value = line;

     

      return null != line;

   }

 

   /**

    * 读取一行数据

    */

   @Override

   public String readLine() throws Exception {

      String line = br.readLine();

      //如果读取到空了之后,将BufferedReader的值变成空

      if (line == null) {

         br.close();

      }

      return line;

   }

}

package cn.toto.bigdata.mymr.io;

 

import cn.toto.bigdata.mymr.common.Context;

 

/**

 * 用于输出结果的类

 */

public abstract class OutPutFormat {

 

   /**

    * 将结果写入文件中

    */

   public abstract void write(Context context) throws Exception;

 

   /**

    * 关闭流

    */

    public abstract void cleanUp() throws Exception;

}

 

package cn.toto.bigdata.mymr.io;

 

import java.io.BufferedWriter;

import java.io.FileWriter;

import java.util.HashMap;

import java.util.Set;

import java.util.Map.Entry;

 

import cn.toto.bigdata.mymr.common.Constants;

import cn.toto.bigdata.mymr.common.Context;

 

public class DefaultOutPutFormat extends OutPutFormat{

    BufferedWriter bw = null;

        

         @Override

         public void write(Context context) throws Exception {

             String outputPath = context.getConfiguration().get(Constants.OUTPUT_PATH);

             HashMap<String, Integer> KVBuffer = context.getKVBuffer();

             this.bw = new BufferedWriter(new FileWriter(outputPath));

             Set<Entry<String, Integer>> entrySet = KVBuffer.entrySet();

             for (Entry<String, Integer> entry : entrySet) {

                            bw.write(entry.getKey() + "\t" + entry.getValue() + "\r");

                   }

             bw.flush();

         }

 

         @Override

         public void cleanUp() throws Exception {

                   bw.close();

         }

 

}

cn.toto.bigdata.mymr.common

Constants

常量定义

 

Context

应用上下文,用于存储计算单词出现字数的次数的中间变量

package cn.toto.bigdata.mymr.common;

 

public class Constants {

 

   public static final String JAR_PATH = "jar.path";

  

   public static final String JAR_FILE = "job.jar";

  

   public static final String WORKER_HOST = "worker.host";

  

   public static final String WORKER_PORT = "worker.port";

  

   public static final String CONF_FILE = "job.conf";

  

   public static final String INPUT_FORMAT = "input.format.class";

  

   public static final String OUTPUT_FORMAT = "output.format.class";

  

   public static final String INPUT_PATH = "input.path";

  

   public static final String OUTPUT_PATH = "output.path";

  

   public static final String TASK_PROCESSOR = "cn.toto.bigdata.mymr.task.TaskProcessor";

 

   public static final String USER_LOGIC = "user.logic.class";

  

   public static final String TASK_WORK_DIR = "E:/task";

  

  

}

package cn.toto.bigdata.mymr.common;

 

import java.io.File;

import java.io.FileInputStream;

import java.io.ObjectInputStream;

import java.util.HashMap;

 

/**

 * 应用上下文,通过这个内容获取配置文件

 * 通过这个上下文最终输出结果

 */

public class Context {

         private HashMap<String, Integer> KVBuffer = new HashMap<String, Integer>();

         private HashMap<String, String> conf;

        

         @SuppressWarnings("unchecked")

         public Context() throws Exception {

                   //加载配置参数

                   File file = new File(Constants.TASK_WORK_DIR + "/" + Constants.CONF_FILE);

                   if (file.exists()) {

                            @SuppressWarnings("resource")

                            ObjectInputStream oi = new ObjectInputStream(new FileInputStream(file));

                            this.conf = (HashMap<String, String>) oi.readObject();

                   } else {

                            // throw new RuntimeException("read conf failed ....");

                   }

         }

        

         /**

          * 通过这种变量最后输出结果

          */

         public void write(String k, Integer v) {

                   KVBuffer.put(k, v);

         }

 

         public HashMap<String, Integer> getKVBuffer() {

                   return KVBuffer;

         }

 

         public void setKVBuffer(HashMap<String, Integer> tmpKV) {

                   this.KVBuffer = tmpKV;

         }

 

         /**

          * 获取配置文件中的信息

          */

         public HashMap<String, String> getConfiguration() {

                   return conf;

         }

 

         /**

          * 在Context()构造函数里面已经有了conf的配置,这里再次传入说明配置可以让用户手动指定

          */

         public void setConfiguration(HashMap<String, String> configuration) {

                   this.conf = configuration;

         }

}

cn.toto.bigdata.mymr.userapp

UserLogic

客户端对ProcessLogic规范的实现

 

UserApp

客户端主入口程序

package cn.toto.bigdata.mymr.userapp;

 

import java.util.HashMap;

import java.util.Set;

import java.util.Map.Entry;

 

import cn.toto.bigdata.mymr.common.Context;

import cn.toto.bigdata.mymr.task.ProcessLogic;

 

public class UserLogic extends ProcessLogic {

 

         private HashMap<String, Integer> wordCount = new HashMap<String, Integer>();

        

         @Override

         public void process(Integer key, String value, Context context) {

                   String [] words = value.split(" ");

                   for(String word : words) {

                            Integer count = wordCount.get(word);

                            if (count == null) {

                                     wordCount.put(word, 1);

                            } else {

                                     wordCount.put(word, count + 1);

                            }

                   }

         }

 

         public void cleanUp(Context context) {

                   Set<Entry<String, Integer>> entrySet = wordCount.entrySet();

                   for(Entry<String, Integer> entry : entrySet) {

                            context.write(entry.getKey(), entry.getValue());

                   }

         }

}

package cn.toto.bigdata.mymr.userapp;

 

import java.util.HashMap;

 

import cn.toto.bigdata.mymr.common.Constants;

import cn.toto.bigdata.mymr.scheduler.Runner;

 

public class UserApp {

   

         public static void main(String[] args) throws Exception {

                   HashMap<String, String> conf = new HashMap<String,String>();

                   conf.put(Constants.INPUT_PATH, "E:/a.txt");

                   conf.put(Constants.OUTPUT_PATH, "E:/out.txt");

                   conf.put(Constants.INPUT_FORMAT, "cn.toto.bigdata.mymr.io.DefaultInputFormat");

                   conf.put(Constants.OUTPUT_FORMAT, "cn.toto.bigdata.mymr.io.DefaultOutPutFormat");

                   conf.put(Constants.JAR_PATH, "E:/test.jar");

                   conf.put(Constants.WORKER_HOST, "localhost");

                   conf.put(Constants.WORKER_PORT, "9889");

                   conf.put(Constants.USER_LOGIC, "cn.toto.bigdata.mymr.userapp.UserLogic");

                  

                   Runner runner = new Runner(conf);

                   runner.submit("localhost", 9889);

         }

}

cn.toto.bigdata.mymr.scheduler

Runner

客户端UserApp执行命令是依赖的Runner类,通过这里面的Socket发送命令。

 

WorkerClient

客户端执行时需要用到的client相关的代码

 

WorkerServer

UserApp执行时,需要提前启动的服务端

 

WorkerRunnable

服务端执行的相关逻辑

package cn.toto.bigdata.mymr.scheduler;

 

import java.io.FileOutputStream;

import java.io.ObjectOutputStream;

import java.util.HashMap;

 

import cn.toto.bigdata.mymr.common.Constants;

 

public class Runner {

    private HashMap<String, String> conf;

        

    public Runner(HashMap<String, String> conf) {

             this.conf = conf;

    }

   

    public void submit(String host,int port) throws Exception {

             ObjectOutputStream jobConfStream = new ObjectOutputStream(new FileOutputStream(Constants.CONF_FILE));

                   jobConfStream.writeObject(conf);

                  

                   WorkerClient workerClient = new WorkerClient(conf);

                   workerClient.submit();

    }

}

package cn.toto.bigdata.mymr.scheduler;

 

import java.io.FileInputStream;

import java.io.OutputStream;

import java.net.Socket;

import java.util.HashMap;

 

import cn.toto.bigdata.mymr.common.Constants;

 

public class WorkerClient {

   

         private HashMap<String, String> conf;

         Socket socket = null;

         OutputStream so = null;

        

         public WorkerClient(HashMap<String, String> conf) {

                   this.conf = conf;

         }

        

         public void submit() throws Exception {

                   socket = new Socket(conf.get(Constants.WORKER_HOST), Integer.parseInt(conf.get(Constants.WORKER_PORT)));

                   so = socket.getOutputStream();

 

                   String jarPath = conf.get(Constants.JAR_PATH);

 

                   // 发送jar包

                   byte[] buff = new byte[4096];

                   FileInputStream jarIns = new FileInputStream(jarPath);

                   so.write("jarfile".getBytes());

                   int read = 0;

                   while ((read=jarIns.read(buff)) != -1) {

                            so.write(buff,0,read);

                   }

                   jarIns.close();

                   so.close();

                   socket.close();

                  

                   // 发送job.conf文件

                   socket = new Socket(conf.get(Constants.WORKER_HOST), Integer.parseInt(conf.get(Constants.WORKER_PORT)));

                   so = socket.getOutputStream();

                  

                   FileInputStream confIns = new FileInputStream(Constants.CONF_FILE);

                   so.write("jobconf".getBytes());

                   while ((read = confIns.read(buff)) != -1) {

                            so.write(buff,0,read);

                   }

                   confIns.close();

                   so.close();

                   socket.close();

 

                   // 发送启动命令

                   socket = new Socket(conf.get(Constants.WORKER_HOST), Integer.parseInt(conf.get(Constants.WORKER_PORT)));

                   so = socket.getOutputStream();

                   so.write("job2run".getBytes());

                   String shell = "java -cp E:/test.jar cn.toto.bigdata.mymr.task.TaskProcessor";

                   so.write(shell.getBytes());

                   so.close();

                   socket.close();

         }

}

package cn.toto.bigdata.mymr.scheduler;

 

import java.net.ServerSocket;

import java.net.Socket;

 

public class WorkerServer {

 

         public static void main(String[] args) throws Exception {

                   ServerSocket ssc = new ServerSocket(9889);

                   System.out.println("Worker服务器启动-->9889");

                   while (true) {

                            Socket accept = ssc.accept();

                            new Thread(new WorkerRunnable(accept)).start();

                   }

         }

}

package cn.toto.bigdata.mymr.scheduler;

 

import java.io.BufferedReader;

import java.io.File;

import java.io.FileOutputStream;

import java.io.InputStream;

import java.io.InputStreamReader;

import java.net.Socket;

 

import cn.toto.bigdata.mymr.common.Constants;

 

public class WorkerRunnable implements Runnable {

         Socket socket;

         InputStream in = null;

         volatile long confSize = 0;

         volatile long jarSize = 0;

 

         public WorkerRunnable(Socket socket) {

                   this.socket = socket;

         }

 

         @Override

         public void run() {

                   try {

                            this.in = socket.getInputStream();

                            byte[] protocal = new byte[7];

                            int read = in.read(protocal, 0, 7);

                            if (read < 7) {

                                     System.out.println("客户端请求不符合协议规范......");

                                     return;

                            }

                            String command = new String(protocal);

                            switch (command) {

                            case "jarfile":

                                     receiveJarFile();

                                     break;

                            case "jobconf":

                                     receiveConfFile();

                                     break;

                            case "job2run":

                                     runJob();

                                     break;

                            default:

                                     System.out.println("客户端请求不符合协议规范.....");

                                     socket.close();

                                     break;

                            }

 

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

 

         }

 

         private void receiveConfFile() throws Exception {

                   System.out.println("开始接收conf文件");

                   FileOutputStream fo = new FileOutputStream(Constants.TASK_WORK_DIR + "/" + Constants.CONF_FILE);

                   byte[] buff = new byte[4096];

                   int read = 0;

                   while ((read = in.read(buff)) != -1) {

                            confSize += read;

                            fo.write(buff, 0, read);

                   }

                   fo.flush();

                   fo.close();

                   in.close();

                   socket.close();

 

         }

 

         private void receiveJarFile() throws Exception {

                   System.out.println("开始接收jar文件");

                   FileOutputStream fo = new FileOutputStream(Constants.TASK_WORK_DIR + "/" + Constants.JAR_FILE);

                   byte[] buff = new byte[4096];

                   int read = 0;

                   while ((read = in.read(buff)) != -1) {

                            jarSize += read;

                            fo.write(buff, 0, read);

                   }

                   fo.flush();

                   fo.close();

                   in.close();

                   socket.close();

 

         }

 

         private void runJob() throws Exception {

 

                   byte[] buff = new byte[4096];

                   int read = in.read(buff);

                   String shell = new String(buff, 0, read);

                   System.out.println("接收到启动命令......." + shell);

                   in.close();

                   socket.close();

                   Thread.sleep(500);

 

                   File jarFile = new File(Constants.TASK_WORK_DIR + "/" + Constants.JAR_FILE);

                   File confFile = new File(Constants.TASK_WORK_DIR + "/" + Constants.CONF_FILE);

                   System.out.println("jarfile 存在?" + jarFile.exists());

                   System.out.println("confFile 存在?" + confFile.exists());

                   System.out.println("jarfile可读?" + jarFile.canRead());

                   System.out.println("jarfile可写?" + jarFile.canWrite());

                   System.out.println("confFile可读?" + confFile.canRead());

                   System.out.println("confFile可写?" + confFile.canWrite());

 

                   System.out.println("jarFile.length():" + jarFile.length());

                   System.out.println("confFile.length():" + confFile.length());

 

                   /*if (jarFile.length() == jarSize && confFile.length() == confSize) {

                            System.out.println("jar 和 conf 文件已经准备就绪......");

                   }*/

                   System.out.println("开始启动数据处理TaskProcessor......");

 

                   Process exec = Runtime.getRuntime().exec(shell);

                   int waitFor = exec.waitFor();

                  

                   InputStream errStream = exec.getErrorStream();

                   BufferedReader errReader = new BufferedReader(new InputStreamReader(errStream));

                   String inLine = null;

                   /*

                    * InputStream stdStream = exec.getInputStream(); BufferedReader

                    * stdReader = new BufferedReader(new InputStreamReader(stdStream));

                    * while ((inLine = stdReader.readLine()) != null) {

                    * System.out.println(inLine); }

                    */

                   while ((inLine = errReader.readLine()) != null) {

                            System.out.println(inLine);

                   }

                  

                   if (waitFor == 0) {

                            System.out.println("task成功运行完毕.....");

                   } else {

                            System.out.println("task异常退出......");

                   }

 

         }

 

}

 

上一篇:Solidity智能合约编程漏洞及对策


下一篇:专访厦门第二医院影像科主任郭岗:基于 IBM 推出的 AI 集成解决方案,如何给医生减负增效?