Kubernetes 搭建kafka_2.12-2.8.0集群

一、zookeeper集群部署

kafka依赖于zk集群,所以在部署kafka之前需要先搭建好zk集群,搭建访问参见:https://www.cnblogs.com/cyleon/p/14675875.html

二、构建kafka镜像

默认情况下搭建的kafka集群只能在k8s内部进行访问,advertised.listeners默认的返回给zk的地址是:kafka-manager-test-0.kafka-manager-test-svc.default.svc.cluster.local,此地址只能在k8s内部进行访问,如果在集群外访问需要修改到容器的IP地址,容器IP地址在集群外可以直接访问。如果是LoadBalancer的IP则需要创建3个,否则IP相同服务只能启动一个。

1. 在官方下载最新的kafka二进制包:http://kafka.apache.org/downloads

2. 先将软件包打入一个busybox镜像中

# cat Dockerfile 
FROM busybox:latest
MAINTAINER 532141928@qq.com
COPY kafka_2.12-2.8.0.tgz /
CMD ["/bin/sh"]

# docker build -t kafka-manager:2.12-2.8.0 .
# docker push .... # 将镜像传入镜像仓库

3. 构建基础运行环境镜像

可在此下载jdk1.8.0的包 链接:https://pan.baidu.com/s/1u2NI6qca227WWwIu3D3zNg
提取码:59t7

# cat Dockerfile
FROM centos:7
MAINTAINER 532141928@qq.com
ENV JAVA_HOME=/usr/local/jdk1.8.0_144
ENV JAVA_BIN=/usr/local/jdk1.8.0_144/bin
ENV PATH=$JAVA_HOME/bin:$PATH
ADD jdk1.8.0_144.tar.gz /usr/local/
COPY kafka_ip.sh /root/
RUN cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \
&& echo 'Asia/Shanghai' >/etc/timezone \
&& yum install -y wget telnet vim net-tools \
&& yum clean all \
&& echo "set fileencodings=utf-8,ucs-bom,gb18030,gbk,gb2312,cp936" >> /etc/vimrc \
&& echo "set termencoding=utf-8" >> /etc/vimrc \
&& echo "set encoding=utf-8" >> /etc/vimrc

# cat kafka_ip.sh 
#!/bin/bash
/usr/bin/sed -i "s#1.1.1.1#$(ifconfig |awk 'NR==2 {print $2}')#g" /data/app/kafka_2.12-2.8.0/config/server.properties

# docker build -t java1.8.0:144_cl .
# docker push .....  # 将镜像push到dockerhub仓库

 三、创建Service服务

# cat kafka-svc.yaml 
apiVersion: v1
kind: Service
metadata:
  name: kafka-test-svc
  namespace: test
  labels:
    app: kafka-test
  annotations:
    service.kubernetes.io/qcloud-loadbalancer-internal-subnetid: subnet-pqglingd
spec:
  externalTrafficPolicy: Cluster
  ports:
  - name: kafka
    port: 9092
    targetPort: 9092
    protocol: TCP
  selector:
    app: kafka-test
  type: LoadBalancer

四、创建configmap配置文件

# cat kafka-test-configmap.yaml 
kind: ConfigMap
apiVersion: v1
metadata:
  name: kafka-test-configmap
  namespace: test
  labels:
    app: kafka-test-configmap
data:
  server.properties: |
    broker.id=0          # broker.id 每个pod都不相同,在部署过程中进行替换
    delete.topic.enable=true
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/data/kafka/data            # 挂载pvc的位置
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=localhost:2181
    zookeeper.connection.timeout.ms=18000
    group.initial.rebalance.delay.ms=0
    advertised.listeners=PLAINTEXT://1.1.1.1:9092      # ip地址将在部署过程中替换为pod ip
    zookeeper.connect=zk-test-statefulset-0.zk-test-svc:2181,zk-test-statefulset-1.zk-test-svc:2181,zk-test-statefulset-2.zk-test-svc:2181      # zk集群内的地址

四、部署kafka和StateFulSet的服务

# cat kafka-test-statefulset.yaml 
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka-test-statefulset
  namespace: test
  labels:
    app: kafka-test
