文章目录
2021-09-17
说明
- 本博客每周五更新一次。
- 本文功能性博文,提取kafka所有消费组相关信息,整理后,提供为prometheus。
分享
环境
- kafka2.3.0
实现
maven
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
</dependency>
代码
- kerberos认证相关功能,此处不再附属,如有需要,可参照之前文章。
- topic最大offset需要单独提取,输出数据结构为:时间|消费组|topic|最大offset|已消费offset|lag
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.TimerTask;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.kafka.customer.util.CollectionUtil;
import com.kafka.customer.util.ConfigUtil;
import com.kafka.customer.util.FileUtil;
import com.kafka.customer.util.TimeUtil;
public class KafkaLogTimer extends TimerTask {
private final Logger log= LoggerFactory.getLogger(KafkaLogTimer.class);
/**
* admin 客户端
*/
private AdminClient adminClient;
/**
* consumer客户端
*/
private KafkaConsumer<String, String> consumer;
/**
* 时间
*/
private long time;
/**
* 输出文件名
*/
private String fileName;
private String intervalChar="|";
/**
* 执行状态 0 待执行 1 执行完毕
*/
private int state;
@Override
public void run() {
String timeStr=TimeUtil.getFileTime();
time=TimeUtil.getUtcMinuteTime();
fileName=timeStr+ConfigUtil.txt;
log.info("{} Start To Run",timeStr);
state=0;
KafkaLogThread kafkaLogThread=new KafkaLogThread();
kafkaLogThread.start();
long createTime=System.currentTimeMillis();
while(true&&state==0) {
//如果传入线程没有执行完
if ((System.currentTimeMillis() - createTime) >= ConfigUtil.taskTime) {
log.error("Time {} Deal Over Time Will To Stop",timeStr);
kafkaLogThread.interrupt();
break;
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
log.error("Wanting Thread Run Error");
}
}
log.info("{} End To Finish",timeStr);
}
/**
*
* @author wangzonghui
* @date 2021-08-27 10:13:47
* @Description kafka信息获取线程
*/
class KafkaLogThread extends Thread{
@Override
public void run() {
try {
//连接
init();
//获取日志信息
List<String> logdataList =createLog();
//关闭连接
close();
//存储文件
if(CollectionUtil.isNotEmptyCollection(logdataList)) {
String outputFile=ConfigUtil.outputPath+fileName;
FileUtil.createDataFile(logdataList, outputFile);
}else {
log.info("{} Get Log Is Null",TimeUtil.getTime());
}
} catch (Exception e) {
log.info(fileName+" Create Error",e);
}
state=1;
}
/**
* 初始化环境变量
*/
public void init() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
//kerbores安全认证
if(ConfigUtil.kerberos==0){
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "GSSAPI");
props.put("sasl.kerberos.service.name", "kafka");
}
adminClient= AdminClient.create(props);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
}
/**
* 生成日志主类
*/
public List<String> createLog() {
//消费组信息获取
ListConsumerGroupsResult list=adminClient.listConsumerGroups();
KafkaFuture<Collection<ConsumerGroupListing>> data=list.all();
List<String> logdataList=new ArrayList<>();
try {
if(data!=null&&data.get()!=null&&CollectionUtil.isNotEmptyCollection(data.get())) {
Collection<ConsumerGroupListing>resultlist=data.get();
Map<TopicPartition, OffsetAndMetadata> resultOffset;
String context;
StringBuffer buf;
TopicPartition topicPartition;
long endOffset,consumerOffset,lag;
for(ConsumerGroupListing consumerGroupListing:resultlist) {
resultOffset=adminClient.listConsumerGroupOffsets(consumerGroupListing.groupId()).partitionsToOffsetAndMetadata().get();
if(resultOffset!=null&&CollectionUtil.isNotEmptyMap(resultOffset)) {
for(Entry<TopicPartition, OffsetAndMetadata> mapItem :resultOffset.entrySet()) {
context="";
buf=new StringBuffer();
buf.append(time).append(intervalChar);
topicPartition=mapItem.getKey();
endOffset=getLogEndOffset(topicPartition);
consumerOffset=mapItem.getValue().offset();
lag=endOffset-consumerOffset;
buf.append(consumerGroupListing.groupId()).append(intervalChar).append(topicPartition.topic()).append(intervalChar).append(endOffset).append(intervalChar).append(consumerOffset).append(intervalChar).append(lag);
context=buf.toString();
logdataList.add(context);
}
}else {
log.error("Get Consumer Group {} Of Null",consumerGroupListing.groupId());
}
}
}else {
log.info("No Found Consumer Gourp");
}
} catch (Exception e) {
log.error("Get Kafka Consumer Error:"+e.toString(),e);
}
return logdataList;
}
/**
* 获取topic endoffset
* @param topicPartition
* @return
*/
private long getLogEndOffset(TopicPartition topicPartition) {
consumer.assign(Arrays.asList(topicPartition));
consumer.seekToEnd(Arrays.asList(topicPartition));
return consumer.position(topicPartition);
}
/**
* 关闭连接
*/
public void close() {
if(adminClient!=null) {
adminClient.close();
}
if(consumer!=null) {
consumer.close();
}
}
}
}
总结
- 人生有岸,知识无涯,未来路上好好奋斗加油,少年。