Pig Latin程序设计1

  Pig是一个大规模数据分析平台。Pig的基础结构层包括一个产生MapReduce程序的编译器。在编译器中,大规模并行执行依据存在。Pig的语言包括一个叫Pig Latin的文本语言,此语言有如下特性:
1.易于编程。实现简单的和高度并行的数据分析任务非常容易。
2.自动优化。任务编码的方式允许系统自动去优化执行过程,从而使用户能够专注于逻辑,而非效率
3.可扩展性,用户可以轻松编写自己的函数用于特殊用途的处理。

1 安装

1.安装java,配置环境变量
2.安装pig,配置环境变量
ps:安装一款平台级软件一般都是要配置环境变量用于系统执行时进行命令调用。

2 Pig运行模式

Pig有两种运行模式:Loca模式和MapReduce模式。当Pig在Local模式下运行时,Pig只访问本地一台主机;当Pig在MapReduce模式下运行时,它将访问一个Hadoop集群和HDFS的安装位置。这时,Pig将自动地对这个集群进行分配和回收。因为Pig系统可以自动对MapReduce程序进行优化,所以当用户使用Pig Latin语言进行编程的时候,不必关心程序运行的效率,Pig系统将会自动对程序进行优化,这样可以大了节省编程时间。
Pig的Local模式和MapReduce模式都有三种运行方式,分别为:Grunt Shell方式、脚本文件方式和嵌入式程序方式。

2.1 Local模式

(1) Grunt Shell方式
用户使用Grunt Shell方式时,首先需要使用命令开启Pig的Grunt Shell,只需在Linux终端中输入如下命令并只需即可:
pig -x local
这种方式和普通的Shell命令没什么区别,一条一条输入并只需
(2) 脚本文件方式
使用脚本文件作为批处理作业来运行Pig命令,它实际上就是第一种运行方式中的命令集合,可以使用下面的命令在本地模式下运行Pig脚本
pig -x local script.pig
其中script.pig对应的是Pig脚本,注意,脚本位置要正确指定,否则系统不能识别。
(3) 嵌入式程序方式
可以把pig命令嵌入到主机语言中,并且运行这个嵌入式程序。和运行普通的Java程序相同,这里需要书写特定的Java程序,并且将其编译成class文件或package包,再调用main函数运行程序
用户可以使用下面的命令对Java源文件进行编译
javac -cp pig-*.*-core.jar local.java
这里的pig-*.*-core.jar是Pig安装下面的一个jar包,local.java是用户编写的Java源文件。

2.2 MapReduce模式

  Pig需要把真正的查询转换成相应的MapReduce作业,并提交到Hadoop集群去运行。这种方式和普通的local方式没有多少区别,但是需要将相应的参数指定为mapreduce
(1)Grunt Shell方式
在linux终端下输入
pig -x mapreduce
(2)脚本文件方式
运行pig脚本文件
pig -x mapreduce script.log
(3)嵌入程序方式
javac -cp pig-0.7.0-core.jar mapreduce.java
java -cp pig-0.7.0-core.jar:.mapreduce

3 Pig Latin语言

  Pig Latin语言和传统的数据库语言很相似,但是Pig Latin更侧重于书于数据查询。而不是对数据进行修改和删除等操作。另外Pig Latin可以在Hadoop分布式云平台上运行,它的这个特点可以具有其他数据库无所比你的速度优势,它能在短时间内处理海量的数据。用pig latin语言编写程序时候,不必关心如何让程序更好地在hadoop云平台上运行,因为这些任务都是由pig系统自动分配的。

pig语句通常按照如下的格式来编写。
  1.通过LOAD语句从文件系统读取数据
  2.通过一系列"转换"语句对数据进行处理
  3.通过一条STORE语句把处理结果输出到文件系统中,或者使用DUMP语句把处理结果输出到屏幕上。
LOAD和STORE语句有严格的语法规定。关键是灵活使用转换语句对数据进行处理。

3.1 Pig Latin的使用

3.1.1 运行Pig Latin

