每日一题 为了工作 2020 0425 第五十四题

// SparkStreaming+Kafka 数据传导

package com.swust.streaming;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.Random;

/**
 * kafka作为主流的消息队列 与spark整合是实际操作中较为常用的一种数据传导模式
 * @author 雪瞳
 * @Slogan 时钟尚且前行,人怎能再此止步!
 * @Function 向kafka中生产数据
 *
 */
public class ProduceToKafka {

    private final static Random random = new Random();
    private final static String[] ArrayChannel = new String[]{"Spark","Scala","Kafka","Flink","Hadoop","Storm","Hive","Impala","HBase","ML"};
    private final static String[] ArrayActionName = new String[]{"View", "Register"};
    private final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");


    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","data001:9092,data003:9092,data004:9092");
        properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        int  count = 0;
        int  keyFlag = 0;

        while (true){
            count++;
            keyFlag++;
            String userLogs = getUserLogs();
            producer.send(new ProducerRecord<String, String>("wdxll","key:"+keyFlag,userLogs));
            System.out.println(userLogs);
            if (count % 200 ==0){
                count = 0;
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public static String getUserLogs(){
        StringBuffer logs = new StringBuffer();
        long time = System.currentTimeMillis();
        long userID = 0L;
        long pageID = 0L;

        userID = random.nextInt(2000);
        pageID = random.nextInt(2000);

        String chanel = ArrayChannel[random.nextInt(ArrayChannel.length)];
        String action = ArrayActionName[random.nextInt(ArrayActionName.length)];
        String format = sdf.format(new Date());

        logs.append(format)
                .append("\t")
                .append(time)
                .append("\t")
                .append(userID)
                .append("\t")
                .append(pageID)
                .append("\t")
                .append(chanel)
                .append("\t")
                .append(action);

        return logs.toString();
    }
}

  

// 执行结果

./kafka-consloe-consumer.sh --zookeeper data003:2181,data004:2181,data005:2181 --topic wdxll

每日一题 为了工作 2020 0425  第五十四题

 

 每日一题 为了工作 2020 0425  第五十四题

 

 

上一篇:Nginx实现负载均衡


下一篇:[知识路书][scrum meeting] Alpha 0425 进度同步