MapReduce编程实践——WordCount运行实例(Python实现)

一、实验目的

  1. 通过实验掌握基本的 MapReduce 编程方法;
  2. 掌握用 MapReduce 解决一些常见数据处理问题的方法,包括数据合并、数据去重、数据排序和数据挖掘等。

二、实验平台

  • 操作系统:Ubuntu 18.04(或 Ubuntu 16.04)
  • Hadoop 版本:3.2.2

三、实验内容和要求

1. 任务要求

首先我们在本地创建两个文件,即文件A和B。
对于两个输入文件,即文件A和文件B,请编写 MapReduce 程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C。下面是输入文件和输出文件的一个样例,以供参考。

文件A的内容如下:

China is my motherland
I love China

文件B的内容如下:

I am from China

根据输入文件A和B合并得到的程序应该输出如下形式的结果:

I			2
is			1
China		3
my			1
love		1
am			1
from		1
motherland	1

2. 编写Map处理逻辑

编写 Map 的 Python 代码如下(mapper.py):

#!/usr/bin/env python3
# encoding=utf-8

import sys
for line in sys.stdin:
    line = line.strip()
    words = line.split()
    for word in words:
        print("%s\t%s" % (word, 1))

3. 编写Reduce处理逻辑

编写 Reduce 的 Python 代码如下(reducer.py):

#!/usr/bin/env python3
# encoding=utf-8

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

for line in sys.stdin:
    line = line.strip()
    word, count = line.split('\t', 1)
    try:
        count = int(count)
    except ValueError:
        continue
    if current_word == word:
        current_count += count
    else:
        if current_word:
            print("%s\t%s" % (current_word, current_count))
        current_count = count
        current_word = word

if word == current_word:
    print("%s\t%s" % (current_word, current_count))

4. 简单测试

简单在本地测试一下,运行如下代码:

cat A B | python3 mapper.py | python3 reducer.py

输出如下:

MapReduce编程实践——WordCount运行实例(Python实现)

文末我会介绍如何将 Python 程序应用于 HDFS 文件系统中。


四、在HDFS中运行Python程序

首先启动 Hadoop:

cd /usr/local/hadoop
sbin/start-dfs.sh

创建 input 文件夹,把我们的数据文件传进去(注意这里你的 A、B 数据文件所处的位置):

bin/hdfs dfs -mkdir /input
bin/hdfs dfs -copyFromLocal /usr/local/hadoop/MapReduce/python/A /input
bin/hdfs dfs -copyFromLocal /usr/local/hadoop/MapReduce/python/B /input

确保 output 文件夹之前不存在:

bin/hdfs dfs -rm -r /output

我们只需要使用 Hadoop 提供的 Jar 包来为我们的 Python 程序提供一个接口就好了,这里我们所使用的 Jar 包一般在此目录下:

ls /usr/local/hadoop/share/hadoop/tools/lib/

找到名为 hadoop-streaming-x.x.x.jar 的包:

hadoop@fzqs-Laptop:/usr/local/hadoop/MapReduce/sample3$ ls /usr/local/hadoop/share/hadoop/tools/lib/

hadoop-streaming-3.2.2.jar

调用此包,把我们本地的 Python 文件作为参数传进去即可(注意这里我的 streaming 包是 3.2.2,看你自己的版本号):

/usr/local/hadoop/bin/hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.2.2.jar \
-file /usr/local/hadoop/MapReduce/sample1/mapper.py -mapper /usr/local/hadoop/MapReduce/sample1/mapper.py \
-file /usr/local/hadoop/MapReduce/sample1/reducer.py -reducer /usr/local/hadoop/MapReduce/sample1/reducer.py \
-input /input/*         -output /output

查看我们的输出:

bin/hdfs dfs -cat /output/*

输出正确,执行成功:

MapReduce编程实践——WordCount运行实例(Python实现)


五、总结

上一篇:python全栈开发从入门到放弃之初识面向对象


下一篇:第六篇.bootstrap表格