用户可以通过多种方式使用Pig Latin语句,可以按照如下方式使用:
1.Pig对所有语句的语法和语义进行确认
2.如果遇到DUMP或STORE命令,Pig将顺序执行上面所有语句
  也就是说,如果没有遇到DUMP或STORE命令,pig只进行语法确认,但不执行的。

3.1.2 数据流语言

  编写pig latin程序包含一系列步骤,每个步骤都是一个单独的高级数据转换,这个转换支持关系型操作,诸如filter、union、group和join,一个处理查询日志的pig latin可能是这样的:

 log = LOAD 'excite-small.log' AS (user, time, query);
grpd = GROUP log BY user;
cntd = FOREACH grpd GENERATE group,COUNT(log);
DUMP cntd;

pig latin是一个数据流语言,数据流语言对根据算法思考的程序员更友好。

3.1.3 数据类型

Pig在数据类型上的设计理念总结为一句口号:pig吃任何东西,输入数据可以支持任何格式,pig天生支持那些流行的格式,如制表符分隔的文本文件,用户也可以增加函数支持其他的数据格式文件,pig不需要元数据或者数据的schma,但如果有也可以利用。

3.1.4 用户定义函数

pig是面向多种应用程序而设计的————处理日志数据、自然语言处理、分析网络图等,这些计算大多都需要定制化处理。

3.1.5 管理Grunt shell

  除了运行pig latin语句之外,grunt shell还支持一些基本的使用命令,例如输入help打印这些实用命令帮助信息,输入quit可以退出grunt shell,键入kill加上hadoop作业的id可以终止hadoop作业。用set命令可以设置pig参数。
下面的命令是grunt shell下运行的
set debug on
set job.name 'my job'
  参数debug声明是否打开或者关闭调试级别的日志,job.name取一个单引号字符串作为pig程序的hadoop作业名。
  grunt shell还支持文件级的使用命令,例如ls和cp,这些命令大体上是HDFS文件系统shell命令的一个子集。
下面是这些文件命令的列表

实用命令    help    pig使用帮助
quit 退出grunt shell
kill jobid 杀掉hadoop任务
set debug [on|off] 开启或关闭debug日志
set job.name '任务名称'
文件命令 cat, cd, cp, copyFromLocal, copyToLocal, ls, mkdir, mv, pwd, rm, rmf, exec, run

  两个exec和run命令,它们在grunt shell内部运行pig脚本,命令exec执行pig脚本的空间在grunt shell之外。在脚本中定义的别名对shell不可见,反之亦然。命令run执行pig脚本的空间与grunt shell相同,这也被称为交互模式,它和手动脚本逐行输入grunt shell的效果一样。

3.1.6 通过grunt学习pig latin

pig下自带一个数据集tutorial/data/excite-small.log文件中数据分为3列,中间按制表符分隔。

LOAD命令

将数据装载到一个变量指向的内存中。
grunt > log = LOAD 'excite-small.log' AS (user, time, query);
注意:log与=之间有一个空格,=与LOAD之间可以也可以没有空格
注意输入语句后,grunt shell解析语句,但不会去实际执行它们,直到使用DUMP和STORE命令请求其结果,DUMP打出变量的内容,STORE将内容存储在一个文件中。在显式请求最终结果前,pig不会执行任何命令。这是很有意义的,因为我们处理大数据集。没有足够的内存空间用于装载数据,而且任何情况下,我们希望在花费时间和资源去实际执行之前,验证计划的逻辑的正确性是有必要的。
当DUMP一个别名时,内容应该足够小,可以打印在屏幕上,通常多数情况下STORE文件

LIMIT命令

当DUMP一个别名时,内容应该足够小,可以打印在屏幕上,可以使用LIMIT命令,限制输出的行数。
例如:
lmt = LIMIT log 7//限制输出LIMIT的前7行
DUMP lmt
(2A9EABFB35F5B954,970916105432,+md foods +proteins)
(BED75271605EBD0C,970916001949,yahoo chat)
(BED75271605EBD0C,970916001954,yahoo chat)
(BED75271605EBD0C,970916003523,yahoo chat)
(BED75271605EBD0C,970916011322,yahoo search)
(BED75271605EBD0C,970916011404,yahoo chat)
(BED75271605EBD0C,970916011422,yahoo chat)

