如果现在要想将flume中的sink设置为kafka,因为在实际的开发中,可能会有若干个子系统或者若干个客户端进行flume日志采集,那么能够承受这种采集任务量的只有kafka来完成,可是需要注意一个问题,现在的kafka是采用了Kerberos认证,所以要想在flume之中去使用kafka操作,就需要考虑到开发包以及jaas配置问题。
1、将kafka的客户端的程序jar文件拷贝到flume的lib目录之中:
mv kafka-clients-0.10.2.1.jar D:\dev\apache-flume-1.7.0-bin\lib
2、在"D:\"目录下建立jass配置文件
vim D:\kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="alice"
password="alice-pwd";
};
3、修改flume.cnf文件追加kafka
vim D:\dev\apache-flume-1.7.0-bin\conf\flume.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
#a1.sources.r1.type = netcat
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.0.106
a1.sources.r1.port = 44444
# Describe the sink
# a1.sinks.k1.type = logger
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# Use a channel which buffers events in memory
# a1.channels.c1.type = memory
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = 203.195.205.63:9092
a1.channels.c1.kafka.topic = mldn-topic
a1.channels.c1.kafka.producer.security.protocol = SASL_PLAINTEXT
a1.channels.c1.kafka.producer.sasl.mechanism = PLAIN
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
4、window启动flume
cd D:\dev\apache-flume-1.7.0-bin\bind:
flume-ng.cmd agent --conf D:/dev/apache-flume-1.7.0-bin/conf --conf-file D:/dev/apache-flume-1.7.0-bin/conf/flume.conf --name a1 -property "flume.root.logger=INFO,console;java.security.auth.login.config=D:/kafka_client_jaas.conf"
5、启动kafka消费端——FlumeReceiveMessageConsumer.java
package cn.mldn.mykafka.consumer;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
/**
* Flume整合Kafka -- kafka消费端
*
* @author hp
*
*/
public class FlumeReceiveMessageConsumer {
public static final String SERVERS = "203.195.205.63:9092";
public static final String TOPIC = "mldn-topic";
static {
System.setProperty("java.security.auth.login.config",
"d:/kafka_client_jaas.conf"); // 表示系统环境属性
}
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
// 定义消息消费者的连接服务器地址
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
// 消息消费者一定要设置反序列化的程序类,与消息生产者完全对应
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-1");
// 定义消费者处理对象
Consumer<String, String> consumer = new KafkaConsumer<String, String>(
props);
consumer.subscribe(Arrays.asList(TOPIC)); // 设置消费者读取的主题名称
boolean flag = true; // 消费者需要一直进行数据的读取处理操作
while (flag) { // 一直读取消息
ConsumerRecords<String, String> allRecorders = consumer.poll(200);
for (ConsumerRecord<String, String> record : allRecorders) {
System.out.println(
"flume.key = " + record.key() + ",flume.value = " + record.value());
}
}
consumer.close();
}
}
6、启动业务程序,模拟打印消息——TestFlumeDemo.java
package cn.mldn.myflume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestFlumeDemo {
private static final Logger LOGGER = LoggerFactory
.getLogger(TestFlumeDemo.class);
public static void main(String[] args) {
for (int x = 0 ; x < 10 ; x ++) {
LOGGER.info("lynch.cn" + x);
}
}
}
7、FlumeReceiveMessageConsumer.java消费端会接收到flume采集的日志数据
flume.key = null,flume.value = 8flume.client.log4j.timestamp1593705707577<flume.client.log4j.logger.name:cn.mldn.myflume.TestFlumeDemo8flume.client.log4j.log.level
20000Fflume.client.log4j.message.encodingUTF8
flume.key = null,flume.value = 8flume.client.log4j.timestamp1593705716934<flume.client.log4j.logger.name:cn.mldn.myflume.TestFlumeDemo8flume.client.log4j.log.level
20000Fflume.client.log4j.message.encodingUTF8
flume.key = null,flume.value = 8flume.client.log4j.timestamp1593705717194<flume.client.log4j.logger.name:cn.mldn.myflume.TestFlumeDemo8flume.client.log4j.log.level
20000Fflume.client.log4j.message.encodingUTF8