Go/Python/Erlang编程语言对比分析及示例 基于RabbitMQ.Client组件实现RabbitMQ可复用的 ConnectionPool(连接池) 封装一个基于NLog+NLog.Mongo的日志记录工具类LogUtil 分享基于MemoryCache(内存缓存)的缓存工具类,C# B/S 、C/S项目均可以使用!

Go/Python/Erlang编程语言对比分析及示例

 

本文主要是介绍Go,从语言对比分析的角度切入。之所以选择与Python、Erlang对比,是因为做为高级语言,它们语言特性上有较大的相似性,不过最主要的原因是这几个我比较熟悉。

Go的很多语言特性借鉴与它的三个祖先:C,Pascal和CSP。Go的语法、数据类型、控制流等继承于C,Go的包、面对对象等思想来源于Pascal分支,而Go最大的语言特色,基于管道通信的协程并发模型,则借鉴于CSP分支。

Go/Python/Erlang编程语言对比分析及示例          基于RabbitMQ.Client组件实现RabbitMQ可复用的 ConnectionPool(连接池)   封装一个基于NLog+NLog.Mongo的日志记录工具类LogUtil   分享基于MemoryCache(内存缓存)的缓存工具类,C# B/S 、C/S项目均可以使用!

Go/Python/Erlang语言特性对比

如《编程语言与范式》一文所说,不管语言如何层出不穷,所有语言的设计离不开2个基本面:控制流和数据类型。为了提升语言描述能力,语言一般都提供控制抽象和数据抽象。本小节的语言特性对比也从这4个维度入手,详见下图(点击见大图)。

图中我们可以看出,相比于Python的40个特性,Go只有31个,可以说Go在语言设计上是相当克制的。比如,它没有隐式的数值转换,没有构造函数和析构函数,没有运算符重载,没有默认参数,也没有继承,没有泛型,没有异常,没有宏,没有函数修饰,更没有线程局部存储。

但是Go的特点也很鲜明,比如,它拥有协程、自动垃圾回收、包管理系统、一等公民的函数、栈空间管理等。

Go作为静态类型语言,保证了Go在运行效率、内存用量、类型安全都要强于Python和Erlang。

Go的数据类型也更加丰富,除了支持表、字典等复杂的数据结构,还支持指针和接口类型,这是Python和Erlang所没有的。特别是接口类型特别强大,它提供了管理类型系统的手段。而指针类型提供了管理内存的手段,这让Go进入底层软件开发提供了强有力的支持。

Go在面对对象的特性支持上做了很多反思和取舍,它没有类、虚函数、继承、泛型等特性。Go语言中面向对象编程的核心是组合和方法(function)。组合很类似于C语言的struct结构体的组合方式,方法类似于Java的接口(Interface),但是使用方法上与对象更加解耦,减少了对对象内部的侵入。Erlang则不支持面对对象编程范式,相比而言,Python对面对对象范式的支持最为全面。

在函数式编程的特性支持上,Erlang作为函数式语言,支持最为全面。但是基本的函数式语言特性,如lambda、高阶函数、curry等,三种语言都支持。

控制流的特性支持上,三种语言都差不多。Erlang支持尾递归优化,这给它在函数式编程上带来便利。而Go在通过动态扩展协程栈的方式来支持深度递归调用。Python则在深度递归调用上经常被爆栈。

Go和Erlang的并发模型都来源于CSP,但是Erlang是基于actor和消息传递(mailbox)的并发实现,Go是基于goroutine和管道(channel)的并发实现。不管Erlang的actor还是Go的goroutine,都满足协程的特点:由编程语言实现和调度,切换在用户态完成,创建销毁开销很小。至于Python,其多线程的切换和调度是基于操作系统实现,而且因为GIL的大坑级存在,无法真正做到并行。

而且从笔者的并发编程体验上看,Erlang的函数式编程语法风格和其OTP behavior框架提供的晦涩的回调(callback)使用方法,对大部分的程序员,如C/C++和Java出身的程序员来说,有一定的入门门槛和挑战。而被称为“互联网时代的C”的Go,其类C的语法和控制流,以及面对对象的编程范式,编程体验则好很多。

Go/Python/Erlang编程语言对比分析及示例          基于RabbitMQ.Client组件实现RabbitMQ可复用的 ConnectionPool(连接池)   封装一个基于NLog+NLog.Mongo的日志记录工具类LogUtil   分享基于MemoryCache(内存缓存)的缓存工具类,C# B/S 、C/S项目均可以使用!

Go/Python/Erlang语言语法对比

所有的语言特性都需要有形式化的表示方式,Go、Python、Erlang三种语言语法的详细对比如下(点击见完整大图第一部分第二部分第三部分)。这里(链接)有一个详细的Go 与 C 的语法对比,这也是我没有做Go vs. C对比的一个原因。

正如Go语言的设计者之一Rob Pike所说,“软件的复杂性是乘法级相关的”。这充分体现在语言关键词(keyword)数量的控制上,Go的关键词是最少的,只有25个,而Erlang是27个,Python是31个。从根本上保证了Go语言的简单易学。

Go语言将数据类型分为四类:基础类型、复合类型、引用类型和接口类型。基础类型包括:整型、浮点型、复数、字符串和布尔型。复合数据类型有数组和结构体。引用类型包括指针、切片、字典、函数、通道。其他数据类型,如原子(atom)、比特(binary)、元组(tuple)、集合(set)、记录(record),Go则没有支持。

Go对C语言的很多语法特性做了改良,正如Rob Pike在《Less is Exponentially More》中提到,Go的“起点: C语言,解决一些明显的瑕疵、删除杂质、增加一些缺少的特性。”,比如,switch/case的case子程序段默认break跳出,case语句支持数值范围、条件判断语句;所有类型默认初始化为0,没有未初始化变量;把类型放在变量后面的声明语法(链接),使复杂声明更加清晰易懂;没有头文件,文件的编译以包组织,改善封装能力;用空接口(interface {})代替void *,提高类型系统能力等等。

Go对函数,方法,接口做了清晰的区分。与Erlang类似,Go的函数作为第一公民。函数可以让我们将一个语句序列打包为一个单元,然后可以从程序中其它地方多次调用。函数和方法的区别是指有没有接收器,而不像其他语言那样是指有没有返回值。接口类型具体描述了一系列方法的集合,而空接口interfac{}表示可以接收任意类型。接口的这2中使用方式,用面对对象编程范式来类比的话,可以类比于subtype polymorphism(子类型多态)和ad hoc polymorphism(非参数多态)。

从图中示例可以看出,Go的goroutine就是一个函数,以及在堆上为其分配的一个堆栈。所以其系统开销很小,可以轻松的创建上万个goroutine,并且它们并不是被操作系统所调度执行。goroutine只能使用channel来发送给指定的goroutine请求来查询更新变量。这也就是Go的口头禅“不要使用共享数据来通信,使用通信来共享数据”。channel支持容量限制和range迭代器。

