RDD概念
RDD是Spark的数据核心抽象,全名弹性分布式数据集(Resilient Distributed Dataset)
定义:分布式元素集合
特点:
- 不是变量,不可改变
- 可分为多个分区,分区可运行在集群中的不同节点上
- 支持Python,Java,Scala中类型对象,支持用户自定义对象
RDD创建
包括两种:
- 读取外部数据创建,例如读取一个文件的内容
- 从已有RDD通过转化操作创建
RDD操作
包括两种:
- 转化操作:由一个 RDD 经过处理,然后生成一个新的 RDD
- 行动操作:对RDD进行计算,返回结果
特点:
- Spark在接收到一个定义新的RDD时,不会立刻生成新的 RDD,而会等到对这个新RDD进行行动操作时才会生成,这样可以防止不必要的中间结果浪费内存
- 默认情况下Spark在执行完行动操作后会释放此RDD(减少内存占用),之后再对此RDD做行动操作时,需要重新生成RDD。因此需要重用RDD需要使用persist(),将其缓存到内存中或磁盘中
总结
Spark程序的工作流程:
- 从外部数据创建初始RDD
- 由初始RDD经过转化操作,得到新RDD
- 对需要重用的RDD使用persist()持久化
- 使用行动操作对RDD进行计算,实现工作目标
编程实践
目标:从log.txt中找出包含“warning”字段的行数
思路:
- 首先由log.txt生成初始RDD
- 通过filter()操作生成包含“warning”字段的RDD
- 最后通过count()得到行数
代码:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local") \
.appName("logtest") \
.getOrCreate()
sc = spark.sparkContext
def findaccept(line):
return "warning" in line
def main():
lines = sc.textFile("log.txt")
pythonlines = lines.filter(findaccept)
print(pythonlines.count())
if __name__ == '__main__':
main()