spark教程(四)-python基础编程

hadoop 是 java 开发的,原生支持 java;spark 是 scala 开发的,原生支持 scala;

spark 还支持 java、python、R,本文只介绍 python

spark 1.x 和 spark 2.x 用法略有不同,spark 1.x 的用法大部分也适用于 spark 2.x 

 

Pyspark

python + spark,简单来说,想用 python 操作 spark,就必须用 pyspark 模块

 

RDD

spark 最重要的一个概念叫 RDD,Resilient Distributed Dataset,弹性分布式数据集   

RDD 可以从 hadoop 获取数据,也可以从其他地方获取数据,也可以从一种 RDD 转换成 另一种 RDD;

 

Python 编程基本语法

1. 首先创建 SparkSession

在 spark1.x 中是创建 SparkContext

在 spark2.x 中创建 SparkSession

 

2. 然后创建 RDD

spark 是以 RDD 概念为中心运行的,RDD 是一个容错的、可以被并行操作的元素集合。

创建 RDD 有两种方式:

1. 在驱动程序中并行化一个已经存在的集合    【内存中的数据】

2. 从外部存储系统引入数据,生成 RDD      【外部存储介质中的数据,注意 spark 本身没有存储功能】

  // 这个存储系统可以是一个共享文件系统,如 hdfs、hbase

详见我的博客 RDD 认知

 

3. 操作 RDD

RDD 的操作有两种方式:转换 和 行动,而且 转换 是 惰性的

可以根据 是否有返回 判断是哪个操作,行动 有返回值,转换无返回值

详见官网 RDD

 

3.1 RDD 缓存

我们可以把 RDD 缓存到 内存中, 这其实就是 行动 操作

distFile = sc.textFile('README.md')
m = distFile.map(lambda x: len(x))      # map 是 转换 操作,并不立即执行
m.cache()       # 把 map 的输出缓存到内存中,其实 cache 就是 执行 操作

或者 m.persist() 

 

3.2  转换 操作

惰性,无返回值

map(func[, preservesPartitioning=False]):把一个序列中的元素逐个送入 map,经 func 处理后,返回一个新的 序列

rdd = sc.parallelize([2, 3, 4])
rdd.map(lambda x: x + 1).collect()          # [3, 4, 5]

filter(func):类似 map,func 是个过滤函数

rdd = sc.parallelize([2, 3, 4])
rdd.map(lambda x: x > 3).collect()          # [False, False, True]

flatMap(func[, preservesPartitioning=False]):也类似 map,只是 它会把 每次经过 func 处理的结果进行 合并,输入和输出的 list 长度可能不同

rdd = sc.parallelize([2, 3, 4])
rdd.flatMap(lambda x: range(1, x)).collect()            # [1, 1, 2, 1, 2, 3]
# range(1, 2): 1
# range(1, 3): 1, 2
# range(1, 4): 1, 2, 3

### vs map
rdd.map(lambda x: range(1, x)).collect()                # [[1], [1, 2], [1, 2, 3]]

mapPartitions(func [, preservesPartitioning=False]) :map的一个变种,map 是把序列的单个元素送入 func ,而 mapPartitions 是把 序列分区后 每个 分区 整体送入 func

rdd = sc.parallelize([1,2,3,4,5], 3)    # 分 3 个区
def f(iterator): yield sum(iterator)    # 必须是生成器,即 yield,不能 return
rdd.mapPartitions(f).collect()          # [1, 5, 9]

mapPartitionsWithIndex(func [, preservesPartitioning=False]) :func 有两个参数,分片的序号 和 迭代器,返回 分片序号,也必须是 迭代器

rdd = sc.parallelize(range(15), 13)    # 分 13 个区
def f(splitIndex, iterator): yield splitIndex
rdd.mapPartitionsWithIndex(f).collect() # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]

cartesian(otherDataset):利用两个 序列 生成 笛卡尔內积 的数据集

x = sc.parallelize([1,2,3])
y = sc.parallelize([4,5])
x.cartesian(y).collect()        # [(1, 4), (1, 5), (2, 4), (2, 5), (3, 4), (3, 5)]

 

以下方法只适用 key-value 数据

mapValues(func):根据 func 处理 value

rdd = sc.parallelize([(1, [1,2,3]), (3, ['a', 'b'])])
rdd.mapValues(len).collect()            # [(1, 3), (3, 2)]  计算 value 的长度

reduceByKey(func [, numPartitions=None, partitionFunc=<function portable_hash at 0x7fa664f3cb90>]):针对 k-v 对的处理方法,把 key 相同的 value 进行 reduce,然后重新组成 key-reduce 对

rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
def f(x, y): return x + y
rdd.reduceByKey(f).collect()        # [('a', 2), ('b', 1)]

sortByKey([ascending=True, numPartitions=None, keyfunc=<function <lambda> at 0x7fa665048c80>]):根据 key 进行排序,默认升序,numPartitions 代表分区数,keyfunc 是处理 key 的,在 排序过程中对 key 进行处理

tmp = [('a', 4), ('b', 3), ('c', 2), ('D', 1)]
sc.parallelize(tmp).sortByKey(True, 1).collect()    # 升序[('D', 1), ('a', 4), ('b', 3), ('c', 2)] 1代表分区数
sc.parallelize(tmp).sortByKey(True, 2, keyfunc=lambda k:k.lower()).collect()   # 升序[('a', 4), ('b', 3), ('c', 2), ('D', 1)] D跑到后面了

