自定义采集器
package com.gazikel.streamaing
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiver
import scala.util.Random
// 自定义数据采集器
object SparkStreaming01_MyReciver {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("SparkStreaming01_MyReceiver").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(3))
// 接受数据
val messageDS: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver())
messageDS.print()
ssc.start()
ssc.awaitTermination()
}
/**
* 自定义数据采集器
*
*/
// 1. 继承Receiver,定义泛型,传递参数
class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) {
private var flag = true
override def onStart(): Unit = {
new Thread(new Runnable {
override def run(): Unit = {
while(flag) {
val message = "采集的数据为:" + new Random().nextInt(10).toString
store(message)
Thread.sleep(500)
}
}
}).start()
}
override def onStop(): Unit = {
flag = false
}
}
}
与Kafka连接
- 编写代码
package com.gazikel.streamaing
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreaming02_Kafka {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("SparkStreaming02_Kafka").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(3))
val kafkaParam = Map[String, Object] (
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG->"192.168.10.132:9092",
ConsumerConfig.GROUP_ID_CONFIG->"",
"key.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer"
)
val kafkaDataDS = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
//
ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaParam)
)
kafkaDataDS.map(_.value()).print()
ssc.start()
ssc.awaitTermination()
}
}
- 在 kafka中创建topics
在创建topic之前,需要启动zookeeper
$ zkServer.sh start
创建topic话题为atguigu
$ bin/kafka-topics.sh --zookeeper spark:2181 --create --topic atguigu --partitions 3 --replication-factor 1
- 生产数据
启动kafka
$ kafka-server-start.sh -daemon /opt/kafka_2.12-3.1.0/config/server.properties
生产数据操作
$ ./kafka-console-producer.sh --broker-list spark:9092 --topic atguigu
优雅的关闭
package com.gazikel.streamaing
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}
/**
* 优雅的关闭
*/
object SparkStreaming06_Close {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparStreaming06_Close")
val ssc = new StreamingContext(sparkConf, Seconds(3))
ssc.start()
// 如果想要关闭采集器,那么需要创建新的线程
// 需要在第三方中添加关闭状态
new Thread(
new Runnable {
override def run(): Unit = {
// 优雅的关闭
// 将当前的数据处理完毕后,在关闭进程
while (true) {
if(true) {
// 获取SparkStreaming的状态
val state = ssc.getState()
if (state == StreamingContextState.ACTIVE) {
ssc.stop(true, true)
}
System.exit(1)
}
}
}
}
)
ssc.awaitTermination()
}
}
恢复数据
package com.gazikel.streamaing
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreaming07_Resume {
def main(args: Array[String]): Unit = {
val ssc: StreamingContext = StreamingContext.getActiveOrCreate("check_point", () => {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming07_Resume")
val ssc = new StreamingContext(sparkConf, Seconds(3))
ssc
})
ssc.checkpoint("check_point")
ssc.start()
ssc.awaitTermination()
}
}