Go/Python/Erlang编程语言对比分析及示例          基于RabbitMQ.Client组件实现RabbitMQ可复用的 ConnectionPool(连接池)   封装一个基于NLog+NLog.Mongo的日志记录工具类LogUtil   分享基于MemoryCache(内存缓存)的缓存工具类,C# B/S 、C/S项目均可以使用!Go/Python/Erlang编程语言对比分析及示例          基于RabbitMQ.Client组件实现RabbitMQ可复用的 ConnectionPool(连接池)   封装一个基于NLog+NLog.Mongo的日志记录工具类LogUtil   分享基于MemoryCache(内存缓存)的缓存工具类,C# B/S 、C/S项目均可以使用!Go/Python/Erlang编程语言对比分析及示例          基于RabbitMQ.Client组件实现RabbitMQ可复用的 ConnectionPool(连接池)   封装一个基于NLog+NLog.Mongo的日志记录工具类LogUtil   分享基于MemoryCache(内存缓存)的缓存工具类,C# B/S 、C/S项目均可以使用!

Go/Python/Erlang语言词法对比

Go、Python、Erlang三种语言词法符号的详细对比如下(点击见完整大图)。Go的词法符号是3个语言中最多的,有41个,而且符号复用的情况也较多。相对来说,Python最少,只有31个。

Go语言在词法和代码格式上采取了很强硬的态度。Go语言只有一种控制可见性的手段:大写首字母的标识符会从定义它们的包中被导出,小写字母的则不会。这种限制包内成员的方式同样适用于struct或者一个类型的方法。

在文件命名上,Go也有一定的规范要求,如以_test.go为后缀名的源文件是测试文件,它们是go test测试的一部分;测试文件中以Test为函数名前缀的函数是测试函数,用于测试程序的一些逻辑行为是否正确;以Benchmark为函数名前缀的函数是基准测试函数,它们用于衡量一些函数的性能。

除了关键字,此外,Go还有大约30多个预定义的名字,比如int和true等,主要对应内建的常量、类型和函数。

Go/Python/Erlang编程语言对比分析及示例          基于RabbitMQ.Client组件实现RabbitMQ可复用的 ConnectionPool(连接池)   封装一个基于NLog+NLog.Mongo的日志记录工具类LogUtil   分享基于MemoryCache(内存缓存)的缓存工具类,C# B/S 、C/S项目均可以使用!

TDD Go编程示例

本小节以TDD方式4次重构开发一个斐波那契算法的方式,来简单展示Go的特性、语法和使用方式,如Go的单元测试技术,并发编程、匿名函数、闭包等。

首先,看一下TDD最终形成的单元测试文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main
 
import (
    "testing"
)
 
func TestFib(t *testing.T) {
    var testdatas = []struct {
        n    int
        want int64
    }{
        {0, 0},
        {1, 1},
        {2, 1},
        {3, 2},
        {4, 3},
        {16, 987},
        {32, 2178309},
        {45, 1134903170},
    }
 
    for _, test := range testdatas {
        n := test.n
        want := test.want
        got := fib(n)
 
        if got != want {
            t.Errorf("fib(%d)=%d, want %d\n", n, got, want)
        }
    }
 
}

基于递归的实现方案:

1
2
3
4
5
6
7
func fib1(n int) int64 {
    if n == 0 || n == 1 {
        return int64(n)
    }
    return fib1(n-1) + fib1(n-2)
 
}

测试结果:

crbsp@fib$ time go test
PASS
ok _/home/crbsp/alex/go/fib 9.705s

real 0m10.045s
user 0m9.968s
sys 0m0.068s

基于goroutine实现的并发方案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func fib2(n int) int64 {
    var got int64
    var channel = make(chan int64, 2)
 
    if n == 0 || n == 1 {
        return int64(n)
    }
 
    runtime.GOMAXPROCS(2)
 
    go func() { channel <- fib1(n - 2) }()
    go func() { channel <- fib1(n - 1) }()
 
    got = <-channel
    got += <-channel
    return got
 
}

测试结果:

crbsp@fib$ time go test
PASS
ok _/home/crbsp/alex/go/fib 6.118s

real 0m6.674s
user 0m10.268s
sys 0m0.148s

基于迭代的实现方案:

1
2
3
4
5
6
7
8
9
func fib3(n int) int64 {
    var a, b int64
    a, b = 0, 1
 
    for i := 0; i < n; i++ {
        a, b = b, a+b
    }
    return a
}

测试结果:

crbsp@fib$ time go test
PASS
ok _/home/crbsp/alex/go/fib 0.002s

real 0m0.547s
user 0m0.328s
sys 0m0.172s

基于闭包的实现方案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func fibWrapper4() func() int64 {
    var a, b int64
    a, b = 0, 1
 
    return func() int64 {
        a, b = b, a+b
        return a
    }
}
 
func fib4(n int) int64 {
    var got int64
    got = 0
    f := fibWrapper4()
    for i := 0; i < n; i++ {
        got = f()
    }
    return got
}

测试结果:

crbsp@fib$ time go test
PASS
ok _/home/crbsp/alex/go/fib 0.002s

real 0m0.411s
user 0m0.260s
sys 0m0.140s

--完--  

  

  

基于RabbitMQ.Client组件实现RabbitMQ可复用的 ConnectionPool(连接池)

 

一、本文产生原由:  

 之前文章《总结消息队列RabbitMQ的基本用法》已对RabbitMQ的安装、用法都做了详细说明,而本文主要是针对在高并发且单次从RabbitMQ中消费消息时,出现了连接数不足、连接响应较慢、RabbitMQ服务器崩溃等各种性能问题的解方案,之所以会出现我列举的这些问题,究基根源,其实是TCP连接创建与断开太过频繁所致,这与我们使用ADO.NET来访问常规的关系型DB(如:SQL SERVER、MYSQL)有所不同,在访问DB时,我们一般都建议大家使用using包裹,目的是每次创建完DB连接,使用完成后自动释放连接,避免不必要的连接数及资源占用。可能有人会问,为何访问DB,可以每次创建再断开连接,都没有问题,而同样访问MQ(本文所指的MQ均是RabbitMQ),每次创建再断开连接,如果在高并发且创建与断开频率高的时候,会出现性能问题呢?其实如果了解了DB的连接创建与断开以及MQ的连接创建与断开原理就知道其中的区别了。这里我简要说明一下,DB连接与MQ连接 其实底层都是基于TCP连接,创建TCP连接肯定是有资源消耗的,是非常昂贵的,原则上尽可能少的去创建与断开TCP连接,DB创建连接、MQ创建连接可以说是一样的,但在断开销毁连接上就有很大的不同,DB创建连接再断开时,默认情况下是把该连接回收到连接池中,下次如果再有DB连接创建请求,则先判断DB连接池中是否有空闲的连接,若有则直接复用,若没有才创建连接,这样就达到了TCP连接的复用,而MQ创建连接都是新创建的TCP连接,断开时则直接断开TCP连接,简单粗暴,看似资源清理更彻底,但若在高并发高频率每次都重新创建与断开MQ连接,则性能只会越来越差(上面说过TCP连接是非常昂贵的),我在公司项目中就出现了该问题,后面在技术总监的指导下,对MQ的连接创建与断开作了优化,实现了类似DB连接池的概念。

