Spark练习之wordcount,基于排序机制的wordcount

一、原理及其剖析

Spark练习之wordcount,基于排序机制的wordcount

二、pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>spark</groupId>
<artifactId>com.spark</artifactId>
<version>1.0-SNAPSHOT</version> <name>SparkTest</name>
<url>http://maven.apache.org</url> <properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.10.0</scala.version>
<spark.version>1.3.0</spark.version>
<hadoop.version>2.6.4</hadoop.version>
<encoding>UTF-8</encoding>
</properties> <dependencies>
<!-- scala依赖-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- spark依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- hivecontext要用这个依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- hadoop依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency> </dependencies>
<build>
<pluginManagement>
<plugins>
<!-- 编译scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<!-- 编译java的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打包插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build> </project>

**备注:此处会遇到坑,一定要将scala和spark的版本对应好,如果版本不对应,一个过高或一个过低,会导致程序无法正常运行。

三、使用Java进行spark的wordcount练习

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2; import java.util.Arrays; /**
* 使用Java开发本地测试的wordcount程序
*/
public class wordCount { public static void main(String[] args) {
//编写Spark应用程序
//本地执行,直接在main方法执行 //第一步:创建SparkConf对象,设置Spark应用的配置信息
//使用setMaster()可以设置Spark应用程序要连接的Spark集群的master节点的url
//但是如果设置为local则代表在本地运行
SparkConf conf = new SparkConf()
.setAppName("workCount")
.setMaster("local"); //第二步:创建JavaSparkContext对象
//在Spark中,SparkContext是Spark所有功能的一个入口,你无论是用java,scala,甚至是python编写
//都必须要有一个SparkContext,它的主要作用,包括初始化Spark应用程序所需的一些核心组件,
//包括调度器(ADGSchedule、TaskScheduler),还会去到Spark Master节点上进行注册,等等。 //SparkContext,是Spark应用中,最重要的一个对象
//在spark中,编写不同类型的Spark应用程序,使用的SparkContext是不同的,
// 如果使用scala,使用的就是原生的SparkContext对象
//如果使用Java,那么就是JavaSparkContext对象
//如果是开发Spark SQL程序,那么就是SQLContext、HiveContext
//如果是Spark Streaming程序,那么就是它独有的SparkContext
//以此类推 JavaSparkContext sc = new JavaSparkContext(conf); //第三步:要针对输入源(hdfs文件、本地文件,等等),创建一个初始的RDD。
//输入源中的数据会打散,分配到RDD的每个partition中,从而形成一个初始的分布式的数据集
//这里,进行本地测试,所以针对本地文件
//SparkContext中,用于根据文件类型的输入源创建RDD的方法,叫做textFile方法
//在Java中,创建的普通RDD,都叫做JavaRDD
//RDD中,有元素这种概念,如果是hdfs活着本地文件呢,创建的RDD,每一个元素就相当于是文件里的一行
JavaRDD<String> lines = sc.textFile("C://Users//10902//Desktop//spark.txt"); //第四步:对初始RDD进行transformation操作,也就是一些计算操作
//通常操作会通过创建function,并配合RDD的map、flagMap等算子来执行
//function,通常,如果比较简单,则创建指定Function的匿名内部类
//但是如果function比较复杂,则会单独创建一个类,作为实现这个function接口的类 //先将每一行拆分成单个的单词
//FlatMapFunction,有两个泛型参数,分别代表了输入和输出类型
//对于我本地的测试文件,输入肯定是String,因为是一行一行的文本,输出,也是String,因为是每一行的文本
//flagMap:其实就是,将RDD的一个元素,给拆分成一个或多个元素
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 1L; @Override
public Iterable<String> call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}
}); //接着,需要将每一个单词,映射为(单词,1)的这种格式
//因为只有这样,才能根据单词作为key,来进行每个单词的出现次数的累加
//mapToPair,其实就是将每个元素,映射为一个(v1,v2)这样的Tuple2类型的元素
//tuple2是scala类型,包含了两个值
//mapToPair这个算子,要求的是与PairFunction配合使用,第一个泛型参数代表了输入类型
//第二个和第三个泛型参数,代表的输入的Tuple2的第一个值和第二个值的类型
//JavaPairRdd的两个泛型参数,分别代表了tuple元素的第一个值和第二个值的类型
JavaPairRDD<String, Integer> pairs = words.mapToPair( new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
}
); //接着,需要以单词作为key,统计每个单词出现的次数
//这里要使用reduceByKey这个算子,对每个key对应的value,都进行reduce操作
//比如JavaPairRdd中有个元素,分别为(hello,1)(hello,1)(hello,1)(world,1)
//reduce操作,相当于是把第一个值和第二个值进行计算,然后再将结果与第三个值进行计算
//比如这里的hello,那么就相当于是,首先1 + 1 = 2 ,然后再将2 + 1 = 3
//最后返回的JavaPairRDD中的元素,也是tuple,但是第一个值就是每个key,第二个值就是key的value
//reduce之后的结果,相当于就是每个单词出现的次数
JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(
new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L; @Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}
); //到这里为止,我们通过几个spark算子操作,以及统计出了单词的次数
//但是,之前我们使用的flatMap、mapToPair、reduceByKey这种操作,都叫做transformation操作
//一个spark应用中,光是有transformation操作,是不行的,是不会执行的,必须要有一种叫做action
//最后,使用一种叫做action操作的,比如说,foreach,来触发程序的执行
wordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> wordCount) throws Exception {
System.out.println(wordCount._1 + " 出现了" + wordCount._2 + " 次");
}
}); sc.close();
} }

