启动spark-shell客户端
启动集群模式
本机为master节点
export MASTER=spark://`hostname`:7077
bin/spark-shell
或者
bin/spark-shell --master spark://ip:7077
如果没有spark实例启动,直接运行bin/spark-shell则启动的是本地模式,并且线程数为1,该启动方式和下面的效果一样
bin/spark-shell --master local
启动本地模式,线程数为n:
bin/spark-shell --master local[n]
./bin/spark-shell --help 查看更详细的参数信息
spark-shell 默认会创建一个SparkContext对象sc和一个SparkSession对象为spark,利用这些对象我们可以做一些命令行的操纵。
SparkSession可以用来创建SparkContext对象(spark.sparkContext),这个是新版的api。
常用的方法
RDD的方法分为transformations和actions,当对RDD进行transformation时,不会立刻执行,因为transformation是延迟执行的,
此时只会记录RDD的lineage,transformation返回的数据还是RDD。action会立刻执行,返回操作后的结果集
1、textFile: 加载文件,如果文件不是分布式文件,则每个work节点都需要有这个文件,不然运行的时候会提示文件找不到错误
sc.textFile("README.md")
2、addFile:添加文件到spark中
sc.addFile("../test")
可以通过 sc.textFile(SparkFiles.get("test"))进行文件的加载,注意引入import org.apache.spark.SparkFiles
3、count:RDD中有多少条数据
val input = sc.textFile("../README.md")
input.count
4、first:返回RDD中的第一条数据
input.first
5、map:对RDD中的每一条数据做map方法中传递的方法的操作
val spaceSplit = input.map(_.split(" "))
spaceSplit.first
6、flatMap:对RDD的数据进行映射后,合并成一个集合
val fmInput = input.flatMap(_.split(" "))
flatMap:
map:
7、take:取RDD中的前几条数据
val input = sc.textFile("../README.md")
取第一条数据:input.take(1)
取前三条数据:input.take(3)
8、collect:以集合的形式返回RDD中的数据
input.collect
9、reduceByKey:对相同key的值,做给定方法的操作,下面是一个spark实现的map reduce统计文件中的单词个数
input.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect