kafka_java

使用kafka在eclipe上可以设计生产着 和消费者
例子一
将本地文件上传到kafka上然后通过设计kafka的消费者取回到本地


上传到kafka上需要
KafkaProducerproducer;
Properties;//kafka的链接需要初始化数据这里需要properties将所需的东西以字符串的形式写在properties文件中所需东西不多且不会修改的情况下可以直接写在类里面.
FileInputStream 以字节流的方式传入到kafka


package com.ocean.kafka;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class HomeWork {

    private KafkaProducer<String, byte[]> producer;
    private Properties properties;

    // 初始化数据
    public HomeWork() {
        properties = new Properties();
        properties.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

        producer = new KafkaProducer<String, byte[]>(properties);
    }

    // 指定发送到的topic
    public void assignPartitionSend(String key, byte[] value) {

        ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>("home-work_pic",0, key, value);
        producer.send(record);
    }

    // 准备数据
    public void preparLocalData() throws IOException {
        File file = new File("C:\\Users\\Administrator\\Desktop\\psb.jpg");
        FileInputStream fis =new FileInputStream(file);
        
        byte[] context = new byte[1024];
        int a = 0;
        int length=0;
        while ((length=fis.read(context))!=-1) {
//这里要注意得判断读取内容的实际长度 如果不这样设置 回到多出来//很多空格如果是图片的话则取回时无法还原
            byte[] newbyte =new byte[length];
            System.arraycopy(context, 0,   newbyte, 0, length);
            assignPartitionSend("TIMES" + a, newbyte);
            
            a++;
        }
        
        fis.close();
    }

    

    public void close() {
        producer.flush();
        producer.close();
    }

    public static void main(String[] args) throws IOException {
        HomeWork homeWork = new HomeWork();
         try {
         homeWork.preparLocalData();
         } catch (IOException e) {
        
         e.printStackTrace();
         }
    
        homeWork.close();
    }
}

将数据传到Kafka上之后要想查看 可以通过zookeeper的客户端但是什么都看不懂因为是字节流(需要注意的一点是文件传到kafka上分区为0不然的话就会出现文件对应不上 )
最后想看的话就是取回来 下面的累就是将kafka上的文件数据传回到本地


package com.ocean.kafka;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class HomeWork2 {
    private Properties properties = new Properties();
    private KafkaConsumer<String, byte[]> consumer;

    public HomeWork2() {

        properties = new Properties();
        properties.put("bootstrap.servers", "master:9092");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.setProperty("group.id", "home-work_pic");
        consumer = new KafkaConsumer<String, byte[]>(properties);

    }
    
    public void getfile() throws IOException {
        File file =new File("C:\\Users\\Administrator\\Desktop\\output.jpg");
        FileOutputStream fileOutputStream =new FileOutputStream(file,true);
        List<String>topics =new ArrayList<String>();
        topics.add("home-work_pic");
        consumer.subscribe(topics);
        while(true){
            ConsumerRecords<String, byte[]> records =consumer.poll(1000);
            for (ConsumerRecord<String, byte[]> record : records) {
                if(record.value()!=null){
                System.out.println(record.value());
                byte[] b =record.value();
            fileOutputStream.write(b);
                fileOutputStream.flush();
                }
            }
        }
        
    }
        
    public static void main(String[] args)  {
        HomeWork2 homeWork2 =new HomeWork2();
        try {
            homeWork2.getfile();
        } catch (IOException e) {
        
            e.printStackTrace();
        }
    }
}

这就是一个简单的设置将文件以字节流的方式上传和下载从kafka上

上一篇:Flume 之 Interceptors


下一篇:Kafka(Quickstart)