Spark Streaming
Spark Streaming 是基于spark的流式批处理引擎,其基本原理是把输入数据以某一时间间隔批量的处理,当批处理间隔缩短到秒级时,便可以用于处理实时数据流。
Spark Streaming入门小程序
第一步:
在Linux上执行以下命令安装socket客户端工具,模拟发送数据: yum -y install nc
执行以下命令,向指定的端口9999发送数据: nc -lk 9999
光标会一直闪烁,直接输入数据即可,例如: hello word
第二步:
在idea中创建maven工程,命名为:Spark_Streaming,
分别在java与test中创建scala文件夹,赋予相应属性,打开pom.xml添加以下配置信息:
<properties> <scala.version>2.11.8</scala.version> <spark.version>2.2.0</spark.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> <!-- <verbal>true</verbal>--> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass></mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
在java包中创建新的包,命名为:Streaming,在该包下新建一个属性为object的Scala class文件,并命名为:Streaming_Dome,编写以下代码:
package Streaming import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext} object Streaming_Dome { def main(args: Array[String]): Unit = { val sparkContext = new SparkContext(new SparkConf().setMaster("local[6]").setAppName("StreamingContext")) sparkContext.setLogLevel("WARN") //创建streamingContext,有两个参数,每个3秒执行一次 val streamingContext = new StreamingContext(sparkContext,Seconds(3)) //调用streamingContext从socket当中获取数 val textSteram: ReceiverInputDStream[String] = streamingContext.socketTextStream("master",9999) //将接收的数据打印出来 textSteram.print() //启动streaming程序 streamingContext.start() //程序等待 手动结束 streamingContext.awaitTermination() } }
运行Streaming_Dome程序
成功!