Pig的读写操作命令,LIMIT虽然不是一个读写命令,但是和它们经常一起使用,所以也列出。
LOAD 命令
alias = LOAD 'file' [USING function] [AS shcema]
alias是一个变量名,把文件装载到一个关系上。如果不用USING选选来特别指定,默认情况下使用PigStorage装载函数(可以写,也可以不写),数据可以使用AS选项给出schema
例如:log1 =LOAD 'excite-small.log' USING PigStorage AS (user, time, query);

LIMIT命令
alias = LIMIT alias n;
限制元组个数为n,LIMIT返回alias的前n个元组。

DUMP命令
DUMP alias;
显示一个关系的内容,这个关系应该足够小,方便调试

STORE命令

STORE alias INTO 'directory' [USING function]
将一个关系中的数据存储到一个目录中,这个目录在该命令执行时必须不能存在,Pig会生成这个目录,并把关系存储在以part-nnnnn为名的文件中,除非使用USING选项特别指定,否则默认使用PigStorage存储函数。

例子:

 grunt>log = LOAD 'ABC.txt' as (user:chararray, time:long, query:chararray);
grunt>grpd = GROUP log BY user;
grunt>cntd = FOREACH grpd GENERATE group, COUNT(log);
grunt>STORE cntd INTO 'output';

  上面相当于对一个表进行统计
  需要指出的是:在Pig Latin与SQL查询之间有两个主要的不同。如前所述,pig latin是一种数据处理语言,它使用的是一个数据处理步骤的序列,而不是用子句组成的复杂SQL查询。比如DUMP (LIMIT log 7)是非法的。另一个区别是:SQL语句里面通常有固定的schema,在SQL中,我们在数据填充之前定义关系的schema,pig则在schema上采用宽松处理,可以使用,也可以不适用schema。
基于创建关系所用的操作,pig会尽可能多地找出它的schema,可以使用DESCRIBE命令将pig的schema暴露给所有关系。这有助于理解pig语句对数据的处理过程。

DESCRIBE命令
例如:log(相当于一个数据结构)
DESCRIBE log
得到:
log:{user: chararray,time: long,query: chararray}
它表示log这个"表",的schema是user,time和query组成。

DESCRIBE grpd;
得到:
grpd:{group:chararray,log:{(user: chararray,time: long,query: chararray)}}
这个数据结构是很复杂的,会在后面讲

ILLUSTRATE命令
DESCRIBE可以告诉一个关系schema,但是ILLUSTRATE可以通过一个样例的执行,进一步来显示pig如何计算这个关系,但pig只模拟一小部分样例来提高执行速度。

例子:

Pig Latin程序设计1

  ILLUSTRATE命令显示出在cntd之前有4次转换,每个表的标题行描述了转换之后所输出关系,表的其他部分显示了样例数据。就像一个大鹅罩下面放了不同的窝一样,一个窝里面放的是小鸭子,一个窝里面放的是小鸡,窝就相当于包。包是pig里面一种数据结构。

Pig Latin的诊断运算符:

DESCRIBE命令
DESCRIBE alias
显示一个关系的schema

EXPLAIN
显示用于计算一个关系的执行计划

ILLUSTRATE
ILLUSTRATE alias
逐步显示数据如何被转换,从load命令开始直到最终的结果关系,为了使显示和处理可管理,只有一个输入数据的样本用于摸你执行。不过pig的初始样本有时并不理想,无法让脚步生成有意义的数据,Pig就会伪造一些类似的初始数据。为了让ILLUSTRATE能够运行,第一步load命令必须包含一个schema,后续的转换一定不能包含LIMIT或SPLIT运算符,也不能包含嵌套的FOREACH运算符,或者使用map数据类型。

FOREACH命令
作用是基于数据的列进行数据转换,就是对数据列进行处理比如统计,就像SQL语句里的组函数一样
alias = FOREACH 关系 GENERATE group,count(log)
generate就是生成的意思,计算生成

