JAVA多种向influxDB中插入数据方式

package cn.test.jmeter.util;

import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.Point.Builder;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;


/**
 * @author weisy
 * @date 2021/5/11
 * @Description
 */
public class InfluxDbUtil {
    private static String openurl = "http://IP:8086";//连接地址
    private static String username = "admin";//用户名
    private static String password = "admin";//密码
    private static String database = "jmeter";//数据库
    private String  measurement;//表名

    private InfluxDB influxDB;


    public InfluxDbUtil(String username, String password, String openurl, String database) {
        this.username = username;
        this.password = password;
        this.openurl = openurl;
        this.database = database;
    }

    public static InfluxDbUtil setUp() {
//创建 连接
        InfluxDbUtil influxDbUtil = new InfluxDbUtil(username, password, openurl, database);

        influxDbUtil.influxDbBuild();

//        influxDbUtil.createRetentionPolicy();

// influxDB.deleteDB(database);
// influxDB.createDB(database);
        return influxDbUtil;
    }

    /**
     * 连接时序数据库;获得InfluxDB
     **/
    public InfluxDB influxDbBuild() {
        if (influxDB == null) {
            influxDB = InfluxDBFactory.connect(openurl, username, password);
            influxDB.createDatabase(database);
//            influxDB.setRetentionPolicy("one_month");
        }
        return influxDB;
    }

    /**
     *  * 设置数据保存策略
     *  * defalut 策略名 /database 数据库名/ 30d 数据保存时限30天/ 1 副本个数为1/ 结尾DEFAULT 表示 设为默认的策略
     *  
     */
//    public void createRetentionPolicy() {
//        String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT",
//                "defalut", database, "30d", 1);
//        this.query(command);
//    }

    /**
     *  * 查询
     *  * @param command 查询语句
     *  * @return
     *  
     */
    public QueryResult query(String command) {
        return influxDB.query(new Query(command, database));
    }

    /**
     *  * 插入
     *  * @param tags 标签
     *  * @param fields 字段
     *  
     */
    /*
     * 单条插入:多个tag多个field
     * */
    public void insert1(Map<String, String> tags, Map<String, Object> fields,String measurement) {
        Builder builder = Point.measurement(measurement);
        builder.tag(tags);
        builder.fields(fields);

        influxDB.write(database, "", builder.build());
    }

    /*
    * 单条插入:多个tag多个field自定义时间戳
    * */
    public void insert2(Map<String, String> tags, Map<String, Object> fields,String measurement,Long timeStamp,TimeUnit timeUnit) {
        Builder builder = Point.measurement(measurement);
        builder.tag(tags);
        builder.fields(fields);
        if (0 != timeStamp) {
            builder.time(timeStamp, timeUnit);
        }
        influxDB.write(database, "", builder.build());
    }

    /*
    * 单条插入:单条tag单条field
    * */
    public void insert3(String tags, Integer fields,String measurement,Long timeStamp) {

        Point point = Point.measurement(measurement)
                .time(timeStamp, TimeUnit.MILLISECONDS)
                .tag("label", tags)
                .addField("value", fields).build();
        influxDB.write("jmeter", "", point);
    }


    /*
    * 批量插入
    * */
    public void insertBatch(ArrayList<Map<String, String>> sqlserverlist, String measurement) {

        BatchPoints batchPoints = BatchPoints
                .database("jmeter")
                .build();
        //遍历sqlserver获取数据
        for(Map<String, String> map : sqlserverlist) {
            //创建单条数据对象——表名
            Point point = Point.measurement(measurement)
                    .time(Long.parseLong(map.get("timeStamp")), TimeUnit.MILLISECONDS)
                    //tag属性——只能存储String类型
                    .tag("label", map.get("label"))
                    //field存储数据
                    .addField("value", 1)
                    .addField("rt", map.get("rt"))
                    .build();
            //将单条数据存储到集合中
            batchPoints.point(point);
        }
        influxDB.write(batchPoints);
    }

    public void insertBatch2(ArrayList<Map<String, String>> sqlserverlist, String measurement) {

        BatchPoints batchPoints = BatchPoints
                .database("jmeter")
                .build();
        //遍历sqlserver获取数据
        for(Map<String, String> map : sqlserverlist) {
            //创建单条数据对象——表名
            Point point = Point.measurement(measurement)
                    .time(Long.parseLong(map.get("timeStamp")), TimeUnit.MILLISECONDS)
                    //tag属性——只能存储String类型
                    .tag("label", map.get("label"))
                    //field存储数据
                    .addField("value", Integer.valueOf(map.get("value")))
                    .build();
            //将单条数据存储到集合中
            batchPoints.point(point);
        }
        influxDB.write(batchPoints);
    }

    /**
     *  * 删除
     *  * @param command 删除语句
     *  * @return 返回错误信息
     *  
     */
    public String deleteMeasurementData(String command) {
        QueryResult result = influxDB.query(new Query(command, database));
        return result.getError();
    }

    /**
     *  * 创建数据库
     *  * @param dbName
     *  
     */
    @SuppressWarnings("deprecation")
    public void createDB(String dbName) {
        influxDB.createDatabase(dbName);
    }



    /**
     *  * 删除数据库
     *  * @param dbName
     *  
     */
    public void deleteDB(String dbName) {
        influxDB.deleteDatabase(dbName);
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public String getOpenurl() {
        return openurl;
    }

    public void setOpenurl(String openurl) {
        this.openurl = openurl;
    }

    public void setDatabase(String database) {
        this.database = database;
    }

    public static void main(String[] args) {

        InfluxDbUtil influxDB = InfluxDbUtil.setUp();
        Map<String, String> tags = new HashMap<>();
        Map<String, Object> fields = new HashMap<>();
        tags.put("TAG_NAME","abc");
        fields.put("TAG_VALUE","111");
//        try {
//            fields.put("TIMAMPEST", DateUtil.getCurrentDateStr());
//        } catch (Exception e) {
//            e.printStackTrace();
//        }
        influxDB.insert1(tags, fields,"table2");
        //查询
        QueryResult result=  influxDB.query("select *from table2 order by time");
        System.out.println(result);
    }

}

 

上一篇:InfluxDB支持collectd数据收集的配置


下一篇:1. Centos7 下 InfluxDB 从安装开始到入门