首先,官网表明了Spark2.0之后,RDD被DataSet替代了,虽然Spark2.0仍然支持RDD的接口,但是官方强烈推荐使用DataSet。
安全
Spark的安全机制默认是被关闭的,所以有可能会受到攻击,但实际生产过程中,Spark集群更多的是搭建在公司内网中,不对外暴露,个人认为不开启安全机制并不影响。
Spark Shell的使用
SparkShell是学习SparkAPI的一种方式,同时是一款交互式的数据分析工具。官网提供了Scala和Python两种方式的例子,这里以Python为主。
官网的实践中涉及了一些概念,这里提前给出:
- DataSet:
Spark的基础数据抽象,可以理解为分布式的数据集合
DataSet有两种创建方式:
(1) 通过Hadoop的InputFormats创建
(2)通过其他DataSet转换得到 - DataFrame:
DataFrame是DataSet的强类型实现,即DataSet[ROW]
DataFrame的创建方式与DataSet相同
# 启动pyspark
./bin/pyspark
# 创建DataFrmae
textFile=spark.read.text("README.txt")
# 一些DataFrmae的操作
# 获取DataFrmae中的行数
textFile.count()
# 获取DataFrame中的第一行
textFile.first()
# 通过转换操作获取新的DataFrame
# filter算子:对DataFrame中的数据执行过滤操作,返回新的DataFrame
linesWithSpark=textFile.filter(textFile.value.contains("Spark"))
lineWithSpark.count()
DataSet的操作
from pyspark.sql.functions import *
# split算子:对数据按分隔符进行分割
# size算子:返回数据集的大小
# name算子:设置别名
# agg算子:对列上的数据执行聚合函数
# collect算子:Action算子,触发SparkContext提交job
textFile.select(size(split(textFile.value,"\s+")).name("numWords")).agg(max(col("numWords"))).collect()
# spark实现wordcount
# explode算子:将一行数据打成一列
wordCounts=textFile.select(explode(split(textFile.vaule,"\s+")).allias("word")).groupby("word").count().collect()
Caching
Spark支持将频繁访问的数据加载到内存缓存中,这对需要频繁访问的大数据集非常友好,避免了每次使用数据集时从数据源依次执行计算到需要访问的数据集
lineWithSpark.cache()
lineWithSpark.count()
lineWithSpark.count()
一个简单的实例
- 在setup.py添加pyspark
install_requires=[
'pyspark=={site.SPARK_VERSION}'
]
- 创建一个简单的spark实例
from pyspark.sql import sparksession
logFile="YOUR_SPARK_HOME/README.md"
spark=Sparksession.builder.appName("SimpleApp").getOrCreate()
logdata=spark.read.text(logFile).cache()
numAs=logdata.filter(logdata.value.contains("a")).count()
numBs=logdata.filter(logdata.value.contains("b")).count()
ptint("Lines with a:%i, Lines with b:%i" % (numAs,numBs))
spark.stop()
- 运行SimpleApp实例
如果你的程序中使用了第三方库,可以通过--py-files参数来指定jar包
spark-submit --master local[4] SimpleApp.py
个人博客:http://datasheep.info
CSDN:https://blog.csdn.net/qq_36369061
个人公众号:【DataSheep杨】