centos7下Kerberos认证并集成Kafka
-
版本环境:
- (1)centos7.6
- (2)kafka_2.12-0.10.2.2
- (3)kerberos
- (4)flink-1.11.3-bin-scala_2.11
-
(5)jdk1.8
注意:其中“b.kuxiao”是我的hostname
一、kafka安装
1.1.去官网下载kafka
kafka官网:http://kafka.apache.org/downloads.
选择自己需要的版本:(本文我使用的是0.10.2.2版本)
1.2.解压.tgz
tar -zxvf kafka_2.12-0.10.2.2.tgz
1.3.修改配置文件
进入config/server.properties文件修改
broker.id:因为kafka一般是集群部署,所以每个单机有个broker.id,因为这里我只部一个单机测试,所以指定为0就可以了
log.dirs:自行指定kafka的日志目录
listeners:取消注释并修改为“PLAINTEXT://b.kuxiao:9092”,其中“b.kuxiao”是我的hostname(可使用hostnamectl命令查看自己的hostname)
zookeeper.connect:修改为自己的hostname
进入config/producer.properties文件修改
bootstrap.servers:修改为自己的hostname
进入config/consumer.properties文件修改
zookeeper.connect:修改为自己的hostname
命令进入vim /etc/hosts,添加自己的hostname,“192.168.72.130”是我得内网ip
1.4.启动zookeeper(kafka自带的zk)
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
或
nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
1.5.启动kafka
./bin/kafka-server-start.sh ./config/server.properties
或
nohup ./bin/kafka-server-start.sh ./config/server.properties &
1.6.使用 ps -ef|grep kafka查看,执行成功
1.7.创建一个测试topic(test)
./bin/kafka-topics.sh --create --zookeeper b.kuxiao:2181 --replication-factor 1 --partitions 2 --topic test
查看已存在topic
./bin/kafka-topics.sh --list --zookeeper b.kuxiao:2181
1.8.启动消费者
./bin/kafka-console-consumer.sh --bootstrap-server b.kuxiao:9092 --topic test --from-beginning
1.9.启动生产者
./bin/kafka-console-producer.sh --broker-list b.kuxiao:9092 --topic test
1.10.测试
二、Kerberos安装
2.1.Kerberos服务端安装
2.1.1.安装
yum install krb5-server
2.1.2.修改配置文件
修改/var/kerberos/krb5kdc/kdc.conf
vim /var/kerberos/krb5kdc/kdc.conf
[kdcdefaults]
kdc_ports = 88
kdc_tcp_ports = 88
[realms]
KUXIAO.COM = {
#master_key_type = aes256-cts
acl_file = /var/kerberos/krb5kdc/kadm5.acl
dict_file = /usr/share/dict/words
admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab
supported_enctypes = aes256-cts:normal aes128-cts:normal des3-hmac-sha1:normal arcfour-hmac:normal camellia256-cts:normal camellia128-cts:normal des-hmac-sha1:normal des-cbc-md5:normal des-cbc-crc:normal
}
修改/var/kerberos/krb5kdc/kadm5.acl
vim /var/kerberos/krb5kdc/kadm5.acl
*/admin@KUXIAO.COM *
修改/etc/krb5.conf
vim /etc/krb5.conf
# Configuration snippets may be placed in this directory as well
includedir /etc/krb5.conf.d/
[logging]
default = FILE:/var/log/krb5libs.log
kdc = FILE:/var/log/krb5kdc.log
admin_server = FILE:/var/log/kadmind.log
[libdefaults]
default_realm = KUXIAO.COM
dns_lookup_kdc = false
dns_lookup_realm = false
ticket_lifetime = 86400
# renew_lifetime = 604800
forwardable = true
default_tgs_enctypes = aes128-cts aes256-cts-hmac-sha1-96 des3-hmac-sha1 arcfour-hmac
default_tkt_enctypes = aes128-cts aes256-cts-hmac-sha1-96 des3-hmac-sha1 arcfour-hmac
permitted_enctypes = aes128-cts aes256-cts-hmac-sha1-96 des3-hmac-sha1 arcfour-hmac
udp_preference_limit = 1
kdc_timeout = 60000
[realms]
KUXIAO.COM = {
kdc = b.kuxiao #hostname
admin_server = b.kuxiao
}
# EXAMPLE.COM = {
# kdc = kerberos.example.com
# admin_server = kerberos.example.com
# }
[domain_realm]
# .example.com = EXAMPLE.COM
# example.com = EXAMPLE.COM
2.1.3.初始化KDC数据库
需要输入一个管理KDC服务器的密码,千万别忘记
kdb5_util create -r KUXIAO.COM -s
Loading random data
Initializing database '/var/kerberos/krb5kdc/principal' for realm 'KUXIAO.COM',
master key name 'K/M@KUXIAO.COM'
You will be prompted for the database Master Password.
It is important that you NOT FORGET this password.
Enter KDC database master key:
Re-enter KDC database master key to verify:
2.1.4.启动KDC服务
systemctl start krb5kdc#启动服务
systemctl status krb5kdc.service#查看运行状态
systemctl enable krb5kdc#设置开机自启
● krb5kdc.service - Kerberos 5 KDC
Loaded: loaded (/usr/lib/systemd/system/krb5kdc.service; disabled; vendor preset: disabled)
Active: active (running) since 五 2021-04-16 14:41:38 CST; 7s ago
Process: 77245 ExecStart=/usr/sbin/krb5kdc -P /var/run/krb5kdc.pid $KRB5KDC_ARGS (code=exited, status=0/SUCCESS)
Main PID: 77249 (krb5kdc)
Tasks: 1
CGroup: /system.slice/krb5kdc.service
└─77249 /usr/sbin/krb5kdc -P /var/run/krb5kdc.pid
4月 16 14:41:38 b.kuxiao systemd[1]: Starting Kerberos 5 KDC...
4月 16 14:41:38 b.kuxiao systemd[1]: Started Kerberos 5 KDC.
2.1.5.启动kerberos服务
systemctl start kadmin#启动服务
systemctl status kadmin#查看运行状态
systemctl enable kadmin#设置开机自启
● kadmin.service - Kerberos 5 Password-changing and Administration
Loaded: loaded (/usr/lib/systemd/system/kadmin.service; disabled; vendor preset: disabled)
Active: active (running) since 五 2021-04-16 14:44:12 CST; 22s ago
Process: 77433 ExecStart=/usr/sbin/_kadmind -P /var/run/kadmind.pid $KADMIND_ARGS (code=exited, status=0/SUCCESS)
Main PID: 77438 (kadmind)
Tasks: 1
CGroup: /system.slice/kadmin.service
└─77438 /usr/sbin/kadmind -P /var/run/kadmind.pid
4月 16 14:44:12 b.kuxiao systemd[1]: Starting Kerberos 5 Password-changing and Administration...
4月 16 14:44:12 b.kuxiao systemd[1]: Started Kerberos 5 Password-changing and Administration.
2.1.6.设置管理员
/usr/sbin/kadmin.local -q "addprinc admin/admin"
Authenticating as principal root/admin@KUXIAO.COM with password.
WARNING: no policy specified for admin/admin@KUXIAO.COM; defaulting to no policy
Enter password for principal "admin/admin@KUXIAO.COM":
Re-enter password for principal "admin/admin@KUXIAO.COM":
Principal "admin/admin@KUXIAO.COM" created.
2.1.7.kerberos日常操作
登录到管理员账户: 如果在本机上,可以通过kadmin.local直接登录。其它机器的,先使用kinit进行验证
kadmin.local
或
[root@localhost app]# kinit admin/admin
Password for admin/admin@KUXIAO.COM:
[root@localhost app]# kadmin
Authenticating as principal admin/admin@KUXIAO.COM with password.
Password for admin/admin@KUXIAO.COM:
kadmin:
增删查账户
kadmin.local: addprinc -pw 123456 kafka/b.kuxiao@KUXIAO.COM#创建新账户
WARNING: no policy specified for kafka/b.kuxiao@KUXIAO.COM; defaulting to no policy
Principal "kafka/b.kuxiao@KUXIAO.COM" created.
kadmin.local: listprincs #查看所有账户
K/M@KUXIAO.COM
admin/admin@KUXIAO.COM
kadmin/admin@KUXIAO.COM
kadmin/b.kuxiao@KUXIAO.COM
kadmin/changepw@KUXIAO.COM
kafka/b.kuxiao@KUXIAO.COM
kiprop/b.kuxiao@KUXIAO.COM
krbtgt/KUXIAO.COM@KUXIAO.COM
test@KUXIAO.COM
kadmin.local: delprinc test#删除test账户
Are you sure you want to delete the principal "test@KUXIAO.COM"? (yes/no): yes
Principal "test@KUXIAO.COM" deleted.
Make sure that you have removed this principal from all ACLs before reusing.
生成keytab:使用xst命令或者ktadd命令
kadmin.local: ktadd -k /app/kafka.keytab -norandkey kafka/b.kuxiao
Entry for principal kafka/b.kuxiao with kvno 1, encryption type aes256-cts-hmac-sha1-96 added to keytab WRFILE:/app/kafka.keytab.
Entry for principal kafka/b.kuxiao with kvno 1, encryption type aes128-cts-hmac-sha1-96 added to keytab WRFILE:/app/kafka.keytab.
Entry for principal kafka/b.kuxiao with kvno 1, encryption type des3-cbc-sha1 added to keytab WRFILE:/app/kafka.keytab.
Entry for principal kafka/b.kuxiao with kvno 1, encryption type arcfour-hmac added to keytab WRFILE:/app/kafka.keytab.
Entry for principal kafka/b.kuxiao with kvno 1, encryption type camellia256-cts-cmac added to keytab WRFILE:/app/kafka.keytab.
Entry for principal kafka/b.kuxiao with kvno 1, encryption type camellia128-cts-cmac added to keytab WRFILE:/app/kafka.keytab.
Entry for principal kafka/b.kuxiao with kvno 1, encryption type des-hmac-sha1 added to keytab WRFILE:/app/kafka.keytab.
Entry for principal kafka/b.kuxiao with kvno 1, encryption type des-cbc-md5 added to keytab WRFILE:/app/kafka.keytab.
2.2.Kerberos客户端安装
2.2.1.安装
yum install krb5-workstation krb5-libs krb5-auth-dialog
2.2.2.将之前服务端修改的/etc/krb5.conf 拷贝到客户端(我只有一台服务器,这里就不操作了)
2.2.3.用户操作
认证用户
kinit -kt /app/kafka.keytab kafka/b.kuxiao
[root@localhost app]# kinit -kt /app/kafka.keytab kafka/b.kuxiao
[root@localhost app]# klist
Ticket cache: FILE:/tmp/krb5cc_0
Default principal: kafka/b.kuxiao@KUXIAO.COM
Valid starting Expires Service principal
2021-04-16T15:25:28 2021-04-17T15:25:28 krbtgt/KUXIAO.COM@KUXIAO.COM
查看当前的认证用户
klist
[root@localhost app]# klist
Ticket cache: FILE:/tmp/krb5cc_0
Default principal: kafka/b.kuxiao@KUXIAO.COM
Valid starting Expires Service principal
2021-04-16T15:25:28 2021-04-17T15:25:28 krbtgt/KUXIAO.COM@KUXIAO.COM
删除当前的认证的缓存
kdestroy
[root@localhost app]# kdestroy
[root@localhost app]# klist
klist: No credentials cache found (filename: /tmp/krb5cc_0)
三、kafka配置kerberos安全认证
3.1.生成用户keytab
我直接使用2.1.7中生成的
kadmin.local: ktadd -k /app/kafka.keytab -norandkey kafka/b.kuxiao
3.2.创建配置文件
在kafka文件夹下创建jaas/client.properties文件,内容如下:
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
sasl.mechanism=GSSAPI
在kafka文件夹下创建jaas/kafka_server_jaas.conf文件,内容如下:
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/app/kafka.keytab"
principal="kafka/b.kuxiao@KUXIAO.COM";
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/app/kafka.keytab"
principal="kafka/b.kuxiao@KUXIAO.COM";
};
3.3.修改配置文件
在config/server.properties修改添加如下配置
listeners=SASL_PLAINTEXT://b.kuxiao:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI
sasl.kerberos.service.name=kafka
super.users=User:kafka
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
在bin/kafka-run-class.sh脚本添加kafka jvm参数
export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf"
export KAFKA_OPTS="-Djava.security.auth.login.config=/app/kafka_2.12-0.10.2.2/jaas/kafka_server_jaas.conf"
3.4.重启kafka服务测试
zookeeper启动
nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
kafka启动
nohup ./bin/kafka-server-start.sh ./config/server.properties &
Kerberos认证后消费者
./bin/kafka-console-consumer.sh --bootstrap-server b.kuxiao:9092 --topic test --from-beginning --consumer.config jaas/client.properties
消费者启动成功如图
Kerberos认证后生产者
./bin/kafka-console-producer.sh --broker-list b.kuxiao:9092 --topic test --producer.config jaas/client.properties
生产者输入数据
消费者拿到数据
至此kafka添加Kerberos认证完成
四、java代码实现flink订阅Kerberos认证的Kafka消息
4.1.flink安装
4.1.1.下载
官网下载:https://flink.apache.org/downloads.html
4.1.2.解压
tar -zxvf flink-1.11.3-bin-scala_2.11.tgz
4.1.3.修改配置文件
修改conf/flink-conf.yaml文件,添加内容:(我这里只有一台服务器,所以使用一个keytab啦)
security.kerberos.login.use-ticket-cache: false
security.kerberos.login.keytab: /app/kafka.keytab
security.kerberos.login.principal: kafka/b.kuxiao@KUXIAO.COM
security.kerberos.login.contexts: Client,KafkaClient
参考图:
4.2.启动flink
./bin/start-cluster.sh
打开:http://192.168.72.130:8081/
4.3.Java代码实现
依赖
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cn</groupId>
<artifactId>point</artifactId>
<version>1.0</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.4</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<properties>
<java.version>1.8</java.version>
<!--<flink.version>1.12.0</flink.version>-->
<flink.version>1.11.3</flink.version>
<!-- <flink.version>1.7.2</flink.version>-->
<hadoop.version>2.7.7</hadoop.version>
<scala.version>2.11.8</scala.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-parent -->
<!-- <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.4</version> <type>pom</type> </dependency> -->
<!-- alibaba fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.22</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<!-- 默认的版本为3.8.1,修改为4.x,因为3.x使用的为编程的方式,4.x为注解的形式。 -->
</dependency>
<!-- 需要引入与所安装的kafka对应版本的依赖 -->
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.1.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- flink核心API -->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-cep -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>commons-compress</artifactId>
<groupId>org.apache.commons</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>scala-library</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>scala-library</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
<exclusion>
<artifactId>scala-parser-combinators_2.11</artifactId>
<groupId>org.scala-lang.modules</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>scala-library</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!--flink整合kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>snappy-java</artifactId>
<groupId>org.xerial.snappy</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>jsr305</artifactId>
<groupId>com.google.code.findbugs</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
<exclusion>
<artifactId>commons-cli</artifactId>
<groupId>commons-cli</groupId>
</exclusion>
<exclusion>
<artifactId>commons-codec</artifactId>
<groupId>commons-codec</groupId>
</exclusion>
<exclusion>
<artifactId>commons-collections</artifactId>
<groupId>commons-collections</groupId>
</exclusion>
<exclusion>
<artifactId>commons-lang</artifactId>
<groupId>commons-lang</groupId>
</exclusion>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
<exclusion>
<artifactId>netty</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
<exclusion>
<artifactId>commons-math3</artifactId>
<groupId>org.apache.commons</groupId>
</exclusion>
<exclusion>
<artifactId>jackson-core-asl</artifactId>
<groupId>org.codehaus.jackson</groupId>
</exclusion>
<exclusion>
<artifactId>jackson-mapper-asl</artifactId>
<groupId>org.codehaus.jackson</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>snappy-java</artifactId>
<groupId>org.xerial.snappy</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
flink启动代码
package org.track.manager.data.verify.monitor.verify;
import org.apache.flink.api.common.functions.*;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class DataVerifyApp {
final static Logger logger = LoggerFactory.getLogger(DataVerifyApp.class);
private static final Long INITIALDELAY = 5L;
private static final Long PERIOD = 5L;
//分流
private static final OutputTag<String> countsTag = new OutputTag<String>("counts") {
};
public static void main(String[] args) throws Exception {
//获取外界参数
ParameterTool pt = ParameterTool.fromArgs(args);
String KAFKABROKER = "b.kuxiao:9092";// Kafka服务
String TRANSACTIONGROUP = "grouptest";// 消费组
String TOPICNAME = "test";// topic
String sasl_kerberos_service_name = "kafka";// 认证的账户
String security_protocol = "SASL_PLAINTEXT";
String sasl_mechanism = "GSSAPI";
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.enableCheckpointing(5000); //设置 flink 自动管理检查点
streamEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //设置消费语义为EXACTLY_ONCE(只消费一次)
streamEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); // 确认 checkpoints 之间的时间会进行 30000 ms
streamEnv.getCheckpointConfig().setCheckpointTimeout(60000); // Checkpoint 必须在60000L内完成,否则就会被抛弃
streamEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 同一时间只允许一个 checkpoint 进行
//当作业取消时,保留作业的 checkpoint。注意,这种情况下,需要手动清除该作业保留的 checkpoint 默认情况下不保留
streamEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", KAFKABROKER); //kafka的节点的IP或者hostName
properties.setProperty("group.id", TRANSACTIONGROUP); //flink consumer flink的消费者的group.id
properties.setProperty("security.protocol", security_protocol);
properties.setProperty("sasl.mechanism", sasl_mechanism);
properties.setProperty("sasl.kerberos.service.name",sasl_kerberos_service_name);
//创建一个消费者
FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(TOPICNAME, new SimpleStringSchema(), properties);
// 定义kafka流
logger.info("定义kafka流");
SingleOutputStreamOperator<String> kafkaDataStream = streamEnv.addSource(kafkaConsumer)
.filter(new RichFilterFunction<String>() {
@Override
public void open(Configuration parameters) throws Exception {
ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
exec.scheduleAtFixedRate(() -> reload(), INITIALDELAY, PERIOD, TimeUnit.MINUTES);
}
//重复读取数据
public void reload() {
}
@Override
public boolean filter(String s) throws Exception {
logger.error("kafka数据读取 : data = {}",s);
return true;
}
});
streamEnv.execute("Data-Verify-APPS");
}
}
4.4.运行flink跑jar
./bin/flink run -c org.track.manager.data.verify.monitor.verify.DataVerifyApp /app/source/track-manager-data-verify-1.0.jar
运行成功
4.5.测试
生产者输入消息
成功读取到数据
五、java实现flink订阅Kerberos认证的Kafka消息demo
其实“4.3.Java代码实现”中已经是源代码了,要是搭建不起来的话,就去下面地址下载得了
https://download.csdn.net/download/weixin_43857712/16713877
参考:
https://blog.csdn.net/qq_41787086/article/details/103434489
https://blog.csdn.net/Mrerlou/article/details/114986255