InfluxDB是一个时间序列数据库,它被设计用于处理高写入和查询负载。
本文简单介绍了如何下载、配置、启动InfluxDB,以及如何使用InfluxDB客户端进行数据操作。开发环境为:Windows10,influxdb-1.8.4,VS2015,Vibrant.InfluxDB.Client 3.5.1。
1、下载安装启动
(1) 下载
InfluxDB官网为:https://www.influxdata.com/,本文使用的是influxdb-1.8.4_windows_amd64,不过官网已经不提供2.0以下版本的下载了,而且2.0版本也不支持Windows了,需要在Docker等环境下安装使用。
InfluxDB 2.0参考https://docs.influxdata.com/influxdb/v2.0/get-started/。
(2) 配置
Windows下的InfluxDB下载完成并解压后是一个绿色软件,仅仅做一下配置就能够运行,配置文件为influxdb.conf。
- 绑定端口
# bind-address = "127.0.0.1:8088" bind-address = ":8088"
- 修改[http]节点
enabled = true bind-address = ":8086" auth-enabled = true realm = "InfluxDB" log-enabled = true
(3) 启动
配置完成后,将InfluxDB的启动写入批处理可以快速点击启动,当然也可以配置成Windows服务。
influxd.exe -config influxdb.conf
此外,InfluxDB Client也可以通过批处理的形式快速启动。
@ECHO OFF SETLOCAL SET HOME=%~dp0 "%~dp0\influx.exe" %* ENDLOCAL
InfluxDB Client需要使用auth命令进行授权,默认用户名、密码是admin、password。授权完成后,就可以使用InfluxDB的常用命令进行操作了。
2、客户端
(1) 安装客户端
使用NuGet搜索Vibrant.InfluxDB.Client并安装(本文使用3.5.1)。
(2) 封装客户端
下面对InfluxClient类进行封装,抽象出初始化数据库、异步添加数据、异步查询个数、异步获取数据等方法。
using System; using System.Collections.Generic; using System.Threading.Tasks; using AccelerateSensor.Database.Tools; using Vibrant.InfluxDB.Client; namespace AccelerateSensor.Database.InfluxDb { public class InfluxDbHelper { private readonly InfluxClient _influxClient; private string _databaseName; public InfluxDbHelper(string influxHost, string username, string password) { _influxClient = new InfluxClient(new Uri(influxHost), username, password); } #region Init public void Init(string databaseName) { InitAsync(databaseName).GetAwaiter(); } private async Task InitAsync(string databaseName) { //创建数据库 _databaseName = databaseName; await _influxClient.CreateDatabaseAsync(_databaseName); } #endregion #region AddData /// <summary> /// 添加数据 /// </summary> /// <typeparam name="TInfluxData">泛型数据类型</typeparam> /// <param name="measurementName">表名称</param> /// <param name="dataArray">数据数组</param> /// <param name="databaseName">数据库名称(可选)</param> public void AddData<TInfluxData>(string measurementName, TInfluxData[] dataArray, string databaseName = "") where TInfluxData : new() { AddDataAsync(databaseName, measurementName, dataArray).GetAwaiter(); } private async Task AddDataAsync<TInfluxData>( string databaseName, string measurementName, TInfluxData[] dataArray) where TInfluxData : new() { try { databaseName = string.IsNullOrWhiteSpace(databaseName) ? _databaseName : databaseName; await _influxClient.WriteAsync(databaseName, measurementName, dataArray); } catch (Exception e) { LogHelper.AddErrorLog(e.Message); } } #endregion /// <summary> /// 获取数据个数 /// </summary> /// <param name="query">查询条件</param> /// <param name="databaseName">数据库名称(可选)</param> /// <returns>数据个数</returns> public async Task<int> GetDataCountAsync(string query, string databaseName = "") { try { databaseName = string.IsNullOrWhiteSpace(databaseName) ? _databaseName : databaseName; var resultSet = await _influxClient.ReadAsync<CountInfo>(databaseName, query); // resultSet will contain 1 result in the Results collection (or multiple if you execute multiple queries at once) var result = resultSet.Results[0]; if (!result.Succeeded) { LogHelper.AddErrorLog(result.ErrorMessage); return 0; } // result will contain 1 series in the Series collection (or potentially multiple if you specify a GROUP BY clause) var series = result.Series[0]; return series.Rows[0].Count; } catch (Exception e) { LogHelper.AddErrorLog(e.Message); return 0; } } /// <summary> /// 异步获取数据 /// </summary> /// <typeparam name="TInfluxData">数据类型</typeparam> /// <param name="query">查询条件</param> /// <param name="databaseName">数据库名称(可选)</param> /// <returns>数据集合</returns> public async Task<List<TInfluxData>> GetDataAsync<TInfluxData>(string query, string databaseName = "") where TInfluxData : new() { try { databaseName = string.IsNullOrWhiteSpace(databaseName) ? _databaseName : databaseName; var resultSet = await _influxClient.ReadAsync<TInfluxData>(databaseName, query); // resultSet will contain 1 result in the Results collection (or multiple if you execute multiple queries at once) var result = resultSet.Results[0]; if (!result.Succeeded) { LogHelper.AddErrorLog(result.ErrorMessage); return new List<TInfluxData>(); } // result will contain 1 series in the Series collection (or potentially multiple if you specify a GROUP BY clause) var series = result.Series[0]; var dataList = new List<TInfluxData>(); dataList.AddRange(series.Rows); return dataList; } catch (Exception e) { LogHelper.AddErrorLog(e.Message); return new List<TInfluxData>(); } } /// <summary> /// 获取表数据个数的辅助类 /// </summary> private class CountInfo { [InfluxTimestamp] // ReSharper disable once UnusedMember.Local public DateTime Timestamp { get; set; } [InfluxField("count")] // ReSharper disable once UnusedAutoPropertyAccessor.Local public int Count { get; set; } } } }
(3) 使用封装类
首先定义Vibrant.InfluxDB.Client数据库表类型。
using System; using Vibrant.InfluxDB.Client; namespace AccelerateSensor.Service.DbProxy.InfluxDb.Models { public class NodeData { [InfluxTimestamp] public DateTime Timestamp { get; set; } /// <summary> /// 节点编号 /// </summary> [InfluxTag("NodeUuid")] public string NodeUuid { get; set; } /// <summary> /// 数据类型 /// </summary> [InfluxField("AcquireDataType")] public int AcquireDataType { get; set; } /// <summary> /// 节点值 /// </summary> [InfluxField("Value")] public int Value { get; set; } /// <summary> /// 更新时间 /// </summary> [InfluxField("UpdateTime")] public DateTime UpdateTime { get; set; } } }
然后通过InfluxDbHelper实现数据库初始化(创建)、添加数据、分页查询数据、条件查询数据等功能。
using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using AccelerateSensor.Database.InfluxDb; using AccelerateSensor.Service.Tools; using InfluxNodeData = AccelerateSensor.Service.DbProxy.InfluxDb.Models.NodeData; namespace AccelerateSensor.Service.DbProxy.InfluxDb { internal class InfluxDbProxy { private readonly InfluxDbHelper _influxDbHelper; public InfluxDbProxy() { _influxDbHelper = new InfluxDbHelper(Constants.InfluxHost, Constants.Username, Constants.Password); } public void Init() { _influxDbHelper.Init(Constants.DatabaseName); } public void AddNodeData(InfluxNodeData influxNodeData) { _influxDbHelper.AddData(Constants.MeasurementName.NodeData, new[] { influxNodeData }); } public async Task<int> GetNodeDataCountAsync(string nodeUuid) { var query = $"SELECT count(AcquireDataType) FROM {Constants.MeasurementName.NodeData} " + $"WHERE NodeUuid = '{nodeUuid}'"; return await _influxDbHelper.GetDataCountAsync(query); } public async Task<List<NodeData>> GetPageNodeDataAsync( string nodeUuid, int countPerPage, int curPage, int count) { var offset = countPerPage * (curPage - 1); var query = $"SELECT * FROM {Constants.MeasurementName.NodeData} " + $"WHERE NodeUuid = '{nodeUuid}' " + @"ORDER BY time DESC " + $"LIMIT {count} OFFSET {offset}"; var influxNodeDataList = await _influxDbHelper.GetDataAsync<InfluxNodeData>(query); return influxNodeDataList.Select(GetNodeData).ToList(); } public async Task<List<InfluxNodeData>> GetNodeDataAsync(string nodeUuid, DateTime start, DateTime stop) { var query = $"SELECT * FROM {Constants.MeasurementName.NodeData} " + $"WHERE NodeUuid = '{nodeUuid}' " + $"AND time >= '{start:yyyy-MM-dd hh:mm:ss}' " + $"AND time <= '{stop:yyyy-MM-dd hh:mm:ss}'"; var influxNodeDataList = await _influxDbHelper.GetDataAsync<InfluxNodeData>(query); return influxNodeDataList.Select(GetNodeData).ToList(); } } }
3、注意事项
需要注意的是在Vibrant.InfluxDB.Client数据库表对应的类定义中,对数据类型是有要求的。带有InfluxTimestamp属性的字段是必须的,表示添加数据的时间戳,是InluxDB中measurement(表)的主键。带有InfluxTag属性的字段会被建立索引,只能是string类型。带有InfluxField属性的字段是常规的表字段,可以表示具体行的若干数值。