本来应该上周更新的,结果碰上五一,懒癌发作,就推迟了 = =。以后还是要按时完成任务。废话不多说,第四章-第六章主要讲了三个内容:键值对、数据读取与保存与Spark的两个共享特性(累加器和广播变量)。
键值对(PaiRDD)
1.创建
#在Python中使用第一个单词作为键创建一个pairRDD,使用map()函数
pairs = lines.map(lambda x:(x.split(" ")[0],x))
2.转化(Transformation)
转化操作很多,有reduceByKey,foldByKey(),combineByKey()等,与普通RDD中的reduce()、fold()、aggregate()等类似,只不过是根据键来进行操作。
reduceByKey():与recude()类似,只不过是根据键进行聚合
foldByKey():与fold()类似
combineByKey():与aggregate()类似
#用Python对第二个元素进行筛选
result = pairs.filter(lambda keyValue:len(keyValue[1]) < 20) #在Python中使用reduceByKey()和mapValues()计算每个键对应的平均值
rdd.mapValues(lambda x:(x,1)).reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])) #用Python实现单词计数
rdd.sc.textFile("文件地址")
words = rdd.flatMap(lambda x:x.split(" "))
result = words.map(lambda x:(x,1)).reduceByKey((x,y)=>x+y) #在Python中使用combineByKey()求每个键对应的平均值
sumCount = nums.combineByKey((lambda x:(x,1)),
(lambda x,y:(x[0]+y,x[1]+1)),
(lambda x,y:(x[0]+y[0],x[1]+y[1])))
sumCount.map(lambda key,xy:(key.xy[0]/xy[1])).collectAsMap() #在Python中自定义reduceByKey()的并行度
data = [("a",3),("b",4),("a",1)]
sc.parallelize(data).reduceByKey(lambda x,y:x+y)#默认并行度
sc.parallelize(data).reduceByKey(lambda x,y:x+y,10)#自定义并行度 #在Python中以字符串顺序对整数进行自定义排序
rdd.sortByKey(ascending = True,numPartitions = None,keyFunc = lambda x: str(x))
3.行动操作(Action)
数据分区:数据比较大时,可以用partitionBy()转化为哈希分区。即通过向partitionBy传递一个spark.HashPartitioner对象来实现该操作。在Python中不能将HashPartitioner对象传递给partitionBy,只需要把需要的分区数传递过去(如 rdd.partitionBy(100))。
在spark中,会为生成的结果RDD设好分区方式的操作有:cogroup(),groupWith(),join(),leftOuterJoin(),rightOutJoin,groupByKey(),reduceByKey(),combineByKey(),partitionBy(),sort(),mapValues(),flatMapValues(),filter()。最后三种只有当父RDD有分区方式时,结果RDD才会有分区RDD。其他的操作生成的结果都不会存在特定的分区方式。
自定义分区方式:
#Python自定义分区方式
import urlparse def hash_domain(url):
return hash(urlparse.urlparse(url).netloc) rdd.partitionBy(20,hash_domain) #创建20个分区
数据的读取与保存
文件格式
格式名称 | 结构化 | 备注 |
文本文件 |
否 | 普通的文本文件,每行一条记录 |
JSON |
半结构化 | 常见的基于文本的格式,半结构化;大多数库要求每行一条记录 |
CSV |
是 |
常见文本结构 |
SequenceFile |
是 | 一种用于键值对数据的常见Hadoop文件格式 |
Protocol buffers |
是 |
一种快读、节约空间的跨语言格式 |
对象文件 |
是 | 用来将Spark作业中的数据存储下来以让共享的代码读取。改变类的时候回失效。因为它依赖于Java序列化 |
文本文件
#读取文本文件
input=sc.textFile("文件地址")
#保存文本文件
result.saveAsTextFile(outputFile)
JSON
#读取Jason
import json
data = input.map(lambda x: json.loads(x))
#保存
(data.filter(lambda x : x["lovaPandas"]).map(lambda x:json.dumps(x))).saveAsTextFile(outputF
CSV文件
#用textFile读取csv
import csv
import StringIO
def loadRecord(line):
"""解析一行csv记录"""
input = StringIO.StringIO(line)
reader = csv.DictReader(input,filenames =["name","favouriteAnimal"])
return reader.next()
input = sc.textFile(inputFile).map(loadRecord) #读取完整csv
def loadRecords(filenameContents):
"""读取给定文件中的所有记录"""
input = StringIO.StringIO(filenameContents[1])
reader = csv.DictReader(input,fieldnames = ["name","favouriteAnimal"])
return reader
fullFileData = sc.wholeTextFiles(inputFile).flatMap(loadRecords) #保存csv
def writeRecords(records):
"""写出一些csv记录"""
output = StringIO.StringIO()
writer = csv.DictReader(output,filenames = ["name","favouriteAnimal"])
for record in records:
writer.writerow(record)
return [output.getvalue()]
pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)
SequenceFile
#读取SequenceFile
val data = sc.sequenceFile(inFile,"ord.apache.hadoop.io.Text","org.apache.hadoop.io.InWitable")
对象文件
#对象文件,用Java序列化写的,速度慢,保存用saveAsObjectFile(),读取用 SparkContext中的objectFile()函数接收一个路径,返回对应的RDD。它无法在Python中使用
Spark SQL中的结构化数据
Apache Hive
#Apache Hive
#用Python创建HiveContext并查询数据
from pyspark.sql import HiveContext hiveCtx =HiveContext(sc)
rows = hiveCtx.sql("SELECT name, age FROM users ")
firstRow = rows.first()
print firstRow.name
JSON数据
#JSON数据示例
{"user":{"name":"Holden","location":"San Francisco"},"text":"Nice day out today"}
{"user":{"name":"Matei","location":"Berkeley"},"text":"Even nicer here :)"} #在Python中使用SparkSQL读取JSON数据
tweets = hiveCtx.jsonFile("tweets.json")
# UserWarning: jsonFile is deprecated. Use read.json() instead.
# warnings.warn("jsonFile is deprecated. Use read.json() instead.") tweets.registerTempTable("tweets")
results = hiveCtx.sql("SELECT user.name,text FROM tweets")
这章关于sql的命令比较少,关于SQL的其他命令可以看看Spark的官方文档(PySpark 1.6.1 documentation),讲的比较详细。注意,这是spark 1.6版本,如果你安装的是1.2版本,1.6的有些命令是用不了的,可以先升级再用。
最后再来讲讲Spark中两种类型的共享变量:累加器(accumulator)和广播变量(broadcast variable)
累加器:对信息进行聚合。常见得一个用法是在调试时对作业执行进行计数。举个例子:假设我们从文件中读取呼号列表对应的日志,同时也想知道输入文件中有多少空行,就可以用到累加器。实例:
#一条JSON格式的呼叫日志示例
#数据说明:这是无线电操作者的呼叫日志。呼号由国家分配,每个国家有自己呼号号段,所以可以根据呼号查到相对应的国家。有一些呼叫日志中包含操作者的地理位置,用来帮助确定距离
{"address":"address here","band":"40m","callSigns":"KK6JLK","city":"SUNNYVALE",
"contactlat":"37.384733","contactlong":"-122.032164",
"county":"Santa Clara","dxcc":"","fullname":"MATTHEW McPherrin",
"id":57779,"mode":"FM","mylat":"37.751952821","mylong":"-122.4208688735",...}
#在Python中累加空行
file = sc.textFile(inputFile)
#创建Accumulator[int] 并初始化为0
blankLines = sc.accumulator(0) def extractCallSigns(line):
global blankLines #访问全局变量
if (line == ""):
blankLines += 1
return line.split(" ") callSigns = file.flatMap(extractCallSigns)
callSigns.saveAsTextFile(outputDir + "/callSigns")
print "Blank Lines:%d " % blankLines.value
我们来看看这段程序,首先创建了一个叫做blankLines的Accumulator[Int]对象,然后在输入中看到空行就+1,执行完转化操作后就打印出累加器中的值。注意:只有在执行完saveAsTextFile()这个action操作后才能看到正确的计数,flatMap()是transformation操作,是惰性的,这点在上一篇博文已经讲过。
但是我们上一篇文章中也提到过reduce()等这样的操作也是聚合操作,那为什么还有累加器这个东西存在呢?因为RDD本身提供的同步机制粒度太粗,尤其在transformation操作中变量状态不能同步,而累加器可以对那些与RDD本身的范围和粒度不一样的值进行聚合,不过它是一个write-only的变量,无法读取这个值,只能在驱动程序中使用value方法来读取累加器的值。
累加器的用法:
- 通过在驱动器中调用SparkContext.accumulator(initialValue)方法,创建出存有初始值的累加器。返回值为org.apache.spark.Accumulator[T]对象,其中T是初始值initialValue的类型。
- Spark闭包里的执行器代码可以使用累加器的 += 方法(在Java中是add)增加累加器的值。
- 驱动器程序可以调用累加器的Value属性来访问累加器的值(在Java中使用value()或setValue())
对于之前的数据,我们可以做进一步计算:
#在Python中使用累加器进行错误计数
#创建用来验证呼号的累加器
validSignCount = sc.accumulator(0)
invalidSignCount = sc.accumulator(0) def validataSign(sign):
global validSignCount,invalidSignCount
if re.match(r"\A\d?[a-zA-Z]{1,2}\d{1,4}[a-zA-Z]{1,3}\Z",sign):
validSignCount += 1
return True
else:
invalidSignCount += 1
return False #对与每个呼号的联系次数进行计数
validSigns = callings.filter(validataSign)
contactCount = validSigns.map(lambda sign:(sign,1)).reduceByKey(lambda (x,y):x+y) #强制求值计算计数
contactCount.count()
if validSignCount.value < 0.1 * validSignCount.value:
contactCount.saveAsTextFile(outputDir + "/contactCount")
else:
print "Too many errors: %d in %d" %(invalidSignCount.value,validSignCount.value)
累加器与容错性:
我们知道Spark是分布式计算,当有些机器执行得比较慢或者出错的时候,Spark会自动重新执行这些失败的或比较慢的任务。这样会导致同一个函数可能对同一个数据运行了多次,简单的说就是耗内存,降低了计算速度。在这种情况下,累加器怎么处理呢?
对于要在Action操作中使用的累加器,Spark只会把每个任务对累加器的修改应用一次,一般放在foreach()操作中。而对于Transformation操作中的累加器,可能不止更新一次。所以Transformation中的累加器最好只在调试中使用。
广播变量
广播变量允许程序员缓存一个只读的变量在每台机器上面,而不是每个任务保存一份拷贝。利用广播变量,我们能够以一种更有效率的方式将一个大数据量输入集合的副本分配给每个节点。广播变量通过两个方面提高数据共享效率:1,集群中每个节点(物理机器)只有一个副本,默认的闭包是每个任务一个副本;2,广播传输是通过BT下载模式实现的,也就是P2P下载,在集群多的情况下,可以极大的提高数据传输速率。广播变量修改后,不会反馈到其他节点。
在Spark中,它会自动的把所有引用到的变量发送到工作节点上,这样做很方便,但是也很低效:一是默认的任务发射机制是专门为小任务进行优化的,二是在实际过程中可能会在多个并行操作中使用同一个变量,而Spark会分别为每个操作发送这个变量。举个例子,假设我们通过呼号的前缀查询国家,用Spark直接实现如下:
#在Python中查询国家
#查询RDD contactCounts中的呼号的对应位置,将呼号前缀读取为国家前缀来进行查询
signPrefixes = loadCallSignTable() def processSignCount(sign_count,signPrefixes):
country = lookupCountry(sign_count[0],signPrefixes)
count = sign_count[1]
return (country,count) countryContactCounts = (contactCounts.map(processSignCount).reduceByKey((lambda x,y:x+y)))
数据量小的时候可以运行,但是如果这个表很大,signPrefixes的很容易达到MB级别,从主节点为每个任务发送这样的数组会非常消耗内存,而且如果之后还需要用到signPrefixes这个变量,还需要再向每个节点发送一遍。
如果把signPrefixes变为广播变量,就可以解决这个问题:
#在Python中使用广播变量来查询国家
#查询RDD contactCounts中的呼号的对应位置,将呼号前缀读取为国家前缀来进行查询
signPrefixes = sc.broadcast(loadCallSignTable()) def processSignCount(sign_count,signPrefixes):
country = lookupCountry(sign_count[0],signPrefixes.value)
count = sign_count[1]
return (country,count) countryContactCounts = (contactCounts.map(processSignCount).reduceByKey((lambda x,y:x+y))) countryContactCounts.saveAsTextFile(outputDir +"/contries.txt")
总结一下广播变量的过程:
- 通过对一个类型T的对象调用SparkContext.broadcast创建一个Broadcast[T]对象。任何可序列化的对象都可以这么实现。
- 通过value属性访问该对象的值
- 变量只会发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)。
广播的优化
如果广播的值比较大,可以选择既快又好的序列化格式。Scala和Java API中默认使用Java序列化库,对于除基本类型的数组以外的任何对象都比较低效。我们可以使用spark.serializer属性选择另一个序列化库来优化序列化过程。(也可以使用reduce()方法为Python的pickle库自定义序列化)
基于分区进行操作
两个函数:map() 和 foreach()
函数名 | 调用所提供的 | 返回的 | 对于RDD[T]的函数签名 |
mapPartitions() | 该分区中元素的迭代器 | 返回的元素的迭代器 | f:(Iterator[T])->Iterator[U] |
mapPartitionsWithIndex() | 分区序号,以及每个分区中的元素的迭代器 | 返回的元素的迭代器 | f:(Int,Iterator[T])->Iterator[U] |
foreachPartitions() | 元素迭代器 | 无 | f:(Iterator[T])->Unit |
示例:我们有一个在线的电台呼号数据,可以通过这个数据库查询日志中记录过的联系人呼号列表。
#在Python中使用共享连接池
def processCallSigns(signs):
"""使用连接池查询呼号"""
#创建一个连接池
http = urllib3.PoolManager()
#与每条呼号记录相关的URL
urls = map(lambda x: "http://73s.com/qsos/%s.json" % x,signs)
#创建请求(非阻塞)
requests = map(lambda x:(x,http.request('GET',x)),urls)
#获取结果
result = map(lambda x:(x[0],json.loads(x[1].data)),requests)
#删除空的结果并返回
return filter(lambda x:x[1] is not None,result) def fetchCallSigns(input):
"""获取呼号"""
return input.mapPartitions(lambda callsigns:processCallSigns(callsigns)) contactsCountList = fetchCallSigns(validSigns)
再举个例子说明一下mapPartitions()的功能:
#在Python中不实用mapPartitions()求平均值
def combineCtrs(c1,c2):
return (c1[0]+c2[0],c1[1]+c2[1]) def basicAvg(nums):
"""计算平均值"""
nums.map(lambda num:(num,1)).reduce(combineCtrs) #在Python中使用mapPartitions()求平均值
def partitionCtr(nums):
"""计算分区的sumCounter"""
sumCount = [0,0]
for num in nums:
sumCount[0] +=num
sumCount[1] +=1
return [sumCount] def fastAvg(nums):
"""计算平均值"""
sumCount = nums.mapPartitions(partitionCtr).reduce(combineCtrs)
return sumCount[0]/float(sumCount[1])
数值RDD的操作
方法 | 含义 |
count() |
RDD中的元素个数 |
mean() |
元素的平均值 |
sum() |
总和 |
max() |
最大值 |
min() |
最小值 |
variance() |
元素的方差 |
sampleVariance() |
采样的方差 |
stdev() |
标准差 |
sampleStdev() |
采样的标准差 |
举例:从呼叫日志中移除距离过远的联系点
#用Python移除异常值
#要把String类型的RDD转化为数字数据,这样才能使用统计函数并移除异常值
distanceNumerics = distances.map(lambda string :float(string))
stats = distanceNumerics.stats()
stddev = stdts.stdev()
mean =stats.mean()
reasonableDistances = distanceNumerics.filter(lambda x:math.fabs(x-mean) < 3 * stddev)
print reasonableDistances.collect()
这三章的内容比较实用,在生产中也会有实际应用。下周更新第7-9章,主要讲Spark在集群上的运行、Spark调优与调试和Spark SQL。