// SparkStreaming+Kafka 数据传导
package com.swust.streaming; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Properties; import java.util.Random; /** * kafka作为主流的消息队列 与spark整合是实际操作中较为常用的一种数据传导模式 * @author 雪瞳 * @Slogan 时钟尚且前行,人怎能再此止步! * @Function 向kafka中生产数据 * */ public class ProduceToKafka { private final static Random random = new Random(); private final static String[] ArrayChannel = new String[]{"Spark","Scala","Kafka","Flink","Hadoop","Storm","Hive","Impala","HBase","ML"}; private final static String[] ArrayActionName = new String[]{"View", "Register"}; private final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); public static void main(String[] args) { Properties properties = new Properties(); properties.setProperty("bootstrap.servers","data001:9092,data003:9092,data004:9092"); properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); int count = 0; int keyFlag = 0; while (true){ count++; keyFlag++; String userLogs = getUserLogs(); producer.send(new ProducerRecord<String, String>("wdxll","key:"+keyFlag,userLogs)); System.out.println(userLogs); if (count % 200 ==0){ count = 0; try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static String getUserLogs(){ StringBuffer logs = new StringBuffer(); long time = System.currentTimeMillis(); long userID = 0L; long pageID = 0L; userID = random.nextInt(2000); pageID = random.nextInt(2000); String chanel = ArrayChannel[random.nextInt(ArrayChannel.length)]; String action = ArrayActionName[random.nextInt(ArrayActionName.length)]; String format = sdf.format(new Date()); logs.append(format) .append("\t") .append(time) .append("\t") .append(userID) .append("\t") .append(pageID) .append("\t") .append(chanel) .append("\t") .append(action); return logs.toString(); } }
// 执行结果
./kafka-consloe-consumer.sh --zookeeper data003:2181,data004:2181,data005:2181 --topic wdxll