键值对RDD

键值对RDD

1.创建pairRDD

直接创建

pairRDD = sc.parallelize([(2,5),(8,9),(4,5)])

pairRDD.collect()
[(2, 5), (8, 9), (4, 5)]

从文件中加载

键值对RDD
rdd = sc.textFile(r"file:///C:\Users\86178\Desktop\SPARK\word.txt")
pairRDD = rdd.flatMap(lambda x:x.split())
pairRDD.collect()

['pan', 'hello', 'hadoop', 'fan', 'hello', 'python', 'panda', 'good']

pairRDD = pairRDD.map(lambda x:(x,1))
pairRDD.collect()

[('pan', 1), ('hello', 1), ('hadoop', 1), ('fan', 1), ('hello', 1), ('python', 1), ('panda', 1), ('good', 1)]

创建列表操作

rdd = sc.parallelize(['pan', 'hello', 'hadoop', 'fan', 'hello', 'python', 'panda', 'good'])
pairRDD = rdd.map(lambda x:(x,1))
pairRDD.collect()

[('pan', 1), ('hello', 1), ('hadoop', 1), ('fan', 1), ('hello', 1), ('python', 1), ('panda', 1), ('good', 1)]

2. groupByKey()

>>> words = sc.parallelize([('pan', 1), ('hello', 1), ('hadoop', 1), ('fan', 1), ('hello', 1), ('python', 1), ('panda', 1), ('good', 1)])
>>> word1 = words.groupByKey()
>>> word1.foreach(print)
('hadoop', <pyspark.resultiterable.ResultIterable object at 0x00000174798B18D0>)
('python', <pyspark.resultiterable.ResultIterable object at 0x00000174798B17B8>)
('panda', <pyspark.resultiterable.ResultIterable object at 0x00000174798B1898>)
('good', <pyspark.resultiterable.ResultIterable object at 0x00000174798B17B8>)
('hello', <pyspark.resultiterable.ResultIterable object at 0x0000023F5F6418D0>)
('fan', <pyspark.resultiterable.ResultIterable object at 0x0000023F5F641898>)
('pan', <pyspark.resultiterable.ResultIterable object at 0x00000228D8D418D0>)


>>> word1.mapValues(list).foreach(print)

('hadoop', [1])
('python', [1])
('panda', [1])
('good', [1])
('hello', [1, 1])
('fan', [1])
('pan', [1])
>>>

对相同的键的值分组

返回的是一个字典,值是一个可迭代的列表(需要转换)

3. reduceByKey()

对groupByKey的值操作

reduceByKey(func)返回一个新的kv的值

>>> words = sc.parallelize([('pan', 1), ('hello', 1), ('hadoop', 1), ('pan', 2), ('hello', 1), ('python',5), ('good', 1)])
>>> words1 = words.reduceByKey(lambda a,b:a+b)
>>> words1.foreach(print)

('hadoop', 1)
('python', 5)
('good', 1)
('hello', 2)
('pan', 3)

reduceByKey和groupByKey区别

>>> rdd = sc.parallelize(['pan', 'pan','fan', 'good','fan','pan'])
>>> pairRDD = rdd.map(lambda x:(x,1))
>>> wordgroup = pairRDD.groupByKey().map(lambda x:(x[0],sum(x[1])))
>>> wordgroup.foreach(print)
('fan', 2)
('good', 1)
('pan', 3)

>>> rdd10 = rdd9.map(lambda x:(x[0],sum(x[1])/len(x[1])))


>>> wordreduce = pairRDD.reduceByKey(lambda a,b:a+b)
>>> wordreduce.foreach(print)
('fan', 2)
('pan', 3)
('good', 1)

总结:

  1. 求和时,reduceByKey和groupByKey的效果差不多

  2. groupbykey用来求均值较为方便

4. keys和values

>>> words = sc.parallelize([('pan', 1), ('hello', 1), ('hadoop', 1), ('pan', 2), ('hello', 1), ('python',5), ('good', 1)])
>>> words.keys().foreach(print)
pan
python
pan
hello
good
hadoop
hello

>>> word = words.keys()
>>> word.distinct().collect()
['hadoop', 'python', 'good', 'hello', 'pan']
>>> word.distinct().count()