四、使用scala进行spark的wordcount练习


import org.apache.spark.{SparkConf, SparkContext} object WordCount {
def main(args: Array[String]): Unit = {
/**
* 第一步:创建Spark的配置对象SparkConf,设置Spark程序运行时的配置信息,
* 例如说通过设置setMaster来设置程序要链接的Spark集群的Master的URL,
* 如果设置为local,则代表Spark程序在本地运行。
*/
val conf = new SparkConf //创建SparkConf对象
conf.setAppName("WordCount") //设置应用程序的名称,在程序运行的监控界面可以看到名称
conf.setMaster("local") //此时,程序在本地运行,不需要安装Spark集群 /**
* 第二步:创建SparkContext对象
* SparkContext是Spark程序所有功能的唯一入口,无论是采用scala、java、Python,R等都
* 必须有一个SparkContext。SparkContext核心作用:初始化Spark应用程序运行所需要的核心组件,包括
* DAGScheduler,TaskScheduler、SchedulerBackend同时还会负责Spark程序往Master注册程序等。
* SparkContext是这个Spark程序中最为至关重要的一个对象。
*/
val sc = new SparkContext(conf) /**
* 第三步:根据具体的数据源(HDFS、HBase、Local FS、DB、S3等)通过SparkContext创建RDD。
* RDD的创建方式有三种:根据外部的数据源(HDFS)、根据Scala集合、其他的RDD操作。数据会被RDD划分成一系列的
* Partitions,分配到每个Partition的数据属于一个Task的处理范畴
*/
val lines = sc.textFile("C://Users//10902//Desktop//spark.txt", 1) // /**
* 第四步:对初始化的RDD进行Transformation级别处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算。
*/
/**
* 4.1、对每一行的字符串拆分成单个的单词
*/
val words = lines.flatMap { line => line.split(" ") } //对每一行的字符串进行单词拆分并把所有行的拆分结果通过flat合并成为一 /**
* 4.2、在单词拆分的基础上对每个单词实例计数为1,也就是word => (word,1)
*/
val pairs = words.map { word => (word, 1) } /**
* 4.3、在每个单词实例计数为1基础之上统计每个单词在文件中出现的总次数
*/
val wordCounts = pairs.reduceByKey(_ + _) //对相同的key,进行value的累计 wordCounts.foreach(map => println(map._1 + ":" + map._2)) sc.stop()
}
}

五、基于排序机制的wordcount

import org.apache.spark.{SparkConf, SparkContext}

object ScalaSortWordCount {
def main(args: Array[String]) {
val conf = new SparkConf //创建SparkConf对象
conf.setAppName("ScalaSortWordCount") //设置应用程序的名称,在程序运行的监控界面可以看到名称
conf.setMaster("local") //此时,程序在本地运行,不需要安装Spark集群 val sc = new SparkContext(conf)
val lines = sc.textFile("C://Users//xxx//Desktop//spark.txt", 1)
val words = lines.flatMap { line => line.split("") }
val pairs = words.map { word => (word, 1) }
val wordCounts = pairs.reduceByKey(_ + _) val countWords = wordCounts.map { wordCount => (wordCount._2, wordCount._1) }
val sortedCountWords = countWords.sortByKey(false)
val sortedWordCounts = sortedCountWords.map { sortedCountWord => (sortedCountWord._2, sortedCountWord._1) } sortedWordCounts.foreach(sortWordCount => println(sortWordCount._1 + "出现了:" + sortWordCount._2 + "次"))
}
}
上一篇:redis函数总结


下一篇:hdu 1520 Anniversary party(第一道树形dp)