生产实践Kafka与ELK

项目背景,第三方发送数据到kafka。我方负责消费,解码、存储、入库。

开发环境:集群1.0kafka,springboot开发程序

问题:1.kafka服务端与客户端版本不一致,服务端1.0,客户端0.8,导致程序消费到一部分重启后,为消费的数据丢失直接别为已消费,导致剩余数据未走流程。

2. 数据丢失节点盲目,无法找到,需要大量测试。

3. 线程数太多,分配资源浪费,dfs单节点瓶颈。


解决问题:

1. 服务端与客户端版本不一致,0.8版本中auto.offset.reset=smallest/largest/anything else  与1.0版本auto.offset.reset=earliest/latest/none

参数不一致,导致重启程序后数据丢失,调整版本一致后,解决此问题。

2. 使用springboot 继承metrics做统计

pom中添加

<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-actuator</artifactId>
		</dependency>


代码中注入


 @Autowired
 CounterService counterService;//计数
    
 @Autowired
 GaugeService gaugeService;//计时



counterService.increment("zhibiaojishu");//计数的指标名称
gaugeService.submit("zhibiaojishi");//计时的指标名称

继承es,将指标存入到es中,

中程序中加入

@SpringBootApplication
public class KafkaApplication {

	@Bean
	@ConfigurationProperties("metrics.export")
	@ExportMetricWriter
	public MetricWriter metricWriter() {
		return new ElasticsearchMetricWriter();
	}
	public static void main(String[] args) {
		SpringApplication.run(KafkaApplication.class, args);
	}
}

具体代码可参考https://github.com/lane-cn/spring-boot-metrics-sample

安装elasticsearrch-head-1.0.0.jar      head工具 直接java -jar elasticsearrch-head-1.0.0.jar启动,配置ip和端口号

 elasticsearrch-5.6.9.tar.gz   更改集群名字和端口号  ./elasticsearrch -d 后台启动,使用内存可使用大一点

安装kibana-5.6.9.tar.gz 配置ip,启动后可收集es中的数据做统计报表,此时各项指标脚本监控形成图标已完成。


3. 线程数根据cpu物理内存做修改一般保持一致,dfs单点瓶颈问题需要搭建集群环境。


到此ELk模拟环境完成。运维人员可直接查看kibana图标监控各个数据指标。








上一篇:SparkStreaming中foreachRDD、foreachPartition和foreach 及序列化问题


下一篇:使用SparkSql 读取ES数据