InfluxDB的简单使用

InfluxDB是一个时间序列数据库,它被设计用于处理高写入和查询负载。

本文简单介绍了如何下载、配置、启动InfluxDB,以及如何使用InfluxDB客户端进行数据操作。开发环境为:Windows10,influxdb-1.8.4,VS2015,Vibrant.InfluxDB.Client 3.5.1。

1、下载安装启动

(1) 下载

InfluxDB官网为:https://www.influxdata.com/,本文使用的是influxdb-1.8.4_windows_amd64,不过官网已经不提供2.0以下版本的下载了,而且2.0版本也不支持Windows了,需要在Docker等环境下安装使用。

InfluxDB 2.0参考https://docs.influxdata.com/influxdb/v2.0/get-started/

(2) 配置

Windows下的InfluxDB下载完成并解压后是一个绿色软件,仅仅做一下配置就能够运行,配置文件为influxdb.conf。

- 绑定端口

# bind-address = "127.0.0.1:8088"
bind-address = ":8088"

- 修改[http]节点

enabled = true
bind-address = ":8086"
auth-enabled = true
realm = "InfluxDB"
log-enabled = true

(3) 启动

配置完成后,将InfluxDB的启动写入批处理可以快速点击启动,当然也可以配置成Windows服务。

influxd.exe -config influxdb.conf

此外,InfluxDB Client也可以通过批处理的形式快速启动。

@ECHO OFF
SETLOCAL
SET HOME=%~dp0
"%~dp0\influx.exe" %*
ENDLOCAL

InfluxDB Client需要使用auth命令进行授权,默认用户名、密码是admin、password。授权完成后,就可以使用InfluxDB的常用命令进行操作了。

2、客户端

(1) 安装客户端

使用NuGet搜索Vibrant.InfluxDB.Client并安装(本文使用3.5.1)。

(2) 封装客户端

下面对InfluxClient类进行封装,抽象出初始化数据库、异步添加数据、异步查询个数、异步获取数据等方法。

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using AccelerateSensor.Database.Tools;
using Vibrant.InfluxDB.Client;

namespace AccelerateSensor.Database.InfluxDb
{
    public class InfluxDbHelper
    {
        private readonly InfluxClient _influxClient;
        private string _databaseName;

        public InfluxDbHelper(string influxHost, string username, string password)
        {
            _influxClient = new InfluxClient(new Uri(influxHost), username, password);
        }

        #region Init

        public void Init(string databaseName)
        {
            InitAsync(databaseName).GetAwaiter();
        }

        private async Task InitAsync(string databaseName)
        {
            //创建数据库
            _databaseName = databaseName;
            await _influxClient.CreateDatabaseAsync(_databaseName);
        }

        #endregion

        #region AddData

        /// <summary>
        /// 添加数据
        /// </summary>
        /// <typeparam name="TInfluxData">泛型数据类型</typeparam>
        /// <param name="measurementName">表名称</param>
        /// <param name="dataArray">数据数组</param>
        /// <param name="databaseName">数据库名称(可选)</param>
        public void AddData<TInfluxData>(string measurementName, TInfluxData[] dataArray, string databaseName = "")
            where TInfluxData : new()
        {
            AddDataAsync(databaseName, measurementName, dataArray).GetAwaiter();
        }

        private async Task AddDataAsync<TInfluxData>(
            string databaseName, string measurementName,
            TInfluxData[] dataArray) where TInfluxData : new()
        {
            try
            {
                databaseName = string.IsNullOrWhiteSpace(databaseName) ? _databaseName : databaseName;
                await _influxClient.WriteAsync(databaseName, measurementName, dataArray);
            }
            catch (Exception e)
            {
                LogHelper.AddErrorLog(e.Message);
            }
        }

        #endregion

        /// <summary>
        /// 获取数据个数
        /// </summary>
        /// <param name="query">查询条件</param>
        /// <param name="databaseName">数据库名称(可选)</param>
        /// <returns>数据个数</returns>
        public async Task<int> GetDataCountAsync(string query, string databaseName = "")
        {
            try
            {
                databaseName = string.IsNullOrWhiteSpace(databaseName) ? _databaseName : databaseName;
                var resultSet = await _influxClient.ReadAsync<CountInfo>(databaseName, query);

                // resultSet will contain 1 result in the Results collection (or multiple if you execute multiple queries at once)
                var result = resultSet.Results[0];
                if (!result.Succeeded)
                {
                    LogHelper.AddErrorLog(result.ErrorMessage);
                    return 0;
                }

                // result will contain 1 series in the Series collection (or potentially multiple if you specify a GROUP BY clause)
                var series = result.Series[0];
                return series.Rows[0].Count;
            }
            catch (Exception e)
            {
                LogHelper.AddErrorLog(e.Message);
                return 0;
            }
        }

        /// <summary>
        /// 异步获取数据
        /// </summary>
        /// <typeparam name="TInfluxData">数据类型</typeparam>
        /// <param name="query">查询条件</param>
        /// <param name="databaseName">数据库名称(可选)</param>
        /// <returns>数据集合</returns>
        public async Task<List<TInfluxData>> GetDataAsync<TInfluxData>(string query, string databaseName = "")
            where TInfluxData : new()
        {
            try
            {
                databaseName = string.IsNullOrWhiteSpace(databaseName) ? _databaseName : databaseName;
                var resultSet = await _influxClient.ReadAsync<TInfluxData>(databaseName, query);

                // resultSet will contain 1 result in the Results collection (or multiple if you execute multiple queries at once)
                var result = resultSet.Results[0];
                if (!result.Succeeded)
                {
                    LogHelper.AddErrorLog(result.ErrorMessage);
                    return new List<TInfluxData>();
                }

                // result will contain 1 series in the Series collection (or potentially multiple if you specify a GROUP BY clause)
                var series = result.Series[0];

                var dataList = new List<TInfluxData>();
                dataList.AddRange(series.Rows);
                return dataList;
            }
            catch (Exception e)
            {
                LogHelper.AddErrorLog(e.Message);
                return new List<TInfluxData>();
            }
        }

        /// <summary>
        /// 获取表数据个数的辅助类
        /// </summary>
        private class CountInfo
        {
            [InfluxTimestamp]
            // ReSharper disable once UnusedMember.Local
            public DateTime Timestamp { get; set; }

            [InfluxField("count")]
            // ReSharper disable once UnusedAutoPropertyAccessor.Local
            public int Count { get; set; }
        }
    }
}

(3) 使用封装类

首先定义Vibrant.InfluxDB.Client数据库表类型。

using System;
using Vibrant.InfluxDB.Client;

namespace AccelerateSensor.Service.DbProxy.InfluxDb.Models
{
    public class NodeData
    {
        [InfluxTimestamp]
        public DateTime Timestamp { get; set; }

        /// <summary>
        /// 节点编号
        /// </summary>
        [InfluxTag("NodeUuid")]
        public string NodeUuid { get; set; }
        
        /// <summary>
        /// 数据类型
        /// </summary>
        [InfluxField("AcquireDataType")]
        public int AcquireDataType { get; set; }

        /// <summary>
        /// 节点值
        /// </summary>
        [InfluxField("Value")]
        public int Value { get; set; }

        /// <summary>
        /// 更新时间
        /// </summary>
        [InfluxField("UpdateTime")]
        public DateTime UpdateTime { get; set; }
    }
}

然后通过InfluxDbHelper实现数据库初始化(创建)、添加数据、分页查询数据、条件查询数据等功能。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using AccelerateSensor.Database.InfluxDb;
using AccelerateSensor.Service.Tools;
using InfluxNodeData = AccelerateSensor.Service.DbProxy.InfluxDb.Models.NodeData;

namespace AccelerateSensor.Service.DbProxy.InfluxDb
{
    internal class InfluxDbProxy
    {
        private readonly InfluxDbHelper _influxDbHelper;

        public InfluxDbProxy()
        {
            _influxDbHelper = new InfluxDbHelper(Constants.InfluxHost, Constants.Username, Constants.Password);
        }

        public void Init()
        {
            _influxDbHelper.Init(Constants.DatabaseName);
        }

        public void AddNodeData(InfluxNodeData influxNodeData)
        {
            _influxDbHelper.AddData(Constants.MeasurementName.NodeData, new[] { influxNodeData });
        }

        public async Task<int> GetNodeDataCountAsync(string nodeUuid)
        {
            var query = $"SELECT count(AcquireDataType) FROM {Constants.MeasurementName.NodeData} " +
                        $"WHERE NodeUuid = '{nodeUuid}'";
            return await _influxDbHelper.GetDataCountAsync(query);
        }

        public async Task<List<NodeData>> GetPageNodeDataAsync(
            string nodeUuid, int countPerPage, int curPage, int count)
        {
            var offset = countPerPage * (curPage - 1);
            var query = $"SELECT * FROM {Constants.MeasurementName.NodeData} " +
                        $"WHERE NodeUuid = '{nodeUuid}' " +
                        @"ORDER BY time DESC " +
                        $"LIMIT {count} OFFSET {offset}";
            var influxNodeDataList = await _influxDbHelper.GetDataAsync<InfluxNodeData>(query);
            return influxNodeDataList.Select(GetNodeData).ToList();
        }

        public async Task<List<InfluxNodeData>> GetNodeDataAsync(string nodeUuid, DateTime start, DateTime stop)
        {
            var query = $"SELECT * FROM {Constants.MeasurementName.NodeData} " +
                        $"WHERE NodeUuid = '{nodeUuid}' " +
                        $"AND time >= '{start:yyyy-MM-dd hh:mm:ss}' " +
                        $"AND time <= '{stop:yyyy-MM-dd hh:mm:ss}'";
            var influxNodeDataList = await _influxDbHelper.GetDataAsync<InfluxNodeData>(query);
            return influxNodeDataList.Select(GetNodeData).ToList();
        }
    }
}

3、注意事项

需要注意的是在Vibrant.InfluxDB.Client数据库表对应的类定义中,对数据类型是有要求的。带有InfluxTimestamp属性的字段是必须的,表示添加数据的时间戳,是InluxDB中measurement(表)的主键。带有InfluxTag属性的字段会被建立索引,只能是string类型。带有InfluxField属性的字段是常规的表字段,可以表示具体行的若干数值。

上一篇:Mysql的基本使用


下一篇:平面中判断点在三角形内算法(重心法)