1、版本说明
-
在spark2.0版本以前,spakr编程接口是RDD(Resilient Distributed Dataset,弹性分布式数据集),spark2.0版本即以上,RDD被Dataset取代,Dataset比RDD更为强大,在底层得到了许多优化了。当然2.0+版本仍然支持RDD,但官方建议使用Dataset。
2、安全
-
spark的安全模式默认是关闭的,这意味着你可能收到攻击。
3、利用Spark Shell进行交互式数据分析
- Spark的shell提供了一种学习API的简单方法,以及一种以交互方式分析数据的强大工具。
- 可以通过使用scala或者python进行编程。
-
在spark的安装根目录下启动。
3.1、Scala方式
启动
./bin/spark-shell
读取一个文件用来创建一个新的数据集Dataset
对数据集进行操作
textFile.count()
textFile.first()
val linesWithSpark = textFile.filter(line => line.contains("Spark"))
textFile.filter(line => line.contains("Spark")).count()
3.2、python方式
启动
./bin/pyspark
textFile = spark.read.text("README.md")
textFile.count()
textFile.first()
linesWithSpark = textFile.filter(textFile.value.contains("Spark"))
textFile.filter(textFile.value.contains("Spark")).count()
4、Dataset的更多操作
1.查找文件中长度最大的字符串,并返回长度
textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
2.实现wordcounts
val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts.collect()
![image.png](https://upload-images.jianshu.io/upload_images/4045682-f386bbbf70242ca6.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
5、缓存Caching
- Spark还支持将数据集提取到群集范围的内存缓存中。这在重复访问数据时非常有用,例如查询小的“热”数据集或运行像PageRank这样的迭代算法时。举个简单的例子,让我们标记linesWithSpark要缓存的数据集:
linesWithSpark.cache()
linesWithSpark.count()
通过文件运行
- 新建一个SimpleApp.scala
/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession
object SimpleApp {
def main(args: Array[String]) {
val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
val logData = spark.read.textFile(logFile).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println(s"Lines with a: $numAs, Lines with b: $numBs")
spark.stop()
}
}
- 运行结果