Kafka(第二节)通过JavaAPI操作Kafka

首先创建maven项目,并添加依赖,pom文件如下:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version> 0.10.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>0.10.0.0</version>
    </dependency>

</dependencies>

<build>
    <plugins>
        <!-- java编译插件 -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
    </plugins>
</build>

通过JavaAPI操作Kafka生产者

示例代码:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class MyKafkaProducer
{
    public static void main(String[] args) throws InterruptedException
    {
        Properties props = new Properties();

        //指定kafka服务器地址
        props.put("bootstrap.servers", "node01:9092,node0:9092,node03:9092");
        //指定消息的确认机制
        props.put("acks", "all");
        //重试机制
        props.put("retries", 0);
        //批量发送的大小
        props.put("batch.size", 16384);
        //消息的延迟
        props.put("linger.ms", 1);
        //消息缓冲区的大小
        props.put("buffer.memory", 33554432);
        //定义key的序列化
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        //定义value的序列化
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");


        KafkaProducer<String,String> producer=new KafkaProducer<String,String>(props);

        for(int i=0;i<100;i++)
        {
            // 发送数据 ,需要一个producerRecord对象,最少参数 String topic, V value
            producer.send(new ProducerRecord<String, String>("test", "第"+i+"条记录"));
            //Thread.sleep(100);
        }

        producer.close();
    }
}

然后在producer中使用分区策略

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class MyKafkaProducer
{
    public static void main(String[] args) throws InterruptedException
    {
        Properties props = new Properties();

        //指定kafka服务器地址
        props.put("bootstrap.servers", "node01:9092,node0:9092,node03:9092");
        //指定消息的确认机制
        props.put("acks", "all");
        //重试机制
        props.put("retries", 0);
        //批量发送的大小
        props.put("batch.size", 16384);
        //消息的延迟
        props.put("linger.ms", 1);
        //消息缓冲区的大小
        props.put("buffer.memory", 33554432);
        //定义key的序列化
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        //定义value的序列化
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
		//使用自定义的分区策略
        props.put("partitioner.class","partitionTest.MyKafakaPartitioner");


        KafkaProducer<String,String> producer=new KafkaProducer<String,String>(props);

        for(int i=0;i<100;i++)
        {
            // 发送数据 ,需要一个producerRecord对象,最少参数 String topic, V value
            producer.send(new ProducerRecord<String, String>("test", "订单信息!"+i));
        }

        producer.close();
    }
}

通过JavaAPI自定义Kafka生产数据时的分区策略

示例代码:
首先实现org.apache.kafka.clients.producer.Partitioner接口

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class MyKafakaPartitioner implements Partitioner
{
	//重写partition方法来完成自定义的分区策略,返回的int值即是数据要去到的分区的分区号
    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) 
    {
        return 0;
    }

    @Override
    public void close()
    {

    }

    @Override
    public void configure(Map<String, ?> map)
    {

    }
}

通过JavaAPI操作Kafka消费者

按顺序消费数据

示例代码

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.*;

public class MyKafkaConsumer
{
    public static void main(String[] args)
    {
        Properties props=new Properties();

        //指定kafka服务器地址
        props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
        //定义消费者组,是顺便定义的
        props.put("group.id", "test group");

        //使用消费者自动提交offset值
        // props.put("enable.auto.commit", "true");
        //每次自动提交offset值的时间间隔
        // props.put("auto.commit.interval.ms",  "1000");
        //使用消费者手动提交offset值
        props.put("enable.auto.commit", "false");
        //定义key的序列化
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        //定义value的序列化
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");


        KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);
        //订阅名为test的topic
        consumer.subscribe(Arrays.asList("test"));

        while (true) //使用死循环造成阻塞
        {
            ConsumerRecords<String, String> records=consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
            {
                int partition=record.partition();
                String value=record.value();
                long offset=record.offset();
                String key=record.key();
                System.out.printf("分区号为:%d,值为:%s,偏移量为:%d,key值为:%s"+"\n",
                        partition,value,offset,key);
            }


            //将ConsumerRecords中的数据全部消费完之后,手动提交offset
            consumer.commitAsync(); //commitAsync()是异步提交,不会阻塞程序的消费
            //consumer.commitSync(); //commitSync()是同步提交,提交完offset后程序才能继续消费
        }
    }
}

注意:使用JavaAPI模拟Kafka的消费者角色时,有一个概念尤为重要,那就是offset。每次消费完数据后一定要提交offset。自动提交是定时提交并不是消费数据完毕后提交,所以提交offset一般都用手动提交。

按分区消费数据

示例代码:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.*;

public class ConsumerAllPartition
{
    public static void main(String[] args)
    {
        Properties props=new Properties();
        //指定kafka服务器地址
        props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
        //定义消费者组,是顺便定义的
        props.put("group.id", "test group");

        //使用消费者自动提交offset值
        // props.put("enable.auto.commit", "true");
        //每次自动提交offset值的时间间隔
        // props.put("auto.commit.interval.ms",  "1000");
        //使用消费者手动提交offset值
        props.put("enable.auto.commit", "false");
        //定义key的序列化
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        //定义value的序列化
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");


        KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);
        //订阅名为test的topic
        consumer.subscribe(Arrays.asList("test"));

        while (true)
        {
            ConsumerRecords<String, String> records=consumer.poll(100);
            //按分区消费
            Set<TopicPartition> partitions = records.partitions(); //获取当前topic全部的分区
            for (TopicPartition partition : partitions)
            {
                List<ConsumerRecord<String,String>> partitionRecords=records.records(partition);
                for (ConsumerRecord<String, String> record : partitionRecords)
                {
                    int nowPartition=record.partition();
                    String value=record.value();
                    long offset=record.offset();
                    String key=record.key();
                    System.out.printf("分区号为:%d,值为:%s,偏移量为:%d,key值为:%s"+"\n",
                            nowPartition,value,offset,key);
                }
                //获取当前分区最后一条数据的offset
                long lastOffset=partitionRecords.get(partitionRecords.size()-1).offset();
                //按分区提交offset
                consumer.commitSync(Collections.singletonMap(partition,
                        new OffsetAndMetadata(lastOffset+1)));
            }
        }
    }
}

消费指定某些分区的数据

示例代码:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.*;

public class ConsumerSomePartition
{
    public static void main(String[] args)
    {
        Properties props=new Properties();

        //指定kafka服务器地址
        props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
        //定义消费者组,是顺便定义的
        props.put("group.id", "test group");

        //使用消费者自动提交offset值
        // props.put("enable.auto.commit", "true");
        //每次自动提交offset值的时间间隔
        // props.put("auto.commit.interval.ms",  "1000");
        //使用消费者手动提交offset值
        props.put("enable.auto.commit", "false");
        //定义key的序列化
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        //定义value的序列化
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");


        KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);

        //构建分区列表
        TopicPartition p1=new TopicPartition("test",0);
        TopicPartition p2=new TopicPartition("test",2);
        List<TopicPartition> partitionList=Arrays.asList(p1,p2);
        //通过assign方法来注册指定只消费某些分区里的数据
        consumer.assign(partitionList);

        while (true)
        {
            ConsumerRecords<String, String> records=consumer.poll(100);

            Set<TopicPartition> partitions = records.partitions(); //获取当前topic全部的分区
            for (TopicPartition partition : partitions)
            {
                List<ConsumerRecord<String,String>> partitionRecords=records.records(partition);
                for (ConsumerRecord<String, String> record : partitionRecords)
                {
                    int nowPartition=record.partition();
                    String value=record.value();
                    long offset=record.offset();
                    String key=record.key();
                    System.out.printf("分区号为:%d,值为:%s,偏移量为:%d,key值为:%s"+"\n",
                            nowPartition,value,offset,key);
                }
                //获取当前分区最后一条数据的offset
                long lastOffset=partitionRecords.get(partitionRecords.size()-1).offset();
                //按分区提交offset
                consumer.commitSync(Collections.singletonMap(partition,
                        new OffsetAndMetadata(lastOffset+1)));
            }
        }
    }
}

通过JavaAPI操作Kafka流

Kafka流简单理解:就是把一个topic中的数据消费,并将数据存到另一个topic中。
示例代码

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;

import java.util.Properties;

public class MyKafkaStream
{
    public static void main(String[] args)
    {
        Properties properties = new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG,"bigger");
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"node01:9092");
        properties.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());//key的序列化和反序列化的类
        properties.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
        //获取核心类 KStreamBuilder
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        //通过KStreamBuilder调用stream方法表示从哪个topic当中获取数据
        //调用mapValues方法,表示将每一行value都给取出来
        //line表示我们取出来的一行行的数据
        //将转成大写的数据,写入到test2这个topic里面去
        kStreamBuilder.stream("test").mapValues(line -> line.toString().toUpperCase()).to("test2");
        //通过kStreamBuilder可以用于创建KafkaStream  通过kafkaStream来实现流失的编程启动
        KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, properties);
        kafkaStreams.start(); //调用start启动kafka的流 API
    }
}
上一篇:IBASE category 设置为01的情况下 IBASE自动创建情况


下一篇:ABAP help document F1