E-MapReduce Kafka Kerberos集群授权

Kafka授权

如果没有开启Kafka认证(如Kerberos认证或者简单的用户名密码),即使开启了Kafka授权,用户也可以伪造身份访问服务。所以建议创建高安全模式(即支持Kerberos)的Kafka集群,详见Kerberos安全文档

备注:

本文的权限配置只针对E-MapReduce的高安全模式集群,即Kafka以Kerberos的方式启动。

1. 添加配置

在Kafka集群的配置管理->Kafka->配置->server.properties->自定义配置

添加如下几个参数:

key value 备注
authorizer.class.name kafka.security.auth.SimpleAclAuthorizer
super.users User:kafka User:kafka是必须的,可添加其它用户用分号(;)隔开

备注:

zookeeper.set.acl用来设置kafka在zookeeper中数据的权限,E-MapReduce集群中已经设置为true,所以上述不需要再添加该配置。该配置打开后,在Kerberos环境中,只有用户名称为kafka且通过Kerberos认证后才能执行kafka-topics.sh命令(kafka-topics.sh会直接读写/修改zookeeper中的数据)。

2. 重启Kafka集群

在Kafka集群的配置管理->HBase->操作->RESTART All Components

3. 授权(ACL)

3.1 基本概念

Kafka官方文档定义:

Kafka acls are defined in the general format of "Principal P is [Allowed/Denied] Operation O From Host H On Resource R"

即ACL过程涉及Principal Allowed/Denied Operation Host Resource

  • Principal:用户名
安全协议 value
PLAINTEXT ANONYMOUS
SSL ANONYMOUS
SASL_PLAINTEXT mechanism为PLAIN时,用户名是client_jaas.conf指定的用户名,mechanism为GSSAPI时,用户名为client_jaas.conf指定的principal
SASL_SSL
  • Allowed/Denied: 允许/拒绝
  • Operation: 操作

Read,Write,Create,DeleteAlter,Describe,ClusterAction,AlterConfigs,DescribeConfigs,IdempotentWrite,All

  • Host: 针对的机器
  • Resource: 权限作用的资源对象

Topic, Group, Cluster, TransactionalId

Operation/Resource的一些详细对应关系,如哪些Resource支持哪些Operation的授权,详见KIP-11 - Authorization Interface

3.2 授权命令

-使用脚本 kafka-acls.sh (/usr/lib/kafka-current/bin/kafka-acls.sh) 进行授权
具体是使用方式可以直接执行 kafka-acls.sh --help进行查看。

4. 操作示例

在已经创建的E-MapReduce高安全Kafka集群的master节点上进行相关示例操作。

4.1 新建用户test

useradd test

4.2 创建topic

第一节添加配置的备注中提到zookeeper.set.acl=true,kafka-topics.sh需要在kafka账号执行,而且kafka账号下要通过Kerberos认证。

#kafka_client_jaas.conf中已经设置了kafka的Kerberos认证相关信息
export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf" 
# zookeeper地址改成自己集群的对应地址(执行hostnamed的输出)
kafka-topics.sh --create --zookeeper emr-header-1:2181/kafka-1.0.0 --replication-factor 3 --partitions 1 --topic test

4.3 test用户执行kafka-console-producer.sh

  • 创建test用户的keytab文件,用户zookeeper/kafka的认证

    su root
    sh /usr/lib/has-current/bin/hadmin-local.sh /etc/ecm/has-conf -k /etc/ecm/has-conf/admin.keytab
    HadminLocalTool.local: #直接按回车可以看到一些命令的用法
    HadminLocalTool.local: addprinc #输入命令按回车可以看到具体命令的用法
    HadminLocalTool.local: addprinc -pw 123456 test #添加test的princippal,密码设置为123456
    HadminLocalTool.local: ktadd -k /home/test/test.keytab test #导出keytab文件,后续可使用该文件
  • 添加kafka_client_test.conf

    如文件放到/home/test/kafka_client_test.conf,内容如下:

    KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    serviceName="kafka"
    keyTab="/home/test/test.keytab"
    principal="test";
       };
    
       // Zookeeper client authentication
       Client {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    useTicketCache=false
    serviceName="zookeeper"
    keyTab="/home/test/test.keytab"
    principal="test";
     };
  • 添加producer.conf

    如文件放到/home/test/producer.conf,内容如下:

    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=GSSAPI
  • 执行kafka-console-producer.sh

    su test
    export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/test/kafka_client_test.conf"
    kafka-console-producer.sh --producer.config /home/test/producer.conf --topic test --broker-list emr-worker-1:9092

    由于没有设置ACL,所以上述会报错:

    org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test]

  • 设置ACL

    同样kafka-acls.sh也需要kafka账号执行

     su kafka
     export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf" 
     kafka-acls.sh --authorizer-properties zookeeper.connect=emr-header-1:2181/kafka-1.0.0 --add --allow-principal User:test --operation Write --topic test
  • 再执行kafka-console-producer.sh

    su test
    export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/test/kafka_client_test.conf"
     kafka-console-producer.sh --producer.config /home/test/producer.conf --topic test --broker-list emr-worker-1:9092

    正常:

    [2018-02-28 22:25:36,178] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser)
    >alibaba
    >E-MapReduce
    >

4.4 test用户执行kafka-console-consumer.sh

上面成功执行kafka-console-producer.sh,并往topic里面写入一些数据后,就可以执行kafka-console-consumer.sh进行消费测试.

  • 添加consumer.conf

    如文件放到/home/test/consumer.conf,内容如下:

    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=GSSAPI
  • 执行kafka-console-consumer.sh

    su test
    #kafka_client_test.conf跟上面producer使用的是一样的
    export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/test/kafka_client_test.conf"
    kafka-console-consumer.sh --consumer.config consumer.conf  --topic test --bootstrap-server emr-worker-1:9092 --group test-group  --from-beginning

    由于未设置权限,会报错:

    org.apache.kafka.common.errors.GroupAuthorizationException: 
    Not authorized to access group: test-group
  • 设置ACL

    su kafka
    export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf" 
    #test-group权限
    kafka-acls.sh --authorizer-properties zookeeper.connect=emr-header-1:2181/kafka-1.0.0 --add --allow-principal User:test --operation Read --group test-group
    # topic权限
     kafka-acls.sh --authorizer-properties zookeeper.connect=emr-header-1:2181/kafka-1.0.0 --add --allow-principal User:test --operation Read --topic test
  • 再执行kafka-console-consumer.sh

    su test
    #kafka_client_test.conf跟上面producer使用的是一样的
    export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/test/kafka_client_test.conf"
    kafka-console-consumer.sh --consumer.config consumer.conf  --topic test --bootstrap-server emr-worker-1:9092 --group test-group  --from-beginning

    正常输出:

     alibaba
     E-MapReduce
上一篇:SparkSQL自适应执行


下一篇:统计单词个数