注释:
多行注释:/*
.....
*/
单行注释:--
DUMP A;--打印A的内容

大小写相关性:
除了pig的命令及参数名不区分外,其他的都区分大小写。
Pig Latin详述

数据类型和schema

Pig Latin的基本数据类型
int 有符号32位整数
long 有符号64位
float 32位浮点
double 64位浮点
chararray Unicode或UTF-8编码的字符 字符数组:就是数组的每一个元素占一个char长度 如hello world
bytearray 二进制对象,字节数组 字节数组就是数组的每一个元素站一个字节长度 3个复杂数据类型是:元组(tuple)、包(bag)和映射表(map)
tuple 有序字段集 (19,2)
bag 元组集合 {(12,3),(23,13),(4,4)}
map 键值对集合 [key#value] 键必须是唯一的病危chararray型的字符串,值可以是任何类型

数据模式:

pig的数据模式包括:关系、包、元组和域

域:filed,就是一个字段
元组:相当于一条记录,就是所有域的有序集合
包:所有元组的集合
一个关系就是由元组组成的包,pig的关系相当于关系数据库中的表,包中的元组就是表中的行。但和关系表中不同的是,pig不需要每一个元组包含相同数目的域或者相同位置的域,也不需要具有相同的数据类型。
关系是无序的,因此pig不能保证元组按特定的顺序来执行。

表达式和函数
表达式在各种语言都有见到,pig也不例外
介绍几个特别的:
正则表达式匹配:x matches regex
或且非:用and or not

一些内置的函数
AVG 在一个单列的包中计算数字的平均值
CONCAT 连接两个字符串chararray或两个bytearray
COUNT 计算一个分类后的包中的元组个数
DIFF 比较一个元组中的两个字段
MAX 计算在一个单列包中的最大值。该列必须为一个数字类型或者一个chararray
MIN 计算在一个单列包中的最小值,该列必须为一个数字类型或者一个chararray
SIZE 计算元素的个数,对于一个包,它计算元组的个数,对于一个元组,计算元素的个数,对于一个chararray,计算字符的个数,对于bytearray计算字节个数,对于一个数字标量,总是返回1.
SUM 计算在一个单列包中的数值的总和
TOKENIZE将一个字符串拆分为一个由单词组成的包,每个单词为包中的一个元组,单词的分隔符为空格、双引号、逗号、圆括号和星号*
IsEmpty 检查一个bag或map是否为空

关系型运算符
UNION
求两个数据集合的并集
首先编写两个数据文件A:
0,1,2
1,3,4
数据文件B:
0,5,2
1,7,8
运行pig:
xuqiang@ubuntu:~/hadoop/src/pig/pig-0.8.1/tutorial/pigtmp$ pig -x local

2011-06-05 18:46:54,039 [main] INFO org.apache.pig.Main - Logging error messages to: /home/xuqiang/hadoop/src/pig/pig-0.8.1/tutorial/pigtmp/pig_1307324814030.log
2011-06-05 18:46:54,324 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: file:///
grunt>
加载数据A:

grunt> a = load 'A' using PigStorage(',') as (a1:int, a2:int, a3:int);

加载数据B:
grunt> b = load 'B' using PigStorage(',') as (b1:int, b2:int, b3:int);

求a,b的并集:
grunt> c = union a, b;
grunt> dump c;
(0,5,2)
(1,7,8)
(0,1,2)
(1,3,4)
可以发现a和b的数据合在一起了。
UNION支持两个数据集有重复的数据,比如a和b的数据有一部分重复,那么打出来的数据是重复的,只是简单的合并,不会只打印a或b的重复数据。
SPLIT运算符
撕开的意思,就是把一个数据集按照条件分开。
将c分割为d和e,其中d的第一列数据值为0,e的第一列的数据为1($0表示数据集的第一列):
grunt> split c into d if $0 == 0, e if $0 == 1;
查看d:
grunt> dump d;
(0,1,2)
(0,5,2)
查看e:
(1,3,4)
(1,7,8)

FILTER运算符
筛选运算符,筛选出一部分数据
选择c中的一部分数据:

grunt> f = filter c by $1 > 3;
查看数据f:
grunt> dump f;
(0,5,2)
(1,7,8)

组运算符
组运算符是对分类进行操作的。
GROUP运算符
对数据进行分组:
grunt> g = group c by $2;
查看g:
grunt> dump g;
(2,{(0,1,2),(0,5,2)})
(4,{(1,3,4)})
(8,{(1,7,8)})

GROUP alias ALL
把关系中的所有元组都放在一个大的包中以便进行聚类分析,因为函数是对包处理,而不是对关系处理
grunt> h = group c all;

grunt> dump h;
(all,{(0,1,2),(1,3,4),(0,5,2),(1,7,8)})

查看h中元素个数:
grunt> i = foreach h generate COUNT($1);
查看元素个数:
grunt> dump i;
这里可能出现Could not resolve counter using imported: [, org.apache.pig.built in., org.apache.pig.impl.builtin. ]的情况,这是需要使用register命令来注册pig对应的jar版本。

COGROUP运算符
grunt> j = COGROUP a by $2, b by $2;
对两个关系进行连接
DUMP j;
(2,{(0,1,2)},{(0,5,2)})
(4,{(1,3,4)},{})
{8,{},{(1,7,8)}}
GROUP生成的字段通常是两个,COGROUP会是三个(如果COGROUP两个以上关系,还会更多)。第一个字段为组键,第二三个字段为包。这些包源自进行匹配时候显示的。

INNER关键字
用于COGROUP时,忽略一个关系而言并不存在的组键
(2,{(0,1,2)},{(0,5,2)})
(4,{(1,3,4)},{})
{8,{},{(1,7,8)}}
上面的结果,显然对于组键为4的b来说包不存在,可以用INNER来忽略掉。
grunt> j = COGROUP a by $2, b by $2 INNER;
DUMP j;
(2,{(0,1,2)},{(0,5,2)})
{8,{},{(1,7,8)}}
类似也可以对a使用INNER,当然也可以同时使用INNER,得到的结果就是组键同时存在且相等的结果。

JOIN运算符

  从概念上将,COGROUP默认是外连接,就是把组键只要存在的,都列出来,而INNER关键字可以把它修改为左外连接,右外连接和内连接。在Pig中,另一个做内连接的方法是使用JOIN运算符,JOIN和inner COGROUP的主要区别是JOIN生成一个输出记录平坦集合。
grunt> j = join a by $2, b by $2;
输出结果是:
(0,1,2,0,5,2)
可以发现,flatten(平坦化)的结果是新的数据结构,新的关系,字段是两个表的和。

FOREACH ...GENERATE运算符
最关键的是GENERATE运算符,它能生成相应的字段结果
取出c的第二列$1和$1 * $2,将这两列保存在k中:
grunt> k = foreach c generate $1, $1 * $2;
查看k的内容:
grunt> dump k;
(5,10)
(7,56)
(1,2)
(3,12)

FOREACH后面通常跟着一个别名(一个关系的名字)和关键字GENERATE。在GENERATE之后的表达式控制输出结果。最简单的情况下,我们使用FOREACH将一个关系中的特定列输出,还可以应用任意的表达式进行输出。

FOREACH的投影功能(取分量)
就像几何向量的投影功能类似一样,也就是取一个向量的分量
k = FOREACH g GENERATE group, c.a1;
(2,{(0),(0)})
(4,{(1)})
(8,{(1)})

要得到每个包中的两个字段
k = FOREACH g GENERATE group, c.(a1,a2);
(2,{(0,1),(0,5)})
(4,{(1,3)})
(8,{(1,7)})

FLATEN函数
这个函数的功能是将新生成的数据关系(比如上面的k)的字段是包的类型的字段进行解套,就是去掉(),{}这些东西,比如字段如果是
(a,{b,c})
{b,c}是第二个字段,而且是包类型,经过flatten后,去掉外壳,就变成了(a,b,c),相当于将原来的字段进行组合,形成新的关系。FLATTEN使用时,是进行交叉积的。
注意:进行FLATTEN时候,FLATTEN的参数是什么可以用DESCRIBE来查看相应的数据结构,比如对于g,用DESCRIBE后,为
grunt> DESCRIBE g;
g: {group: int,c: {(a1: int,a2: int,a3: int)}}
那么可以FLATTEN的对象就是group和c

如果FLATTEN超过两个包,那么结果是两个包内的元组进行组合
例如:假设一个元组是
(4,{(4,2),(4,3)},{(6),(9)})
结构是:
{group:int,a :{a1:int,a2:int},b:{b1:int}}
{}包含的是包级别,()包含的是元组级别

对第二个包和第三个包进行FLATTEN结果是第二个包里的元组和第三个包里的元组进行组合,也就是2*2四种情况
(4,4,2,6)
(4,4,2,9)
(4,4,3,6)
(4,4,3,9)

GENERATE必须始终出现在嵌套块的末尾,如果有嵌套块的话
例如:
m = FOREACH g{
tmp = FILTER a BY a1 >= a2;
GENERATE group, tmp, a3;
}

DISTINCT 函数
用于去除重复元组
alias = DISTINCT alias
SAMPLE函数
随机采样一个关系
alias = SAMPLE alias factor
factor是采样因子,也就是采样百分比
例如:f = SAMPLE c 0.02;
注意:概率是金丝的,不会精确等于多少

ORDER BY,进行排序ASC 升序,DESC降序
grunt > n = ORDER c BY a1 DESC;
不写默认是ASC,也就是升序
STREAM 处理外部脚本关系

4 用户定义函数(UDF)

用户自定义函数是对pig里面的函数的扩展。很有用
pig能够支持两种类型的UDFs:eval和load/store,其中load/store的自定义函数主要是用来加载和保存特定的数据格式;eval自定义函数主要用来进行常规的数据转换。

1. eval

如果想要实现自定义的eval类型的函数,那么基本的做法是首先编写一个类继承自EvalFunc<T>这个抽象类,同时需要重写这个类的一方法:

abstract public T exec(Tuple input) throws IOException;

该方法传入的类型是Tuple类型。

如果调用udf时使用的是:udf(ARG1, ARG2);那么调用input.get(0)将得到ARG1,同理input.get(1)得到的是ARG2,input.getSize()得到传递的参数的数量,这里就是2.

下面我们就开始编写udf UPPER.java,将UPPER.java文件保存到myudfs目录下:

package myudfs;
import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.WrappedIOException; public class UPPER extends EvalFunc<String>
{
public String exec(Tuple input) throws IOException {
if (input == null || input.size() == 0)
return null;
try{
String str = (String)input.get(0);
return str.toUpperCase();
}catch(Exception e){
throw WrappedIOException.wrap("Caught exception processing input row ", e);
}
}
} 编译该文件,同时生成该jar文件: xuqiang@ubuntu:~/hadoop/src/pig/pig-0.8.1/myudfs$ javac -cp ../pig.jar UPPER.java
xuqiang@ubuntu:~/hadoop/src/pig/pig-0.8.1/myudfs$ cd .. xuqiang@ubuntu:~/hadoop/src/pig/pig-0.8.1$ jar -cf myudfs.jar myudfs
准备student_data文件:
student1,1,1
studetn2,2,2
student3,3,3
student4,4,4
在pig中测试该udf: xuqiang@ubuntu:~/hadoop/src/pig/pig-0.8.1$ pig -x local 注册该udf: grunt> register myudfs.jar
加载数据: grunt> A = load 'student_data' using PigStorage(',') as (name:chararray, age:int,gpa:double);
grunt> B = FOREACH A GENERATE myudfs.UPPER(name);
grunt> dump B;
这时将输出:
(STUDENT1)
(STUDETN2)
(STUDENT3)
(STUDENT4)
上一篇:我是如何利用Github Pages搭建起我的博客,细数一路的坑


下一篇:利用python进行数据分析之数据规整化