json数据入库kafka

package main.scala.com.web.zhangyong168.cn.spark.java;

import com.alibaba.fastjson.JSONObject;
import com.web.zhangyong168.cn.spark.util.PropertiesUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.sparkproject.guava.collect.Lists;

import java.util.*;

/**

  • @version 1.0.0

  • @Author zhangyong

  • @Description json数据入库kafka

  • @Date 2020/06/05 14:40
    **/
    public class WirteKafka {

    /**

    • 配置文件的路径

    • @param proUrl 配置文件路径

    • @param runModel 运行模式 test dev produce

    • @return properties
      */
      public static Properties getProperties(String proUrl, String runModel) {
      Properties props = PropertiesUtils.loadProps("kafka.properties");
      Properties properties = new Properties();
      properties.put("bootstrap.servers", props.get(runModel + ".bootstrap.servers"));
      properties.put("zookeeper.connect", props.get(runModel + ".zookeeper.connect"));
      properties.put("group.id", props.get(runModel + ".group.id"));
      properties.put("key.serializer", props.get(runModel + ".key.serializer"));
      properties.put("value.serializer", props.get(runModel + ".value.serializer"));

      return properties;
      }

    /**

    • 获得数据结果集
    • @param accessArray 参数
    • @return list
      */
      public static List<Map<String, Object>> getResultList(AccessArray accessArray) {
      List<Map<String, Object>> list = new ArrayList<>();
      int columnNamelengths = accessArray.getColumnNames().length;
      for (Object[] tmpValue : accessArray.getRecordArrayValue()) {
      Map<String, Object> parameters = new LinkedHashMap<>();
      if (columnNamelengths == tmpValue.length) {
      for (int j = 0; j < columnNamelengths; j++) {
      parameters.put(accessArray.getColumnName(j), tmpValue[j].toString());
      }
      }
      list.add(parameters);
      }
      return list;
      }

    /**

    • 添加kafak数据
    • @param data 数据
      */
      public static void insertKafkaDatas(String data) {
      Properties props = getProperties("kafka.properties", "test");
      AdminClient create = KafkaAdminClient.create(props);//创建Topic
      create.createTopics(Lists.newArrayList(new NewTopic("lanhuahua", 1, (short) 1)));
      Producer<String, String> producer = new KafkaProducer<String, String>(props);
      //没有key的存入
      //producer.send(new ProducerRecord<String, String>("lanhuahua", data));
      //key 存入
      producer.send(new ProducerRecord<String, String>("lanhuahua", "woaini", data));
      create.close();
      producer.close();
      }

    public static void main(String[] args) {
    String str1 = "{"tableName":"yunduo.tb_role_user","columnNames":[ "name", "age", "birthday" ]," +
    ""columnTypes":[0,0,0]," +
    ""columnValues":[["daniel","20","2020-06-02"]," +
    "["huahua","25","2020-06-03"]]" +
    "}";
    AccessArray accessArray = JSONObject.parseObject(str1, AccessArray.class);
    System.out.println(accessArray);
    List<Map<String, Object>> list = getResultList(accessArray);
    insertKafkaDatas(str1.toString());
    // kafkaProducer("你是我的眼3");
    }

}

json数据入库kafka

上一篇:[CSS] place-content = align-items + justify-content


下一篇:Android 美团Robust热更新 使用入门