我使用python版的 kafka向sparkstream 发送数据
数据格式如下 因为床型存在重复,我会删除第一个床型,如果每段长度为6时。
白山涵月楼酒店 大床 双床 ¥702 2021年-04月-19日21时:27分:23秒
白山涵月楼酒店 大床 大床 ¥847 2021年-04月-19日21时:27分:23秒
白山涵月楼酒店 大床 大床 ¥288 2021年-04月-19日21时:27分:23秒
白山涵月楼酒店 大床 大床 ¥302 2021年-04月-19日21时:27分:23秒
V5连锁酒店(白山轴承店) 大床 大床 ¥94 2021年-04月-19日21时:27分:40秒
V5连锁酒店(白山轴承店) 大床 其他 ¥104 2021年-04月-19日21时:27分:40秒
如家商旅酒店(白山民中街店) 大床 其他 ¥186 2021年-04月-19日21时:27分:56秒
如家商旅酒店(白山民中街店) 大床 双床 ¥193 2021年-04月-19日21时:27分:56秒
如家商旅酒店(白山民中街店) 大床 其他 ¥209 2021年-04月-19日21时:27分:56秒
如家商旅酒店(白山民中街店) 大床 其他 ¥228 2021年-04月-19日21时:27分:56秒
如家商旅酒店(白山民中街店) 大床 其他 ¥228 2021年-04月-19日21时:27分:56秒
星程酒店(白山市*店) 大床 双床 ¥200 2021年-04月-19日21时:28分:12秒
星程酒店(白山市*店) 大床 大床 ¥240 2021年-04月-19日21时:28分:12秒
星程酒店(白山市*店) 大床 大床 ¥307 2021年-04月-19日21时:28分:12秒
from kafka import KafkaProducer
from time import sleep
def start_producer():
#设置主机号
producer = KafkaProducer(bootstrap_servers='192.168.44.131:9092')
#设置文件位置
file = open('data/data.txt', encoding='utf-8')
line = file.readlines()
for info in line:
datalist = info.split("\t")
if len(datalist) == 6:
del datalist[1]
print(datalist, len(datalist))
msg = 'msg is ' + str(datalist)
#设置主题 和编码
producer.send('hoetl_info', msg.encode('utf-8'))
sleep(3)
if __name__ == '__main__':
start_producer()
package sparkstream.demo
import java.util.Random
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.reflect.internal.util.NoPosition.line
object SparkStreaming04_Kafka {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
//设置窗口时间
val ssc = new StreamingContext(sparkConf, Seconds(15))
val kafkaPara: Map[String, Object] = Map[String, Object](
//kafka主机
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.44.131:9092",
//kafka主题
ConsumerConfig.GROUP_ID_CONFIG -> "hoetl_info",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
)
val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set("hoetl_info"), kafkaPara)
)
//kafkaDataDS.map(_.value()).print()
val line:DStream[(String,Int)] = kafkaDataDS.map(x => {
//使用map进行转换
val lines = x.value().split(',')
val regex = "([0-9]+)".r;
var moeny=""
for(matchline <- regex.findAllIn(lines(2))){
moeny = matchline
}
(lines(0),moeny.toInt)
})
line.reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
}
}