>>> words.values().foreach(print)
5
1
1
1
1
2
1

5. sortByKey()和sortBy()

对键排序。参数默认为True,升序。False降序。

>>> words = sc.parallelize([('pan', 1), ('hello', 1), ('hadoop', 1), ('pan', 2), ('hello', 1), ('python',5), ('good', 1)])
>>> words.sortByKey(False).foreach(print)
('hello', 1)
('hello', 1)
('hadoop', 1)
('pan', 1)
('pan', 2)
('python', 5)
('good', 1)

当需要对值排序时,使用sortBy;但是有时候排序会不正确(会按照分区进行排序,对每一个分区进行排序),所以当需要对rdd所有的排序就需要将分区数设置为1

>>> words.sortBy(lambda x:x[1]).foreach(print)
('pan', 1)
('hello', 1)
('hadoop', 1)
('hello', 1)
('good', 1)
('pan', 2)
('python', 5)
>>> words = sc.parallelize([('pan', 1), ('hello', 1), ('hadoop', 1), ('pan', 2), ('hello', 1), ('python',5), ('good', 1)],1)

或者words.repartition(1)
>>> words.glom().collect()
[[('pan', 1), ('hello', 1), ('hadoop', 1), ('pan', 2), ('hello', 1), ('python', 5), ('good', 1)]]
>>> words.sortBy(lambda x:x[1]).foreach(print)
('pan', 1)
('hello', 1)
('hadoop', 1)
('hello', 1)
('good', 1)
('pan', 2)
('python', 5)
>>> words.sortBy(lambda x:x[1],False).foreach(print)
('python', 5)
('pan', 2)
('pan', 1)
('hello', 1)
('hadoop', 1)
('hello', 1)
('good', 1)
>>>

6. mapValues(func)

对每一个values处理,不处理key

>>> words = sc.parallelize([('pan', 1), ('hello', 1), ('hadoop', 1), ('pan', 2), ('hello', 1), ('python',5), ('good', 1)])
>>> words.mapValues(lambda x:x+10).collect()
[('pan', 11), ('hello', 11), ('hadoop', 11), ('pan', 12), ('hello', 11), ('python', 15), ('good', 11)]

7. flatmapValues(func)

先执行mapValues(func),然后再压平

8. 分区partitionBy

list=['Hadoop','Spark','Hive','spoon']

rdd = sc.parallelize(list,2)		# 默认为cpu个数



rdd.glom().collect()			# 查看分区

len(rdd.glom().collect())		# 分区数量

rdd1 = rdd.repartition(3)		# 重新分区

9. join和leftOuterJoin和rightOuterJoin

join共同拥有的

>>> words = sc.parallelize([('pan', 1), ('hello', 1),('panda',2)])
>>> word1 = sc.parallelize([('panda','np')])
>>> word2 = words.join(word1)
>>> word2.collect()
[('panda', (2, 'np'))]
>>>
>>> word1.leftOuterJoin(words).collect()
[('panda', ('np', 2))]

>>> words.leftOuterJoin(word1).collect()
[('panda', (2, 'np')), ('pan', (1, None)), ('hello', (1, None))]
>>> word1.rightOuterJoin(words).collect()
[('panda', ('np', 2)), ('pan', (None, 1)), ('hello', (None, 1))]

>>> words.rightOuterJoin(word1).collect()
[('panda', (2, 'np'))]

10. countByKey()

>>> words = sc.parallelize([('pan', 1), ('hello', 1), ('hadoop', 1), ('pan', 2), ('hello', 1), ('python',5), ('good', 1)])
>>> words.countByKey()
defaultdict(<class 'int'>, {'pan': 2, 'hello': 2, 'hadoop': 1, 'python': 1, 'good': 1})
>>> words.countByKey().items()
dict_items([('pan', 2), ('hello', 2), ('hadoop', 1), ('python', 1), ('good', 1)])

六. 实践

1. 读取文件转换为键值对

>>> rdd = sc.textFile(r"file:///C:\Users\86178\Desktop\SPARK\word.txt")
>>> rdd1 = rdd.flatMap(lambda x:x.split())
>>> rdd2 = rdd1.map(lambda x:(x,1))
>>> rdd2.reduceByKey(lambda a,b:a+b).collect()
[('python', 2), ('panda', 1), ('fan', 2), ('hello', 2), ('spark', 2)]
>>>