sc.parallelize(tmp).sortByKey(False, 2, keyfunc=lambda k:k.lower()).collect()# 降序[('D', 1), ('c', 2), ('b', 3), ('a', 4)]

keyfunc 只在 排序过程中起作用,在输出时 keyfunc 不起作用

join(otherDataset [, numPartitions=None]):将 两个 k-v RDD 中 共有的 key 的 value 交叉组合

x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
x.join(y).collect()     # [('a', (1, 2)), ('a', (1, 3))]

 

3.3 行动 操作

有返回值

collect:返回 RDD 中的数据

count:返回 RDD 中元素个数

first:返回 RDD 中第一个元素

max. min.sum:不解释

take(n):返回 RDD 中 前 n 个元素

takeOrdered(n [, key=None]):对 RDD 先进行排序,然后取排序后的 前 n 个数据,key 表示先经过 keyfunc 处理后再进行排序,最终返回的还是原数据

sc.parallelize([9,7,3,2,6,4]).takeOrdered(3)    # [2, 3, 4]
sc.parallelize([9,7,3,2,6,4]).takeOrdered(3, key=lambda x: -x)   # [9, 7, 6]
## 过程如下
#  9,  7,  3,  2,  6,  4  ## 原数据
# -9, -7, -3, -2, -6, -4  ## 经过 keyfunc 处理后的数据
# -9, -7, -6, -4, -3, -2  ## 对处理后的数据升序排序
# -9, -7, -6              ## 取前3个
#  9,  7,  6              ## 对应到原数据

也就是说,keyfunc 只在排序时起作用,在输出时不起作用

foreach(func):运行 func 函数 并行处理 RDD 的所有元素

sc.parallelize([1, 2, 3, 4, 5]).foreach(print)  # 并行打印,不按顺序输出
# 1
# 2
# 4
# 5
# 3

reduce(func):把 RDD 中前两个元素送入 func,得到一个 value,把这个 value 和 下一个元素 送入 func,直至最后一个元素

sc.parallelize([1,2,3,4,5]).reduce(lambda x, y: x + y)  # 15 求和

fold:与 reduce 类似,fold 是有一个 基数,然后 把每个元素 和 基数 送入 func,然后替换该基数,循环,直到最后一个元素

x = sc.parallelize([1,2,3])
neutral_zero_value = 0  # 0 for sum, 1 for multiplication
y = x.fold(neutral_zero_value, lambda obj, accumulated: accumulated + obj) # computes cumulative sum
print(x.collect())  # [1,2,3]
print(y)            # 6

aggregate:对每个分区进行聚合,然后聚合每个分区的聚合结果,详见我的博客 aggregate

countByValue:统计相同元素的个数

sc.parallelize([1,2,3,1,2,5,3,2,3,2]).countByValue().items()    # [(1, 2), (2, 4), (3, 3), (5, 1)]

# 输入 k-v 不按 value 统计,按 k-v 统计
sc.parallelize([('a', 1), ('b', 1)]).countByValue().items()     # [(('a', 1), 1), (('b', 1), 1)]

saveAsTextFile(path [, compressionCodecClass=None]):把 RDD 存储到文件系统中

counts.saveAsTextFile('/usr/lib/spark/out')

输入必须是 路径,且该路径不能事先存在

 

以下方法只适用 key-value 数据

countByKey:统计相同 key 的个数,返回 key-count 

sc.parallelize([("a",1), ("b",1), ("a", 3)]).countByKey()   # defaultdict(<type 'int'>, {'a': 2, 'b': 1})

dictdata= sc.parallelize([("a",1), ("b",1), ("a", 3)]).countByKey()
dictdata.items()        # [('a', 2), ('b', 1)]

 

Python 脚本

如何运行 python 脚本?如何 在 python 中 调用 spark?,这两个问题答案相同。

 

首先需要配置 /etc/profile

# python can call pyspark directly
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/pyspark:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH

python 的搜索路径 ,加上 spark 中 python 和 pyspark,以及 py4j-0.10.4-src.zip,他的作用是 负责 python 和 java 之间的 转换。

 

python 脚本 test1.py

from __future__ import print_function
from pyspark import *
import os
print(os.environ['SPARK_HOME'])
print(os.environ['HADOOP_HOME'])
if __name__ == '__main__':
    sc = SparkContext("spark://hadoop10:7077")
    rdd = sc.parallelize("hello Pyspark world".split(' '))
    counts = rdd.map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a + b)
    counts.saveAsTextFile('/usr/lib/spark/out')
    counts.foreach(print)

    sc.stop()

 

命令行执行

bin/spark-submit test1.py

或者 之间运行 py 文件

python test1.py

 

脚本模式 通过 http://192.168.10.10:8080/ 查看任务

spark教程(四)-python基础编程

 

 

 

参考资料:

https://www.cnblogs.com/yangzhang-home/p/6056133.html  快速入门

https://blog.csdn.net/kl28978113/article/details/80361452  较全教程

http://spark.apache.org/docs/latest/    spark 2.4.4 官网

http://spark.apache.org/docs/latest/api/python/index.html    spark 2.4.4 python API

https://www.cnblogs.com/Vito2008/p/5216324.html

https://blog.csdn.net/proplume/article/details/79798289

https://www.iteblog.com/archives/1396.html#aggregate  RDD 操作 API

https://www.cnblogs.com/yxpblog/p/5269314.html    RDD 操作 API

上一篇:java spark转换算子union、intersection、subtract


下一篇:spark教程(五)-action 操作 group 系列