备忘录

一、HIVE

over中的partition by与group by

name	orderdate	cost
jack	2017-01-01	10
jack	2017-02-03	23
jack	2017-01-05	46
jack	2017-04-06	42
jack	2017-01-08	55
mart	2017-04-08	62
mart	2017-04-09	68
mart	2017-04-11	75
mart	2017-04-13	94
=======================gruop by=======================
select
    name,
    count(*)
from
    overdemo
where
    date_format(orderdate,'yyyy-MM')='2017-04'
group by
    name;
--------结果---------
name	_c1
jack	1
mart	4
=======================partition by========================
select
    name,
    count(*) over(partition by name)
from
    overdemo
where
    date_format(orderdate,'yyyy-MM')='2017-04';
--------结果---------
name	count_window_0
jack	1
mart	4
mart	4
mart	4
mart	4
  • 差别:partition by 将所有符合过滤条件的列分组选中,故group by 可看成partition by 的去重版
  • 执行顺序:group by -> over(),故group by分组去重,再partition by结合聚合函数(count/sum)分区计算
=====同时有group by name和over(partition by name)=====
select
    name,
    count(*) over(partition by name)
from
    overdemo
where
    date_format(orderdate,'yyyy-MM')='2017-04'
group by 
    name;
---------结果--------
name	count_window_0
jack	1
mart	1
=====同时有group by name和over()=====over()无参数时结合聚合函数对全量数据进行运算
select
    name,
    count(*) over()
from
    overdemo
where
    date_format(orderdate,'yyyy-MM')='2017-04'
group by
    name;
----------结果-------
name	count_window_0
mart	2
jack	2
  • 当over()无参时,结合聚合函数 count(),对得到的数据进行计数,得到的结果=2,并且会在mart和Jack后显示2(因为没有进行partition by name,mart和Jack是在同一个分区的)
  • ②当over(partition by name)之后,group by得到的数据会再次进行分区,分为mart和jack,两个分区中进行count()计算,都只有1个,所以在两个分区中分别显示为1
  • 此时,再解释上面只使用over(partition by name)时,为什么mart会显示4次,而且count()也是4。因为符合过滤条件的mart有4条,都被partition by分到了一个分区,所以count()的结果就是4,而且over(partition by name)并不会去重,所以显示了4次。
  • 综上,over(partition by)可以视为group by的升级版:①会保留所有进入分区的数据,不去重(没有数据损失);②在group by之后执行,可以进行更为复杂个性化的操作

二、spark

Spark为什么比MR快

1、Spark基于内存计算

Spark vs MapReduce ≠ 内存 vs 磁盘

其实Spark和MapReduce的计算都发生在内存中,区别在于:

MapReduce通常需要将计算的中间结果写入磁盘,然后还要读取磁盘,从而导致了频繁的磁盘IO。

Spark则不需要将计算的中间结果写入磁盘,这得益于Spark的RDD(弹性分布式数据集,很强大)和DAG(有向无环图),其中DAG记录了job的stage以及在job执行过程中父RDD和子RDD之间的依赖关系。中间结果能够以RDD的形式存放在内存中,且能够从DAG中恢复,大大减少了磁盘IO。

2、多进程模型 vs 多线程模型的区别

MapReduce采用了多进程模型,而Spark采用了多线程模型。多进程模型的好处是便于细粒度控制每个任务占用的资源,但每次任务的启动都会消耗一定的启动时间。就是说MapReduce的Map Task和Reduce Task是进程级别的,而Spark Task则是基于线程模型的,就是说mapreduce 中的 map 和 reduce 都是 jvm 进程,每次启动都需要重新申请资源,消耗了不必要的时间(假设容器启动时间大概1s,如果有1200个block,那么单独启动map进程事件就需要20分钟)
Spark则是通过复用线程池中的线程来减少启动、关闭task所需要的开销。(多线程模型也有缺点,由于同节点上所有任务运行在一个进程中,因此,会出现严重的资源争用,难以细粒度控制每个任务占用资源)

Yarn Container

mapreduce on yarn :

  • Application Master 向Resource Manager申请 Container , 每个MapTask/ReduceTask 申请一个Container。

每一个Task就是一个进程。也是一个Container 资源

进程的名字叫做:YarnChild

MapTask/ReduceTask 的资源申请是Container资源范围内。

那么:对于 Container 里面的task是单线程在运行的?

spark on yarn :

  • Application Master 向Resource Manager申请 Container , 每个Executor申请一个Container。

每一个Executor是一个JVM进程。也是一个Container 资源

在spark on yarn 中 Executor 的进程 名字叫 CoarseGrainedExecutorBackend 类似于Hadoop MapReduce中的YarnChild, 一个CoarseGrainedExecutorBackend进程 有且仅有一个 executor对象,它负责将Task包装成taskRunner,并从线程池中抽取出一个空闲线程运行Task

一个Yarn的container就是一个Spark的CoarseGrainedExecutorBackend,也就是常说的Spark的Executor进程,每个executor可以并行执行多个task。CoarseGrainedExecutorBackend是Executor的RPC endpoint服务,具体执行task是CoarseGrainedExecutorBackend持有的Executor对象的launchTask方法启动.

Executor 的资源申请是Container资源范围内。

  • 在 Executor 中并行执行多个 Task

把executor的jvm进程看做task执行池,每个executor最大有 spark.executor.cores / spark.task.cpus 个执行槽( solt ), 一个执行槽可以同时执行一个task。

– spark.task.cpus 是指每个任务需要的 CPU 核数, 大部分应用使用默认值 1
– spark.executor.cores 是指每个executor的核数。

  • Application --> 多个job --> 多个stage --> 多个task

1.Spark的driver端只是用来请求资源获取资源的,executor端是用来执行代码程序,=>application在driver端job 、stage 、task在executor端。

2.在executor端划分job、划分stage、划分task。

**job : ** 程序遇见 action算子 划分一次,

**stage : ** 每个 job遇见 shuffle(或者宽依赖) 划分一次,

