碰到的问题
(1)线程操作问题,因为单机节点,代码加锁就好了,后续再写
(2) 消费者写hdfs的时候以流的形式写入,但是什么时候关闭流就是一个大问题了,这里引入了 fsDataOutputStream.hsync();
hsync 保证 hdfs在写数据的时候被新的reader读到,保证数据被datanode持久化
生产者
package com.xuliugen.kafka.demo; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class ProducerDemo { // Topic
private static final String topic = "tangsonghuai"; public static void main(String[] args) throws Exception { Properties props = new Properties();
props.put("bootstrap.servers", "192.168.15.140:9092");
props.put("acks", "0");
props.put("group.id", "1111");
props.put("retries", "0");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //生产者实例
KafkaProducer producer = new KafkaProducer(props); int i = 1; // 发送业务消息
// 读取文件 读取内存数据库 读socket端口
while (i<50) {
Thread.sleep(100);
producer.send(new ProducerRecord<String, String>(topic, "key:" + i, "value:" + i));
System.out.println("key:" + i + " " + "value:" + i);
i++;
}
}
}
消费者
package com.xuliugen.kafka.demo; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
import java.util.*; public class ConsumerDemo {
private static final Logger logger = LoggerFactory.getLogger(ConsumerDemo.class);
private static final String topic = "tangsonghuai"; public static void main(String[] args) throws IOException { Properties props = new Properties();
props.put("bootstrap.servers", "192.168.15.140:9092");
props.put("group.id", "1111");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(topic));
int i = 0;
String uri = "hdfs://192.168.15.140:9000/";
Configuration configuration = new Configuration();
configuration.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER"); FileSystem fs = FileSystem.get(URI.create(uri), configuration);
final String pathString = "/d1/tangsonghuai";
final FSDataOutputStream fsDataOutputStream = fs.append(new Path(pathString));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
// HashMap<String, String> hmap= new HashMap<String, String>();
// hmap.put(record.key(),record.value()); fsDataOutputStream.write((record.offset()+","+record.key() + "," + record.value()+"\n").getBytes());
fsDataOutputStream.hsync();
i++;
if (i == 70) {
fsDataOutputStream.close();
consumer.close();
} // IOUtils.copyBytes(new ByteArrayInputStream(record.value().getBytes()),
// fsDataOutputStream,configuration, true);
}
} }
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>com.xuliugen.kafka</groupId>
<artifactId>kafka.demo</artifactId>
<version>1.0-SNAPSHOT</version> <dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency> <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.12</version>
</dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.5</version>
</dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.8.5</version>
</dependency> </dependencies> </project>