连接池,故名思义,连接的池子,所有的连接作为一种资源集中存放在池中,需要使用时就可以到池中获取空闲连接资源,用完后再放回池中,以此达到连接资源的有效重用,同时也控制了资源的过度消耗与浪费(资源多少取决于池子的容量)

二、源代码奉献(可直接复制应用到大家的项目中)

下面就先贴出实现MQHelper(含连接池)的源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Util;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Web.Caching;
using System.Web;
using System.Configuration;
using System.IO;
using System.Collections.Concurrent;
using System.Threading;
using System.Runtime.CompilerServices;
 
namespace Zuowj.Core
{
    public class MQHelper
    {
        private const string CacheKey_MQConnectionSetting = "MQConnectionSetting";
        private const string CacheKey_MQMaxConnectionCount = "MQMaxConnectionCount";
 
        private readonly static ConcurrentQueue<IConnection> FreeConnectionQueue;//空闲连接对象队列
        private readonly static ConcurrentDictionary<IConnection, bool> BusyConnectionDic;//使用中(忙)连接对象集合
        private readonly static ConcurrentDictionary<IConnection, int> MQConnectionPoolUsingDicNew;//连接池使用率
        private readonly static Semaphore MQConnectionPoolSemaphore;
        private readonly static object freeConnLock = new object(), addConnLock = new object();
        private static int connCount = 0;
 
        public const int DefaultMaxConnectionCount = 30;//默认最大保持可用连接数
        public const int DefaultMaxConnectionUsingCount = 10000;//默认最大连接可访问次数
 
 
        private static int MaxConnectionCount
        {
            get
            {
                if (HttpRuntime.Cache[CacheKey_MQMaxConnectionCount] != null)
                {
                    return Convert.ToInt32(HttpRuntime.Cache[CacheKey_MQMaxConnectionCount]);
                }
                else
                {
                    int mqMaxConnectionCount = 0;
                    string mqMaxConnectionCountStr = ConfigurationManager.AppSettings[CacheKey_MQMaxConnectionCount];
                    if (!int.TryParse(mqMaxConnectionCountStr, out mqMaxConnectionCount) || mqMaxConnectionCount <= 0)
                    {
                        mqMaxConnectionCount = DefaultMaxConnectionCount;
                    }
 
                    string appConfigPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "App.config");
                    HttpRuntime.Cache.Insert(CacheKey_MQMaxConnectionCount, mqMaxConnectionCount, new CacheDependency(appConfigPath));
 
                    return mqMaxConnectionCount;
                }
 
            }
        }
 
        /// <summary>
        /// 建立连接
        /// </summary>
        /// <param name="hostName">服务器地址</param>
        /// <param name="userName">登录账号</param>
        /// <param name="passWord">登录密码</param>
        /// <returns></returns>
        private static ConnectionFactory CrateFactory()
        {
            var mqConnectionSetting = GetMQConnectionSetting();
            var connectionfactory = new ConnectionFactory();
            connectionfactory.HostName = mqConnectionSetting[0];
            connectionfactory.UserName = mqConnectionSetting[1];
            connectionfactory.Password = mqConnectionSetting[2];
            if (mqConnectionSetting.Length > 3) //增加端口号
            {
                connectionfactory.Port = Convert.ToInt32(mqConnectionSetting[3]);
            }
            return connectionfactory;
        }
 
        private static string[] GetMQConnectionSetting()
        {
            string[] mqConnectionSetting = null;
            if (HttpRuntime.Cache[CacheKey_MQConnectionSetting] == null)
            {
                //MQConnectionSetting=Host IP|;userid;|;password
                string mqConnSettingStr = ConfigurationManager.AppSettings[CacheKey_MQConnectionSetting];
                if (!string.IsNullOrWhiteSpace(mqConnSettingStr))
                {
                    mqConnSettingStr = EncryptUtility.Decrypt(mqConnSettingStr);//解密MQ连接字符串,若项目中无此需求可移除,EncryptUtility是一个AES的加解密工具类,大家网上可自行查找
                    if (mqConnSettingStr.Contains(";|;"))
                    {
                        mqConnectionSetting = mqConnSettingStr.Split(new[] { ";|;" }, StringSplitOptions.RemoveEmptyEntries);
                    }
                }
 
                if (mqConnectionSetting == null || mqConnectionSetting.Length < 3)
                {
                    throw new Exception("MQConnectionSetting未配置或配置不正确");
                }
 
                string appConfigPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "App.config");
                HttpRuntime.Cache.Insert(CacheKey_MQConnectionSetting, mqConnectionSetting, new CacheDependency(appConfigPath));
            }
            else
            {
                mqConnectionSetting = HttpRuntime.Cache[CacheKey_MQConnectionSetting] as string[];
            }
 
            return mqConnectionSetting;
        }
 
 
 
 
        public static IConnection CreateMQConnection()
        {
            var factory = CrateFactory();
            factory.AutomaticRecoveryEnabled = true;//自动重连
            var connection = factory.CreateConnection();
            connection.AutoClose = false;
            return connection;
        }
 
 
        static MQHelper()
        {
            FreeConnectionQueue = new ConcurrentQueue<IConnection>();
            BusyConnectionDic = new ConcurrentDictionary<IConnection, bool>();
            MQConnectionPoolUsingDicNew = new ConcurrentDictionary<IConnection, int>();//连接池使用率
            MQConnectionPoolSemaphore = new Semaphore(MaxConnectionCount, MaxConnectionCount, "MQConnectionPoolSemaphore");//信号量,控制同时并发可用线程数
 
        }
 
        public static IConnection CreateMQConnectionInPoolNew()
        {
 
        SelectMQConnectionLine:
 
            MQConnectionPoolSemaphore.WaitOne();//当<MaxConnectionCount时,会直接进入,否则会等待直到空闲连接出现
 
            IConnection mqConnection = null;
            if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount)//如果已有连接数小于最大可用连接数,则直接创建新连接
            {
                lock (addConnLock)
                {
                    if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount)
                    {
                        mqConnection = CreateMQConnection();
                        BusyConnectionDic[mqConnection] = true;//加入到忙连接集合中
                        MQConnectionPoolUsingDicNew[mqConnection] = 1;
                        //  BaseUtil.Logger.DebugFormat("Create a MQConnection:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", mqConnection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count);
                        return mqConnection;
                    }
                }
            }
 
 
            if (!FreeConnectionQueue.TryDequeue(out mqConnection)) //如果没有可用空闲连接,则重新进入等待排队
            {
                // BaseUtil.Logger.DebugFormat("no FreeConnection,FreeConnectionCount:{0}, BusyConnectionCount:{1}", FreeConnectionQueue.Count, BusyConnectionDic.Count);
                goto SelectMQConnectionLine;
            }
            else if (MQConnectionPoolUsingDicNew[mqConnection] + 1 > DefaultMaxConnectionUsingCount || !mqConnection.IsOpen) //如果取到空闲连接,判断是否使用次数是否超过最大限制,超过则释放连接并重新创建
            {
                mqConnection.Close();
                mqConnection.Dispose();
                // BaseUtil.Logger.DebugFormat("close > DefaultMaxConnectionUsingCount mqConnection,FreeConnectionCount:{0}, BusyConnectionCount:{1}", FreeConnectionQueue.Count, BusyConnectionDic.Count);
 
                mqConnection = CreateMQConnection();
                MQConnectionPoolUsingDicNew[mqConnection] = 0;
                // BaseUtil.Logger.DebugFormat("create new mqConnection,FreeConnectionCount:{0}, BusyConnectionCount:{1}", FreeConnectionQueue.Count, BusyConnectionDic.Count);
            }
 
            BusyConnectionDic[mqConnection] = true;//加入到忙连接集合中
            MQConnectionPoolUsingDicNew[mqConnection] = MQConnectionPoolUsingDicNew[mqConnection] + 1;//使用次数加1
 
            // BaseUtil.Logger.DebugFormat("set BusyConnectionDic:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", mqConnection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count);
 
            return mqConnection;
        }
 
        private static void ResetMQConnectionToFree(IConnection connection)
        {
            lock (freeConnLock)
            {
                bool result = false;
                if (BusyConnectionDic.TryRemove(connection, out result)) //从忙队列中取出
                {
                    //  BaseUtil.Logger.DebugFormat("set FreeConnectionQueue:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count);
                }
                else
                {
                    // BaseUtil.Logger.DebugFormat("failed TryRemove BusyConnectionDic:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count);
                }
 
                if (FreeConnectionQueue.Count + BusyConnectionDic.Count > MaxConnectionCount)//如果因为高并发出现极少概率的>MaxConnectionCount,则直接释放该连接
                {
                    connection.Close();
                    connection.Dispose();
                }
                else
                {
                    FreeConnectionQueue.Enqueue(connection);//加入到空闲队列,以便持续提供连接服务
                }
 
                MQConnectionPoolSemaphore.Release();//释放一个空闲连接信号
 
                //Interlocked.Decrement(ref connCount);
                //BaseUtil.Logger.DebugFormat("Enqueue FreeConnectionQueue:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2},thread count:{3}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count,connCount);
            }
        }
 
 
        /// <summary>
        /// 发送消息
        /// </summary>
        /// <param name="connection">消息队列连接对象</param>
        /// <typeparam name="T">消息类型</typeparam>
        /// <param name="queueName">队列名称</param>
        /// <param name="durable">是否持久化</param>
        /// <param name="msg">消息</param>
        /// <returns></returns>
        public static string SendMsg(IConnection connection, string queueName, string msg, bool durable = true)
        {
            try
            {
 
                using (var channel = connection.CreateModel())//建立通讯信道
                {
                    // 参数从前面开始分别意思为:队列名称,是否持久化,独占的队列,不使用时是否自动删除,其他参数
                    channel.QueueDeclare(queueName, durable, falsefalsenull);
 
                    var properties = channel.CreateBasicProperties();
                    properties.DeliveryMode = 2;//1表示不持久,2.表示持久化
 
                    if (!durable)
                        properties = null;
 
                    var body = Encoding.UTF8.GetBytes(msg);
                    channel.BasicPublish("", queueName, properties, body);
                }
 
 
                return string.Empty;
            }
            catch (Exception ex)
            {
                return ex.ToString();
            }
            finally
            {
                ResetMQConnectionToFree(connection);
            }
        }
 
        /// <summary>
        /// 消费消息
        /// </summary>
        /// <param name="connection">消息队列连接对象</param>
        /// <param name="queueName">队列名称</param>
        /// <param name="durable">是否持久化</param>
        /// <param name="dealMessage">消息处理函数</param>
        /// <param name="saveLog">保存日志方法,可选</param>
        public static void ConsumeMsg(IConnection connection, string queueName, bool durable, Func<string, ConsumeAction> dealMessage, Action<string, Exception> saveLog = null)
        {
            try
            {
 
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queueName, durable, falsefalsenull); //获取队列
                    channel.BasicQos(0, 1, false); //分发机制为触发式
 
                    var consumer = new QueueingBasicConsumer(channel); //建立消费者
                    // 从左到右参数意思分别是:队列名称、是否读取消息后直接删除消息,消费者
                    channel.BasicConsume(queueName, false, consumer);
 
                    while (true)  //如果队列中有消息
                    {
                        ConsumeAction consumeResult = ConsumeAction.RETRY;
                        var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //获取消息
                        string message = null;
 
                        try
                        {
                            var body = ea.Body;
                            message = Encoding.UTF8.GetString(body);
                            consumeResult = dealMessage(message);
                        }
                        catch (Exception ex)
                        {
                            if (saveLog != null)
                            {
                                saveLog(message, ex);
                            }
                        }
                        if (consumeResult == ConsumeAction.ACCEPT)
                        {
                            channel.BasicAck(ea.DeliveryTag, false);  //消息从队列中删除
                        }
                        else if (consumeResult == ConsumeAction.RETRY)
                        {
                            channel.BasicNack(ea.DeliveryTag, falsetrue); //消息重回队列
                        }
                        else
                        {
                            channel.BasicNack(ea.DeliveryTag, falsefalse); //消息直接丢弃
                        }
                    }
                }
 
            }
            catch (Exception ex)
            {
                if (saveLog != null)
                {
                    saveLog("QueueName:" + queueName, ex);
                }
 
                throw ex;
            }
            finally
            {
                ResetMQConnectionToFree(connection);
            }
        }
 
 
        /// <summary>
        /// 依次获取单个消息
        /// </summary>
        /// <param name="connection">消息队列连接对象</param>
        /// <param name="QueueName">队列名称</param>
        /// <param name="durable">持久化</param>
        /// <param name="dealMessage">处理消息委托</param>
        public static void ConsumeMsgSingle(IConnection connection, string QueueName, bool durable, Func<string, ConsumeAction> dealMessage)
        {
            try
            {
 
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(QueueName, durable, falsefalsenull); //获取队列
                    channel.BasicQos(0, 1, false); //分发机制为触发式
 
                    uint msgCount = channel.MessageCount(QueueName);
 
                    if (msgCount > 0)
                    {
                        var consumer = new QueueingBasicConsumer(channel); //建立消费者
                        // 从左到右参数意思分别是:队列名称、是否读取消息后直接删除消息,消费者
                        channel.BasicConsume(QueueName, false, consumer);
 
                        ConsumeAction consumeResult = ConsumeAction.RETRY;
                        var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //获取消息
                        try
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body);
                            consumeResult = dealMessage(message);
                        }
                        catch (Exception ex)
                        {
                            throw ex;
                        }
                        finally
                        {
                            if (consumeResult == ConsumeAction.ACCEPT)
                            {
                                channel.BasicAck(ea.DeliveryTag, false);  //消息从队列中删除
                            }
                            else if (consumeResult == ConsumeAction.RETRY)
                            {
                                channel.BasicNack(ea.DeliveryTag, falsetrue); //消息重回队列
                            }
                            else
                            {
                                channel.BasicNack(ea.DeliveryTag, falsefalse); //消息直接丢弃
                            }
                        }
                    }
                    else
                    {
                        dealMessage(string.Empty);
                    }
                }
 
            }
            catch (Exception ex)
            {
                throw ex;
            }
            finally
            {
                ResetMQConnectionToFree(connection);
            }
        }
 
 
        /// <summary>
        /// 获取队列消息数
        /// </summary>
        /// <param name="connection"></param>
        /// <param name="QueueName"></param>
        /// <returns></returns>
        public static int GetMessageCount(IConnection connection, string QueueName)
        {
            int msgCount = 0;
            try
            {
 
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(QueueName, truefalsefalsenull); //获取队列
                    msgCount = (int)channel.MessageCount(QueueName);
                }
            }
            catch (Exception ex)
            {
                throw ex;
            }
            finally
            {
                ResetMQConnectionToFree(connection);
            }
 
            return msgCount;
        }
 
 
    }
 
    public enum ConsumeAction
    {
        ACCEPT,  // 消费成功
        RETRY,   // 消费失败,可以放回队列重新消费
        REJECT,  // 消费失败,直接丢弃
    }
}

