kafka
1 什么是kafka
Kafka是分布式发布-订阅消息系统,它最初是由LinkedIn公司开发的,之后成为Apache项目的一部分,Kafka是一个分布式,可划分的,冗余备份的持久性的日志服务,它主要用于处理流式数据。
2 为什么要使用 kafka,为什么要使用消息队列
缓冲和削峰:上游数据时有突发流量,下游可能扛不住,或者下游没有足够多的机器来保证冗余,kafka在中间可以起到一个缓冲的作用,把消息暂存在kafka中,下游服务就可以按照自己的节奏进行慢慢处理。
解耦和扩展性:项目开始的时候,并不能确定具体需求。消息队列可以作为一个接口层,解耦重要的业务流程。只需要遵守约定,针对数据编程即可获取扩展能力。
冗余:可以采用一对多的方式,一个生产者发布消息,可以被多个订阅topic的服务消费到,供多个毫无关联的业务使用。
健壮性:消息队列可以堆积请求,所以消费端业务即使短时间死掉,也不会影响主要业务的正常进行。
异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
3.Kafka中的ISR、AR又代表什么?ISR的伸缩又指什么
ISR:In-Sync Replicas 副本同步队列
AR:Assigned Replicas 所有副本
ISR是由leader维护,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度, 当前最新的版本0.10.x中只支持replica.lag.time.max.ms这个维度),任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。
4.kafka中的broker 是干什么的
broker 是消息的代理,Producers往Brokers里面的指定Topic中写消息,Consumers从Brokers里面拉取指定Topic的消息,然后进行业务处理,broker在中间起到一个代理保存消息的中转站。
5.kafka中的 zookeeper 起到什么作用,可以不用zookeeper么
zookeeper 是一个分布式的协调组件,早期版本的kafka用zk做meta信息存储,consumer的消费状态,group的管理以及 offset的值。考虑到zk本身的一些因素以及整个架构较大概率存在单点问题,新版本中逐渐弱化了zookeeper的作用。新的consumer使用了kafka内部的group coordination协议,也减少了对zookeeper的依赖,
但是broker依然依赖于ZK,zookeeper 在kafka中还用来选举controller 和 检测broker是否存活等等。
6.kafka follower如何与leader同步数据
Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。完全同步复制要求All Alive Follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率。而异步复制方式下,Follower异步的从Leader复制数据,数据只要被Leader写入log就被认为已经commit,这种情况下,如果leader挂掉,会丢失数据,kafka使用ISR的方式很好的均衡了确保数据不丢失以及吞吐率。Follower可以批量的从Leader复制数据,而且Leader充分利用磁盘顺序读以及send file(zero copy)机制,这样极大的提高复制性能,内部批量写磁盘,大幅减少了Follower与Leader的消息量差。
7.什么情况下一个 broker 会从 isr中踢出去
leader会维护一个与其基本保持同步的Replica列表,该列表称为ISR(in-sync Replica),每个Partition都会有一个ISR,而且是由leader动态维护 ,如果一个follower比一个leader落后太多,或者超过一定时间未发起数据复制请求,则leader将其重ISR中移除 。
8.kafka 为什么那么快
Cache Filesystem Cache PageCache缓存
顺序写 由于现代的操作系统提供了预读和写技术,磁盘的顺序写大多数情况下比随机写内存还要快。
Zero-copy 零拷技术减少拷贝次数
Batching of Messages 批量量处理。合并小的请求,然后以流的方式进行交互,直顶网络上限。
Pull 拉模式 使用拉模式进行消息的获取消费,与消费端处理能力相符。
9.kafka producer如何优化打入速度
增加线程
提高 batch.size
增加更多 producer 实例
增加 partition 数
设置 acks=-1 时,如果延迟增大:可以增大 num.replica.fetchers(follower 同步数据的线程数)来调解;
跨数据中心的传输:增加 socket 缓冲区设置以及 OS tcp 缓冲区设置。
10.kafka producer 打数据,ack 为 0, 1, -1 的时候代表啥, 设置 -1 的时候,什么情况下,leader 会认为一条消息 commit了
1(默认) 数据发送到Kafka后,经过leader成功接收消息的的确认,就算是发送成功了。在这种情况下,如果leader宕机了,则会丢失数据。
0 生产者将数据发送出去就不管了,不去等待任何返回。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
-1 producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。当ISR中所有Replica都向Leader发送ACK时,leader才commit,这时候producer才能认为一个请求中的消息都commit了。
11.kafka unclean 配置代表啥,会对 spark streaming 消费有什么影响
unclean.leader.election.enable 为true的话,意味着非ISR集合的broker 也可以参与选举,这样有可能就会丢数据,spark streaming在消费过程中拿到的 end offset 会突然变小,导致 spark streaming job挂掉。如果unclean.leader.election.enable参数设置为true,就有可能发生数据丢失和数据不一致的情况,Kafka的可靠性就会降低;而如果unclean.leader.election.enable参数设置为false,Kafka的可用性就会降低。
12.如果leader crash时,ISR为空怎么办
kafka在Broker端提供了一个配置参数:unclean.leader.election,这个参数有两个值:
true(默认):允许不同步副本成为leader,由于不同步副本的消息较为滞后,此时成为leader,可能会出现消息不一致的情况。
false:不允许不同步副本成为leader,此时如果发生ISR列表为空,会一直等待旧leader恢复,降低了可用性。
13.kafka的message格式是什么样的
一个Kafka的Message由一个固定长度的header和一个变长的消息体body组成
header部分由一个字节的magic(文件格式)和四个字节的CRC32(用于判断body消息体是否正常)构成。
当magic的值为1的时候,会在magic和crc32之间多一个字节的数据:attributes(保存一些相关属性,
比如是否压缩、压缩格式等等);如果magic的值为0,那么不存在attributes属性
body是由N个字节构成的一个消息体,包含了具体的key/value消息
14.kafka中consumer group 是什么概念
同样是逻辑上的概念,是Kafka实现单播和广播两种消息模型的手段。同一个topic的数据,会广播给不同的group;同一个group中的worker,只有一个worker能拿到这个数据。换句话说,对于同一个topic,每个group都可以拿到同样的所有数据,但是数据进入group后只能被其中的一个worker消费。group内的worker可以使用多线程或多进程来实现,也可以将进程分散在多台机器上,worker的数量通常不超过partition的数量,且二者最好保持整数倍关系,因为Kafka在设计时假定了一个partition只能被一个worker消费(同一group内)。
15.Kafka中的消息是否会丢失和重复消费?
要确定Kafka的消息是否丢失或重复,从两个方面分析入手:消息发送和消息消费。
1、消息发送
Kafka消息发送有两种方式:同步(sync)和异步(async),默认是同步方式,可通过producer.type属性进行配置。Kafka通过配置request.required.acks属性来确认消息的生产:
0---表示不进行消息接收是否成功的确认;
1---表示当Leader接收成功时确认;
-1---表示Leader和Follower都接收成功时确认;
综上所述,有6种消息生产的情况,下面分情况来分析消息丢失的场景:
(1)acks=0,不和Kafka集群进行消息接收确认,则当网络异常、缓冲区满了等情况时,消息可能丢失;
(2)acks=1、同步模式下,只有Leader确认接收成功后但挂掉了,副本没有同步,数据可能丢失;
2、消息消费
Kafka消息消费有两个consumer接口,Low-level API和High-level API:
Low-level API:消费者自己维护offset等值,可以实现对Kafka的完全控制;
High-level API:封装了对parition和offset的管理,使用简单;
如果使用高级接口High-level API,可能存在一个问题就是当消息消费者从集群中把消息取出来、并提交了新的消息offset值后,还没来得及消费就挂掉了,那么下次再消费时之前没消费成功的消息就“诡异”的消失了;
解决办法:
针对消息丢失:同步模式下,确认机制设置为-1,即让消息写入Leader和Follower之后再确认消息发送成功;异步模式下,为防止缓冲区满,可以在配置文件设置不限制阻塞超时时间,当缓冲区满时让生产者一直处于阻塞状态;
针对消息重复:将消息的唯一标识保存到外部介质中,每次消费时判断是否处理过即可。
消息重复消费及解决参考:https://www.javazhiyin.com/22910.html
16.为什么Kafka不支持读写分离?
在 Kafka 中,生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的,从 而实现的是一种主写主读的生产消费模型。
Kafka 并不支持主写从读,因为主写从读有 2 个很明 显的缺点:
(1)数据一致性问题。数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间 窗口会导致主从节点之间的数据不一致。某一时刻,在主节点和从节点中 A 数据的值都为 X, 之后将主节点中 A 的值修改为 Y,那么在这个变更通知到从节点之前,应用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不一致的问题。
(2)延时问题。类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经 历网络→主节点内存→网络→从节点内存这几个阶段,整个过程会耗费一定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它需要经历网络→主节点内存→主节点磁盘→网络→从节 点内存→从节点磁盘这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。
17.Kafka中是怎么体现消息顺序性的?
kafka每个partition中的消息在写入时都是有序的,消费时,每个partition只能被每一个group中的一个消费者消费,保证了消费时也是有序的。
整个topic不保证有序。如果为了保证topic整个有序,那么将partition调整为1.
18.消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?
offset+1
19.kafka如何实现延迟队列?
Kafka并没有使用JDK自带的Timer或者DelayQueue来实现延迟的功能,而是基于时间轮自定义了一个用于实现延迟功能的定时器(SystemTimer)。JDK的Timer和DelayQueue插入和删除操作的平均时间复杂度为O(nlog(n)),并不能满足Kafka的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为O(1)。时间轮的应用并非Kafka独有,其应用场景还有很多,在Netty、Akka、Quartz、Zookeeper等组件中都存在时间轮的踪影。
底层使用数组实现,数组中的每个元素可以存放一个TimerTaskList对象。TimerTaskList是一个环形双向链表,在其中的链表项TimerTaskEntry中封装了真正的定时任务TimerTask.
Kafka中到底是怎么推进时间的呢?Kafka中的定时器借助了JDK中的DelayQueue来协助推进时间轮。具体做法是对于每个使用到的TimerTaskList都会加入到DelayQueue中。Kafka中的TimingWheel专门用来执行插入和删除TimerTaskEntry的操作,而DelayQueue专门负责时间推进的任务。再试想一下,DelayQueue中的第一个超时任务列表的expiration为200ms,第二个超时任务为840ms,这里获取DelayQueue的队头只需要O(1)的时间复杂度。如果采用每秒定时推进,那么获取到第一个超时的任务列表时执行的200次推进中有199次属于“空推进”,而获取到第二个超时任务时有需要执行639次“空推进”,这样会无故空耗机器的性能资源,这里采用DelayQueue来辅助以少量空间换时间,从而做到了“精准推进”。Kafka中的定时器真可谓是“知人善用”,用TimingWheel做最擅长的任务添加和删除操作,而用DelayQueue做最擅长的时间推进工作,相辅相成。
参考:https://blog.csdn.net/u013256816/article/details/80697456
20.Kafka中的事务是怎么实现的?
参考:https://blog.csdn.net/u013256816/article/details/89135417
21.Kafka中有那些地方需要选举?这些地方的选举策略又有哪些?
https://blog.csdn.net/yanshu2012/article/details/54894629
- 消息列队的特点:
• 生产者消费者模式
• 先进先出(FIFO)顺序保证
• 可靠性保证
– 自己不丢数据
– 消费者不丢数据:“至少一次,严格一次”
• 至少一次就是可能会有两次,会重
• 严格一次机制就会负责一点
4.消息列队常见场景
• 系统之间解耦合
– queue模型
– publish-subscribe模型
• 峰值压力缓冲
• 异步通信
5.kafka 的架构
• producer:消息生产者
• consumer:消息消费者
• broker:kafka集群的server,负责处理消息读、写请求,存储消息
• topic:消息队列/分类
• Queue里面有生产者消费者模型
• broker就是代理,在kafka cluster这一层这里,其实里面是有很多个broker
• topic就相当于queue
• 图里没有画其实还有zookeeper,这个架构里面有些元信息是存在zookeeper上面的,整个集群的管理也和zookeeper有很大的关系
• 一个topic分成多个partition
• 每个partition内部消息强有序,其中的每个消息都有一个序号叫offset
• 一个partition只对应一个broker,一个broker可以管多个partition
• 消息不经过内存缓冲,直接写入文件
• 根据时间策略删除,而不是消费完就删除
• producer自己决定往哪个partition写消息,可以是轮询的负载均衡,或者是基于hash的partition策略
• 接下来我们看kafka是怎么生产消息,消费消息,和怎么存储消息的,来看它精髓的地方
• kafka里面的消息是有topic来组织的,简单的我们可以想象为一个队列,一个队列就是一个topic,然后它把每个topic又分为很多个partition,这个是为了做并行的,在每个partition里面是有序的,相当于有序的队列,其中每个消息都有个序号,比如0到12,从前面读往后面写,
• 一个partition对应一个broker,一个broker可以管多个partition,比如说,topic有6个partition,有两个broker,那每个broker就管3个partition
• 这个partition可以很简单想象为一个文件,当数据发过来的时候它就往这个partition上面append,追加就行,kafka和很多消息系统不一样,很多消息系统是消费完了我就把它删掉,而kafka是根据时间策略删除,而不是消费完就删除,在kafka里面没有一个消费完这么个概念,只有过期这样一个概念,这个模型带来了很多个好处,这个我们后面再讨论一下
•这里producer自己决定往哪个partition里面去写,这里有一些的策略,譬如如果hash就不用多个partition之间去join数据了
kafka 的消息存储和生产消费模型
• consumer自己维护消费到哪个offset
• 每个consumer都有对应的group
• group内是queue消费模型
– 各个consumer消费不同的partition
– 因此一个消息在group内只消费一次
• group间是publish-subscribe消费模型
– 各个group各自独立消费,互不影响
– 因此一个消息只被每个group消费一次
kafka 有哪些特 点
• 消息系统的特点:生产者消费者模型,FIFO
• 高性能:单节点支持上千个客户端,百MB/s吞吐
• 持久性:消息直接持久化在普通磁盘上且性能好
• 分布式:数据副本冗余、流量负载均衡、可扩展
• 很灵活:消息长时间持久化+Client维护消费状态
• 消息系统基本的特点是保证了,有基本的生产者消费者模型,partition内部是FIFO的,partition之间呢不是FIFO的,当然我们可以把topic设为一个partition,这样就是严格的FIFO
• 接近网卡的极限
• 直接写到磁盘里面去,就是直接append到磁盘里面去,这样的好处是直接持久化,数据不会丢,第二个好处是顺序写,然后消费数据也是顺序的读,所以持久化的同时还能保证顺序,比较好,因为磁盘顺序读比较好
• 分布式,数据副本,也就是同一份数据可以到不同的broker上面去,也就是当一份数据,磁盘坏掉的时候,数据不会丢失,比如3个副本,就是在3个机器磁盘都坏掉的情况下数据才会丢,在大量使用情况下看这样是非常好的,负载均衡,可扩展,在线扩展,不需要停服务的
• 消费方式非常灵活,第一原因是消息持久化时间跨度比较长,一天或者一星期等,第二消费状态自己维护消费到哪个地方了,Queue的模型,发布订阅(广播)的模型,还有回滚的模型
• ZeroMQ是一个socket的通信库,它是以库的形式提供的,所以说你需要写程序来实现消息系统,它只管内存和通信那一块,持久化也得自己写,还是那句话它是用来实现消息队列的一个库,其实在storm里面呢,storm0.9之前,那些spout和bolt,bolt和bolt之间那些底层的通信就是由ZeroMQ来通信的,它并不是一个消息队列,就是一个通信库,在0.9之后呢,因为license的原因,ZeroMQ就由Netty取代了,Netty本身就是一个网络通信库嘛,所以说更合适是在通信库这一层,不应该是MessageQueue这一层
• Kafka,的亮点,天生是分布式的,不需要你在上层做分布式的工作,另外有较长时间持久化,前面基本消费就干掉了,另外在长时间持久化下性能还比较高,顺序读和顺序写,另外还通过sendFile这样0拷贝的技术直接从文件拷贝到网络,减少内存的拷贝,还有批量读批量写来提高网络读取文件的性能,最后一点是比较轻量和灵活
• 消费状态谁来维护Client vs.Server
• 有人可能会说kafka写磁盘,会不会是瓶颈,其实不会而且是非常好的,为什么是非常好的,因为kafka写磁盘是顺序的,所以不断的往前产生,不断的往后写,kafka还用了sendFile的0拷贝技术,提高速度,而且还用到了批量读写,一批批往里写,64K为单位,100K为单位,每一次网络传输量不会特别小,RTT(RTT:Round-TripTime往返时间)的开销就会微不足道,对文件的操作不会是很小的IO,也会是比较大块的IO
storm+kafka 有什么好 处
• 满足获取输入。产生输出数据的基本需求
• kafka的分布式、产生输出数据的基本需求
• kafka的分布式、高性能和storm吻合
• pub-sub模型可以让多个storm业务共享输入数据
• kafka灵活消费的模式能配合storm实现不丢不重(exactly-once)的处理模型
• exactly-once,精准一次,这种模型在很多时候也是很有用的
理解零拷 贝
• 从WIKI的定义中,我们看到“零拷贝”是指计算机操作的过程中,CPU不需要为数据在内存之间的拷贝消耗资源。而它通常是指计算机在网络上发送文件时,不需要将文件内容拷贝到用户空间(User Space)而直接在内核空间(Kernel Space)中传输到网络的方式。
• Non-Zero Copy方式:
Zero Copy方式:
从上图中可以清楚的看到,Zero Copy的模式中,避免了数据在用户空间和内存空间之间的拷贝,从而提高了系统的整体性能。Linux中的sendfile()以及Java NIO中的FileChannel.transferTo()方法都实现了零拷贝的功能,而在Netty中也通过在FileRegion中包装了NIO的FileChannel.transferTo()方法实现了零拷贝。
Storm简介(一)
Storm是 Twitter开源的一个分布式的实时计算系统,用于数据的实时分析,持续计算,分布式RPC等等。
官网地址http://storm-project.net/
源码地址:https:/github.com/nathanmarz/storm
实时计算需要解决一些什么问题
最显而易见的就是实时推荐系统,比如我们在淘宝等电商购物网站去买东西,我们会在网页旁边或者底端看到与自己所需要商品相关的系列产品。这就是使用类似 storn实时计算去做的,我们非常熟悉的 Hadoop只是做离线的数据分析,无法做到实时分析计算。
比如车流量实时的计算,每天我们北京市的交通情况非常的拥挤,我们可以利用stom为我们实时计算每一个路段的拥挤度等相关路况信息。
再比如我们非常熟悉的股票,那么股票系统也是一种实时计算的机制,利用stom完全可以实现。
Storm简介(二)
实现一个实时计算系统
低延迟:都说了是实时计算系统了,延迟是一定要低的。高性能:可以使用几台普通的服务器建立环境,结余成本
分布式:Stom非常适合于分布式场景,大数据的实时计算;你的数据和计算单机就能搞定,那么不用考虑这些复杂的问题了。我们所说的是单机搞不定的情况。
可扩展:伴随着业务的发展,我们的数据量、计算量可能会越来越大,所以希望这个系统是可扩展的。
容错:这是分布式系统中通用问题,一个节点挂了不能影响我的应用,Storm可以轻松做到在节点挂了的时候实现任务转移,并且在节点重启的时候(也就是重新投入生产环境时,自动平衡任务)
可靠性:可靠的消息处理。Storm保证每个消息至少能得到一次完整处理。任务失败时,它会负责从消息源重试消息。
快速:系统的设计保证了消息能得到快速的处理,使用 ZeroMQ作为其底层消息队列。
本地模式:Storm有一个“本地模式”,可以在处理过程中完全模拟stom集群。这让你可以快速进行开发和单元测试
Storm体系结构(一)
首先我们拿 Hadoop和stom进行一个简单的对比:
storm是一个开源的分布式实时计算系统,可以简单、可靠的处理大量的数据流。Storm有很多使用场景:如实时分析,在线机器学习,持续计算,分布式RPC,ETL等等。Stom支持水平扩展,具有高容错性,保证每个消息都会得到处理,而且处理速度很快(在一个小集群中,每个结点每秒可以处理数以百万计的消息)。Storm的部署和运维都很便捷,而且更为重要的是可以使用任意编程询言来开发应用。
Storm体系结构(二)
Storm架构结构图
Storm体系结构(三)
Nimbus主节点:
主节点通常运行一个后台程序—Nimbus,用于响应分布在集群中的节点,分配任务和监测故障。这个很类似亍 Hadoop中的 Job Tracker.
Supervisor工作节点:
工作节点同样会运行一个后台程序—supervisor,用于收听工作指派并基于要求运行工作进程。每个工作节点都是 topology中一个子集的实现。而Nimbus和 Supervisor之间的协调则通过 Zookeeper系统戒者集群。
Zookeeper
Zookeeper是完成 Supervisor和 Nimbus之间协调的服务。而应用程序实现实时的逻辑则被封装到stom中的“topology”.topology则是一组由 Spouts(数据源)和Bots(数据操作)通过 Stream Groupings运行连接的图。下面对出现的术语进行更深刻的解析。
Topology(拓扑)
storm中运行的一个实时应用程序,因为各个组件间的消息流动形成逻辑上的一个拓扑结构。一个 topology是 spouts和bos组成的图,通过 stream groupings将图中的 spouts和bots连接起来,如下图:
Storm Hello World (一)
Storm Hello world(二)
我们首先回顾下storm的组件,安装这个流程去写我们的java代码:
Storm Hello world(三)
首先编写我们的数据源类:Spout。可以使用俩种方式:
继承 BaseRichSpout类
实现 IRichSpout接口
重点需要几个方法进行重写或实现:open、nextTuple、declareOutputFields
继续编写我们的数据处理类:Bolt。可以使用俩种方式:
继承 BaseBasicBolt类
实现 IRichBolt接口
重点需要几个方法进行重写或实现:execute、declareOutputFields
最后我们编写主函数(Topology)去进行提交一个任务。
在使用 Topology的时候,Storm框架为我们提供了俩种模式:本地模式和集群模式
本地模式:(无需stom集群,直接在jaa中即可运行,一般用于测试和开发阶段)执行运行main函数即可。
集群模式:(需要Stom集群,把实现的java程序打包,然后 Topology进行提交)需要把应用打成jar,使用stom命令把 Topology提交到集群中去
Storm Hello World(四)
提交topology命令:storm jar storm01.jar bhz.topology.PWTopology1
查看任务命令:storm list
另外俩个 supervisor节点jps显示:
最后我们可以看下俩个工作节点的 usr/local/temp下的文件信息是否有内容
Storm APl
Topology(拓扑)
Stream grouping(流分组、数据的分发方式)
Spout(喷口、消息源)
Bolt(螺栓、处理器)
Worker(工作进程)
Executor(执行器、Task的线程)
Task(具体的执行任务)
Configuration(配置)
Storm拓扑配置(一)
工作进程、并行度、任务数设置:
我们首先设置了2个工作进程(也就是2个jvm)
然后我们设置了 spout的并行度为2(产生2个执行器和2个任务)
第一个bolt的并行度为2并且指定任务数为4(产生2个执行器和4个任务)
第二个bolt的并行度为6(产生6个执行器和6个任务)
因此:该拓扑程序共有俩个工作进程(worker),2+2+6=10个执行器
(executor),2+4+6=12个任务(task)。每个工作进程可以领取到12/2=6个任务。默认情况下一个执行器执行一个任务,但如果指定了任务的数目。则任务会平均分配到执行器中。
Storm什么是拓扑?(二)
我们在使用storm进行流式计算的时候,都必须要在Main函数里面建立所谓的“拓扑”,拓扑是什么?
拓扑是一个有向图的计算。(也就是说在计算的过程中是有流向的去处理业务逻辑,节点之间的连接显示数据该如何进入下一个节点,他们是进行连接传递的)
拓扑运行很简单,只需要使用 storm命令,把一个jar提交给 nimbus节点,numbus就会把任务分配给具体的子节点(supervisor)去工作。
我们创建拓扑非常简单:
第一,构建 TopologyBuilder对象
第二,设置 Spout(喷口)数据源对象(可以设置多个)
第三,设置Bolt(螺栓)数据处理对象(可以设置多个)
第四,构建 Config对象
第五,提交拓扑
Storm流分组(一)
Stream Grouping:为每个bolt指定应该接受哪个流作为输入,流分组定义了如何在bolt的任务直接进行分发。
Storm流分组(二)☆☆☆
Shuffle Grouping随机分组:保证每个bot接收到的 tuple数目相同
Fields Grouping按字段分组:比如按 userid来分组,具有同样 userid的 tuple会被分到相同的Bots,而不同的 userid则会被分配到不同的Bolts。
All Grouping广播发送:对于每一个 tuple,所有的Bots都会收到。
Global Grouping:全局分组:这个 tuple被分配到stom中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。
Non Grouping无分组:假设你不关心流式如何分组的煤科院使用这种方式,目前这种分组和随机分组是一样的效果,不同的是Stom会把这个Bolt放到Bolt的订阅者的同一个线程中执行。
Direct Grouping直接分组:这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为 Direct stream的消息流可以声明这种分组方法而且这种消息tupe必须使用 emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的 taskid(Outputcollector.emit,方法也会返回 taskid)
本地分组:如果目标bo在同一工作进程存在一个或多个任务,元祖会随机分配给执行任务,否则该分组方式与随机分组方式是一样的。
storm流分组(三)
Storm WorldCount
我们以一个统计单词的小程序来说明问题。(storm02)
上面的示意图中有4个组件,分别为一个 spout和3个bolt,当数据源 spout.取得数据(可以是一个句子,里面包含多个单词)以后,发送给 SolitBolt进行切分,然后由 CountBolt进行统计结果,最终由ReportBolt记录结果
Storm Spout的可靠性(一)
Spout是 Storm数据流的入口,在设计拓扑时,一件很重要的事情就是需要考虑消息的可靠性,如果消息不能被处理而丢失是很严重的问题。
我们继续作实验,以一个传递消息并且实时处理的例子,来说明这个问题。
新建 maven项目(storm03)
通过示例我们知道,如果在第一个bolt处理的时候出现异常,我们可以让整个数据进行重发,但是如果在第二个bolt处理的时候出现了异常,那么我们也会让对应的整个spout里的数据重发,这样就会出现事务的问题,我们就需要进行判断或者是进行记录
如果是数据入库的话,可以与原ID进行比对。
将一批数据定义唯一的ID入库(幂等性判断事物)
如果是事务的话在编写代码时,尽量就不要进行拆分 tuple
或者使用 storm的 Trident框架
Storm Spout的可靠性(三)
下图是 spout处理可靠性的示意图:当 spout发送一个消息时,分配给俩个bolt分别处理,那么在最后一个bolt接受的时候会做异或运算
RPC介绍
调用客户端句柄;执行传送参数
调用本地系统内核发送网络
消息消息传送到远程主机
服务器句柄得到消息并取得参数
执行远程过程
执行的过程将结果返回服务器句柄
服务器句柄返回结果,调用远程系统内核
消息传回本地主机
客户句柄由内核接收消息
客户接收句柄返回的数据
Storm DRPC介绍
分布式RPc(distributed RPc,DRPc)
Storm里面引入DRPC主要是利用 storm的实时计算能力来并行化cPU密集型(CPU intensive)的计算任务。DRPc的 storm topology以函数的参数流作为输入,而把这些函数调用的返回值作为 topology的输出流。
DRPc其实不能算是 storm本身的一个特性,它是通过组合stom的原语stream、spout、bolt、topology而成的一种模式(pattern)。本来应该把DRPc单独打成一个包的,但是DRPC实在是太有用了,所以我们把它和storm捆绑在一起。
Distributed RPC是通过一个”DRPC Server”来实现
DRPC Server的整体工作过程如下:
1接收一个RPC请求
2)发送请求到 storm topology
3)从 storm topology接收结果。
4)把结果发回给等待的客户端。
Storm DRPC配置和示例
Storm提供了一个称作 LinearDRPCTopologyBuilder的 Topology builder,它把实现DRPc的几乎所有步骤都自简化了。
相关代码地址https://github.com/nathanmarz/storm-starter/blob/master/src/jvm/storm/starter/BasicDRPCTopology.java
实现DRPC步骤:(新建 maven项目,storm04)
1需要修改配置文件内容为(分别修改每台机器配置):
vim /usr/local/apache-storm-0.9.2/conf/storm.yaml
drpc.servers:
-"192.168.1.114"
2需要启动stom的drpc:服务,命令:storm drpc&
3把相应的 topology代码上传到stom服务器上
storm jar storm04.jar bhz.drpc1.BasicDRPCTopology exc
4在本地调用远程 topology即可。
Storm DRPC实例场景
我们继续看下一个示例:
主要使用 storm的并行计算能力来进行,我们在微博、论坛进行转发帖子的时候,是对u进行转发,分析给粉丝(关注我的人),那么每一个人的粉丝(关注者可能会有重复的情况),这个例子就是统计一下帖子(ur)的转发人数。
相关代码地址:https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java
实现步骤如下:
第一,获取当前转发帖子的人。
第二,获取当前人的粉丝(关注者)。
第三,进行粉丝去重。
第四,统计人数
第五,最后使用drpc远程调用 topology返回执行结果。
Storm Trident介绍
Trident是在stom基础上,一个以实时计算为目标的高度抽象。它在提供处理大吞吐量数据能力(每秒百万次消息)的同时,也提供了低延时分布式查询和有状态流式处理的能力。如果你对Pig和 Cascading这种高级批处理工具很了解的话,那么应该很容易理解 Trident,因为他们之间很多的概念和思想都是类似的。Trident提供了 joins,aggregations,grouping,functions,以及 filters等能力。除此之外,Trident还提供了一些与门的原语,从而在基于数据库戒者其他存储的前提下来应付有状态的递增式处理。Trident也提供致性(consistent)、有且仅有一次(exactly-once)等语义,这使得我们在使用 trident toplogy时变得容易。
我们首先熟悉下 Trident的概念:
"Stream"是 Trident中的核心数据模型,它被当做一系列的 batch来处理。在Storm集群的节点之间,一个 stream被划分成很多 partition(分区),对流的操作(operation)是在每个 partition上并行执行的。
state Query、partition Persist.、poe(filter、partitionAggregate、
对每个 partition的局部操作包括:function
新建 maven工程(storm05)
Storm Trident Function
Storm Trident Filter
Storm Trident projection
Storm Trident operation
Storm Trident aggregate
Batch和 Scout与 Transactiona(一)
Trident提供了下面的语义来实现有且有一次被处理的目标
1、Tuples是被分成小的集合(一组 tuple被称为一个 batch)被批量处理的。
2、每一批 tuples被给定一个唯1D作为事务ID(txid),当这一个 batch被重发时,tid不变。
3、batch和 batch之间的状态更新时严格顺序的。比如说 batch3的状态的更新必须要等到 batch2的状态更新成功之后才可以进行。
有了这些定义,你的状态实现可以检测到当前 batch是否以前处理过,并根据不同的情况进行不同的处理,这个处理取决于你的输入 spout。有三种不同类型的可以容错的 sqout:
1、non-transactional(无事务支持的 spout)
2、transactional(事务支持的 spout)
3、opaque transactional(不透明事务支持的 spout)
Batch和 Scout与 Transactiona(二)
transactional sqout实现
1、重发操作:
2、重发结果:
opaque transactional sqout实现
实现ITridentspout接口
最通用的AP可以支持 transactional or opaque transactional语义
实现IBatchSpou接口:
一个 non-transactional spout
实现IPArtitioned Tridentspout接口:
一个 transactional spout
实现IOpaquePartitioned Tridentspout接口:
一个opaque transactional spout
Storm与 KafKa
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。这种动作(网页浏览,搜索和其他用户的行动是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。对于像 Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。
Kafka的目的是通过 Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。