1.parallelize()创建RDD:
words = sc.parallelize([("hadoop",1),("is",1),("good",1),\
("spark",1),("is",1),("fast",1),("spark",1),("is",1),\
("better",1)])
wordsres1 = words.groupByKey()
wordsres1.collect()
2.groupByKey()结果:
[('hadoop', <pyspark.resultiterable.ResultIterable at 0x7effad99feb8>),
('is', <pyspark.resultiterable.ResultIterable at 0x7effad99f828>),
('good', <pyspark.resultiterable.ResultIterable at 0x7effad99fcf8>),
('spark', <pyspark.resultiterable.ResultIterable at 0x7effad99fda0>),
('fast', <pyspark.resultiterable.ResultIterable at 0x7effad99fbe0>),
('better', <pyspark.resultiterable.ResultIterable at 0x7effad99fd68>)]
通过groupByKey,对原始的RDD数据进行分组,结果如下:
("hadoop",1) |
("is",(1,1,1)) |
("spark",(1,1) |
("good",1) |
("fast",1) |
("better",1) |
3.reduceByKey()结果:
wordsres2 = words.reduceByKey(lambda a,b:a+b)
wordsres2.collect()
#结果:
[('hadoop', 1),
('is', 3),
('good', 1),
('spark', 2),
('fast', 1),
('better', 1)]
通过reduceByKey,对原始的RDD数据进行jisuan,结果如下
("hadoop",1) | ("is",3) |
("spark",2) | ("good",1) |
("fast",1) | ("better",1) |