task数量 : 每个stage中 最后一个rdd的分区(分片)数 就是。

YarnCluster-Spark 执行流程:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cex8FYXU-1620478211325)(C:\Users\hason\AppData\Roaming\Typora\typora-user-images\image-20210507092846530.png)]

三、Kafka

kafka写数据:

  • 顺序写,往磁盘上写数据时,就是追加数据,没有随机写的操作。经验: 如果一个服务器磁盘达到一定的个数,磁盘也达到一定转数,往磁盘里面顺序写(追加写)数据的速度和写内存的速度差不多生产者生产消息,经过kafka服务先写到os cache 内存中,然后经过sync顺序写到磁盘上

消费者读取数据流程:

  • 普通
  1. 消费者发送请求给kafka服务
  2. kafka服务去os cache缓存读取数据(缓存没有就去磁盘读取数据)
  3. 从磁盘读取了数据到os cache缓存中
  4. os cache复制数据到kafka应用程序中
  5. kafka将数据(复制)发送到socket cache中
  6. socket cache通过网卡传输给消费者
  • 零拷贝(页缓存:os cache,有os管理)

1.消费者发送请求给kafka服务

2.kafka服务去os cache缓存读取数据(缓存没有就去磁盘读取数据)

3.从磁盘读取了数据到os cache缓存中

4.os cache直接将数据发送给网卡

5.通过网卡将数据传输给消费者

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hPoK65so-1620478211327)(C:\Users\hason\AppData\Roaming\Typora\typora-user-images\image-20210507093741624.png)]

Kafka日志分段保存

Kafka中一个主题,一般会设置分区;比如创建了一个topic_a,然后创建的时候指定了这个主题有三个分区。其实在三台服务器上,会创建三个目录。服务器1(kafka1)创建目录topic_a-0:。目录下面是我们文件(存储数据),kafka数据就是message,数据存储在log文件里。.log结尾的就是日志文件,在kafka中把数据文件就叫做日志文件 。一个分区下面默认有n多个日志文件(分段存储),一个日志文件默认1G

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1u0zi4tZ-1620478211328)(C:\Users\hason\AppData\Roaming\Typora\typora-user-images\image-20210507093928761.png)]

Kafka二分查找定位数据

消息体对应offset -> 在对应 index文件(稀疏索引4k一个) -> 定位log文件对应的消息体

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nR7gjSFr-1620478211329)(C:\Users\hason\AppData\Roaming\Typora\typora-user-images\image-20210507094404722.png)]

副本冗余保证高可用

0.8以前没有副本机制

leader:1.读写操作;2.维护ISR列表

​ ISR:{a,b,c}若一follower分区 超过10秒 没向leader partition拉取数据,这个分区就从ISR列表里移除。

follower:向leader同步数据

框架总结

**高可用:**多副本机制

**高并发:**网络架构设计 三层架构:多selector -> 多线程 -> 队列的设计(NIO)

高性能:

A 写数据:

  1. 把数据先写入到OS Cache
  2. 写到磁盘上面是顺序写,性能很高

B 读数据:

  1. 根据稀疏索引,快速定位到要消费的数据
  2. 零拷贝机制 减少数据的拷贝 减少了应用程序与操作系统上下文切换

专有名词

**QPS(Query Per Second):**每秒请求数,就是说服务器在一秒的时间内处理了多少个请求。

**吞吐量(Throughput) (TPS):**吞吐量是指系统在单位时间内处理请求的数量。

**响应时间(RT) :**响应时间是指系统对请求作出响应的时间。

**并发用户数 :**并发用户数是指系统可以同时承载的正常使用系统功能的用户的数量。

生产环境分析

需求分析:

pqs/day:每天10亿,高峰期3h处理6亿请求:qps6亿 / 3h = 5.5万/s pqs

数据量:10亿 * 50kb/条 * 2副本 * 3天 = 276T

注: (50kb/条 )我司因在生产端封装了数据,然后多条数据合并,故一个请求才会有这么大

物理机数:

评估机器: 按照高峰期 * 4倍 = 20万pqs

每台机器 承受 4万请求 -> 5台机器

磁盘选择:

SSD硬盘:性能好,指它 随机读写性能 ** 较好,但顺序写跟SAS盘差不多,适合MySql集群**

SAS盘:某方面性能不是很好,但比较便宜。

kafka的理解:就是用的顺序写。所以我们就用普通的【机械硬盘】就可以了。

每台机器:276T / 5台 = 60T/台

现有配置:11块硬盘/机器 * 7T/块硬盘 = 77 T/机器

**总结:**10亿请求,5台机器,每个机器 11(SAS) * 7T

内存评估:

kafka读写流程 都基于os cache => 尽可能多的内存资源要给 os cache

Kafka中核心代码是scala,客户端java。都是基于jvm => 部分的内存给jvm

​ Kafka的设计,没有把很多数据结构都放在jvm里面 =>jvm不需太大内存

根据经验:>=10G

  • 假设10个请求,拥有100Topic => 总partition数 = 100Topic * 5 partition * 2副本 = 1000partition

    **kafka使用内存/机器:**1000partition * 1G /.log 文件 * 25%常驻内存 / 5机器 = 50G

    机器总内存: kafka(50G) + JVM(10G)+ OS(10G-5G) = 64G (128G更好)

CPU压力评估

评估需要多少个cpu (线程依托cpu运行)服务有多少线程ing

线程多,cpu core少,会导致机器负载高性能差

kafka 启动线程数:

  • Acceptor线程 1 ,processor线程 3, 6~9个线程 处理请求线程 8个, 32个线程 定时清理的线程,拉取数据的线程,定时检查ISR列表的机制 等等。

  • 所以大概一个Kafka的服务启动起来以后,会有一百多个线程。

机器cpu 线程数:

  • cpu core = 4个,几十个线程,就肯定把cpu 打满了。
  • cpu core = 8个,轻松支持几十个线程。如是100多,或差不多200个,那么8 个 cpu core是搞不定的。
  • 所以我们这儿建议:CPU core = 16个。如果可以的话,能有32个cpu core 那就最好。
  • 结论:kafka集群,最低也要给16个cpu core,如果能给到32 cpu core那就更好。2cpu * 8 =16 cpu core 4cpu * 8 = 32 cpu core

总结:10亿请求,5台物理机,11(SAS) * 7T ,需64G内存(128G更好),需16cpu core(32更好)

网络需求评估:

千兆的网卡(1G/s),万兆的网卡(10G/s)

网络请求流量:5.5w pqs / 5机器 * 50kb/条 * 2副本 = 976m/s

注: 网卡的带宽是达不到极限的,70%左右

集群规划:

主从式的架构:controller -> 通过zk集群来管理整个集群的元数据。

注:kafka集群 理论上来讲,不应该把kafka的服务与zk的服务安装在一起

运维:

场景一:topic数据量太大,要增加topic数

kafka-topics.sh --alter --zookeeper hdp1:2181,hdp2:2181,hdp3:2181 --partitions 2 --topic test6

场景二:核心topic增加副本因子
  1. 核心业务数据需要增加副本因子 vim test.json脚本,将下面一行json脚本保存
{
	“version”:1,
	“partitions”:[
					{“topic”:“test6”,“partition”:0,“replicas”:[0,1,2]},
					{“topic”:“test6”,“partition”:1,“replicas”:[0,1,2]},
					{“topic”:“test6”,“partition”:2,“replicas”:[0,1,2]}
				]
}
  1. 执行上面json脚本:
kafka-reassign-partitions.sh \
--zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 \
--reassignment-json-file test.json \
--execute
场景三:负载不均衡的topic,手动迁移vi topics-to-move.json
{“topics”: [{“topic”: “test01”}, {“topic”: “test02”}], “version”: 1} // 把你所有的topic都写在这里
  1. 执行:
kafka-reassgin-partitions.sh \
--zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 \
--topics-to-move-json-file topics-to-move.json \
--broker-list “5,6” \
--generate

把你所有的包括新加入的broker机器都写在这里,就会说是把所有的partition均匀的分散在各个broker上,包括新进来的broker此时会生成一个迁移方案,可以保存到一个文件里去:expand-cluster-reassignment.json

kafka-reassign-partitions.sh \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 \
--reassignment-json-file expand-cluster-reassignment.json \
--execute

kafka-reassign-partitions.sh \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 \
--reassignment-json-file expand-cluster-reassignment.json \
--verify

这种数据迁移操作一定要在晚上低峰的时候来做,因为他会在机器之间迁移数据,非常的占用带宽资源

–generate: 根据给予的Topic列表和Broker列表生成迁移计划。generate并不会真正进行消息迁移,而是将消息迁移计划计算出来,供execute命令使用。

–execute: 根据给予的消息迁移计划进行迁移。

–verify: 检查消息是否已经迁移完成。

场景四:如果某个broker leader partition过多

正常情况下,我们的leader partition在服务器之间是负载均衡,现在各个业务方可以自行申请创建topic,分区数量都是自动分配和后续动态调整的, kafka本身会自动把leader partition均匀分散在各个机器上,这样可以保证每台机器的读写吞吐量都是均匀的

  • 如果某些broker宕机,会导致leader partition过于集中在其他少部分几台broker上, 这会导致少数几台broker的读写请求压力过高,其他宕机的broker重启之后都是follower partition,读写请求很低造成集群负载不均衡

    => 有一个参数,auto.leader.rebalance.enable,默认是true, 每隔300秒(leader.imbalance.check.interval.seconds)检查leader负载是否平衡

  • 如果一台broker上的不均衡的leader超过了10%,leader.imbalance.per.broker.percentage(每个broker允许的不平衡的leader的比率)

    => 就会对这个broker进行选举 配置参数:auto.leader.rebalance.enable 默认是true 。

    ​ leader.imbalance.check.interval.seconds( leader不平衡检查间隔时间):默认值300秒

kafka生产者

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Uh0QarFv-1620478211331)(C:\Users\hason\AppData\Roaming\Typora\typora-user-images\image-20210507131016827.png)]

如何提升吞吐量

参数一:buffer.memory:设置发送消息的缓冲区,默认值是33554432,就是32MB

参数二:compression.type:默认是none,不压缩,但是也可以使用lz4压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大producer端的cpu开销

参数三:batch.size:设置batch的大小,

  • 如果batch太小,会导致频繁网络请求,吞吐量下降;
  • 如果batch太大,会导致一条消息需要等待很久才能被发送出去,而且会让内存缓冲区有很大压力,过多数据缓冲在内存里,默认值是:16384,就是16kb,也就是一个batch满了16kb就发送出去,
  • 一般在实际生产环境,这个batch的值可以增大一些来提升吞吐量,如果一个批次设置大了,会有延迟。一般根据一条消息大小来设置。如果我们消息比较少。配合使用的参数linger.ms,这个值默认是0,意思就是消息必须立即被发送,但是这是不对的,一般设置一个100毫秒之类的,这样的话就是说,这个消息被发送出去后进入一个batch,如果100毫秒内,这个batch满了16kb,自然就会发送出去。
处理异常
  1. LeaderNotAvailableException:这个就是如果某台机器挂了,此时leader副本不可用,会导致你写入失败,要等待其他follower副本切换为leader副本之后,才能继续写入,此时可以重试发送即可;如果说你平时重启kafka的broker进程,肯定会导致leader切换,一定会导致你写入报错,是LeaderNotAvailableException。
  2. NotControllerException:这个也是同理,如果说Controller所在Broker挂了,那么此时会有问题,需要等待Controller重新选举,此时也是一样就是重试即可。
  3. NetworkException:网络异常 timeout
    • a. 配置retries参数,他会自动重试的
    • b. 但是如果重试几次之后还是不行,就会提供Exception给我们来处理了,我们获取到异常以后,再对这个消息进行单独处理。我们会有备用链路。发送不成功的消息发送到Redis或者写到文件系统中,甚至是丢弃
重试机制
  1. 消息会重复有的时候一些leader切换之类的问题,需要进行重试,设置retries即可,但是消息重试会导致,重复发送的问题,
    • 比如说网络抖动一下导致他以为没成功,就重试了,其实人家都成功了.
  2. 消息乱序消息重试是可能导致消息的乱序的,因为可能排在你后面的消息都发送出去了。所以可以使用"max.in.flight.requests.per.connection"参数设置为1, 这样可以保证producer同一时间只能发送一条消息。两次重试的间隔默认是100毫秒,用"retry.backoff.ms"来进行设置 基本上在开发过程中,靠重试机制基本就可以搞定95%的异常问题。

Kafka消费者

偏移量管理
  1. 每个consumer内存里数据结构保存对每个topic的每个分区的消费offset,定期会提交offset,老版本是写入zk,但是那样高并发请求zk是不合理的架构设计,zk是做分布式系统的协调的,轻量级的元数据存储,不能负责高并发读写,作为数据存储。
  2. 新版本提交offset发给kafka内部topic:__consumer_offsets,提交KV结构
    • key:group.id+topic+分区号,
    • value:当前offset的值,
    • 每隔一段时间,kafka内部会对这个topic进行compact(合并),也就是每个group.id+topic+分区号就保留最新数据
  3. __consumer_offsets可能会接收高并发的请求,所以默认分区50个(leader partitiron -> 50 kafka),这样如果你的kafka部署了一个大的集群,比如有50台机器,就可以用50台机器来抗offset提交的请求压力.
偏移量监控工具介绍
  1. web页面管理的一个管理软件(kafka Manager) 修改bin/kafka-run-class.sh脚本,第一行增加JMX_PORT=9988 重启kafka进程
  2. 另一个软件:主要监控的consumer的偏移量。就是一个jar包 java -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb –offsetStorage kafka \(根据版本:偏移量存在kafka就填kafka,存在zookeeper就填zookeeper) –zk hadoop1:2181 –port 9004 –refresh 15.seconds –retain 2.days。
消费异常感知
  • heartbeat.interval.ms:consumer心跳时间间隔,必须得与coordinator保持心跳才能知道consumer是否故障了, 如果故障之后,就会通过心跳下发rebalance的指令给其他的consumer通知他们进行rebalance的操作

  • session.timeout.ms:默认是10秒,kafka多长时间感知不到一个consumer就认为他故障了,

  • max.poll.interval.ms:如果在两次poll操作之间,超过了这个时间,那么就会认为这个consume处理能力太弱了,会被踢出消费组,分区分配给别人去消费

一般来说结合业务处理的性能来设置就可以了。

核心参数解释

fetch.max.bytes:获取一条消息最大的字节数,一般建议设置大一些,默认是1M 其实我们在之前多个地方都见到过这个类似的参数,意思就是说一条信息最大能多大?

  1. Producer 发送的数据,一条消息最大多大, -> 10M
  2. Broker 存储数据,一条消息最大能接受多大 -> 10M
  3. Consumer max.poll.records: 一次poll返回消息的最大条数,默认是500条
    • connection.max.idle.ms:consumer跟broker的socket连接如果空闲超过了一定的时间,此时就会自动回收连接,但是下次消费就要重新建立socket连接,这个建议设置为-1,不要去回收
    • enable.auto.commit: 开启自动提交偏移量
    • auto.commit.interval.ms: 每隔多久提交一次偏移量,默认值5000毫秒
    • _consumer_offset auto.offset.reset:
      • earliest
        • 当各分区下有已提交的offset时,从提交的offset开始消费;
        • 无提交的offset时,从头开始消费 topica -> partition0:1000 partitino1:2000
      • latest
        • 当各分区下有已提交的offset时,从提交的offset开始消费;
        • 无提交的offset时,消费新产生的该分区下的数据 none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
group coordinator原理
  • 面试题:消费者是如何实现rebalance的?答:根据coordinator实现
  1. 什么是coordinator

    • 每个consumer group都会选择一个broker作为自己的coordinator,他是负责监控这个消费组里各个消费者心跳,以及判断是否宕机,然后开启rebalance
  2. 如何选择coordinator机器

    • 首先对groupId进行hash(数字),
    • 接着对__consumer_offsets的分区数量取模,默认是50,_consumer_offsets的分区数可以通过offsets.topic.num.partitions来设置
    • 找到分区以后,这个分区所在的broker机器就是coordinator机器。
    • eg:比如说:groupId,“myconsumer_group” -> hash值(数字)-> 对50取模 -> 8 __consumer_offsets 这个主题的8号分区在哪台broker上面,那一台就是coordinator 就知道这个consumer group下的所有的消费者提交offset的时候是往哪个分区去提交offset,
  3. 运行流程

    1)每个consumer都发送JoinGroup请求到Coordinator, 然后Coordinator从一个consumer group中选择一个consumer作为leader,(选择leader)

    2)把consumer group情况发送给leader,接着leader会负责制定消费方案, (leader定制消费方案)

    3)通过SyncGroup发给Coordinator ,Coordinator就把消费方案下发给各个consumer,他们会从指定的分区的 leader broker,进行socket连接以及消费消息**(Coordinator把消费方案下发各个consumer)**

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-u41owUSO-1620478211332)(C:\Users\hason\AppData\Roaming\Typora\typora-user-images\image-20210507142400943.png)]

rebalance策略

consumer group靠coordinator实现了Rebalance,rebalance的策略:range、round-robin、sticky

  • **sticky策略:**尽可能保证在rebalance的时候,让原本属于这个consumer 的分区还是属于他们,然后把多余的分区再均匀分配过去,这样尽可能维持原来的分区分配的策略

Broker管理

LEO、HW:

在kafka里面,无论leader partition还是follower partition统一都称作副本(replica)。

