kafka是什么
- kafka是采用scala语言开发的一个 多分区 、 多副本 且 基于zookeeper协调的 分布式 消息系统。
- kafka是 高吞吐、可持久化、可水平扩展、支持流数据等多种特性的分布式流式处理平台
- kafka扮演的三大角色:消息系统、存储系统、流式处理平台
基本概念
- Producer:
- Consumer:
- broker:
docker安装kafka
- 安装docker-compose
sudo curl -L https://github.com/docker/compose/releases/download/1.16.1/docker-compose-`uname -s`-`uname -m` -o /usr/local/bin/docker-compose
#daocloud镜像源
#curl -L https://get.daocloud.io/docker/compose/releases/download/1.22.0/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose
chmod +x /usr/local/bin/docker-compose
docker-compose --version
下载zookeeper和kafka镜像
docker pull wurstmeister/kafka:2.12-2.2.0
docker pull wurstmeister/zookeeper:3.4.6
启动zookeeper集群和kafka集群
docker-compose.yml的内容如下
version: '3.1'
services:
zoo1:
image: wurstmeister/zookeeper:3.4.6
restart: always
hostname: zoo1
container_name: zoo1
#domainname:
ports:
- 2181:2181
volumes:
- /usr/local/docker_app/zookeeper/zoo1/data:/data
- /usr/local/docker_app/zookeeper/zoo1/datalog:/datalog
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
zoo2:
image: wurstmeister/zookeeper:3.4.6
restart: always
hostname: zoo2
container_name: zoo2
ports:
- 2182:2181
volumes:
- /usr/local/docker_app/zookeeper/zoo2/data:/data
- /usr/local/docker_app/zookeeper/zoo2/datalog:/datalog
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
zoo3:
image: wurstmeister/zookeeper:3.4.6
restart: always
hostname: zoo3
container_name: zoo3
ports:
- 2183:2181
volumes:
- /usr/local/docker_app/zookeeper/zoo3/data:/data
- /usr/local/docker_app/zookeeper/zoo3/datalog:/datalog
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
kafka1:
image: wurstmeister/kafka:2.12-2.2.0
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka1
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181"
KAFKA_BROKER_ID: 1
depends_on:
- zoo1
- zoo2
- zoo3
container_name: kafka1
hostname: kafka1
kafka2:
image: wurstmeister/kafka:2.12-2.2.0
ports:
- "9093:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka2
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181"
KAFKA_BROKER_ID: 2
depends_on:
- zoo1
- zoo2
- zoo3
container_name: kafka2
hostname: kafka2
kafka3:
image: wurstmeister/kafka:2.12-2.2.0
ports:
- "9094:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka3
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181"
KAFKA_BROKER_ID: 3
depends_on:
- zoo1
- zoo2
- zoo3
container_name: kafka3
hostname: kafka3
启动集群
#启动集群
docker-compose -f docker-compose.yml up -d
#查看启动状态
docker-compose -f docker-compose.yml ps
如下图
测试集群
docker exec -it kafka1 /bin/bash
kafka-topics.sh -zookeeper zoo1:2181 --create --topic topic-demo --replication-factor 1 --partitions 2
kafka-topics.sh -zookeeper zoo1:2181 --describe --topic topic-demo
kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic topic-demo
docker exec -it kafka1 /bin/bash
kafka-console-producer.sh --broker-list kafka1:9092 --topic topic-demo
>hello
>hello kafka
使用java客户端连接kafka
- spring boot项目,pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>spring-boot-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-boot-kafka</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
- java代码
package com.example.demo;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringBootKafkaApplication {
public static final String brokerList = "kafka1:9092";
public static final String topic = "topic-demo";
public static void main(String[] args) {
SpringApplication.run(SpringBootKafkaApplication.class, args);
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "hello, renchenglin");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
producer.close();
}
}
遇到的坑
在启动kafka时,
KAFKA_ADVERTISED_HOST_NAME: kafka3
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092
配置为
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
时,在安装docker的虚拟机上(IP为192.168.31.109)可以正常测试,在宿主机(安装docker的虚拟机的物理机,IP为192.168.31.201)上使用java程序无法访问
后来改为
KAFKA_ADVERTISED_HOST_NAME: kafka3
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092
时仍然无法访问,最终在物理机上配置上配置DNS
C:WindowsSystem32driversetchosts 文件中追加如下配置OK
192.168.31.109 kafka1
192.168.31.109 kafka2
192.168.31.109 kafka3
kafka sasl
touch kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
user_alice="alice";
};
touch kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin";
};
更改server.properties配置文件:
listeners=SASL_PLAINTEXT://localhost:9092
advertised.host.name=kafka1
advertised.listeners=SASL_PLAINTEXT://192.168.31.109:9092
zookeeper.connect=192.168.31.109:2181
# 使用的认证协议
security.inter.broker.protocol=SASL_PLAINTEXT
#SASL机制
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
# 完成身份验证的类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 如果没有找到ACL(访问控制列表)配置,则允许任何操作。
super.users=User:admin
在kafka-console-consumer.sh和kafka-console-producer.sh中添加:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
touch kafka_zoo_jaas.conf
ZKServer{
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin";
};
kafka-server-start.sh
export KAFKA_OPTS=" -Djava.security.auth.login.config=/root/kafka/kafka_2.12-2.2.0/config/kafka_server_jaas.conf"
kafka-console-consumer.sh和kafka-console-producer.sh
export KAFKA_OPTS=" -Djava.security.auth.login.config=/root/kafka/kafka_2.12-2.2.0/config/kafka_client_jaas.conf"
bin/zookeeper-server-start.sh config/zookeeper.properties & (&代表后台运行)
bin/kafka-server-start.sh config/server.properties &
bin/kafka-console-producer.sh --broker-list 192.168.31.109:9092 --topic test --producer.config config/producer.properties
bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.109:9092 --topic test --from-beginning --consumer.config config/consumer.properties