在kerberosed环境中向kafka主题发送消息时收到错误.我们在hdp 2.3上有集群
我跟着这个http://henning.kropponline.de/2016/02/21/secure-kafka-java-producer-with-kerberos/
但是对于发送消息,我必须首先明确地执行kinit,然后才能将消息发送到kafka主题.
我试图通过java类编织,但这也行不通.
PFB代码:
package com.ct.test.kafka;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class TestProducer {
public static void main(String[] args) {
String principalName = "ctadmin";
String keyTabPath = "/etc/security/keytabs/ctadmin.keytab";
boolean authStatus = CTSecurityUtil.loginUserFromKeytab(principalName, keyTabPath);
if (!authStatus) {
System.out.println("Authntication fails, try something else " + authStatus);
} else {
System.out.println("Authntication successfull " + authStatus);
}
System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
System.setProperty("java.security.auth.login.config", "/etc/kafka/2.3.4.0-3485/0/kafka_jaas.conf");
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
System.setProperty("sun.security.krb5.debug", "true");
try {
long events = Long.parseLong("3");
Random rnd = new Random();
Properties props = new Properties();
System.out.println("After broker list- " + args[0]);
props.put("metadata.broker.list", args[0]);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
props.put("security.protocol", "PLAINTEXTSASL");
//props.put("partitioner.class", "com.ct.test.kafka.SimplePartitioner");
System.out.println("After config prop -1");
ProducerConfig config = new ProducerConfig(props);
System.out.println("After config prop -2 config" + config);
Producer<String, String> producer = new Producer<String, String>(config);
System.out.println("After config prop -3");
for (long nEvents = 0L; nEvents < events; nEvents += 1L) {
Date runtime = new Date();
String ip = "192.168.2" + rnd.nextInt(255);
String msg = runtime + " www.example.com, " + ip;
KeyedMessage<String, String> data = new KeyedMessage<String, String>("test_march4", ip, msg);
System.out.println("After config prop -1 data" + data);
producer.send(data);
}
producer.close();
} catch (Throwable th) {
th.printStackTrace();
}
}
}
Pom.xml:从hortonworks repo下载的所有依赖项.
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.9.0.2.3.4.0-3485</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.2.3.4.0-3485</version>
</dependency>
<dependency>
<groupId>org.jasypt</groupId>
<artifactId>jasypt-spring31</artifactId>
<version>1.9.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.1.2.3.4.0-3485</version>
</dependency>
</dependencies>
错误:
案例1:当我指定myuser kafka_jass.conf时
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
After config prop -2 configkafka.producer.ProducerConfig@643293ae
java.lang.SecurityException: Configuration Error:
Line 6: expected [controlFlag]
at com.sun.security.auth.login.ConfigFile.<init>(ConfigFile.java:110)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:379)
at javax.security.auth.login.Configuration$2.run(Configuration.java:258)
at javax.security.auth.login.Configuration$2.run(Configuration.java:250)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:249)
at org.apache.kafka.common.security.kerberos.Login.login(Login.java:291)
at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104)
at kafka.common.security.LoginManager$.init(LoginManager.scala:36)
at kafka.producer.Producer.<init>(Producer.scala:50)
at kafka.producer.Producer.<init>(Producer.scala:73)
at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)
at com.ct.test.kafka.TestProducer.main(TestProducer.java:51)
Caused by: java.io.IOException: Configuration Error:
Line 6: expected [controlFlag]
at com.sun.security.auth.login.ConfigFile.match(ConfigFile.java:563)
at com.sun.security.auth.login.ConfigFile.parseLoginEntry(ConfigFile.java:413)
at com.sun.security.auth.login.ConfigFile.readConfig(ConfigFile.java:383)
at com.sun.security.auth.login.ConfigFile.init(ConfigFile.java:283)
at com.sun.security.auth.login.ConfigFile.init(ConfigFile.java:219)
at com.sun.security.auth.login.ConfigFile.<init>(ConfigFile.java:108)
MyUser_Kafka_jass.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useTicketCache=true
renewTicket=true
principal="ctadmin/prod-dev1-dn1@PROD.COM";
useKeyTab=true
serviceName="kafka"
keyTab="/etc/security/keytabs/ctadmin.keytab"
client=true;
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/etc/security/keytabs/ctadmin.keytab"
storeKey=true
useTicketCache=true
serviceName="zookeeper"
principal="ctadmin/prod-dev1-dn1@PROD.COM";
};
case2:当我指定Kafkas自己的jaas文件时
Java config name: /etc/krb5.conf
Loaded from Java config
javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. Make sure -Djava.security.auth.login.config property passed to JVM and the client is configured to use a ticket cache (using the JAAS configuration setting 'useTicketCache=true)'. Make sure you are using FQDN of the Kafka broker you are trying to connect to. not available to garner authentication information from the user
at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:899)
at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:719)
at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:584)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at javax.security.auth.login.LoginContext.invoke(LoginContext.java:762)
at javax.security.auth.login.LoginContext.access$000(LoginContext.java:203)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:690)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:688)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:687)
at javax.security.auth.login.LoginContext.login(LoginContext.java:595)
at org.apache.kafka.common.security.kerberos.Login.login(Login.java:298)
at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104)
at kafka.common.security.LoginManager$.init(LoginManager.scala:36)
at kafka.producer.Producer.<init>(Producer.scala:50)
at kafka.producer.Producer.<init>(Producer.scala:73)
at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)
at com.ct.test.kafka.TestProducer.main(TestProducer.java:51)
这很好,如果我在运行这个应用程序之前做了kinit,否则它将通过上面的错误.
我不能在我的生产环境中这样做,如果我们的应用程序本身有任何方法可以做到这一点,那么请帮助我.
如果您需要更多详细信息,请与我们联系.
谢谢:)
解决方法:
我不知道第一次犯了什么错误,在我再次做的事情之下,它运作正常.
首先给出所有主题的访问权限:
bin/kafka-acls.sh --add --allow-principals user:ctadmin --operation ALL --topic marchTesting --authorizer-properties zookeeper.connect={hostname}:2181
创建jass文件:
卡夫卡的Jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useTicketCache=true
principal="ctadmin@HSCALE.COM"
useKeyTab=true
serviceName="kafka"
keyTab="/etc/security/keytabs/ctadmin.keytab"
client=true;
};
Java程序:
package com.ct.test.kafka;
import java.util.Date;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class KafkaProducer {
public static void main(String[] args) {
String topic = args[0];
Properties props = new Properties();
props.put("metadata.broker.list", "{Hostname}:6667");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
props.put("security.protocol", "PLAINTEXTSASL");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
for (int i = 0; i < 10; i++){
producer.send(new KeyedMessage<String, String>(topic, "Test Date: " + new Date()));
}
}
}
运行申请:
java -Djava.security.auth.login.config = / home / ctadmin / kafka-jaas.conf -Djava.security.krb5.conf = / etc / krb5.conf -Djavax.security.auth.useSubjectCredsOnly = true -cp kafka -testing-0.0.1-jar-with-dependencies.jar com.ct.test.kafka.KafkaProducer