Flink tableapi数据写入ES

tabEnv.connect(new Elasticsearch()
      .version("6")
      .host("localhost",9092,"http")
      .index("sensor")
      .documentType("test")
    )
      .inUpsertMode()
      .withFormat(new Json())
      .withSchema(new Schema()
        .field("id",DataTypes.STRING())
        .field("cnt",DataTypes.DOUBLE())
        .field("temp",DataTypes.DOUBLE())
      ).createTemporaryTable("es_output_table")

上一篇:Flink java作为消费者连接虚拟机中的kafka/或本地的kafka,并解决java.net.UnknownHostException报错


下一篇:kafka集群搭建