SparkStreaming

自定义采集器

package com.gazikel.streamaing

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiver

import scala.util.Random

// 自定义数据采集器
object SparkStreaming01_MyReciver {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName("SparkStreaming01_MyReceiver").setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    // 接受数据
    val messageDS: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver())
    messageDS.print()

    ssc.start()
    ssc.awaitTermination()
  }

  /**
   * 自定义数据采集器
   *
   */
  // 1. 继承Receiver,定义泛型,传递参数
  class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) {

    private var flag = true

    override def onStart(): Unit = {
      new Thread(new Runnable {
        override def run(): Unit = {

          while(flag) {
            val message = "采集的数据为:" + new Random().nextInt(10).toString

            store(message)

            Thread.sleep(500)
          }


        }
      }).start()
    }

    override def onStop(): Unit = {
      flag = false
    }
  }

}

与Kafka连接

  1. 编写代码
package com.gazikel.streamaing

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming02_Kafka {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("SparkStreaming02_Kafka").setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    val kafkaParam = Map[String, Object] (
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG->"192.168.10.132:9092",
      ConsumerConfig.GROUP_ID_CONFIG->"",
      "key.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer"
    )

    val kafkaDataDS = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      //
      ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaParam)
    )

    kafkaDataDS.map(_.value()).print()


    ssc.start()
    ssc.awaitTermination()
  }
}
  1. 在 kafka中创建topics
    在创建topic之前,需要启动zookeeper
$ zkServer.sh start

创建topic话题为atguigu

$ bin/kafka-topics.sh --zookeeper spark:2181 --create --topic atguigu --partitions 3 --replication-factor 1
  1. 生产数据
    启动kafka
$ kafka-server-start.sh -daemon /opt/kafka_2.12-3.1.0/config/server.properties

生产数据操作

$ ./kafka-console-producer.sh --broker-list spark:9092 --topic atguigu

SparkStreaming

优雅的关闭

package com.gazikel.streamaing

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}

/**
 * 优雅的关闭
 */
object SparkStreaming06_Close {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparStreaming06_Close")
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    
    ssc.start()

    // 如果想要关闭采集器,那么需要创建新的线程
    // 需要在第三方中添加关闭状态
    new Thread(
      new Runnable {
        override def run(): Unit = {
          // 优雅的关闭
          // 将当前的数据处理完毕后,在关闭进程
          while (true) {
            if(true) {
              // 获取SparkStreaming的状态
              val state = ssc.getState()
              if (state == StreamingContextState.ACTIVE) {
                ssc.stop(true, true)
              }
              System.exit(1)
              
            }
          }
        }
      }
    )
    
    ssc.awaitTermination()

    
  }
}

恢复数据

package com.gazikel.streamaing

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming07_Resume {
  def main(args: Array[String]): Unit = {

    val ssc: StreamingContext = StreamingContext.getActiveOrCreate("check_point", () => {
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming07_Resume")
      val ssc = new StreamingContext(sparkConf, Seconds(3))

      ssc
    })

    ssc.checkpoint("check_point")

    ssc.start()
    ssc.awaitTermination()
  }
}
上一篇:selenium 上传文件


下一篇:文件引入方式 link 和 @import