首先创建maven项目,并添加依赖,pom文件如下:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version> 0.10.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
通过JavaAPI操作Kafka生产者
示例代码:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class MyKafkaProducer
{
public static void main(String[] args) throws InterruptedException
{
Properties props = new Properties();
//指定kafka服务器地址
props.put("bootstrap.servers", "node01:9092,node0:9092,node03:9092");
//指定消息的确认机制
props.put("acks", "all");
//重试机制
props.put("retries", 0);
//批量发送的大小
props.put("batch.size", 16384);
//消息的延迟
props.put("linger.ms", 1);
//消息缓冲区的大小
props.put("buffer.memory", 33554432);
//定义key的序列化
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
//定义value的序列化
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String,String> producer=new KafkaProducer<String,String>(props);
for(int i=0;i<100;i++)
{
// 发送数据 ,需要一个producerRecord对象,最少参数 String topic, V value
producer.send(new ProducerRecord<String, String>("test", "第"+i+"条记录"));
//Thread.sleep(100);
}
producer.close();
}
}
然后在producer中使用分区策略
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class MyKafkaProducer
{
public static void main(String[] args) throws InterruptedException
{
Properties props = new Properties();
//指定kafka服务器地址
props.put("bootstrap.servers", "node01:9092,node0:9092,node03:9092");
//指定消息的确认机制
props.put("acks", "all");
//重试机制
props.put("retries", 0);
//批量发送的大小
props.put("batch.size", 16384);
//消息的延迟
props.put("linger.ms", 1);
//消息缓冲区的大小
props.put("buffer.memory", 33554432);
//定义key的序列化
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
//定义value的序列化
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
//使用自定义的分区策略
props.put("partitioner.class","partitionTest.MyKafakaPartitioner");
KafkaProducer<String,String> producer=new KafkaProducer<String,String>(props);
for(int i=0;i<100;i++)
{
// 发送数据 ,需要一个producerRecord对象,最少参数 String topic, V value
producer.send(new ProducerRecord<String, String>("test", "订单信息!"+i));
}
producer.close();
}
}
通过JavaAPI自定义Kafka生产数据时的分区策略
示例代码:
首先实现org.apache.kafka.clients.producer.Partitioner接口
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class MyKafakaPartitioner implements Partitioner
{
//重写partition方法来完成自定义的分区策略,返回的int值即是数据要去到的分区的分区号
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster)
{
return 0;
}
@Override
public void close()
{
}
@Override
public void configure(Map<String, ?> map)
{
}
}
通过JavaAPI操作Kafka消费者
按顺序消费数据
示例代码
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.*;
public class MyKafkaConsumer
{
public static void main(String[] args)
{
Properties props=new Properties();
//指定kafka服务器地址
props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
//定义消费者组,是顺便定义的
props.put("group.id", "test group");
//使用消费者自动提交offset值
// props.put("enable.auto.commit", "true");
//每次自动提交offset值的时间间隔
// props.put("auto.commit.interval.ms", "1000");
//使用消费者手动提交offset值
props.put("enable.auto.commit", "false");
//定义key的序列化
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
//定义value的序列化
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);
//订阅名为test的topic
consumer.subscribe(Arrays.asList("test"));
while (true) //使用死循环造成阻塞
{
ConsumerRecords<String, String> records=consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
int partition=record.partition();
String value=record.value();
long offset=record.offset();
String key=record.key();
System.out.printf("分区号为:%d,值为:%s,偏移量为:%d,key值为:%s"+"\n",
partition,value,offset,key);
}
//将ConsumerRecords中的数据全部消费完之后,手动提交offset
consumer.commitAsync(); //commitAsync()是异步提交,不会阻塞程序的消费
//consumer.commitSync(); //commitSync()是同步提交,提交完offset后程序才能继续消费
}
}
}
注意:使用JavaAPI模拟Kafka的消费者角色时,有一个概念尤为重要,那就是offset。每次消费完数据后一定要提交offset。自动提交是定时提交并不是消费数据完毕后提交,所以提交offset一般都用手动提交。
按分区消费数据
示例代码:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
public class ConsumerAllPartition
{
public static void main(String[] args)
{
Properties props=new Properties();
//指定kafka服务器地址
props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
//定义消费者组,是顺便定义的
props.put("group.id", "test group");
//使用消费者自动提交offset值
// props.put("enable.auto.commit", "true");
//每次自动提交offset值的时间间隔
// props.put("auto.commit.interval.ms", "1000");
//使用消费者手动提交offset值
props.put("enable.auto.commit", "false");
//定义key的序列化
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
//定义value的序列化
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);
//订阅名为test的topic
consumer.subscribe(Arrays.asList("test"));
while (true)
{
ConsumerRecords<String, String> records=consumer.poll(100);
//按分区消费
Set<TopicPartition> partitions = records.partitions(); //获取当前topic全部的分区
for (TopicPartition partition : partitions)
{
List<ConsumerRecord<String,String>> partitionRecords=records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords)
{
int nowPartition=record.partition();
String value=record.value();
long offset=record.offset();
String key=record.key();
System.out.printf("分区号为:%d,值为:%s,偏移量为:%d,key值为:%s"+"\n",
nowPartition,value,offset,key);
}
//获取当前分区最后一条数据的offset
long lastOffset=partitionRecords.get(partitionRecords.size()-1).offset();
//按分区提交offset
consumer.commitSync(Collections.singletonMap(partition,
new OffsetAndMetadata(lastOffset+1)));
}
}
}
}
消费指定某些分区的数据
示例代码:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
public class ConsumerSomePartition
{
public static void main(String[] args)
{
Properties props=new Properties();
//指定kafka服务器地址
props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
//定义消费者组,是顺便定义的
props.put("group.id", "test group");
//使用消费者自动提交offset值
// props.put("enable.auto.commit", "true");
//每次自动提交offset值的时间间隔
// props.put("auto.commit.interval.ms", "1000");
//使用消费者手动提交offset值
props.put("enable.auto.commit", "false");
//定义key的序列化
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
//定义value的序列化
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);
//构建分区列表
TopicPartition p1=new TopicPartition("test",0);
TopicPartition p2=new TopicPartition("test",2);
List<TopicPartition> partitionList=Arrays.asList(p1,p2);
//通过assign方法来注册指定只消费某些分区里的数据
consumer.assign(partitionList);
while (true)
{
ConsumerRecords<String, String> records=consumer.poll(100);
Set<TopicPartition> partitions = records.partitions(); //获取当前topic全部的分区
for (TopicPartition partition : partitions)
{
List<ConsumerRecord<String,String>> partitionRecords=records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords)
{
int nowPartition=record.partition();
String value=record.value();
long offset=record.offset();
String key=record.key();
System.out.printf("分区号为:%d,值为:%s,偏移量为:%d,key值为:%s"+"\n",
nowPartition,value,offset,key);
}
//获取当前分区最后一条数据的offset
long lastOffset=partitionRecords.get(partitionRecords.size()-1).offset();
//按分区提交offset
consumer.commitSync(Collections.singletonMap(partition,
new OffsetAndMetadata(lastOffset+1)));
}
}
}
}
通过JavaAPI操作Kafka流
Kafka流简单理解:就是把一个topic中的数据消费,并将数据存到另一个topic中。
示例代码
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Properties;
public class MyKafkaStream
{
public static void main(String[] args)
{
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG,"bigger");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"node01:9092");
properties.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());//key的序列化和反序列化的类
properties.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
//获取核心类 KStreamBuilder
KStreamBuilder kStreamBuilder = new KStreamBuilder();
//通过KStreamBuilder调用stream方法表示从哪个topic当中获取数据
//调用mapValues方法,表示将每一行value都给取出来
//line表示我们取出来的一行行的数据
//将转成大写的数据,写入到test2这个topic里面去
kStreamBuilder.stream("test").mapValues(line -> line.toString().toUpperCase()).to("test2");
//通过kStreamBuilder可以用于创建KafkaStream 通过kafkaStream来实现流失的编程启动
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, properties);
kafkaStreams.start(); //调用start启动kafka的流 API
}
}