java code of health check :
import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.DescribeClusterOptions; import org.apache.kafka.clients.admin.DescribeClusterResult; import org.apache.kafka.common.config.ConfigResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.actuate.health.AbstractHealthIndicator; import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.Status; import org.springframework.kafka.core.KafkaAdmin; import org.springframework.stereotype.Component; import java.util.Collections; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; /** * localhost:8080/actuator/health * refer docs: * https://github.com/spring-projects/spring-boot/blob/7cd19822c6de99e835bcaff1307f104e863da265/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java * * spring boot github address: * https://github.com/spring-projects/spring-boot/tree/master/spring-boot-project */ @Component("kafka") public class MyKafkaHealthIndicator extends AbstractHealthIndicator { private Logger logger = LoggerFactory.getLogger(MyKafkaHealthIndicator.class); static final String REPLICATION_PROPERTY = "transaction.state.log.replication.factor"; @Autowired private KafkaAdmin kafkaAdmin; private Integer TIMEOUT_MS = 1000; private DescribeClusterOptions describeOptions = new DescribeClusterOptions().timeoutMs(TIMEOUT_MS); @Override protected void doHealthCheck(Health.Builder builder) throws Exception { try (AdminClient adminClient = AdminClient.create(this.kafkaAdmin.getConfigurationProperties())) { DescribeClusterResult clusterResult = adminClient.describeCluster(this.describeOptions); // print detail of each of the nodes adminClient.describeCluster(this.describeOptions) .nodes() .get() .stream() .forEach(node -> { try { int replicationNum = getReplicationNumber(node.id() + "", adminClient); logger.info("遍历kafka节点:node.id={}, host={}, port={}, replicationNum={}", node.id(), node.host(), node.port(), replicationNum); } catch (Exception e) { logger.error(e.getMessage(), e); } }); // kafka 充当控制器的节点 the node as controller node String brokerId = clusterResult.controller().get(TIMEOUT_MS, TimeUnit.MILLISECONDS).idString(); int replicationNum = getReplicationNumber(brokerId, adminClient); int availableNodesNum = clusterResult.nodes().get().size(); Status status = availableNodesNum >= replicationNum ? Status.UP : Status.DOWN; builder.status(status) .withDetail("clusterId", clusterResult.clusterId().get()) .withDetail("controller-node-brokerId", brokerId) .withDetail("available-nodes-number", availableNodesNum); } } private int getReplicationNumber(String brokerId, AdminClient adminClient) throws ExecutionException, InterruptedException { ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, brokerId); String replicationNumStr = adminClient .describeConfigs(Collections.singletonList(configResource)) .all() .get() .get(configResource) .get(REPLICATION_PROPERTY) .value(); return Integer.valueOf(replicationNumStr); } }
regist KafkaAdmin bean in springboot:
@Bean public KafkaAdmin admin() { Map<String, Object> configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, StringUtils.arrayToCommaDelimitedString("192.168.2.62:9092,192.168.2.63:9092".split(","))); return new KafkaAdmin(configs); }
end.
customize health checking of kafka in springboot app 在springboot应用中自定义kafka的健康状况检查