LEO:每次partition接收到一条消息,都会更新自己的LEO,也就是log end offset,LEO其实就是最新的offset + 1

HW:高水位 LEO有一个很重要的功能就是更新HW,如果follower和leader的LEO同步了,此时HW就可以更新 HW之前的数据对消费者是可见,消息属于commit状态。HW之后的消息消费者消费不到。

controller如何管理整个集群

  1. 竞争controller的 /controller/id

  2. controller服务监听的目录:

    • /broker/ids/ 用来感知 broker上下线
    • /broker/topics/ 创建主题,我们当时创建主题命令,提供的参数,ZK地址。
    • /admin/reassign_partitions 分区重分

延时任务(扩展知识)

  1. 第一类延时的任务:

    • 比如说producer的acks=-1,必须等待leader和follower都写完才能返回响应。有一个超时时间,默认是30秒(request.timeout.ms)。所以需要在写入一条数据到leader磁盘之后,就必须有一个延时任务,
    • 30秒之后,到期时间是30秒延时任务 放到**DelayedOperationPurgatory(延时管理器)**中。
    • 假如在30秒之前
      • 如果所有follower都写入副本到本地磁盘了,那么这个任务就会被自动触发苏醒,就可以返回响应结果给客户端了, 否则的话,这个延时任务自己指定了最多是30秒到期,
      • 如果到了超时时间,都没等到,就直接超时返回异常。
  2. 第二类延时的任务:

    • follower往leader拉取消息的时候,如果发现是空的,此时会创建一个延时拉取任务 延时时间到了之后(比如到了100ms),就给follower返回一个空的数据,然后follower再次发送请求读取消息
    • 但是如果延时的过程中(还没到100ms),leader写入了消息(不为空),这个任务就会自动苏醒自动执行拉取任务

海量的延时任务,需要去调度。

时间轮机制

时间轮说白其实就是一个数组。

Kafka内部有很多延时任务,没有基于JDK Timer来实现,那个插入和删除任务的时间复杂度是O(nlogn), 而是基于了自己写的时间轮来实现的,时间复杂度是O(1),依靠时间轮机制,延时任务插入和删除,O(1)

**tickMs:**时间轮间隔 1ms

**wheelSize:**时间轮大小 20

**interval:**timckMS * whellSize,一个时间轮的总的时间跨度。20ms

**currentTime:**当时时间的指针。

​ 1. 因为时间轮是一个数组,所以要获取里面数据的时候,靠的是index,时间复杂度是O(1)

​ 2. 数组某个位置上对应的任务,用的是双向链表存储的,往双向链表里面插入,删除任务,时间复杂度也是O(1)

  • 举例:插入一个8ms以后要执行的任务 19ms

  • 多层级的时间轮 :

    • 比如:要插入一个110毫秒以后运行的任务。
      • tickMs:时间轮间隔 20ms
      • wheelSize:时间轮大小 20
      • interval:timckMS * whellSize,一个时间轮的总的时间跨度。20ms
      • currentTime:当时时间的指针。
      • 第一层时间轮:1ms * 20 ;第二层时间轮:20ms * 20 ;第三层时间轮:400ms * 20

四、HIVE

解析/编译/优化/执行器

  1. 解析器 SQL Parser:将SQL字符串转换为 抽象语法树AST(依赖第三方antlr库),对AST进行语法分析(例如:表、字段是否存在,sql语义是否有误)
  2. 编译器Physical Plan:将AST编译成逻辑执行计划
  3. 优化器Query Optimizer:对逻辑执行计划进行优化
  4. 执行器Executor:把逻辑执行计划转换成可以执行的物理计划(MR程序)

五、sqoop

参数

sqoop
--import
--connect 数据库URL
--username
--passwd
#目标路径及删除
--target-dir
--delete-target-dir
#空值处理
--null-string '\\N'
--null-non-string '\\N'
#指定分割符
--fields-terminated-dir '\t'
#sql
--query  "select .. from table ...and$CONDITIONS"

-----以下可选参数------
#切分字段、map数
--split-by id
--num-mappers 100

-----同步策略--------
全量:sql后加 where 1=1
新增:sql后加 where 创建时间=昨天 or 操作时间=昨天
	 --increment append (用在直接导入hive表的时候)
特殊:只到一次
新增及变化:用户表(拉链)

场景一:导入mysql表数据中,若出现问题(BUG、中断、出错)导致导入数据不正确

  • 临时表,完成后再导入正式表,出问题就清空临时表再重新导入正式表

–staging-table

–clear-staging-table

场景二:sqoop导数据慢

  • 可将表进行指定字段切分

split-by 指定字段【指定字段最好不是String字符串类型】

  • 增加map个数(默认4map)

–mappers 100

场景三:1G 需要多久跑完

几分钟

场景四:ads层的Parquet格式表 导mysql ,报错

方案一: 在hive中将该表 加载到 textfile格式的临时表 再导入mysql

方案二: ads不要建parquet格式表

六、Azkaban

自定义报警onealter睿象云

微信、钉钉、微信、电话等等

电话需要调用电信运营商接口 付费

使用第三方运维平台 onealter睿象云 集成多种组件(ZABBIX、阿里云、restAPI) 付费

  • 购买相应服务后,获取通知API。然后进行MailAlter二次开发

异常处理

优先重跑:1 自动重跑,2 手动重跑 【注】重跑还失败:看日志

yarn logs -applicationId "appId" | less =》 shift+g

  • 【注】 “| less ” 为一页一页翻着看;G:跳到最后一页

七、Hbase

