package cn.test.jmeter.util; import org.influxdb.InfluxDB; import org.influxdb.InfluxDBFactory; import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; import org.influxdb.dto.Point.Builder; import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; /** * @author weisy * @date 2021/5/11 * @Description */ public class InfluxDbUtil { private static String openurl = "http://IP:8086";//连接地址 private static String username = "admin";//用户名 private static String password = "admin";//密码 private static String database = "jmeter";//数据库 private String measurement;//表名 private InfluxDB influxDB; public InfluxDbUtil(String username, String password, String openurl, String database) { this.username = username; this.password = password; this.openurl = openurl; this.database = database; } public static InfluxDbUtil setUp() { //创建 连接 InfluxDbUtil influxDbUtil = new InfluxDbUtil(username, password, openurl, database); influxDbUtil.influxDbBuild(); // influxDbUtil.createRetentionPolicy(); // influxDB.deleteDB(database); // influxDB.createDB(database); return influxDbUtil; } /** * 连接时序数据库;获得InfluxDB **/ public InfluxDB influxDbBuild() { if (influxDB == null) { influxDB = InfluxDBFactory.connect(openurl, username, password); influxDB.createDatabase(database); // influxDB.setRetentionPolicy("one_month"); } return influxDB; } /** * * 设置数据保存策略 * * defalut 策略名 /database 数据库名/ 30d 数据保存时限30天/ 1 副本个数为1/ 结尾DEFAULT 表示 设为默认的策略 * */ // public void createRetentionPolicy() { // String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT", // "defalut", database, "30d", 1); // this.query(command); // } /** * * 查询 * * @param command 查询语句 * * @return * */ public QueryResult query(String command) { return influxDB.query(new Query(command, database)); } /** * * 插入 * * @param tags 标签 * * @param fields 字段 * */ /* * 单条插入:多个tag多个field * */ public void insert1(Map<String, String> tags, Map<String, Object> fields,String measurement) { Builder builder = Point.measurement(measurement); builder.tag(tags); builder.fields(fields); influxDB.write(database, "", builder.build()); } /* * 单条插入:多个tag多个field自定义时间戳 * */ public void insert2(Map<String, String> tags, Map<String, Object> fields,String measurement,Long timeStamp,TimeUnit timeUnit) { Builder builder = Point.measurement(measurement); builder.tag(tags); builder.fields(fields); if (0 != timeStamp) { builder.time(timeStamp, timeUnit); } influxDB.write(database, "", builder.build()); } /* * 单条插入:单条tag单条field * */ public void insert3(String tags, Integer fields,String measurement,Long timeStamp) { Point point = Point.measurement(measurement) .time(timeStamp, TimeUnit.MILLISECONDS) .tag("label", tags) .addField("value", fields).build(); influxDB.write("jmeter", "", point); } /* * 批量插入 * */ public void insertBatch(ArrayList<Map<String, String>> sqlserverlist, String measurement) { BatchPoints batchPoints = BatchPoints .database("jmeter") .build(); //遍历sqlserver获取数据 for(Map<String, String> map : sqlserverlist) { //创建单条数据对象——表名 Point point = Point.measurement(measurement) .time(Long.parseLong(map.get("timeStamp")), TimeUnit.MILLISECONDS) //tag属性——只能存储String类型 .tag("label", map.get("label")) //field存储数据 .addField("value", 1) .addField("rt", map.get("rt")) .build(); //将单条数据存储到集合中 batchPoints.point(point); } influxDB.write(batchPoints); } public void insertBatch2(ArrayList<Map<String, String>> sqlserverlist, String measurement) { BatchPoints batchPoints = BatchPoints .database("jmeter") .build(); //遍历sqlserver获取数据 for(Map<String, String> map : sqlserverlist) { //创建单条数据对象——表名 Point point = Point.measurement(measurement) .time(Long.parseLong(map.get("timeStamp")), TimeUnit.MILLISECONDS) //tag属性——只能存储String类型 .tag("label", map.get("label")) //field存储数据 .addField("value", Integer.valueOf(map.get("value"))) .build(); //将单条数据存储到集合中 batchPoints.point(point); } influxDB.write(batchPoints); } /** * * 删除 * * @param command 删除语句 * * @return 返回错误信息 * */ public String deleteMeasurementData(String command) { QueryResult result = influxDB.query(new Query(command, database)); return result.getError(); } /** * * 创建数据库 * * @param dbName * */ @SuppressWarnings("deprecation") public void createDB(String dbName) { influxDB.createDatabase(dbName); } /** * * 删除数据库 * * @param dbName * */ public void deleteDB(String dbName) { influxDB.deleteDatabase(dbName); } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public String getOpenurl() { return openurl; } public void setOpenurl(String openurl) { this.openurl = openurl; } public void setDatabase(String database) { this.database = database; } public static void main(String[] args) { InfluxDbUtil influxDB = InfluxDbUtil.setUp(); Map<String, String> tags = new HashMap<>(); Map<String, Object> fields = new HashMap<>(); tags.put("TAG_NAME","abc"); fields.put("TAG_VALUE","111"); // try { // fields.put("TIMAMPEST", DateUtil.getCurrentDateStr()); // } catch (Exception e) { // e.printStackTrace(); // } influxDB.insert1(tags, fields,"table2"); //查询 QueryResult result= influxDB.query("select *from table2 order by time"); System.out.println(result); } }