spec:
  serviceName: kafka-test-svc
  replicas: 3
  selector:
    matchLabels:
      app: kafka-test
  template:
    metadata:
      labels:
        app: kafka-test
    spec:
      nodeSelector:
        message: enable
      imagePullSecrets:
      - name: dockerhub-registry
      volumes:
      - name: server-properties-volume
        configMap:
          name: kafka-test-configmap
      
      initContainers:
      - name: init
        imagePullPolicy: IfNotPresent
        image: kafka-manager:2.12-2.8.0
        securityContext:
          privileged: true
        command: ["sh", "-c", "mkdir -p /data/app; mkdir -p /data/kafka/data; tar xf kafka_2.12-2.8.0.tgz -C /data/app/; ln -sf /data/app/kafka_2.12-2.8.0 /data/app/kafka && cat /data/config/server.properties >/data/app/kafka_2.12-2.8.0/config/server.properties"]
        volumeMounts:
        - name: kafka-test-data
          mountPath: "/data/"
        - name: server-properties-volume
          mountPath: /data/config/server.properties
          subPath: "server.properties"

      containers:
      - name: kafka
        imagePullPolicy: Always
        image: java1.8.0:144_cl
        command: ["sh", "-c", 'sed -i "s#broker.id=0#broker.id=$(($(hostname | sed s/.*-//) + 1))#g" /data/app/kafka_2.12-2.8.0/config/server.properties;chmod +x /root/kafka_ip.sh; /root/kafka_ip.sh; /data/app/kafka_2.12-2.8.0/bin/kafka-server-start.sh /data/app/kafka_2.12-2.8.0/config/server.properties']
        workingDir: "/data/app/kafka_2.12-2.8.0"
        resources:
          requests:
            cpu: 200m
            memory: 1024Mi
          limits:
            cpu: 500m
            memory: 2043Mi 
        ports:
        - containerPort: 9092
          name: client
          protocol: TCP     
        env: 
        - name: KAFKA_HEAP_OPTS
          value: "-Xms2G -Xms1G"
         # value: "-server -Xms2G -Xms1G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"   # 可优化启动参数
        - name: JMX_PORT
          value: "9999"
        volumeMounts:
        - name: kafka-test-data
          mountPath: "/data/"
        readinessProbe:
          tcpSocket:
            port: 9092
          initialDelaySeconds: 30
          periodSeconds: 5
        livenessProbe:
          tcpSocket:
            port: 9092
          initialDelaySeconds: 30
          periodSeconds: 10
  volumeClaimTemplates:
  - metadata:
      name: kafka-test-data
    spec:
      storageClassName: cbs
      accessModes:
       - ReadWriteOnce
      resources:
        requests:
          storage: 50Gi

五、部署kafka-eagle监控服务

 1. 构建镜像,先下载kafka-eagle-bin-1.4.7.tar.gz软件包,并解压出kafka-eagle-web-1.4.7-bin.tar.gz

下载地址:http://www.kafka-eagle.org/articles/docs/changelog/changelog.html

# cat Dockerfile
FROM centos:7
MAINTAINER 532141928@qq.com
ENV JAVA_HOME=/usr/local/jdk1.8.0_144
ENV JAVA_BIN=/usr/local/jdk1.8.0_144/bin
ENV KE_HOME=/usr/local/kafka-eagle-web-1.4.7-bin
ENV PATH=$JAVA_HOME/bin:$KE_HOME/bin:$PATH
ADD jdk1.8.0_144.tar.gz kafka-eagle-web-1.4.7-bin.tar.gz /usr/local/
RUN cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \
&& echo 'Asia/Shanghai' >/etc/timezone \
&& yum install -y wget telnet vim \
&& yum clean all \
&& echo "set fileencodings=utf-8,ucs-bom,gb18030,gbk,gb2312,cp936" >> /etc/vimrc \
&& echo "set termencoding=utf-8" >> /etc/vimrc \
&& echo "set encoding=utf-8" >> /etc/vimrc

CMD ["/bin/bash"]
# docker build -t kafka-eagle-web:1.4.7 .

2. 创建service服务

# cat kafka-eagle-svc.yaml 
apiVersion: v1
kind: Service
metadata:
  name: kafka-eagle-147-svc
  namespace: test
  labels:
    app: kafka-eagle-1.4.7
  annotations:
    service.kubernetes.io/qcloud-loadbalancer-internal-subnetid: subnet-pqglingv
spec:
  externalTrafficPolicy: Cluster
  ports:
  - name: kafka-eagle
    port: 8048
    targetPort: 8048
    protocol: TCP
  selector:
    app: kafka-eagle-1.4.7
  type: LoadBalancer

3. 创建configmap配置文件

# cat kafka-eagle-configmap.yaml 
kind: ConfigMap
apiVersion: v1
metadata:
  name: kafka-eagle-1.4.7-configmap
  namespace: test
  labels:
    app: kafka-eagle-1.4.7-configmap