现在对上述代码的核心点作一个简要的说明:

先说一下静态构造函数:

FreeConnectionQueue 用于存放空闲连接对象队列,为何使用Queue,因为当我从中取出1个空闲连接后,空闲连接数就应该少1个,这个Queue很好满足这个需求,而且这个Queue是并发安全的Queue哦(ConcurrentQueue)

BusyConnectionDic 忙(使用中)连接对象集合,为何这里使用字典对象呢,因为当我用完后,需要能够快速的找出使用中的连接对象,并能快速移出,同时重新放入到空闲队列FreeConnectionQueue ,达到连接复用

MQConnectionPoolUsingDicNew 连接使用次数记录集合,这个只是辅助记录连接使用次数,以便可以计算一个连接的已使用次数,当达到最大使用次数时,则应断开重新创建

MQConnectionPoolSemaphore 这个是信号量,这是控制并发连接的重要手段,连接池的容量等同于这个信号量的最大可并行数,保证同时使用的连接数不超过连接池的容量,若超过则会等待;

具体步骤说明:

1.MaxConnectionCount:最大保持可用连接数(可以理解为连接池的容量),可以通过CONFIG配置,默认为30;

2.DefaultMaxConnectionUsingCount:默认最大连接可访问次数,我这里没有使用配置,而是直接使用常量固定为1000,大家若有需要可以改成从CONFIG配置,参考MaxConnectionCount的属性设置(采取了依赖缓存)

3.CreateMQConnectionInPoolNew:从连接池中创建MQ连接对象,这个是核心方法,是实现连接池的地方,代码中已注释了重要的步骤逻辑,这里说一下实现思路:

  3.1 通过MQConnectionPoolSemaphore.WaitOne() 利用信号量的并行等待方法,如果当前并发超过信号量的最大并行度(也就是作为连接池的最大容量),则需要等待空闲连接池,防止连接数超过池的容量,如果并发没有超过池的容量,则可以进入获取连接的逻辑;

  3.2FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount,如果空闲连接队列+忙连接集合的总数小于连接池的容量,则可以直接创建新的MQ连接,否则FreeConnectionQueue.TryDequeue(out mqConnection) 尝试从空闲连接队列中获取一个可用的空闲连接使用,若空闲连接都没有,则需要返回到方法首行,重新等待空闲连接;

  3.3MQConnectionPoolUsingDicNew[mqConnection] + 1 > DefaultMaxConnectionUsingCount || !mqConnection.IsOpen 如果取到空闲连接,则先判断使用次数是否超过最大限制,超过则释放连接或空闲连接已断开连接也需要重新创建,否则该连接可用;

  3.4BusyConnectionDic[mqConnection] = true;加入到忙连接集合中,MQConnectionPoolUsingDicNew[mqConnection] = MQConnectionPoolUsingDicNew[mqConnection] + 1; 使用次数加1,确保每使用一次连接,连接次数能记录

4.ResetMQConnectionToFree:重置释放连接对象,这个是保证MQ连接用完后能够回收到空闲连接队列中(即:回到连接池中),而不是直接断开连接,这个方法很简单就不作作过多说明。

好了,都说明了如何实现含连接池的MQHelper,现在再来举几个例子来说明如何用:

三、实际应用(简单易上手)

获取并消费一个消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public string GetMessage(string queueName)
{
    string message = null;
    try
    {
        var connection = MQHelper.CreateMQConnectionInPoolNew();
 
        MQHelper.ConsumeMsgSingle(connection, queueName, true, (msg) =>
        {
            message = msg;
            return ConsumeAction.ACCEPT;
        });
    }
    catch (Exception ex)
    {
        BaseUtil.Logger.Error(string.Format("MQHelper.ConsumeMsgSingle Error:{0}", ex.Message), ex);
        message = "ERROR:" + ex.Message;
    }
 
    //BaseUtil.Logger.InfoFormat("第{0}次请求,从消息队列(队列名称:{1})中获取消息值为:{2}", Interlocked.Increment(ref requestCount), queueName, message);
 
 
    return message;
 
 
}

 发送一个消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public string SendMessage(string queueName, string msg)
{
    string result = null;
    try
    {
        var connection = MQHelper.CreateMQConnectionInPoolNew();
 
        result = MQHelper.SendMsg(connection, queueName, msg);
    }
    catch (Exception ex)
    {
        BaseUtil.Logger.Error(string.Format("MQHelper.SendMessage Error:{0}", ex.Message), ex);
        result = ex.Message;
    }
 
    return result;
}

 获取消息队列消息数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public int GetMessageCount(string queueName)
{
    int result = -1;
    try
    {
        var connection = MQHelper.CreateMQConnectionInPoolNew();
 
        result = MQHelper.GetMessageCount(connection, queueName);
    }
    catch (Exception ex)
    {
        BaseUtil.Logger.Error(string.Format("MQHelper.GetMessageCount Error:{0}", ex.Message), ex);
        result = -1;
    }
 
    return result;
}

 这里说一下:BaseUtil.Logger 是Log4Net的实例对象,另外上面没有针对持续订阅消费消息(ConsumeMsg)作说明,因为这个其实可以不用连接池也不会有问题,因为它是一个持久订阅并持久消费的过程,不会出现频繁创建连接对象的情况。