Hbase中Region/Store/StoreFile/Hfile之间的关系

  1. Table: Hbase的数据是以表的形式存在

  2. ColumnFamily列簇: 相当于mysql Table的字段,是Hbase的表结构

    • Column: 列, hbase的列 = 列簇:列限定符
    • Cell: 根据 rowkey+列簇:列限定符+时间戳 确定唯一一个cell
    • Row: 一行数据,主键相同的为一行数据
  3. 列限定符: 放在列簇下,是数据的形式存在

  4. version: 在创建表的时候再列簇上指定的版本号,version代表列簇下的列限定符中最多可以保存多少个历史数据

  5. namespace: 命名空间,相当于Mysql的库

  6. table 在行的方向分割为多个region。region是Hbase中分布式存储负载均衡最小单元

    即不同的 region可以分别在不同 的 regionServer上,但是同一个 region是不会拆分到多个 server上

  7. Store:每一个region有一个或多个store组成,至少是一个store,hbase会把一起访问的数据放在一个store里,即为每个columnFamily建一个store(即有几个columnFamily 就有几个store)。一个store由一个memStore0或多个storeFile组成

    • HBase以store的大小来判断是否需要切分region。
  8. MemStore:内存,KV数据,刷写阈值默认64M -> 一个快照。专属线程负责刷写操作

  9. StoreFile:memStore每次刷写后形成新StoreFile文件(以HFile格式保存)

  10. HFile:KV数据,是hadoop二进制格式文件,一个StoreFile对应着一个HFile。而HFile是存储在HDFS之上的。

    • HFile文件是不定长的,长度固定的只有其中的两块:Trailer和FileInfo。Trailer中又指针指向其他数据块的起始点,FileInfo记录了文件的一些meta信息。

Hbase架构

  • Master:
    • 1、负责表的创建、删除、修改
    • 2、监控regionserver的状态,一旦regionserver宕机,会将宕机的regionserver中的region进行故障转移
    • 3、负责region的分配
  • Regionserver:
    • 1、负责客户端的数据的增删改查
    • 2、负责region split、storeFile compact等操作
      • region: region保存在regionserver中
      • Store: store的个数 = 列簇的个数
      • memstore: 是一块内存区域,后续数据会先写入memstore中,等达到一定的条件之后,memstore会flush,flush的时候会生成storeFile
      • storeFile: memstore flush生成,每 flush一次会生成一个storeFile,最终以HFile这种文件格式保存在HDFS
  • HLog: 预写日志,因为memstore是内存区域m,所以数据如果直接写到memstore中,可能因为regionserver造成数据丢失,所以后续写入的时候会将数据写入日志中,等到memstore数据丢失之后,可以从日志中回复数据【一个regionserver只有一个Hlog】
  • Zookeeper: master监控regionserver状态需要依赖zk

Hbase原理

1、Hbase元数据: hbase:meta

​ hbase元数据表中数据的rowkey = 表名,region起始rowkey,时间戳.region名称
​ hbase元数据中保存的有region的起始rowky,结束rowkey,所处于哪个regionserver
​ hbase元数据表只有一个region

2、写入流程

​ 1、client会向zookeeper发起请求,获取元数据处于哪个regionserver
​ 2、zookeeper会将元数据所在regionserver返回给client
​ 3、client会向元数据所在的regionserver发起请求,获取元数据
​ 4、元数据所在的regionserver返回元数据信息给client,client会缓存元数据信息
​ 5、client会根据元数据得知数据应该写到哪个region,region处于哪个regionserver,会region所在的regionserver发起数据写入请求
​ 6、数据首先写入HLog,然后再写入region的store中的memstore中
​ 7、返回信息告知client写入完成

3、读取流程:

​ 1、client会向zookeeper发起请求,获取元数据处于哪个regionserver
​ 2、zookeeper会将元数据所在regionserver返回给client
​ 3、client会向元数据所在的regionserver发起请求,获取元数据
​ 4、元数据所在的regionserver返回元数据信息给client,client会缓存元数据信息
​ 5、client会根据元数据得知数据应该写到哪个region,region处于哪个regionserver,会region所在的regionserver发起数据读取请求
​ 6、首先会从block cache中获取数据,如果block cache中没有或者只有一部分数据,会再从memstore和storeFile中获取数据
​ 从storeFile中读取数据的时候会根据布隆过滤器判断数据可能存在与哪些storeFile中,然后再根据HFile文件格式中的数据索引获取数据。
​ 布隆过滤器特性: 如果判断存在不一定存在,如果判断不存在则一定不存在
​ 7、获取到最终数据之后,如果需要缓存结果的话,会将当前查询结果缓存到block cache中,返回数据给客户端

blockCache 块缓存 内存存储 的 数据淘汰机制 基于 LRU算法

算法认为最近使用的数据是热门数据,下一次很大概率将会再次被使用。而最近很少被使用的数据,很大概率下一次不再用到。当缓存容量的满时候,优先淘汰最近很少使用的数据。

4、memstore flush的触发条件

memstore flush的最小单位是region,不会只单独flush某个memstore,所以如果region中有多个store的话,可能在flush的时候会生成大量的小文件,所以工作中创建表的时候列簇的个数一般设置为1个,最多不超过两个
​ 1、region中某个memstore的大小达到128M,会触发flush
​ 2、region中所有的memstore的总大小达到128*4,会触发flush
​ 3、regionserver中所有的region中的所有的memstore的总大小达到javaHeap * 0.4 *0.95,会根据region中所有的memstore占用的内存大小排序,优先flush占用空间大的region
​ 4、当写入特别频繁的时候,第三个触发条件可能会适当的延迟,延迟到regionserver中所有的region中的所有的memstore的总大小达到javaHeap * 0.4,会阻塞client的写入,会根据region中所有的memstore占用的内存大小排序,优先flush占用空间大的region,flush到占用总空间<=javaHeap * 0.4 *0.95的时候会停止flush,允许client写入
​ 5、regionserver对应WAL日志文件数量达到32的时候,会flush
​ 6、region距离上一次flush的时间超过一个小时,会自动flush
​ 7、手动flush: flush ‘表名’

