kafka练习

package com.ocean.kafka;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import javax.swing.plaf.multi.MultiButtonUI;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

public class MennuCommitConsumer {
    private Properties properties = new Properties();
    private KafkaConsumer<String, String> consumer;

    public MennuCommitConsumer() {

        properties.setProperty("bootstrap.servers", "master:9092");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("group.id", "java_group");
        // properties.setProperty("auto.offset.reset", "null");
        properties.setProperty("enable.auto.commit", "false");
        consumer = new KafkaConsumer<String, String>(properties);
    }

    public void subscirbleTopc() {
        List<String> topics = new ArrayList<String>();
        topics.add("b");
        topics.add("from-java");
        consumer.subscribe(topics);

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("partition:" + record.partition() + "offset:" + record.offset() + "key:"
                        + record.key() + "value:" + record.value());
            }
            // consumer.commitSync();
            // 这句话是为了提交数据 如果不写 则会在下次启动时 还会出现

        }
    }

    public void getOffset() {
        OffsetAndMetadata offsets = consumer.committed(new TopicPartition("b", 0));
        System.out.println("offsets:" + offsets.offset());
    }
    // 制定分区消费 指定从offset的值出开始消费
    // 对消费着topic的消费指定有两种方式
    // 1.consumer.subscribe(topics);

    // 2.consumer.assign(topicPartitions);
    public void sonsumerAssigned() {
        // List<String>topics= new ArrayList<String>();
        // topics.add("b");
        // consumer.subscribe(topics);
        // 指定分区
        List<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
        topicPartitions.add(new TopicPartition("from-java", 0));
        consumer.assign(topicPartitions);
        // 指定分区的offset分区的位置
        consumer.seek(new TopicPartition("from-java", 0), 21);

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(
                        "partition:" + record.partition() + "offset:" + record.offset() + "value:" + record.value());
            }
        }

    }

    public void setCommentOffset() {

        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(new TopicPartition("from_java", 0), new OffsetAndMetadata(0));
        List<String> topics = new ArrayList<String>();
        
        topics.add("from_java");
        consumer.subscribe(topics);
        // 指定位置提交某个分区的offsets的值 这会在下一次拉取数据前生效
        consumer.commitSync(offsets);

        while (true) {

            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {

                if (record.partition() == 0) {
                    System.out.println("partition:" + record.partition() + "offset:" + record.offset() + "value:"
                            + record.value());
                }
            }
        }

    }

    public void exactlyOnceConSumer(){
        //1.配置上参数
        properties.setProperty("enable.auto.commmit", "false");
        //2.订阅主题或者分区
        //consumer.subscribe(topics);
        //重设offset (offset)的值需要从mysql中获取
        //3.从mysql中获取
        //4.1 consumer.commitSync(offsets);
        //提交到kafka服务器中
        //或者使用
        //4.2 consumer.seek(new TopicPartition("from-java",0),0);
        //来指定要从kafka中高消费数据的初始值位置
        
        //订阅主题或分区
        //consumer.subscribe(topics);
        
        //5. poll数据
//        recordes =consumer.pool(1000)
        
        //6. 遍历参数值分析计算
        
        //7.计算结束之后使用consumer.committed(new TopicPartition("from-java",1))
        //获取当前消费的offset值
        
        //8.把计算结果和offset值 以原子操作(事物)的形式保存到mysql数据库
        
        //9.重新调到第五步循环执行 进行下一次pool和下一次计算
    }
    
    
    public static void main(String[] args) {
        MennuCommitConsumer mennuCommitConsumer = new MennuCommitConsumer();
        // mennuCommitConsumer.subscirbleTopc();
        // mennuCommitConsumer.getOffset();
        mennuCommitConsumer.sonsumerAssigned();
        mennuCommitConsumer.setCommentOffset();

    }

}

package com.ocean.kafka;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.log4j.pattern.PropertiesPatternConverter;

public class ProducerConsumer {

    private Properties properties = new Properties();
    private KafkaConsumer<String, String> consumer;

    public ProducerConsumer() {

        properties = new Properties();
        properties.put("bootstrap.servers", "master:9092");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        properties.setProperty("group.id", "java-group");
        consumer = new KafkaConsumer<String, String>(properties);

    }

    public void subscribeTopic() {
        List<String> topics = new ArrayList<String>();
        topics.add("home-work_pic");
        consumer.subscribe(topics);
        // 循环从kafka中拉取数据
        while (true) {
            // 从kafka中拉取数据
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("接收信息:partition" + record.partition() + "offset:" + record.offset() + "key:"
                        + record.key() + "value:" + record.value());
            }
        }
    }

    public static void main(String[] args) {
        ProducerConsumer producerConsumer = new ProducerConsumer();
        producerConsumer.subscribeTopic();

    }
}


package com.ocean.kafka;

import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerKafka {

    private KafkaProducer<String, String> producer;
    private Properties properties;

    public ProducerKafka() {
    
        properties=new Properties();
        properties.put("bootstrap.servers", "master:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
//        properties.put("acks",  "all");
//        properties.put("retries", 0);
//        
        producer=new KafkaProducer<String, String>(properties);
    
    }
    public void assignPartitionSend(String key,String value){
        
        ProducerRecord<String, String>record =new ProducerRecord<String, String>("from-java", 0,key,value);
        producer.send(record);
            
    }
    public void sendRecorder(String key,String value){
        Logger logger=LoggerFactory.getLogger(ProducerKafka.class);
        ProducerRecord<String, String> record =new ProducerRecord<String, String>("from-java", key,value);
        producer.send(record);
    }
    public void getTopicPartitions(String topic){
        
        Logger logger=LoggerFactory.getLogger(ProducerKafka.class);
//        ProducerRecord<String, String> record =new ProducerRecord<String, String>("from-java", key,value);
        
        List<PartitionInfo> partitionInfos =producer.partitionsFor(topic);
        for (PartitionInfo partitionInfo : partitionInfos) {
            System.out.println(partitionInfo);
        }
        
    }
    
    public void getMetrics(){
        @SuppressWarnings("unchecked")
        Map<MetricName, Metric> metrics =(Map<MetricName, Metric>) producer.metrics();
        for (MetricName name : metrics.keySet()) {
            System.out.println(name.name()+":"+metrics.get(name).value());
        }
        
    }
    
    public void sendRecorderWithCallback(String key,String value){
        final Logger logger=LoggerFactory.getLogger(ProducerKafka.class);
        ProducerRecord<String, String>record =new ProducerRecord<String, String>("from-java",key,value);
        Callback callback=new Callback() {
            //回掉方法
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                
                if(exception==null){
                    logger.info("存储位置:partition:"+metadata.partition()+",offset:"+metadata.offset()+",ts:"+metadata.timestamp());
                }else{
                    logger.warn("服务端出现异常");
                    exception.printStackTrace();
                }
            
            }
        };
        producer.send(record,callback);
    }
    public void close(){
        producer.flush();
        producer.close();
    }
    

    
    public static void main(String[] args) {
        
        ProducerKafka client =new ProducerKafka();
        for(int i=0;i<100;i++){
            client.sendRecorderWithCallback("Ckey"+i, "Cvalue"+i);
        }
//        client.getMetrics();
        client.close();
    }
}

上一篇:Flume sink=avro rpc connection error


下一篇:Spark..........WordCount