最后要说的是,虽说代码贴出来,大家一看就觉得很简单,好像没有什么技术含量,但如果没有完整的思路也还是需要花费一些时间和精力的,代码中核心是如何简单高效的解决并发及连接复用的的问题,该MQHelper有经过压力测试并顺利在我司项目中使用,完美解决了之前的问题,由于这个方案是我在公司通宵实现的,可能有一些方面的不足,大家可以相互交流或完善后入到自己的项目中。

封装一个基于NLog+NLog.Mongo的日志记录工具类LogUtil

 

封装一个基于NLog+NLog.Mongo的日志记录工具类LogUtil,代码比较简单,主要是把MongoTarget的配置、FileTarget的配置集成到类中,同时利用缓存依赖来判断是否需要重新创建Logger类,完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
using NLog;
using NLog.Config;
using NLog.Mongo;
using NLog.Targets;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Web;
using System.Collections.Concurrent;
using NLog.Targets.Wrappers;
 
/// <summary>
/// 日志工具类(基于NLog.Mongo组件)
/// Author:左文俊
/// Date:2017/12/11
/// </summary>
public class LogUtil
{
    private NLog.Logger _Logger = null;
    private const string cacheKey_NLogConfigFlag = "NLogConfigFlag";
    private const string defaultMongoDbName = "SysLog";
    private static readonly object syncLocker = new object();
    private static readonly ConcurrentDictionary<string, LogUtil> cacheLogUitls = new ConcurrentDictionary<string, LogUtil>();
    private string loggerCacheDependencyFilePath = "";
    private bool needWriteLogToFile = true;
    private string mongoDbName = defaultMongoDbName;
    private string mongoDbCollectionName = "";
    private bool asyncWriteLog = true;
 
    public static LogUtil GetInstance(string mongoDbCollName, string loggerCacheDependencyFilePath = nullbool needWriteLogToFile = true)
    {
        string key = string.Format("{0}_{1}", defaultMongoDbName, mongoDbCollName);
        return cacheLogUitls.GetOrAdd(key, new LogUtil()
        {
            LoggerCacheDependencyFilePath = string.IsNullOrEmpty(loggerCacheDependencyFilePath) ? HttpContext.Current.Server.MapPath("~/Web.config") : loggerCacheDependencyFilePath,
            NeedWriteLogToFile = needWriteLogToFile,
            MongoDbName = defaultMongoDbName,
            MongoDbCollectionName = mongoDbCollName
        });
    }
    public string LoggerCacheDependencyFilePath
    {
        get
        {
            return loggerCacheDependencyFilePath;
        }
        set
        {
            if (!File.Exists(value))
            {
                throw new FileNotFoundException("日志配置缓存依赖文件不存在:" + value);
            }
            string oldValue = loggerCacheDependencyFilePath;
            loggerCacheDependencyFilePath = value;
            PropertyChanged(oldValue, loggerCacheDependencyFilePath);
        }
    }
 
    public bool NeedWriteLogToFile
    {
        get
        {
            return needWriteLogToFile;
        }
        set
        {
            bool oldValue = needWriteLogToFile;
            needWriteLogToFile = value;
            PropertyChanged(oldValue, needWriteLogToFile);
        }
    }
 
    public string MongoDbCollectionName
    {
        get
        {
            return mongoDbCollectionName;
        }
        set
        {
            string oldValue = mongoDbCollectionName;
            mongoDbCollectionName = value;
            PropertyChanged(oldValue, mongoDbCollectionName);
        }
    }
 
    /// <summary>
    /// 同一个项目只会用一个DB,故不对外公开,取默认DB
    /// </summary>
    private string MongoDbName
    {
        get
        {
            return mongoDbName;
        }
        set
        {
            string oldValue = mongoDbName;
            mongoDbName = value;
            PropertyChanged(oldValue, mongoDbName);
        }
    }
 
    public bool AsyncWriteLog
    {
        get
        {
            return asyncWriteLog;
        }
        set
        {
            bool oldValue = asyncWriteLog;
            asyncWriteLog = value;
            PropertyChanged(oldValue, asyncWriteLog);
        }
    }
 
 
    private void PropertyChanged<T>(T oldValue, T newValue) where T : IEquatable<T>
    {
        if (!oldValue.Equals(newValue) && _Logger != null)
        {
            lock (syncLocker)
            {
                _Logger = null;
            }
        }
    }
 
    private Logger GetLogger()
    {
 
        if (_Logger == null || HttpRuntime.Cache[cacheKey_NLogConfigFlag] == null)
        {
            lock (syncLocker)
            {
                if (_Logger == null || HttpRuntime.Cache[cacheKey_NLogConfigFlag] == null)
                {
                    string mongoDbConnectionSet = ConfigUtil.GetAppSettingValue("MongoDbConnectionSet");
                    if (!string.IsNullOrEmpty(mongoDbConnectionSet))
                    {
                        mongoDbConnectionSet = AESDecrypt(mongoDbConnectionSet);//解密字符串,若未加密则无需解密
                    }
 
                    LoggingConfiguration config = new LoggingConfiguration();
 
                    #region 配置MONGODB的日志输出对象
 
                    try
                    {
                        MongoTarget mongoTarget = new MongoTarget();
                        mongoTarget.ConnectionString = mongoDbConnectionSet;
                        mongoTarget.DatabaseName = mongoDbName;
                        mongoTarget.CollectionName = mongoDbCollectionName;
                        mongoTarget.IncludeDefaults = false;
                        AppendLogMongoFields(mongoTarget.Fields);
 
                        Target mongoTargetNew = mongoTarget;
                        if (AsyncWriteLog)
                        {
                            mongoTargetNew = WrapWithAsyncTargetWrapper(mongoTarget);//包装为异步输出对象,以便实现异步写日志
                        }
 
                        LoggingRule rule1 = new LoggingRule("*", LogLevel.Debug, mongoTargetNew);
                        config.LoggingRules.Add(rule1);
                    }
                    catch
                    { }
 
                    #endregion
 
                    #region 配置File的日志输出对象
 
                    if (NeedWriteLogToFile)
                    {
                        try
                        {
                            FileTarget fileTarget = new FileTarget();
                            fileTarget.Layout = @"[${date}] <${threadid}> - ${level} - ${event-context:item=Source} - ${event-context:item=UserID}: ${message};
                                                  StackTrace:${stacktrace};Other1:${event-context:item=Other1};Other2:${event-context:item=Other2};Other3:${event-context:item=Other3}";
 
                            string procName = System.Diagnostics.Process.GetCurrentProcess().ProcessName;
                            fileTarget.FileName = "${basedir}/Logs/" + procName + ".log";
                            fileTarget.ArchiveFileName = "${basedir}/archives/" + procName + ".{#}.log";
                            fileTarget.ArchiveNumbering = ArchiveNumberingMode.DateAndSequence;
                            fileTarget.ArchiveAboveSize = 1024 * 1024 * 10;
                            fileTarget.ArchiveDateFormat = "yyyyMMdd";
                            fileTarget.ArchiveEvery = FileArchivePeriod.Day;
                            fileTarget.MaxArchiveFiles = 30;
                            fileTarget.ConcurrentWrites = true;
                            fileTarget.KeepFileOpen = false;
                            fileTarget.Encoding = System.Text.Encoding.UTF8;
 
                            Target fileTargetNew = fileTarget;
                            if (AsyncWriteLog)
                            {
                                fileTargetNew = WrapWithAsyncTargetWrapper(fileTarget);//包装为异步输出对象,以便实现异步写日志
                            }
 
                            LoggingRule rule2 = new LoggingRule("*", LogLevel.Debug, fileTargetNew);
                            config.LoggingRules.Add(rule2);
                        }
                        catch
                        { }
                    }
 
                    #endregion
 
 
                    LogManager.Configuration = config;
 
                    _Logger = LogManager.GetCurrentClassLogger();
 
                    HttpRuntime.Cache.Insert(cacheKey_NLogConfigFlag, "Nlog"new System.Web.Caching.CacheDependency(loggerCacheDependencyFilePath));
                }
            }
        }
 
        return _Logger;
 
    }
 
    private void AppendLogMongoFields(IList<MongoField> mongoFields)
    {
        mongoFields.Clear();
        Type logPropertiesType = typeof(SysLogInfo.LogProperties);
        foreach (var pro in typeof(SysLogInfo).GetProperties(BindingFlags.Public | BindingFlags.Instance))
        {
            if (pro.PropertyType == logPropertiesType) continue;
 
            string layoutStr = string.Empty; //"${event-context:item=" + pro.Name + "}";
            if (pro.Name.Equals("ThreadID") || pro.Name.Equals("Level") || pro.Name.Equals("MachineName"))
            {
                layoutStr = "${" + pro.Name.ToLower() + "}";
            }
            else if (pro.Name.Equals("LogDT"))
            {
                layoutStr = "${date:format=yyyy-MM-dd HH\\:mm\\:ss}";
            }
            else if (pro.Name.Equals("Msg"))
            {
                layoutStr = "${message}";
            }
 
            if (!string.IsNullOrEmpty(layoutStr))
            {
                mongoFields.Add(new MongoField(pro.Name, layoutStr, pro.PropertyType.Name));
            }
        }
    }
 
    private Target WrapWithAsyncTargetWrapper(Target target)
    {
        var asyncTargetWrapper = new AsyncTargetWrapper();
        asyncTargetWrapper.WrappedTarget = target;
        asyncTargetWrapper.Name = target.Name;
        target.Name = target.Name + "_wrapped";
        target = asyncTargetWrapper;
        return target;
    }
 
 
    private LogEventInfo BuildLogEventInfo(LogLevel level, string msg, string source, string uid, string detailTrace = nullstring other1 = nullstring other2 = nullstring other3 = null)
    {
        var eventInfo = new LogEventInfo();
        eventInfo.Level = level;
        eventInfo.Message = msg;
        eventInfo.Properties["DetailTrace"] = detailTrace;
        eventInfo.Properties["Source"] = source;
        eventInfo.Properties["Other1"] = other1;
        eventInfo.Properties["Other2"] = other2;
        eventInfo.Properties["Other3"] = other3;
 
        eventInfo.Properties["UserID"] = uid;
 
        return eventInfo;
    }
 
    public void Info(string msg, string source, string uid, string detailTrace = nullstring other1 = nullstring other2 = nullstring other3 = null)
    {
        try
        {
            var eventInfo = BuildLogEventInfo(LogLevel.Info, msg, source, uid, detailTrace, other1, other2, other3);
            var logger = GetLogger();
            logger.Log(eventInfo);
        }
        catch
        { }
    }
 
    public void Warn(string msg, string source, string uid, string detailTrace = nullstring other1 = nullstring other2 = nullstring other3 = null)
    {
        try
        {
            var eventInfo = BuildLogEventInfo(LogLevel.Warn, msg, source, uid, detailTrace, other1, other2, other3);
 
            var logger = GetLogger();
            logger.Log(eventInfo);
        }
        catch
        { }
    }
 
 
    public void Error(string msg, string source, string uid, string detailTrace = nullstring other1 = nullstring other2 = nullstring other3 = null)
    {
        try
        {
            var eventInfo = BuildLogEventInfo(LogLevel.Error, msg, source, uid, detailTrace, other1, other2, other3);
 
            var logger = GetLogger();
            logger.Log(eventInfo);
        }
        catch
        { }
    }
 
    public void Error(Exception ex, string source, string uid, string other1 = nullstring other2 = nullstring other3 = null)
    {
        try
        {
            var eventInfo = BuildLogEventInfo(LogLevel.Error, ex.Message, source, uid, ex.StackTrace, other1, other2, other3);
 
            var logger = GetLogger();
            logger.Log(eventInfo);
        }
        catch
        { }
    }
 
    public void Log(LogLevel level, string msg, string source, string uid, string detailTrace = nullstring other1 = nullstring other2 = nullstring other3 = null)
    {
        try
        {
            var eventInfo = BuildLogEventInfo(level, msg, source, uid, detailTrace, other1, other2, other3);
            var logger = GetLogger();
            logger.Log(eventInfo);
        }
        catch
        { }
    }
 
 
    public class SysLogInfo
    {
        public DateTime LogDT { getset; }
 
        public int ThreadID { getset; }
 
        public string Level { getset; }
 
        public string Msg { getset; }
 
        public string MachineName { getset; }
 
        public LogProperties Properties { getset; }
 
        public class LogProperties
        {
            public string Source { getset; }
 
            public string DetailTrace { getset; }
 
            public string UserID { getset; }
 
            public string Other1 { getset; }
 
            public string Other2 { getset; }
 
            public string Other3 { getset; }
        }
    }
 
 
}

封装这个日志工具类的目的就是为了保证日志格式的统一,同时可以快速的复制到各个项目中使用,而省去需要配置文件或因配置文件修改导致日志记录信息不一致的情况。

从代码中可以看出,若一旦属性发生改变,则缓存标识会失效,意味着会重新生成Logger对象,这样保证了Logger时刻与设置的规则相同。

另一点就是异步日志记录功能AsyncWriteLog,如果是基于配置文件,则只需要更改配置文件targets中配置async="true"即为异步。默认或写false都为同步,而代码上如何实现异步网上并没有介绍,我通过分析NLOG源代码找到关键点,即通过AsyncTargetWrapper异步目标包裹器来包装一次即可。

分享基于MemoryCache(内存缓存)的缓存工具类,C# B/S 、C/S项目均可以使用!

 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Caching;
using System.Text;
using System.Threading.Tasks;
 
namespace AutoLogisticsPH.Common.Utils
{
    /// <summary>
    /// 基于MemoryCache(内存缓存)的缓存工具类
    /// Author:左文俊
    /// Date:2017/12/11
    /// </summary>
    public static class MemoryCacheUtil
    {
        private static readonly Object _locker = new object(), _locker2 = new object();
 
        /// <summary>
        /// 取缓存项,如果不存在则返回空
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="key"></param>
        /// <returns></returns>
        public static T GetCacheItem<T>(String key)
        {
            try
            {
                return (T)MemoryCache.Default[key];
            }
            catch
            {
                return default(T);
            }
        }
 
        /// <summary>
        /// 是否包含指定键的缓存项
        /// </summary>
        /// <param name="key"></param>
        /// <returns></returns>
        public static bool Contains(string key)
        {
            return MemoryCache.Default.Contains(key);
        }
 
        /// <summary>
        /// 取缓存项,如果不存在则新增缓存项
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="key"></param>
        /// <param name="cachePopulate"></param>
        /// <param name="slidingExpiration"></param>
        /// <param name="absoluteExpiration"></param>
        /// <returns></returns>
        public static T GetOrAddCacheItem<T>(String key, Func<T> cachePopulate, TimeSpan? slidingExpiration = null, DateTime? absoluteExpiration = null)
        {
            if (String.IsNullOrWhiteSpace(key)) throw new ArgumentException("Invalid cache key");
            if (cachePopulate == nullthrow new ArgumentNullException("cachePopulate");
            if (slidingExpiration == null && absoluteExpiration == nullthrow new ArgumentException("Either a sliding expiration or absolute must be provided");
 
            if (MemoryCache.Default[key] == null)
            {
                lock (_locker)
                {
                    if (MemoryCache.Default[key] == null)
                    {
                        T cacheValue = cachePopulate();
                        if (!typeof(T).IsValueType && ((object)cacheValue) == null//如果是引用类型且为NULL则不存缓存
                        {
                            return cacheValue;
                        }
 
                        var item = new CacheItem(key, cacheValue);
                        var policy = CreatePolicy(slidingExpiration, absoluteExpiration);
 
                        MemoryCache.Default.Add(item, policy);
                    }
                }
            }
 
            return (T)MemoryCache.Default[key];
        }
 
        /// <summary>
        /// 取缓存项,如果不存在则新增缓存项
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="key"></param>
        /// <param name="cachePopulate"></param>
        /// <param name="dependencyFilePath"></param>
        /// <returns></returns>
        public static T GetOrAddCacheItem<T>(String key, Func<T> cachePopulate, string dependencyFilePath)
        {
            if (String.IsNullOrWhiteSpace(key)) throw new ArgumentException("Invalid cache key");
            if (cachePopulate == nullthrow new ArgumentNullException("cachePopulate");
 
            if (MemoryCache.Default[key] == null)
            {
                lock (_locker2)
                {
                    if (MemoryCache.Default[key] == null)
                    {
                        T cacheValue = cachePopulate();
                        if (!typeof(T).IsValueType && ((object)cacheValue) == null//如果是引用类型且为NULL则不存缓存
                        {
                            return cacheValue;
                        }
 
                        var item = new CacheItem(key, cacheValue);
                        var policy = CreatePolicy(dependencyFilePath);
 
                        MemoryCache.Default.Add(item, policy);
                    }
                }
            }
 
            return (T)MemoryCache.Default[key];
        }
 
        /// <summary>
        /// 移除指定键的缓存项
        /// </summary>
        /// <param name="key"></param>
        public static void RemoveCacheItem(string key)
        {
            try
            {
                MemoryCache.Default.Remove(key);
            }
            catch
            { }
        }
 
        private static CacheItemPolicy CreatePolicy(TimeSpan? slidingExpiration, DateTime? absoluteExpiration)
        {
            var policy = new CacheItemPolicy();
 
            if (absoluteExpiration.HasValue)
            {
                policy.AbsoluteExpiration = absoluteExpiration.Value;
            }
            else if (slidingExpiration.HasValue)
            {
                policy.SlidingExpiration = slidingExpiration.Value;
            }
 
            policy.Priority = CacheItemPriority.Default;
 
            return policy;
        }
 
        private static CacheItemPolicy CreatePolicy(string filePath)
        {
            CacheItemPolicy policy = new CacheItemPolicy();
            policy.ChangeMonitors.Add(new HostFileChangeMonitor(new List<string>() { filePath }));
            policy.Priority = CacheItemPriority.Default;
            return policy;
        }
    }
}

支持:可指定绝对过期时间、滑动过期明间、文件依赖  三种缓存方式,目前已在公司各种生产业务项目中有使用。优点是可以根据数据的使用频率设置缓存有效期,特别是文件依赖缓存,比如:连接字符串读取一次后,若CONFIG文件没有改变,则缓存永久有效,一旦CONFIG更改,则缓存失效需重新读取,保证数据缓存的最大可用性,减少不必要的多次重复读取CONFIG。

使用示例很简单:(如下:会在第一次读取连接字符串并解密后返回给connstr变量,后续直接通过缓存KEY dbConnName直接返回连接字符串的结果,若修改了连接字符串的CONFIG文件,则缓存的项会失效,会重新读取连接字符串并重新加入到缓存中)
            string connstr= MemoryCacheUtil.GetOrAddCacheItem(dbConnName, () =>
            {
                var connStrSettings = ConfigUtil.GetConnectionString(dbConnName,dbConnectionStringConfigPath);
                string dbProdName = connStrSettings.ProviderName;
                string dbConnStr = connStrSettings.ConnectionString;
                return EncryptUtil.Decrypt(dbConnStr);
            }, "缓存依赖文件路路,如:c:\app\app.config");

上一篇:硬盘安装Win7、CentOS7双系统


下一篇:asp.net学习之SqlDataSource