memStore级别: 某个memStore 达到 xxxx.flush.size = 128M
Region级别: region内部的所有memStore总和达到了 128M * 4 , 阻塞写
RegionServer级别: 公式 =》 堆内存 * 0.4 * 0.95 ,超过这个阈值,开始刷写, 由大到小依次刷写
​ 刷写到什么时候: 小于 堆内存 * 0.4 * 0.95
HLog数量: 现在不用手动配置,上限 32个
定期刷写: 默认1小时,最后一次编辑时间
手动刷写: 执行命令

5、storeFile compact

​ 原因: memstore每flush一次会生成一个storeFile,storeFile文件越来越多会影响查询性能,所以需要将storeFile文件合并
​ minor compact:
​ 触发条件: 符合文件数>=3 的时候会自动合并
​ 符合文件数: 判断当前文件是否需要合并, 当前文件大小 <= sum(小于当前文件大小的所有文件) * 比例,如果满足该条件,则代表当前文件需要合并
​ 合并的过程: 单纯的将小文件合并成大文件,在合并的过程中不会删除无效数据【无效版本数据、标记删除数据】
​ 合并结果: 会有多个大文件
​ major compact:
​ 触发条件: 7天一次
​ 合并过程: 在合并的过程中会删除无效数据【无效版本数据、标记删除数据】
​ 合并结果: 合并之后,store中只有一个文件

  为什么要合并: 每次刷写都生成一个新的 HFile ,时间久了很多小文件
  小合并:相邻的几个HFile,合并成一个新的大的HFile,原先的小HFile删除掉
  		不会删除 被标记为删除、过期的数据
  大合并:所有的HFile,合并成一个大的HFile
  		删除 被标记为删除、过期的数据
  		注意:大合并会影响正常使用
6、region split

​ 原因: 表在创建的时候默认只有一个region,后续所有的读写请求,都会落在该region上,随着时间流逝,region中保存的数据越来越多,导致后续region所在的regionserver可能出现读写负载的不均衡,所以需要将region切分,分配到不同的regionserver,从而分担负载压力
​ 切分规则:
​ 0.9版本之前: 当region中某个store中storeFile总大小达到10G的时候,该region会切分为两个
​ 0.9版本-2.0版本:
​ 当region中某个store中storeFile总大小达到 [N == 0 || N>100 ? 10G: min(10G,2128MN^3) ]的时候,该region会切分为两个
​ N代表region所属表在当前regionserver上的region个数
​ 2.0版本之后:
​ 当region中某个store中storeFile总大小达到 [N == 1 ? 2 *128M : 10G ]的时候,该region会切分为两个
​ N代表region所属表在当前regionserver上的region个数

Hbase优化

1、预分区:

​ 原因:Hbase在创建表的时候默认只有一个region,客户端如果读取/写入的并发量比较大,会造成regionserver负载压力,所以需要偶在创建表的时候多指定几个region,从而分担负载压力
​ 如何预分区?
​ 1、create ‘表名’,‘列簇名’,…,SPLITS=>[‘rowkey1’,‘rowkey2’,…]
​ 2、create ‘表名’,‘列簇名’,…,{NUMREGIONS=>region个数,SPLITLGO=>‘HexStringSplit’}
​ 3、create ‘表名’,‘列簇名’,SPLIT_FILE=>‘分区文件’
​ 4、通过Api建表预分区
​ byte[][] splitKeys = {“rowkey”.getBytes(),…}
​ 1、admin.createTable(table,splitKeys)
​ byte[] startRow;
​ byte[] stopRow;
​ numRegions指表的region个数
​ 2、admin.createTable(table,startRow,stopRow,numRegions)

2、rowkey设计

​ 原则:
​ 1、唯一性原则: 两条数据的rowkey不能相同
​ 2、长度原则: 一般保持在16字节以下
​ hbase rowkey不能太长,太长会导致rowkey占用过多的存储空间,client会缓存元数据,如果rowkey过长,而clint缓存空间不太足,就会导致缓存的元数据相对比较少
​ 3、hash原则: 保证数据均衡分配在不同的region中
​ 如果rowkey设计不合理可能会导致数据全部聚在一个region中,从而出现数据热点问题
​ 如何解决数据热点问题?
​ 1、在原来的rowkey基础上添加随机数
​ 2、将rowkey反转
​ 3、将原来rowkey hashcode值作为新rowkey

3、内存优化:

​ 一般会给hbase节点分配16-48G的内存

4、基础优化

1、允许HDFS追加内容
​ Hbase写入数据到HDFS的时候是以追加的形式写入,HDFS默认就是允许的
2、调整datanode最大文件打开数
​ HBase读取数据与写入数据都需要打开HDFS文件,如果同一时间有大量的读写请求,就可能导致HDFS文件打开数不够造成请求需要排队的情况
3、调整HDFS数据写入的超时时间
​ Hbase写入数据的时候,如果网络不是太好,可能造成超时导致数据写入失败
4、调整HDFS数据写入效率
​ 如果数据量相对比较大,此时可以将数据压缩之后写入HDFS,能够提高效率
5、调整RPC监听数
​ Hbase内部有一个线程池专门用来处理client的请求,如果同一时间client有大量的请求,此时肯能造成线程池中的线程不够用,从而出现请求排队的情况
6、调整HStore的文件大小
​ HStore文件的大小如果过小,可能导致频繁的split,如果过大,可能导致region里面存放大量数据,负载不均衡
7、调整Client缓存
​ client在获取元数据之后,会缓存在客户端,后续在获取元数据的时候可以直接从缓存中获取,如果缓存的大小比较小,会导致client只能缓存一部分元数据,后续在获取元数据的时候可能就不能从缓存中直接获取到,需要从regionserver中获取
8、flush、compact、split参数值调整
​ hbase.regionserver.global.memstore.size.lower.limit一般需要调整,调整为0.7/0.8
9、scan扫描的数据条数
​ scan是全表扫描,如果扫描的数据条数比较大会占用过多的内存空间

Phoenix

1、shell使用
		1、查询所有表: !tables
		2、创建表
			1、hbase有表[在phoenix中创建表与hbase的表建立映射关系]
				1、通过create table建表与hbase的表建立映射关系
					create table创建的表在删除的时候会同步删除hbase的表
					create table创建的表可以增删改查数据
					CREATE TABLE hbase表名(
						字段名 类型 primary key, --映射hbase的rowkey
						列簇名.列限定符 类型, --映射hbase表中column
						...
					)COLUM_ENCODED_BYTES=0			
			2、通过create view建视图与hbase的表建立映射关系
				create view创建的视图在删除的时候不会删除hbase的表
				create view创建的视图只能查询数据
				CREATE VIEW hbase表名(
					字段名 类型 primary key, --映射hbase的rowkey
					列簇名.列限定符 类型, --映射hbase表中column
					...
				)COLUM_ENCODED_BYTES=0
			注意: 如果列簇名与列限定符以及表名是小写需要用""括起来
		2、hbase没有表[在phoenix中创建表的同时会在hbase中创建一个同名表]
			单主键:
			CREATE TABLE 表名(
				字段名 字段类型 primary key,
				...
			)COLUM_ENCODED_BYTES=0
			复合主键:
			CREATE TABLE 表名(
				字段名 字段类型,
				...
				CONSTRAINT 主键名 primary key(字段名,..)
			)COLUM_ENCODED_BYTES=0
			
			注意事项:
				1、COLUM_ENCODED_BYTES=0 是指在创建hbase表的时候列限定符不进行编码
				2、在创建表的时候phoenix会默认将小写转成大写[不管是字段还是表名],如果想要保持表名/字段小写需要将表名/字段名用""括起来
	3、插入/修改数据: upsert into 表名(字段名,..) values(值,..) [如果表名/字段名是小写需要用""括起来]
	4、查询: select .. from .. where .. [如果表名/字段名是小写需要用""括起来]
	5、如果想要映射命名空间的表
		1、在hbase/conf/hbase-site.xml中配置两个参数
		<property>
				<name>phoenix.schema.isNamespaceMappingEnabled</name>
				<value>true</value>
		</property>
		<property>
				<name>phoenix.schema.mapSystemTablesToNamespace</name>
				<value>true</value>
		</property>
		2、在client端配置两个参数
		<property>
				<name>phoenix.schema.isNamespaceMappingEnabled</name>
				<value>true</value>
		</property>
		<property>
				<name>phoenix.schema.mapSystemTablesToNamespace</name>
				<value>true</value>
		</property>
		3、重启hbase
		4、创建一个schema,schema的名称必须与命名空间的名称一致
		5、在phoenix中创建表与命名空间的表建立映射关系
				CREATE TABLE schema名称.hbase表名(
					字段名 类型 primary key, --映射hbase的rowkey
					列簇名.列限定符 类型, --映射hbase表中column
					...
				)COLUM_ENCODED_BYTES=0
3、hbase二级索引
	原因: hbase使用get查询的是可以通过rowkey+元数据知道数据处于哪个region,该region处于哪个regionserver,所以能够很快定位到数据。但是如果是根据value值来查询数据的时候使用的scan查询,scan是扫描全表,所以查询速度比较慢
	1、全局二级索引
		1、语法: create index 索引名 on schema名称.表名([列簇名.]字段名,..) [include([列簇名.]字段名,..)]
		2、原理: 
			给某个/某几个字段建索引之后,会在hbase中新创建一个索引表,后续根据索引字段查询数据的时候,此时是从索引表中查询数据
			新创建的索引表: 
				rowkey = 索引字段值1_索引字段值2_.._原来的rowkey
		3、注意事项:
			在查询数据的时候,查询条件中必须包含第一个索引字段,select与from中的查询字段必须在索引表中能够全部找到,不然就是全部扫描
	2、本地二级索引
		1、语法:create local index 索引名 on schema名称.表名([列簇名.]字段名,..)
		2、原理: 给某个/某几个字段建索引之后,会在原表中插入对应的数据[数据的rowkey=__索引字段值1_索引字段值2_.._原来的rowkey],后续在根据索引字段查询数据的时候会首先扫描索引对应的region获取原来的rowkey,再通过原来的rowkey查询原始数据
	3、协处理器
		协处理器相当于是一个触发器,会监控表,一旦client做出对应的操作,会触发对应的动作
		步骤:
			1、需要继承两个接口
			2、重写一个对应的方法
			3、重写一个对应的动作方法
			4、打包,上传到hdfs
			5、禁用表
			6、加载协处理器
			7、启动表

与hive集成

1、内部表
在hive创建表的时候会同步在hbase中创建表,如果hbase的表已经存在则报错
删除hive内部表的时候会同步删除Hbase的表
CREATE TABLE student_hive_2(id string,name string,age string)
STORED BY ‘org.apache.hadoop.hive.hbase.HBaseStorageHandler’
WITH SERDEPROPERTIES (“hbase.columns.mapping” = “:key,f1:name,f1:age”)
TBLPROPERTIES (“hbase.table.name” = “student_hbase”);
:key 指hbase的主键
hbase.columns.mapping: 指定hive的字段与hbase的column的映射
hbase.table.name: 指定创建的hbase的表名
2、外部表[与hbase的表建立映射关系]
在hive创建表的时候要求hbase的表必须已经存在,如果hbase的表不存在则报错
删除hive外部表的时候不会删除hbase的表
CREATE EXTERNAL TABLE student_hive_2(id string,name string,age string)
STORED BY ‘org.apache.hadoop.hive.hbase.HBaseStorageHandler’
WITH SERDEPROPERTIES (“hbase.columns.mapping” = “:key,f1:name,f1:age”)
TBLPROPERTIES (“hbase.table.name” = “student_hbase”);
:key 指hbase的主键
hbase.columns.mapping: 指定hive的字段与hbase的column的映射
hbase.table.name: 指定hbase的表名

上一篇:【路径规划】基于matlab动态多群粒子群算法局部搜索路径规划【含Matlab源码 448期】


下一篇:HBase集群间的主从复制原理