一.应用场景
在Spark程序中调用Linux命令,实现一些程序难以实现的功能,例如:发送模拟邮件、文件打包或解压等等
二.代码实现
package big.data.analyse.linux import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession import scala.sys.process._
/**
* Created by zhen on 2019/10/10.
*/
object SparkUseLinux {
/**
* 设置日志级别
*/
Logger.getLogger("org").setLevel(Level.WARN)
def main(args: Array[String]) {
/**
* 调用linux命令解压zip包
*/
println("===开始解压包数据===")
val path = "D:\\testData.zip"
val ml = "unzip " + path + " -d D:\\"
s"$ml".!
/**
* 创建入口
*/
val fileUrl = "D:\\testData.txt"
val spark = SparkSession.builder().appName("SparkUseLinux").master("local[2]").getOrCreate()
/**
* 加载解压后的数据,计算wordcount
*/
val rdd = spark.sparkContext.textFile(fileUrl)
.map(row => row.replace("(", " ").replace(")", " ").replace(".", " ").replace("\"", " ").replace(":", " "))//去除文字中的,防止出现歧义
.flatMap(row => row.split(" "))//把字符串转换为字符集合
.map(row => (row, 1))//把每个字符串转换为map,便于计数
.reduceByKey(_+_)//计数
.filter(row => !row._1.isEmpty)
.filter(row => row._2 > 1) println("---结果---")
rdd.foreach(println)
/**
* 关闭入口
*/
spark.stop()
}
}
三.结果
执行前:
执行后:
结果: