大数据实时流处理零数据丢失
1.整体流程:
a)kafka:作为流处理程序的生产者
b)sparkStreaming:作为消费者,设置合理batch
c)DB:输出到redis/ES
2.存在问题:
雪崩效应: kill 出现,导致的数据丢失
sparkStreaming程序挂掉了,到知道的数据丢失
解决:
1.使用checkpoint。维护太麻烦,流程序修改后需要删除checkpoint下的数据才可以,但是这回导致数据丢失或者重复。
2.官方建议:同时启动新旧两个流程序,然后认为关掉旧的,并记录偏移量,然后新的流程序从指定的偏移量消费。
3.自己保存偏移量到外部设备,每次启动流程序先读取外设,然后判断,offset的读取方式是earliest 还是指定偏移量
4.配置kafak限速参数,以及流程序的序列化方式。
5.关于偏移量的存储,可以存mysql,redis,HBASE,kafka,zk。。。
6.关于数据重复,为了达到exactly once, 需要跟实际的业务结合。例如最后的结果如果是写入到HBase,可以将偏移量作为HBase对应的业务表中的一个列,实现事务性,幂等。
代码:
1.pom文件:
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.3.1</spark.version>
<kafka.version>0.10.0.0</kafka.version>
</properties>
<repositories>
<repository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
<repository>
<id>scalikeJDBC</id>
<name>scalikeJDBC</name>
<url>https://mvnrepository.com/artifact/org.scalikejdbc/scalikejdbc</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--sparksql -kafka-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<!--json解析-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.36</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.40</version>
</dependency>
<!--redis-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<!--config-->
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.3.1</version>
</dependency>
<!--RDBMS访问-->
<dependency>
<groupId>org.scalikejdbc</groupId>
<artifactId>scalikejdbc_2.11</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>org.scalikejdbc</groupId>
<artifactId>scalikejdbc-core_2.11</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>org.scalikejdbc</groupId>
<artifactId>scalikejdbc-config_2.11</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
2.kafka topic的创建:
a) bin/kafka-topics.sh --create --zookeeper pj-01:2181 --replication-factor 1 --partitions 1 --topic saprkProcess
b) 修改kafka的分区数目:提高spark的并行度,提升消息的处理吞吐量。
bin/kafka-topics.sh --alter --zookeeper pj-01:2181 --topic saprkProcess --partitions 3
c) note:在0.8版本,修改完kafka 对应的topic的partition的后,流程序是侦测不到,导致数据丢失。其实,在kafka的logs下面数据已经有了,所以需要在代码中判断是否有新的分区,并获取到新的分区,然后将其偏移量初始化为0.
但是在0.10版本后,就不存在这个问题了。
3.模拟kafka生产者:
package Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
public class KafkaProducerApp {
public static void main(String[] args) throws InterruptedException {
final Properties props = new Properties();
//Assign topicName to string variable
String topics = "saprkProcess";
// create instance for properties to access producer configs
//Assign localhost id
props.put("bootstrap.servers", "pj-01:9092");
//Set acknowledgements for producer requests.
props.put("acks", "all");
//If the request fails, the producer can automatically retry,
props.put("retries", 0);
//Specify buffer size in config
props.put("batch.size", 16384);
//Reduce the no of requests less than 0
props.put("linger.ms", 1);
//The buffer.memory controls the total amount of memory available to the producer for buffering.
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
String value[] = {"alibaba","baidu","tencent","facebook","amazon","apple","google","linkedin","twitter","ant","ICBC","Lakers","cleveland"};
int i = value.length;
final Random random = new Random();
int ret = 100;
while (ret>=0) {
Thread.sleep(10);
final ProducerRecord<String, String> msg = new ProducerRecord<String, String>(
topics
// , (new Random()).nextInt(3)
,0
,System.currentTimeMillis()
, UUID.randomUUID().toString().substring(6, 14).replace("-", "")
, value[random.nextInt(i)]
);
producer.send(msg);
System.out.println("msg = " + "alibaba -A: "+msg);
ret --;
}
}
}
4.创建:mysql 偏移量表:
a)这里MySQL相关操作使用的scalikeJDBC,这个是针对scala的一款db库,操作api简单明了,默认数据库配置通过读取resources下的(applicaiton.conf/application.json,applicaiton.properties)文件。
b)更新偏移量的api使用replace into:
note:如果没有设置主键,默认replace操作就是新增,不会与表中已存在的数据进行逻辑比对。
01:所以下面建表语句中:使用topic,groupid,partition作为联合主键,来确定一条记录。
02:因为我们的流程序一经启动:一般就是7*24,所以表名暂定为streaming_offset_24。这个自定义程度很高。
建表语句:
create table streaming_offset_24(topic varchar(50),groupid varchar(20), partitions int, offset bigint, primary key(topic,groupid,partitions));
5.流程序: spark streaming 处理demo:
package pjnet
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scalikejdbc.config.DBs
import scalikejdbc.{DB, _}
object StreamingProApp {
Logger.getLogger("org.apache").setLevel(Level.WARN) //设置日志显示
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("StreamingProApp")
//指定每秒钟每个分区kafka拉取消息的速率
.set("spark.streaming.kafka.maxRatePerPartition", "100")
// 修改序列化为Kyro,减少shuffle量
.set("spark.serilizer", "org.apache.spark.serializer.KryoSerializer")
// 开启rdd的压缩
.set("spark.rdd.compress", "true")
// 设置批次的处理时间
val ssc = new StreamingContext(conf, Seconds(10))
//一参数设置
val groupId = "1" //也可以通过读取配置文件,使用typesafe的 ConfigFactory读取
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "pj-01:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean) //自己维护偏移量。连接kafka的集群。
)
val topics = Array("saprkProcess")
//二参数设置
DBs.setup()
val fromdbOffset: Map[TopicPartition, Long] =
DB.readOnly { implicit session =>
SQL(s"select * from `streaming_offset_24` where groupId = '${groupId}'")
.map(rs => (new TopicPartition(rs.string("topic"), rs.int("partitions")), rs.long("offset")))
.list().apply()
}.toMap
//程序启动,拉取kafka的消息。
val stream = if (fromdbOffset.size == 0) {
KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
} else {
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Assign[String, String](fromdbOffset.keys, kafkaParams, fromdbOffset)
)
}
stream.foreachRDD({
rdd =>
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//数据处理
val resout: RDD[(String, Int)] = rdd.flatMap(_.value().split(" ")).map((_, 1)).reduceByKey(_ + _)
//可以将结果保存至redis,hbase等db
resout.foreach(println)
// resout.foreachPartition({
//// it =>
//// val jedis = RedisUtils.getJedis
//// it.foreach({
//// va =>
//// jedis.hincrBy(field,val1,val2)
//// })
//// jedis.close()
//
// })
//偏移量存入mysql,使用scalikejdbc框架,下面两种方式都可以。
//note: localTx is transactional, if metric update or offset update fails, neither will be committed
DB.localTx { implicit session =>
for (or <- offsetRanges) {
SQL("replace into `streaming_offset_24`(topic,groupId,partitions,offset) values(?,?,?,?)")
.bind(or.topic,groupId, or.partition, or.untilOffset).update().apply()
}
}
// offsetRanges.foreach(osr => {
// DB.autoCommit{ implicit session =>
// sql"REPLACE INTO streaming_offset_24(topic, groupid, partitions, offset) VALUES(?,?,?,?)"
// .bind(osr.topic, groupId, osr.partition, osr.untilOffset).update().apply()
// }
// })
})
stream.count().print()
ssc.start()
ssc.awaitTermination()
}
}