一、zookeeper集群部署
kafka依赖于zk集群,所以在部署kafka之前需要先搭建好zk集群,搭建访问参见:https://www.cnblogs.com/cyleon/p/14675875.html
二、构建kafka镜像
默认情况下搭建的kafka集群只能在k8s内部进行访问,advertised.listeners默认的返回给zk的地址是:kafka-manager-test-0.kafka-manager-test-svc.default.svc.cluster.local,此地址只能在k8s内部进行访问,如果在集群外访问需要修改到容器的IP地址,容器IP地址在集群外可以直接访问。如果是LoadBalancer的IP则需要创建3个,否则IP相同服务只能启动一个。
1. 在官方下载最新的kafka二进制包:http://kafka.apache.org/downloads
2. 先将软件包打入一个busybox镜像中
# cat Dockerfile FROM busybox:latest MAINTAINER 532141928@qq.com COPY kafka_2.12-2.8.0.tgz / CMD ["/bin/sh"] # docker build -t kafka-manager:2.12-2.8.0 .
# docker push .... # 将镜像传入镜像仓库
3. 构建基础运行环境镜像
可在此下载jdk1.8.0的包 链接:https://pan.baidu.com/s/1u2NI6qca227WWwIu3D3zNg
提取码:59t7
# cat Dockerfile FROM centos:7 MAINTAINER 532141928@qq.com ENV JAVA_HOME=/usr/local/jdk1.8.0_144 ENV JAVA_BIN=/usr/local/jdk1.8.0_144/bin ENV PATH=$JAVA_HOME/bin:$PATH ADD jdk1.8.0_144.tar.gz /usr/local/ COPY kafka_ip.sh /root/ RUN cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \ && echo 'Asia/Shanghai' >/etc/timezone \ && yum install -y wget telnet vim net-tools \ && yum clean all \ && echo "set fileencodings=utf-8,ucs-bom,gb18030,gbk,gb2312,cp936" >> /etc/vimrc \ && echo "set termencoding=utf-8" >> /etc/vimrc \ && echo "set encoding=utf-8" >> /etc/vimrc # cat kafka_ip.sh #!/bin/bash /usr/bin/sed -i "s#1.1.1.1#$(ifconfig |awk 'NR==2 {print $2}')#g" /data/app/kafka_2.12-2.8.0/config/server.properties # docker build -t java1.8.0:144_cl . # docker push ..... # 将镜像push到dockerhub仓库
三、创建Service服务
# cat kafka-svc.yaml apiVersion: v1 kind: Service metadata: name: kafka-test-svc namespace: test labels: app: kafka-test annotations: service.kubernetes.io/qcloud-loadbalancer-internal-subnetid: subnet-pqglingd spec: externalTrafficPolicy: Cluster ports: - name: kafka port: 9092 targetPort: 9092 protocol: TCP selector: app: kafka-test type: LoadBalancer
四、创建configmap配置文件
# cat kafka-test-configmap.yaml kind: ConfigMap apiVersion: v1 metadata: name: kafka-test-configmap namespace: test labels: app: kafka-test-configmap data: server.properties: | broker.id=0 # broker.id 每个pod都不相同,在部署过程中进行替换 delete.topic.enable=true num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/data/kafka/data # 挂载pvc的位置 num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 advertised.listeners=PLAINTEXT://1.1.1.1:9092 # ip地址将在部署过程中替换为pod ip zookeeper.connect=zk-test-statefulset-0.zk-test-svc:2181,zk-test-statefulset-1.zk-test-svc:2181,zk-test-statefulset-2.zk-test-svc:2181 # zk集群内的地址
四、部署kafka和StateFulSet的服务
# cat kafka-test-statefulset.yaml apiVersion: apps/v1 kind: StatefulSet metadata: name: kafka-test-statefulset namespace: test labels: app: kafka-test spec: serviceName: kafka-test-svc replicas: 3 selector: matchLabels: app: kafka-test template: metadata: labels: app: kafka-test spec: nodeSelector: message: enable imagePullSecrets: - name: dockerhub-registry volumes: - name: server-properties-volume configMap: name: kafka-test-configmap initContainers: - name: init imagePullPolicy: IfNotPresent image: kafka-manager:2.12-2.8.0 securityContext: privileged: true command: ["sh", "-c", "mkdir -p /data/app; mkdir -p /data/kafka/data; tar xf kafka_2.12-2.8.0.tgz -C /data/app/; ln -sf /data/app/kafka_2.12-2.8.0 /data/app/kafka && cat /data/config/server.properties >/data/app/kafka_2.12-2.8.0/config/server.properties"] volumeMounts: - name: kafka-test-data mountPath: "/data/" - name: server-properties-volume mountPath: /data/config/server.properties subPath: "server.properties" containers: - name: kafka imagePullPolicy: Always image: java1.8.0:144_cl command: ["sh", "-c", 'sed -i "s#broker.id=0#broker.id=$(($(hostname | sed s/.*-//) + 1))#g" /data/app/kafka_2.12-2.8.0/config/server.properties;chmod +x /root/kafka_ip.sh; /root/kafka_ip.sh; /data/app/kafka_2.12-2.8.0/bin/kafka-server-start.sh /data/app/kafka_2.12-2.8.0/config/server.properties'] workingDir: "/data/app/kafka_2.12-2.8.0" resources: requests: cpu: 200m memory: 1024Mi limits: cpu: 500m memory: 2043Mi ports: - containerPort: 9092 name: client protocol: TCP env: - name: KAFKA_HEAP_OPTS value: "-Xms2G -Xms1G" # value: "-server -Xms2G -Xms1G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70" # 可优化启动参数 - name: JMX_PORT value: "9999" volumeMounts: - name: kafka-test-data mountPath: "/data/" readinessProbe: tcpSocket: port: 9092 initialDelaySeconds: 30 periodSeconds: 5 livenessProbe: tcpSocket: port: 9092 initialDelaySeconds: 30 periodSeconds: 10 volumeClaimTemplates: - metadata: name: kafka-test-data spec: storageClassName: cbs accessModes: - ReadWriteOnce resources: requests: storage: 50Gi
五、部署kafka-eagle监控服务
1. 构建镜像,先下载kafka-eagle-bin-1.4.7.tar.gz软件包,并解压出kafka-eagle-web-1.4.7-bin.tar.gz
下载地址:http://www.kafka-eagle.org/articles/docs/changelog/changelog.html
# cat Dockerfile FROM centos:7 MAINTAINER 532141928@qq.com ENV JAVA_HOME=/usr/local/jdk1.8.0_144 ENV JAVA_BIN=/usr/local/jdk1.8.0_144/bin ENV KE_HOME=/usr/local/kafka-eagle-web-1.4.7-bin ENV PATH=$JAVA_HOME/bin:$KE_HOME/bin:$PATH ADD jdk1.8.0_144.tar.gz kafka-eagle-web-1.4.7-bin.tar.gz /usr/local/ RUN cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \ && echo 'Asia/Shanghai' >/etc/timezone \ && yum install -y wget telnet vim \ && yum clean all \ && echo "set fileencodings=utf-8,ucs-bom,gb18030,gbk,gb2312,cp936" >> /etc/vimrc \ && echo "set termencoding=utf-8" >> /etc/vimrc \ && echo "set encoding=utf-8" >> /etc/vimrc CMD ["/bin/bash"]
# docker build -t kafka-eagle-web:1.4.7 .
2. 创建service服务
# cat kafka-eagle-svc.yaml apiVersion: v1 kind: Service metadata: name: kafka-eagle-147-svc namespace: test labels: app: kafka-eagle-1.4.7 annotations: service.kubernetes.io/qcloud-loadbalancer-internal-subnetid: subnet-pqglingv spec: externalTrafficPolicy: Cluster ports: - name: kafka-eagle port: 8048 targetPort: 8048 protocol: TCP selector: app: kafka-eagle-1.4.7 type: LoadBalancer
3. 创建configmap配置文件
# cat kafka-eagle-configmap.yaml kind: ConfigMap apiVersion: v1 metadata: name: kafka-eagle-1.4.7-configmap namespace: test labels: app: kafka-eagle-1.4.7-configmap data: system-config.properties: |- kafka.eagle.zk.cluster.alias=cluster1 cluster1.zk.list=10.3.2.56:2181 # zookeeper集群的service IP地址 cluster1.kafka.eagle.broker.size=20 kafka.zk.limit.size=25 kafka.eagle.webui.port=8048 cluster1.kafka.eagle.offset.storage=kafka cluster2.kafka.eagle.offset.storage=zk kafka.eagle.metrics.charts=false kafka.eagle.metrics.retain=15 kafka.eagle.sql.topic.records.max=5000 kafka.eagle.sql.fix.error=false kafka.eagle.topic.token=keadmin cluster1.kafka.eagle.sasl.enable=false cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT cluster1.kafka.eagle.sasl.mechanism=SCRAM-SHA-256 cluster1.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle"; cluster1.kafka.eagle.sasl.client.id= cluster1.kafka.eagle.sasl.cgroup.enable=false cluster1.kafka.eagle.sasl.cgroup.topics= cluster2.kafka.eagle.sasl.enable=false cluster2.kafka.eagle.sasl.protocol=SASL_PLAINTEXT cluster2.kafka.eagle.sasl.mechanism=PLAIN cluster2.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle"; cluster2.kafka.eagle.sasl.client.id= cluster2.kafka.eagle.sasl.cgroup.enable=false cluster2.kafka.eagle.sasl.cgroup.topics= cluster3.kafka.eagle.ssl.enable=false cluster3.kafka.eagle.ssl.protocol=SSL cluster3.kafka.eagle.ssl.truststore.location= cluster3.kafka.eagle.ssl.truststore.password= cluster3.kafka.eagle.ssl.keystore.location= cluster3.kafka.eagle.ssl.keystore.password= cluster3.kafka.eagle.ssl.key.password= cluster3.kafka.eagle.ssl.cgroup.enable=false cluster3.kafka.eagle.ssl.cgroup.topics= kafka.eagle.driver=com.mysql.jdbc.Driver kafka.eagle.url=jdbc:mysql://192.168.0.18:3306/ka?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull kafka.eagle.username=root kafka.eagle.password=hddfdfvxvi
4. 创建deployment服务
# cat kafka-eagle-deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: namespace: test name: kafka-eagle-1.4.7-dp labels: app: kafka-eagle-1.4.7 spec: replicas: 1 selector: matchLabels: app: kafka-eagle-1.4.7 template: metadata: labels: app: kafka-eagle-1.4.7 spec: nodeSelector: zt: enable volumes: - name: data-vol emptyDir: {} - name: system-config-properties configMap: name: kafka-eagle-1.4.7-configmap imagePullSecrets: - name: dockerhub-registry containers: - name: kafka-eagle imagePullPolicy: IfNotPresent image: kafka-eagle-web:1.4.7 workingDir: "/usr/local/kafka-eagle-web-1.4.7" command: ["/bin/sh", "-c", "chmod +x /usr/local/kafka-eagle-web-1.4.7/bin/ke.sh; /usr/local/kafka-eagle-web-1.4.7/bin/ke.sh start && tail -f /usr/local/kafka-eagle-web-1.4.7/logs/ke.out"] ports: - containerPort: 8048 volumeMounts: - name: data-vol mountPath: "/data" - name: system-config-properties mountPath: "/usr/local/kafka-eagle-web-1.4.7/conf/system-config.properties" subPath: "system-config.properties" resources: requests: cpu: 100m memory: 1024Mi limits: cpu: 1000m memory: 2048Mi
5. 使用kafka-eagle查看kafka服务
访问地址:LoadBalancerIP:8848/ke/
6. 通过console端进行操作
查看topic ./kafka-topics.sh --zookeeper 10.3.2.56:2181 --list 创建topic ./kafka-topics.sh --zookeeper 10.3.2.56:2181 --create --replication-factor 3 --partitions 3 --topic childe 生产消息 ./kafka-console-producer.sh --broker-list 10.3.2.144:9092 --topic childe >hello 消费消息 ./kafka-console-consumer.sh --bootstrap-server 10.3.2.144:9092 --from-beginning --topic childe hello