2. 计算spark和Hadoop书再两天内的平均出本

# 方法一(不推荐)
>>> word = sc.parallelize([('spark',2),('hadoop',4),('spark',6),('hadoop',6)])
>>> word1 = word.reduceByKey(lambda a,b:a+b)
>>> word2 = word1.map(lambda x:(x[0],x[1]/2))
>>> word2.collect()
[('hadoop', 5.0), ('spark', 4.0)]


# 方法二(不推荐)
>>> wordgroup = word.groupByKey().map(lambda x:(x[0],sum(x[1])))
>>> wordgroup.collect()
[('hadoop', 10), ('spark', 8)]
>>> wordgroup = word.groupByKey().map(lambda x:(x[0],len(x[1])))
>>> wordgroup.collect()
[('hadoop', 2), ('spark', 2)]


# 方法三(推荐)
>>> wordgroup = word.groupByKey().map(lambda x:(x[0],sum(x[1])/len(x[1])))
>>> wordgroup.collect()
[('hadoop', 5.0), ('spark', 4.0)]


# 方法四(和方法s原理一样)
wordgroup = word.groupByKey().map(lambda x:(x[0],sum(x[1]),len(x[1])))
>>> wordgroup.collect()
[('hadoop', 10, 2), ('spark', 8, 2)]
>>> wordgroup.map(lambda x:(x[0],x[1]/x[2])).collect()
[('hadoop', 5.0), ('spark', 4.0)]

3. 求TOP值

第一步可以将两个文件合并到一起组成一个新的RDD

>>> rdd = sc.textFile(r"file:///C:\Users\86178\Desktop\SPARK\数据集\file*.txt")
>>> rdd.collect()
['15,594,564,126', '789,157,259,115', '894,115,157,267', '5456,5,6,2', '494,199,1,2597', '4969,45,69,25', '', '', '12,56', '4564,461,2369,16', '49,6,56,65', '659,652,166,64', '6559,65,6,4', '599,56', '6561,127,489,145', '', '14']
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('spark').setMaster('local[1]')
sc = SparkContext(conf=conf)


rdd_pcq = sc.textFile(r"file:///C:\Users\86178\Desktop\SPARK\数据集\求TOP值\file*.txt")
rdd1 = rdd.filter(lambda x:len(x.split(','))==4)
rdd2 = rdd1.map(lambda x:eval(x.split(',')[2]))
rdd3 = rdd2.repartition(1)
rdd4 = rdd3.sortBy(lambda x:x,False)
rdd4.foreach(print)
键值对RDD

4. 二次排序(独立运用程序)

import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('spark').setMaster('local[1]')
sc = SparkContext(conf=conf)


# 以上是独立运用程序必备的

rdd = sc.textFile(r"file:///C:\Users\86178\Desktop\SPARK\数据集\二次排序\file*.txt")
rdd1 = rdd.map(lambda x:(x.split()[0],x.split()[1]))

rdd2 = rdd1.sortBy(lambda x:x,False)
rdd2.foreach(print)

运行得到结果,如下

键值对RDD

5. 文件排序

import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('spark').setMaster('local[1]')
sc = SparkContext(conf=conf)

index = 0
def getindex():
    global index
    index+=1
    return index

rdd = sc.textFile(r"file:///C:\Users\86178\Desktop\SPARK\数据集\文件排序\file*.txt")
rdd1 = rdd.filter(lambda x:len(x)>0)
rdd2 = rdd1.map(lambda x:int(x.strip()))

rdd3 = rdd2.repartition(1)
rdd4 = rdd3.sortBy(lambda x:x)
rdd5 = rdd4.map(lambda x:(getindex(),x))
rdd5.foreach(print)
rdd5.saveAsTextFile(r"file:///C:\Users\86178\Desktop\SPARK\数据集\文件排序\export")

运行结果如下,

键值对RDD
上一篇:Java调用GDAL读取指定点附近指定距离方格内的栅格像元值


下一篇:使用Spark RDD完成词频统计