Kafka消费者

    1消费者端配置参数 brokerlist:和生产者一样,kafka服务端地址,至少配置2个; key反序列化器:将字节数组转化成对象,需要个生产者的序列化器对应上; value反序列化器:同上 groupid:消费者组id;   消费者组中消费者和分区的关系是:每个分区只能同时被一个消费者组内的一个消费者消费;   消费者组的分区分配规则,可以有系统参数: partition.assignment.strategyf 来进行设置对应的类: 目前系统提供的有3种:  
range策略(默认) org.apache.kafka.clients.consumer.RangeAssignor 对每个主题,获取所有的分区数,然后除以消费者组内的消费者个数,得到商和余数,商就是每个组内消费者负责的分区,余数,则给排序后的前面几位的消费者 消费者组 有2个 c1 和c2 2个主题 s1,s2,每个主题有3个分区: 所有分区:s1p1,s1p2,s1p3 和 s2p1,s2p2,s2p3 c1 对应的分区s1p1  s1p3 s2p1  ,s2p3 c2 对应的分区 s1p2  s2p2   注意是对每个主题分开的;
roundronbin分配策略 org.apache.kafka.clients.consumer.RoundRobinAssignor 1:消费者内的消费订阅的都是相同的主题:则列出所有的主题分区,然后总和再除以消费者数目,商就是每个组内消费者负责的分区,余数,则给排序后的前面几位的消费者 2:如果不是相同的;对于没有订阅指定主题的,消费者获取不到该分区,其余的和上面相同 消费者组 有2个 c1 和c2 订阅相同主题 2个主题 s1,s2,每个主题有3个分区: 所有分区:s1p1,s1p2,s1p3 和 s2p1,s2p2,s2p3 c1 对应的分区s1p1  s1p3   s2p2 c2 对应的分区 s1p2  s2p1 s2p3   消费者组 有2个 c1 和c2 c3 订阅不相同主题 c1 订阅 s1,c2 订阅s1,s2,c3都订阅了 2个主题 s1,s2,s3,主题分别有123个分区: 所有分区:s1p1,s2p1,s2p2 和 s3p1,s3p2,s3p3   c1 对应的分区s1p1   c2 对应的分区 s2p1  c2 对应的分区 s2p2  s3p1,s3p2,s3p3    
sticky   有2个目的: (1)分区要尽可能均匀 (2)尽可能与上一次保持一致 第一个目标大于第二个目标 消费者组 有2个 c1 和c2 c3 订阅相同主题 3个主题 s1,s2,s3,主题分别2个分区: 所有分区:s1p1,s1p2,s2p1,s2p2 和 s3p1,s3p2   c1 对应的分区s1p1  s2p2 c2 对应的分区 s1p2  s3p1 c2 对应的分区 s2p1  s3p2     此时c2 退出了该消费者组 roundronbin上面会分配为 c1 对应的分区s1p1 s2p1  s3p1 c2 对应的分区 s1p2   s2p2 s3p2 而sticky 在保持分区均匀的前提下,会减少分区的改动   sticky c1 对应的分区s1p1  s2p2 s1p2  c2 对应的分区 s2p1  s3p2 s3p1        
  2创建消费者实例 KafkaConsumer consumer = new KafkaConsumer(prop);   3订阅主题 3种订阅方式,订阅会覆盖之前的订阅的主题;  
subscribe(Collection<T> list,均衡器) 直接订阅主题集合  
subscribe(正则表达式,均衡器) 订阅的主题就是,主题名字满足正则表达式;动态的  
assagin(Topicpation) 订阅某个分区  
  4拉取消息 ConsumerRecords recodes = consumer.poll(时间);  
1:拉取消息,只会去leader节点上的leader分区副本读取消息
2:kafka会有一个consumer_offset来记录该主题该分区该消费者组的 消息偏移量,就是该分区的HW 每个分区有多个副本,分布在各个broker上,这些副本的内容相似,不同的就是有些副本消息比较 老了;只有新老之分,主要是看follerower副本同步leader副本的程度; 每个副本分区的最大消息偏移量+1就是这个分区副本的LEO 一个分区的所有副本的最小LEO就是这个分区的Hw,高水位,消费者只能拉到这个偏移量之前的消息  
3:时间,是代表如果拉不到消息,会阻塞这么长时间;依旧没有就返回异常;
 
多线程实现消费高效   (1)实例化多个消费者线程,然后每个消费线程,去消费消息;   class ThreadA extends Thread{     Consumer consumer ;       public ThreadA(Consumer consumer){         this.consumer= consumer; }   @overrider public void run(){    ConsumerRecords recodes = consumer.poll(1000);  Iteator it =    records.iteator();     while(it.hasNext()){       ConsumerRecord recodes =  it.next();         //业务逻辑     } }   }     class Test{ main{     for(int i = 3; i>0;i--){         ThradA a = new ThreadA(new Consumer(pros));         a.start()     }      }   }   每个消费者线程单独维护一个TCP连接; 注意:这里消费者线程的个数不要超过 订阅主题的分区数;   (2)再(1)的基础上,增加线程池,帮助加速业务逻辑处理   class ThreadA extends Thread{     Consumer consumer ;     Exectors exctors = new FixedExectorssesrvice;         public ThreadA(Consumer consumer){         this.consumer= consumer; }     @overrider public void run(){    ConsumerRecords recodes = consumer.poll(1000); Iteator it =    records.iteator();     while(it.hasNext()){       ConsumerRecord recode =  it.next();         //业务逻辑         exctors.submit(new ThreadB(recode));     } }     }       class ThreadB extends Thread{     ConsumerRecord recode;         publicThreadB(ConsumerRecord recode){         this.recode=recode; }     @overrider public void run(){    // 对recode业务处理 }     }        提高了消费性能     5提交消费位移    
通过设置系统参数,auto.commited,flag  
同步提交  
异步提交  
   
上一篇:kafka学习(三)kafka基础理论


下一篇:Kafka面试题总结