pyspark 包介绍
内容
PySpark是针对Spark的Python API。根据网上提供的资料,现在汇总一下这些类的基本用法,并举例说明如何具体使用。也是总结一下经常用到的这些公有类的使用方式。方便初学者查询及使用。
Public 类们:
- SparkContext:
Spark 功能的主入口。
- RDD:
弹性分布式数据集,就是在Spark中的基础抽象
- Broadcast:
一个在task之间重用的广播变量。
- Accumulator:
一个“add-only” 共享变量,task只能增加值。
- SparkConf:
用于配置Spark.
- SparkFiles:
在job中访问文件。
- StorageLevel:
更细粒度的缓存持久化级别。
- 将分为两篇介绍这些类的内容,这里首先介绍SparkConf类
- 1. class pyspark.SparkConf(loadDefaults=True, _jvm=None, _jconf=None)
-
配置一个Spark应用,一般用来设置各种Spark的键值对作为参数。
大多数时候,使用SparkConf()来创建SparkConf对象,也用于载入来自spark.* Java系统的属性值。此时,在SparkConf对象上设置的任何参数都有高于系统属性的优先级。
对于单元测试,也能调用SparkConf(false)来略过额外的配置,无论系统属性是什么都可以获得相同的配置。
这个类中的设值方法都是支持链式结构的,例如,你可以这样编写配置conf.setMaster(“local”).setAppName(“My app”)。
注意:
一旦SparkConf对象被传递给Spark,它就被复制并且不能被其他人修改。
- contains(key)
-
配置中是否包含一个指定键。
- get(key, defaultValue=None)
-
获取配置的某些键值,或者返回默认值。
- getAll()
-
得到所有的键值对的list。
- set(key, value)
-
设置配置属性。
- setAll(pairs)
-
通过传递一个键值对的list,为多个参数赋值。
- etAppName(value)
-
设置应用名称
- setExecutorEnv(key=None, value=None, pairs=None)
-
设置环境变量复制给执行器。
- setIfMissing(key, value)
-
如果没有,则设置一个配置属性。
- setMaster(value)
-
设置主连接地址。
- setSparkHome(value)
-
设置工作节点上的Spark安装路径。
- toDebugString()
-
返回一个可打印的配置版本。
- 2. class pyspark.SparkContext(master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=<class 'pyspark.profiler.BasicProfiler'>)
-
Spark功能的主入口,SparkContext 代表到Spark 集群的连接,并且在集群上能创建RDD和broadcast。
- PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')
- accumulator(value, accum_param=None)
-
用指定的初始化值创建一个Accumulator累加器。使用AccumulatorParam对象定义如何添加数据类型的值。默认AccumulatorParams为整型和浮点型。如果其他类型需要自定义。
- addFile(path, recursive=False)
-
使用在每个节点上的Spark job添加文件下载。这里path 参数可以使本地文件也可以使在HDFS中的文件,也可以是HTTP、HTTPS或者URI。
在Spark的job中访问文件,使用L{SparkFiles.get(fileName)<pyspark.files.SparkFiles.get>}可以找到下载位置。
如果递归选项被设置为“TRUE”则路径能被指定。当前路径仅仅支持Hadoop文件系统。
>>> from pyspark import SparkFiles
>>> path = os.path.join(tempdir, "test.txt")
>>> with open(path, "w") as testFile:
... _ = testFile.write("")
>>> sc.addFile(path)
>>> def func(iterator):
... with open(SparkFiles.get("test.txt")) as testFile:
... fileVal = int(testFile.readline())
... return [x * fileVal for x in iterator]
>>> sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect()
[100, 200, 300, 400]
- addPyFile(path)
-
为所有将在SparkContext上执行的任务添加一个a.py或者.zip的附件。这里path 参数可以使本地文件也可以使在HDFS中的文件,也可以是HTTP、HTTPS或者FTP URI。
- applicationId
-
Spark应用的唯一ID,它的格式取决于调度器实现。
- 本地模式下像这样的ID‘local-1433865536131’
- 模式下像这样的ID‘application_1433865536131_34483’
>>> sc.applicationId
u'local-...'
- binaryFiles(path, minPartitions=None)
-
注意
- 从HDFS上读取二进制文件的路径,本地文件系统(在所有节点上都可用),或者其他hadoop支持的文件系统URI党组偶一个二进制数组。每个文件作为单独的记录,并且返回一个键值对,这个键就是每个文件的了路径,值就是每个文件的内容。
- 小文件优先选择,大文件也可以,但是会引起性能问题。
- binaryRecords(path, recordLength)
- path – 输入文件路径
- recordLength – 分割记录的长度(位数)
-
注意
从平面二进制文件中载入数据,假设每个记录都是一套指定数字格式的数字(ByteBuffer),并且每个记录位数的数是恒定的。
- broadcast(value)
-
广播一个制度变量到集群,返回一个L{Broadcast<pyspark.broadcast.Broadcast>} 对象在分布式函数中读取。这个变量将只发一次给每个集群。
- cancelAllJobs()
-
取消所有已排程的或者正在运行的job。
- cancelJobGroup(groupId)
-
取消指定组的已激活job,查看SparkContext.setJobGroup更多信息。
- defaultMinPartitions
-
当不被用户指定时,默认Hadoop RDDs 为最小分区。
- defaultParallelism
-
当不被用户指定时,默认并行级别执行。(例如reduce task)
- dump_profiles(path)
-
转存配置信息到目录路径下。
- emptyRDD()
-
创建没有分区或者元素的RDD。
- getConf()
- getLocalProperty(key)
-
在当前线程中得到一个本地设置属性。
- classmethod getOrCreate(conf=None)
- 参数:conf – SparkConf (optional)
-
获取或者实例化一个SparkContext并且注册为单例模式对象。
- hadoopFile(path, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)、
-
用任意来自HDFS的键和值类读取一个老的Hadoop输入格式,本地系统(所有节点可用),或者任何支持Hadoop的文件系统的URI。这个机制是与sc.sequenceFile是一样的。
Hadoop 配置可以作为Python的字典传递。这将被转化成Java中的配置。
参数:
- path – Hadoop文件路径
- inputFormatClass – 输入的Hadoop文件的规范格式(例如 “org.apache.hadoop.mapred.TextInputFormat”)
- keyClass – 可写键类的合格类名 (例如“org.apache.hadoop.io.Text”)
- valueClass –可写值类的合格类名 (e.g. “org.apache.hadoop.io.LongWritable”)
- keyConverter – (默认为none)
- valueConverter – (默认为none)
- conf – Hadoop配置,作为一个字典传值 (默认为none)
- batchSize – Python对象的数量代表一个单一的JAVA对象 (默认 0, 表示自动匹配batchSize)
- hadoopRDD(inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)
-
读取Hadoop输入格式用任意键值类。与上面的类相似。
参数:
- inputFormatClass – 输入的Hadoop文件的规范格式(例如 “org.apache.hadoop.mapred.TextInputFormat”)
- keyClass – 可写键类的合格类名 (例如“org.apache.hadoop.io.Text”)
- valueClass –可写值类的合格类名 (e.g. “org.apache.hadoop.io.LongWritable”)
- keyConverter – (默认为none)
- valueConverter – (默认为none)
- conf – Hadoop配置,作为一个字典传值 (默认为none)
- batchSize – Python对象的数量代表一个单一的JAVA对象 (默认 0, 表示自动匹配batchSize)
- newAPIHadoopFile(path, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)
-
与上面的功能类似.
- newAPIHadoopRDD(inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)
-
任意Hadoop的配置作为参数传递。
- parallelize(c, numSlices=None)
-
分配一个本Python集合构成一个RDD。如果输入代表了一个性能范围,建议使用xrange。
>>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
[[0], [2], [3], [4], [6]]
>>> sc.parallelize(xrange(0, 6, 2), 5).glom().collect()
[[], [0], [], [2], [4]]
- pickleFile(name, minPartitions=None)
-
载入使用RDD.saveAsPickleFile方法保存的RDD。
>>> tmpFile = NamedTemporaryFile(delete=True)
>>> tmpFile.close()
>>> sc.parallelize(range(10)).saveAsPickleFile(tmpFile.name, 5)
>>> sorted(sc.pickleFile(tmpFile.name, 3).collect())
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
- range(start, end=None, step=1, numSlices=None)
-
创建一个int类型元素组成的RDD,从开始值到结束(不包含结束),里面都是按照步长增长的元素。这就要用到Python内置的函数range()。如果只有一个参数调用,这个参数就表示结束值,开始值默认为0.
参数:
- start –起始值
- end – 结束值(不包含)
- step – 步长(默认: 1)
- numSlices –RDD分区数量(切片数)
返回值:RDD
>>> sc.range(5).collect()
[0, 1, 2, 3, 4]
>>> sc.range(2, 4).collect()
[2, 3]
>>> sc.range(1, 7, 2).collect()
[1, 3, 5]
- runJob(rdd, partitionFunc, partitions=None, allowLocal=False)
-
执行指定的partitionFunc 在指定的分区,返回一个元素数组。如果不指定分区,则将运行在所有分区上。
>>> myRDD = sc.parallelize(range(6), 3)
>>> sc.runJob(myRDD, lambda part: [x * x for x in part])
[0, 1, 4, 9, 16, 25] >>> myRDD = sc.parallelize(range(6), 3)
>>> sc.runJob(myRDD, lambda part: [x * x for x in part], [0, 2], True)
[0, 1, 16, 25]
- sequenceFile(path, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, minSplits=None, batchSize=0)
-
读取Hadoop 的SequenceFile,机制如下:
1.一个Java RDD通过SequenceFile或者其他输入格式创建,需要键值的可写类参数。
2.序列化
3.如果失败,则对每个键值调用‘toString’。
4.在Python上,PickleSerializer用来反序列化。
参数:
path –序列化文件路径
keyClass – 可用键类(例如 “org.apache.hadoop.io.Text”)
valueClass – 可用值类 (例如 “org.apache.hadoop.io.LongWritable”)
keyConverter –
valueConverter –
minSplits – 数据集最低分割数(默认 min(2, sc.defaultParallelism))
batchSize – 代表一个JAVA对象Python对象的数量 (默认0, 自动)
setCheckpointDir(dirName)
设定作为检查点的RDD的目录,如果运行在集群上,则目录一定时HDFS路径。
- setJobGroup(groupId, description, interruptOnCancel=False)
-
分配一个组ID给所有被这个线程开启的job。
通常,一个执行单位由多个Spark 的action或者job组成。应用程序可以将所有把所有job组成一个组,给一个组的描述。一旦设置好,Spark的web UI 将关联job和组。
应用使用SparkContext.cancelJobGroup来取消组。
>>> import threading
>>> from time import sleep
>>> result = "Not Set"
>>> lock = threading.Lock()
>>> def map_func(x):
... sleep(100)
... raise Exception("Task should have been cancelled")
>>> def start_job(x):
... global result
... try:
... sc.setJobGroup("job_to_cancel", "some description")
... result = sc.parallelize(range(x)).map(map_func).collect()
... except Exception as e:
... result = "Cancelled"
... lock.release()
>>> def stop_job():
... sleep(5)
... sc.cancelJobGroup("job_to_cancel")
>>> supress = lock.acquire()
>>> supress = threading.Thread(target=start_job, args=(10,)).start()
>>> supress = threading.Thread(target=stop_job).start()
>>> supress = lock.acquire()
>>> print(result)
Cancelled如果对于job组,interruptOnCancel被设定为True,那么那么取消job将在执行线程中调用Thread.interrupt()。这对于确保任务实时停止是有作用的。但是默认情况下,HDFS可以通过标记节点为dead状态来停止线程。
- setLocalProperty(key, value)
-
设定本地影响提交工作的属性,例如Spark 公平调度池。
- setLogLevel(logLevel)
-
控制日志级别。重写任何用户自定义的日志设定。有效的日志级别包括:ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN。
- classmethod setSystemProperty(key, value)
-
设定Java系统属性,例如spark.executor.memory,这一定要在实例化SparkContext之前被激活。
- show_profiles()
-
打印配置信息到标准输出。
- sparkUser()
-
为运行SparkContext 的用户获得SPARK_USER
- startTime
-
当SparkContext被发起,则返回新的时间纪元。
- statusTracker()
-
Return StatusTracker object
返回StatusTracker对象
- stop()
-
关闭SparkContext。
- textFile(name, minPartitions=None, use_unicode=True)
-
从HDFS中读取一个text文件,本地文件系统(所有节点可用),或者任何支持Hadoop的文件系统的URI,然后返回一个字符串类型的RDD。
如果用户use_unicode为False,则strings类型将为str(用utf-8编码),这是一种比unicode更快、更小的编码(Spark1.2以后加入)。
>>> path = os.path.join(tempdir, "sample-text.txt")
>>> with open(path, "w") as testFile:
... _ = testFile.write("Hello world!")
>>> textFile = sc.textFile(path)
>>> textFile.collect()
[u'Hello world!']
- uiWebUrl
-
返回由SparkContext的SparkUI实例化开启的URL。
- union(rdds)
-
建立RDD列表的联合。
支持不同序列化格式的RDD的unions()方法,需要使用默认的串行器将它们强制序列化(串行化):
>>> path = os.path.join(tempdir, "union-text.txt")
>>> with open(path, "w") as testFile:
... _ = testFile.write("Hello")
>>> textFile = sc.textFile(path)
>>> textFile.collect()
[u'Hello']
>>> parallelized = sc.parallelize(["World!"])
>>> sorted(sc.union([textFile, parallelized]).collect())
[u'Hello', 'World!']
- version
-
应用运行的Spark的版本。
- wholeTextFiles(path, minPartitions=None, use_unicode=True)
-
读取HDFS的文本文件的路径,这是一个本地文件系统(所有节点可用),或者任何支持Hadoop的文件系统的URI。每个文件被当做一个独立记录来读取,然后返回一个键值对,键为每个文件的路径,值为每个文件的内容。
如果用户use_unicode为False,则strings类型将为str(用utf-8编码),这是一种比unicode更快、更小的编码(Spark1.2以后加入)。
举例说明,如果有如下文件:
hdfs://a-hdfs-path/part-00000
hdfs://a-hdfs-path/part-00001
...
hdfs://a-hdfs-path/part-nnnnn如果执行 rdd = sparkContext.wholeTextFiles(“hdfs://a-hdfs-path”), 那么rdd 包含:
(a-hdfs-path/part-00000, its content)
(a-hdfs-path/part-00001, its content)
...
(a-hdfs-path/part-nnnnn, its content)注意
这种情况适合小文件,因为每个文件都会被载入到内存中。消耗很多内存啊!
>>> dirPath = os.path.join(tempdir, "files")
>>> os.mkdir(dirPath)
>>> with open(os.path.join(dirPath, "1.txt"), "w") as file1:
... _ = file1.write("")
>>> with open(os.path.join(dirPath, "2.txt"), "w") as file2:
... _ = file2.write("")
>>> textFiles = sc.wholeTextFiles(dirPath)
>>> sorted(textFiles.collect())
[(u'.../1.txt', u''), (u'.../2.txt', u'')]本篇接少了两个类SparkContext和SparkConf,下一篇将会介绍其余的几个类的内容,这是一篇汇总性质的文章主要便于以后使用时知道具体类中的方法调用为刚刚接触Spark和我差不多人提供参考。还有理解不到位的请多多理解。