data:
  system-config.properties: |-
    kafka.eagle.zk.cluster.alias=cluster1
    cluster1.zk.list=10.3.2.56:2181          # zookeeper集群的service IP地址
    cluster1.kafka.eagle.broker.size=20
    kafka.zk.limit.size=25
    kafka.eagle.webui.port=8048
    cluster1.kafka.eagle.offset.storage=kafka
    cluster2.kafka.eagle.offset.storage=zk
    kafka.eagle.metrics.charts=false
    kafka.eagle.metrics.retain=15
    kafka.eagle.sql.topic.records.max=5000
    kafka.eagle.sql.fix.error=false
    kafka.eagle.topic.token=keadmin
    cluster1.kafka.eagle.sasl.enable=false
    cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
    cluster1.kafka.eagle.sasl.mechanism=SCRAM-SHA-256
    cluster1.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
    cluster1.kafka.eagle.sasl.client.id=
    cluster1.kafka.eagle.sasl.cgroup.enable=false
    cluster1.kafka.eagle.sasl.cgroup.topics=
    cluster2.kafka.eagle.sasl.enable=false
    cluster2.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
    cluster2.kafka.eagle.sasl.mechanism=PLAIN
    cluster2.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
    cluster2.kafka.eagle.sasl.client.id=
    cluster2.kafka.eagle.sasl.cgroup.enable=false
    cluster2.kafka.eagle.sasl.cgroup.topics=
    cluster3.kafka.eagle.ssl.enable=false
    cluster3.kafka.eagle.ssl.protocol=SSL
    cluster3.kafka.eagle.ssl.truststore.location=
    cluster3.kafka.eagle.ssl.truststore.password=
    cluster3.kafka.eagle.ssl.keystore.location=
    cluster3.kafka.eagle.ssl.keystore.password=
    cluster3.kafka.eagle.ssl.key.password=
    cluster3.kafka.eagle.ssl.cgroup.enable=false
    cluster3.kafka.eagle.ssl.cgroup.topics=
    kafka.eagle.driver=com.mysql.jdbc.Driver
    kafka.eagle.url=jdbc:mysql://192.168.0.18:3306/ka?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
    kafka.eagle.username=root
    kafka.eagle.password=hddfdfvxvi

4. 创建deployment服务

# cat kafka-eagle-deployment.yaml 
apiVersion: apps/v1
kind: Deployment
metadata:
  namespace: test
  name: kafka-eagle-1.4.7-dp
  labels:
    app: kafka-eagle-1.4.7
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-eagle-1.4.7
  template:
    metadata:
      labels:
        app: kafka-eagle-1.4.7
    spec:
      nodeSelector:
        zt: enable
      volumes:
      - name: data-vol
        emptyDir: {}
      - name: system-config-properties
        configMap:
          name: kafka-eagle-1.4.7-configmap

      imagePullSecrets:
      - name: dockerhub-registry

      containers:
      - name: kafka-eagle
        imagePullPolicy: IfNotPresent
        image: kafka-eagle-web:1.4.7
        workingDir: "/usr/local/kafka-eagle-web-1.4.7"
        command: ["/bin/sh", "-c", "chmod +x /usr/local/kafka-eagle-web-1.4.7/bin/ke.sh; /usr/local/kafka-eagle-web-1.4.7/bin/ke.sh start && tail -f /usr/local/kafka-eagle-web-1.4.7/logs/ke.out"]
        ports:
        - containerPort: 8048
        volumeMounts:
        - name: data-vol
          mountPath: "/data"
        - name: system-config-properties
          mountPath: "/usr/local/kafka-eagle-web-1.4.7/conf/system-config.properties"
          subPath: "system-config.properties"
        resources:
          requests:
            cpu: 100m
            memory: 1024Mi
          limits:
            cpu: 1000m
            memory: 2048Mi 

5. 使用kafka-eagle查看kafka服务

访问地址:LoadBalancerIP:8848/ke/

Kubernetes 搭建kafka_2.12-2.8.0集群

 6. 通过console端进行操作

查看topic
./kafka-topics.sh --zookeeper 10.3.2.56:2181 --list

创建topic
./kafka-topics.sh --zookeeper 10.3.2.56:2181 --create --replication-factor 3 --partitions 3 --topic childe

生产消息
./kafka-console-producer.sh --broker-list 10.3.2.144:9092 --topic childe
>hello

消费消息
./kafka-console-consumer.sh --bootstrap-server 10.3.2.144:9092 --from-beginning --topic childe
hello

Kubernetes 搭建kafka_2.12-2.8.0集群

 

上一篇:吊炸天的 Kafka 图形化工具 Eagle,必须推荐给你!


下一篇:物联网Kafka理论+实战+Zookeeper+深入+错误集合