如何在hadoop中使用外部的python程序文件

业务场景大概是这样,我需要在公司hadoop集群上对博文进行结巴分词。我的数据是存储在hive表格中的,数据量涉及到五百万用户三个月内发的所有博文。

首先对于数据来说,很简单,在hive表格中就是两列,一列代表的是uid,一列代表的是博文内容。举个例子如下:

uid     content
12345   今天天气真好啊
23456   中午的食物真不错啊
...     ...

对于hive表格,我在使用hadoop的时候,方法一般使用的是hive+python的形式,也就是从hive中一行行的读取数据,每一行都经过python文件进行处理,然后将结果插入到另一个新的表格中去。

在这个过程中,我们在python文件中需要导入结巴分词(或者其他比较好的分词包),即 import jieba;但是在运行程序的时候发现程序报错,通过查询错误(查询的方法很笨拙,第一感觉就是包出了问题,只要程序中没有包的时候没有错误,加入结巴分词有了错误,就知道错在哪里了)也就是说在公司集群的python运行环境下并没有结巴分词。

因为使用的是公司的集群,我没有权限加载新的包,这就需要我找到别的办法。

通过谷歌,我知道的绝大多是的解决办法都是基于hadoop mapreduce的(简单来讲,就是通过一个mapper文件,一个reducer文件实现一个程序的执行)我在这里是实现了一个简单的python版本的mapreduce程序

正如我在开头提到的,我在这里使用的是hive+python的方式处理hive表格,正式的名称我们可以称它为hadoop streaming(从本质上来讲hadoop streaming 和mapreudce是一样的,只不过使用的方式不同)。也正是基于两者的本质是一样的,所以经过我的苦思冥想,终于灵光一现,从hadoop mapreduce中的解决办法中类比出了在hadoo streaming 中的解决办法。 简单讲一下,mapreduce中解决办法就是在最后的执行程序中加上一行

hadoop -file jieba.mod

从本质上来讲,这句话的作用就是把本地的文件传输到集群上面,让这个hadoop集群有能力使用这个文件。我为什么这么理解呢? 从上面那个简单的python版mapreduce中的程序中,我们知道最后的执行命令大概是这样的:

$HADOOP_CMD jar $STREAM_JAR_PATH \
    -input $INPUT_FILE_PATH \
    -output $OUTPUT_PATH \
    -mapper "python mapper.py" \
    -reducer "python reducer.py" \
    -file ./mapper.py \
    -file ./reducer.py

具体上面的命令是什么意思,在这里就不详细讲了,可以去我那边文章中学一下。我这边就从感觉上讲一下这个东西。我们在程序中是使用了mapper文件和mapper文件的。这个文件也是在本地上,如果我们想要在集群上使用这两个文件,就需要我们使用命令将这两个文件从本地上传到集群上。我们使用的命令就是hadoop -file.
类比的,我们就知道hadoop -file jieba.mod 的作用就是把结巴压缩包上传到集群上去。

这一点就是最关键的地方!

在hadoop streaming中(hive+python),我们在使用hive UDF(hive 自定义函数)的时候,一定会用到add file这个命令。比如

add file ./process.py

process.py就是python处理数据的文件。我们为什么使用到这个命令呢?就是因为这个处理文件是在本地的。如果想要在集群上使用这个命令,就需要把这个文件上传到集群上去,在streaming我们就是使用add file 这个命令来把文件上传到集群上去的。

这样,我们就把两者的联系在了一起,也就是说,如果我们想把结巴分词这个包上传到集群上去,在mapreduce中我们使用的是haddop -file,在streaming中我们就要使用的是add file.

逻辑理清了,接下来就非常清楚了,就是写代码的工作了。

当然具体的博文数据肯定是比这个更加复杂的,比如在博文中会有着表情内容等等。我们假设上面的这博文表格名称为 temp_zida_blog_content.

大致的思想就是这样,接下来上代码:

首先我们从官网下载结巴分词包,点这里.
下载的文件名字为:jieba-0.39.zip 可能版本不同,名字不同。解压之后,进入这个文件,找到一个文件夹名字为'jieba'(不带版本号)(在这里一定要注意我们一定要要进入到解压缩之后的文件中找到这个不带版本号的文件,这个才是我们要压缩的文件,如果我们直接使用从官网下载之后的压缩文件,是会出错误的),对这个不带版本号的文件夹进行压缩,并改成mod后缀名:

zip -r jieba.zip jieba
mv jieba.zip jieba.mod

为了在python中使用结巴分词,我们在python代码中引入的就是jieba.mod。需要注意的一点就是这个jieba.mod需要和python代码在同一个目录下,所以为了处理数据方便,最好是把处理数据需要的所有代码和相应配置文件全部放在一个文件夹中。

这样在python代码中,我们就可以这样引入结巴分词包:

zida.py

import sys,math,random
reload(sys)
sys.setdefaultencoding('utf-8')
sys.path.append('./') #这个步骤非常的重要,点名了导入文件的路径
import zipimport

importer = zipimport.zipimporter('jieba.mod')
jieba = importer.load_module('jieba')
import jieba
import re 

##下面的代码就是对每一行导进来的hive表格内容进行处理,然后返回相应的处理之后的数据

正如我之前所说的,只是python中引入这个包,是不够的,因为我们现在python文件中使用的包是从本地中导进去的,那么也就是说并不能适用在公司的hadoop的集群中,所以我们还需要在shell脚本中进行相应的处理(这一步也就是我这个业务场景和我在谷歌找到解决办法很不一样的地方,他们都是在执行mapreduce的那一步加入了本地相应的文件)

##在下面这个函数中,最重要的就是add file ./jieba.mod;,它代表的就是使用上本地的包文件。这一步其实很容易通过dd file ./zida.py这行代码来理解。因为首先我们的shell脚本是在集群上运行的,为了处理hive表格数据,我们使用本地的python脚本,为了让python脚本能够在集群上跑,我们使用了这行代码,类似于把python脚本抛到集群上。类比去想,如果我们想在python脚本中使用相应的包,那么只需要在这里把相应的包抛上去就好了,然后在python脚本中导入就好,因为已经被抛到集群中了,当然可以被引用。


function process_data(){
cat <<EOF
add file ./jieba.mod; 
add file ./zida.py;
INSERT OVERWRITE TABLE temp_zida_uids_bowen_content
    select transform(tmp.*) using 'python zida.py test'
    AS uid,bowen
    FROM(
        select a.uid,b.extend,b.content from 
        (select distinct(uid) from temp_zida_uids_day)a
        left outer join
        (select uid,extend,content from ods_tblog_content where dt>=dt_ods_tblog_content_90days_ago and dt<=dt_ods_tblog_content)b
        on a.uid=b.uid
    )tmp
EOF
}

hive -e "`process_data`"
echo "process_data"

其实可以引申一下,如果我们想在python脚本中使用或者说读进来本地的文件,比如说停用词,我们还需要在shell脚本中add这个停用词文件,然后才能从python脚本中读取。具体可以看我这里的讲解如何在处理hive表格的python代码中导入外部文件

公司的python运行环境比较老旧了,还是2的版本,不支持中文,搞得我代码正确的情况下,失败了好多次,浪费了大量时间,一定要记住在python脚本文件最上面写上:

#coding=utf-8

接下来我列取一些我参考的网页
这个网页是我主要参考的部分-给了我很多的思考-使用Hadoop的MapReduce和jieba分词统计西游记中的词频-这个网页中很厉害,它不只是导入了结巴分词,还导入了Jieba.analyse,同时进行了相应的改变,比我这个要复杂一些
How can I include a python package with Hadoop streaming job?
[How to use Cascading with Hadoop Streaming]-谷歌上其他的解决办法基本上都是基于这个网页的解答,给了我很多启发(http://eigenjoy.com/2009/11/18/how-to-use-cascading-with-hadoop-streaming/)-从这个网页学到了在下载完结巴分词之后,解压之后还需要进去,把没有带版本号的那个文件夹进行一个压缩.

上一篇:用Less定义常用的CSS3效果函数及常用颜色搭配(让CSS写起来更有趣)


下一篇:[C#]使用Redis来存储键值对(Key-Value Pair)