Kafka

kafka是什么

  • kafka是采用scala语言开发的一个 多分区 、 多副本 且 基于zookeeper协调的 分布式 消息系统。
  • kafka是 高吞吐、可持久化、可水平扩展、支持流数据等多种特性的分布式流式处理平台
  • kafka扮演的三大角色:消息系统、存储系统、流式处理平台

基本概念

  • Producer:
  • Consumer:
  • broker:

docker安装kafka

  • 安装docker-compose
sudo curl -L https://github.com/docker/compose/releases/download/1.16.1/docker-compose-`uname -s`-`uname -m` -o /usr/local/bin/docker-compose

#daocloud镜像源
#curl -L https://get.daocloud.io/docker/compose/releases/download/1.22.0/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose

chmod +x /usr/local/bin/docker-compose
docker-compose --version

下载zookeeper和kafka镜像

docker pull wurstmeister/kafka:2.12-2.2.0
docker pull wurstmeister/zookeeper:3.4.6

启动zookeeper集群和kafka集群

docker-compose.yml的内容如下

version: '3.1'

services:
  zoo1:
    image: wurstmeister/zookeeper:3.4.6
    restart: always
    hostname: zoo1
    container_name: zoo1
    #domainname: 
    ports:
      - 2181:2181
    volumes:
      - /usr/local/docker_app/zookeeper/zoo1/data:/data
      - /usr/local/docker_app/zookeeper/zoo1/datalog:/datalog
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888

  zoo2:
    image: wurstmeister/zookeeper:3.4.6
    restart: always
    hostname: zoo2
    container_name: zoo2
    ports:
      - 2182:2181
    volumes:
      - /usr/local/docker_app/zookeeper/zoo2/data:/data
      - /usr/local/docker_app/zookeeper/zoo2/datalog:/datalog
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888

  zoo3:
    image: wurstmeister/zookeeper:3.4.6
    restart: always
    hostname: zoo3
    container_name: zoo3
    ports:
      - 2183:2181
    volumes:
      - /usr/local/docker_app/zookeeper/zoo3/data:/data
      - /usr/local/docker_app/zookeeper/zoo3/datalog:/datalog
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888


  kafka1:
    image: wurstmeister/kafka:2.12-2.2.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka1
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181"
      KAFKA_BROKER_ID: 1
    depends_on:
      - zoo1
      - zoo2
      - zoo3
    container_name: kafka1
    hostname: kafka1

  kafka2:
    image: wurstmeister/kafka:2.12-2.2.0
    ports:
      - "9093:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka2
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181"
      KAFKA_BROKER_ID: 2
    depends_on:
      - zoo1
      - zoo2
      - zoo3
    container_name: kafka2
    hostname: kafka2


  kafka3:
    image: wurstmeister/kafka:2.12-2.2.0
    ports:
      - "9094:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka3
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181"
      KAFKA_BROKER_ID: 3
    depends_on:
      - zoo1
      - zoo2
      - zoo3
    container_name: kafka3
    hostname: kafka3

启动集群

#启动集群
docker-compose -f docker-compose.yml up -d

#查看启动状态
docker-compose -f docker-compose.yml ps

如下图
Kafka

测试集群

docker exec -it kafka1 /bin/bash 
kafka-topics.sh -zookeeper zoo1:2181 --create --topic topic-demo --replication-factor 1 --partitions 2
kafka-topics.sh -zookeeper zoo1:2181 --describe --topic topic-demo
kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic topic-demo

docker exec -it kafka1 /bin/bash
kafka-console-producer.sh --broker-list kafka1:9092 --topic topic-demo
>hello
>hello kafka

Kafka
Kafka

使用java客户端连接kafka

  • spring boot项目,pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>spring-boot-kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-boot-kafka</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

  • java代码
package com.example.demo;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringBootKafkaApplication {
    public static final String brokerList = "kafka1:9092";

    public static final String topic = "topic-demo";
    
    public static void main(String[] args) {
        SpringApplication.run(SpringBootKafkaApplication.class, args);
        
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "hello, renchenglin");
        
        try {
            producer.send(record);
        } catch (Exception e) {
            e.printStackTrace();
        }
        
        producer.close();
    }
    

}

遇到的坑

在启动kafka时,
KAFKA_ADVERTISED_HOST_NAME: kafka3
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092
配置为
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
时,在安装docker的虚拟机上(IP为192.168.31.109)可以正常测试,在宿主机(安装docker的虚拟机的物理机,IP为192.168.31.201)上使用java程序无法访问

后来改为
KAFKA_ADVERTISED_HOST_NAME: kafka3
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092
时仍然无法访问,最终在物理机上配置上配置DNS
C:WindowsSystem32driversetchosts 文件中追加如下配置OK

192.168.31.109 kafka1
192.168.31.109 kafka2
192.168.31.109 kafka3

kafka sasl


touch kafka_server_jaas.conf
KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin"
    user_admin="admin"
    user_alice="alice";
};



touch kafka_client_jaas.conf
KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin";
};



更改server.properties配置文件:
listeners=SASL_PLAINTEXT://localhost:9092
advertised.host.name=kafka1
advertised.listeners=SASL_PLAINTEXT://192.168.31.109:9092
zookeeper.connect=192.168.31.109:2181
# 使用的认证协议 
security.inter.broker.protocol=SASL_PLAINTEXT
#SASL机制 
sasl.enabled.mechanisms=PLAIN  
sasl.mechanism.inter.broker.protocol=PLAIN   
# 完成身份验证的类 
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer 
# 如果没有找到ACL(访问控制列表)配置,则允许任何操作。 
super.users=User:admin


在kafka-console-consumer.sh和kafka-console-producer.sh中添加:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN


touch kafka_zoo_jaas.conf
ZKServer{
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin"
    user_admin="admin";
};


kafka-server-start.sh
export KAFKA_OPTS=" -Djava.security.auth.login.config=/root/kafka/kafka_2.12-2.2.0/config/kafka_server_jaas.conf"


kafka-console-consumer.sh和kafka-console-producer.sh
export KAFKA_OPTS=" -Djava.security.auth.login.config=/root/kafka/kafka_2.12-2.2.0/config/kafka_client_jaas.conf"



bin/zookeeper-server-start.sh config/zookeeper.properties & (&代表后台运行)

bin/kafka-server-start.sh config/server.properties &


bin/kafka-console-producer.sh --broker-list 192.168.31.109:9092 --topic test --producer.config config/producer.properties
bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.109:9092 --topic test --from-beginning --consumer.config config/consumer.properties

上一篇:【Android】设置EditText为仅输入数字且最多只能有两位数字


下一篇:java虚拟机