基于.net的分布式系统限流组件
在互联网应用中,流量洪峰是常有的事情。在应对流量洪峰时,通用的处理模式一般有排队、限流,这样可以非常直接有效的保护系统,防止系统被打爆。另外,通过限流技术手段,可以让整个系统的运行更加平稳。今天要与大家分享一下限流算法和C#版本的组件。
一、令牌桶算法:
令牌桶算法的基本过程如下:
- 假如用户配置的平均发送速率为r,则每隔1/r秒速率将一个令牌被加入到桶中;
- 假设桶最多可以存发b个令牌。当桶中的令牌达到上限后,丢弃令牌。
- 当一个有请求到达时,首先去令牌桶获取令牌,能够取到,则处理这个请求
- 如果桶中没有令牌,那么请求排队或者丢弃
工作过程包括3个阶段:产生令牌、消耗令牌和判断数据包是否通过。其中涉及到2个参数:令牌产生的速率和令牌桶的大小,这个过程的具体工作如下。
- 产生令牌:周期性的以固定速率向令牌桶中增加令牌,桶中的令牌不断增多。如果桶中令牌数已到达上限,则丢弃多余令牌。
- 消费 令牌:业务程序根据具体业务情况消耗桶中的令牌。消费一次,令牌桶令牌减少一个。
- 判断是否通过:判断是否已有令牌桶是否存在有效令牌,当桶中的令牌数量可以满足需求时,则继续业务处理,否则将挂起业务,等待令牌。
下面是C#的一个实现方式
class TokenBucketLimitingService: ILimitingService
{
private LimitedQueue<object> limitedQueue = null;
private CancellationTokenSource cancelToken;
private Task task = null;
private int maxTPS;
private int limitSize;
private object lckObj = new object();
public TokenBucketLimitingService(int maxTPS, int limitSize)
{
this.limitSize = limitSize;
this.maxTPS = maxTPS;if (this.limitSize <= 0)
this.limitSize = 100;
if(this.maxTPS <=0)
this.maxTPS = 1;limitedQueue = new LimitedQueue<object>(limitSize);
for (int i = 0; i < limitSize; i++)
{
limitedQueue.Enqueue(new object());
}
cancelToken = new CancellationTokenSource();
task = Task.Factory.StartNew(new Action(TokenProcess), cancelToken.Token);
}/// <summary>
/// 定时消息令牌
/// </summary>
private void TokenProcess()
{
int sleep = 1000 / maxTPS;
if (sleep == 0)
sleep = 1;DateTime start = DateTime.Now;
while (cancelToken.Token.IsCancellationRequested ==false)
{
try
{
lock (lckObj)
{
limitedQueue.Enqueue(new object());
}
}
catch
{
}
finally
{
if (DateTime.Now - start < TimeSpan.FromMilliseconds(sleep))
{
int newSleep = sleep - (int)(DateTime.Now - start).TotalMilliseconds;
if (newSleep > 1)
Thread.Sleep(newSleep - 1); //做一下时间上的补偿
}
start = DateTime.Now;
}
}
}public void Dispose()
{
cancelToken.Cancel();
}/// <summary>
/// 请求令牌
/// </summary>
/// <returns>true:获取成功,false:获取失败</returns>
public bool Request()
{
if (limitedQueue.Count <= 0)
return false;
lock (lckObj)
{
if (limitedQueue.Count <= 0)
return false;object data = limitedQueue.Dequeue();
if (data == null)
return false;
}return true;
}
}
public interface ILimitingService:IDisposable
{
/// <summary>
/// 申请流量处理
/// </summary>
/// <returns>true:获取成功,false:获取失败</returns>
bool Request();
}
public class LimitingFactory
{
/// <summary>
/// 创建限流服务对象
/// </summary>
/// <param name="limitingType">限流模型</param>
/// <param name="maxQPS">最大QPS</param>
/// <param name="limitSize">最大可用票据数</param>
public static ILimitingService Build(LimitingType limitingType = LimitingType.TokenBucket, int maxQPS = 100, int limitSize = 100)
{
switch (limitingType)
{
case LimitingType.TokenBucket:
default:
return new TokenBucketLimitingService(maxQPS, limitSize);
case LimitingType.LeakageBucket:
return new LeakageBucketLimitingService(maxQPS, limitSize);
}
}
}/// <summary>
/// 限流模式
/// </summary>
public enum LimitingType
{
TokenBucket,//令牌桶模式
LeakageBucket//漏桶模式
}public class LimitedQueue<T> : Queue<T>
{
private int limit = 0;
public const string QueueFulled = "TTP-StreamLimiting-1001";public int Limit
{
get { return limit; }
set { limit = value; }
}public LimitedQueue()
: this(0)
{ }public LimitedQueue(int limit)
: base(limit)
{
this.Limit = limit;
}public new bool Enqueue(T item)
{
if (limit > 0 && this.Count >= this.Limit)
{
return false;
}
base.Enqueue(item);
return true;
}
}
调用方法:
var service = LimitingFactory.Build(LimitingType.TokenBucket, 500, 200);
while (true)
{
var result = service.Request();
//如果返回true,说明可以进行业务处理,否则需要继续等待
if (result)
{
//业务处理......
}
else
Thread.Sleep(1);
}
二、漏桶算法
声明一个固定容量的桶,每接受到一个请求向桶中添加一个令牌,当令牌桶达到上线后请求丢弃或等待,具体算法如下:
- 创建一个固定容量的漏桶,请求到达时向漏桶添加一个令牌
- 如果请求添加令牌不成功,请求丢弃或等待
- 另一个线程以固定的速率消费桶里的令牌
工作过程也包括3个阶段:产生令牌、消耗令牌和判断数据包是否通过。其中涉及到2个参数:令牌自动消费的速率和令牌桶的大小,个过程的具体工作如下。
- 产生令牌:业务程序根据具体业务情况申请令牌。申请一次,令牌桶令牌加一。如果桶中令牌数已到达上限,则挂起业务后等待令牌。
- 消费令牌:周期性的以固定速率消费令牌桶中令牌,桶中的令牌不断较少。
- 判断是否通过:判断是否已有令牌桶是否存在有效令牌,当桶中的令牌数量可以满足需求时,则继续业务处理,否则将挂起业务,等待令牌。
C#的一个实现方式:
class LeakageBucketLimitingService: ILimitingService
{
private LimitedQueue<object> limitedQueue = null;
private CancellationTokenSource cancelToken;
private Task task = null;
private int maxTPS;
private int limitSize;
private object lckObj = new object();
public LeakageBucketLimitingService(int maxTPS, int limitSize)
{
this.limitSize = limitSize;
this.maxTPS = maxTPS;if (this.limitSize <= 0)
this.limitSize = 100;
if (this.maxTPS <= 0)
this.maxTPS = 1;limitedQueue = new LimitedQueue<object>(limitSize);
cancelToken = new CancellationTokenSource();
task = Task.Factory.StartNew(new Action(TokenProcess), cancelToken.Token);
}private void TokenProcess()
{
int sleep = 1000 / maxTPS;
if (sleep == 0)
sleep = 1;DateTime start = DateTime.Now;
while (cancelToken.Token.IsCancellationRequested == false)
{
try
{if (limitedQueue.Count > 0)
{
lock (lckObj)
{
if (limitedQueue.Count > 0)
limitedQueue.Dequeue();
}
}
}
catch
{
}
finally
{
if (DateTime.Now - start < TimeSpan.FromMilliseconds(sleep))
{
int newSleep = sleep - (int)(DateTime.Now - start).TotalMilliseconds;
if (newSleep > 1)
Thread.Sleep(newSleep - 1); //做一下时间上的补偿
}
start = DateTime.Now;
}
}
}public void Dispose()
{
cancelToken.Cancel();
}public bool Request()
{
if (limitedQueue.Count >= limitSize)
return false;
lock (lckObj)
{
if (limitedQueue.Count >= limitSize)
return false;return limitedQueue.Enqueue(new object());
}
}
}
调用方法:
var service = LimitingFactory.Build(LimitingType.LeakageBucket, 500, 200);
while (true)
{
var result = service.Request();
//如果返回true,说明可以进行业务处理,否则需要继续等待
if (result)
{
//业务处理......
}
else
Thread.Sleep(1);
}
两类限流算法虽然非常相似,但是还是有些区别的,供大家参考!
- 漏桶算法能够强行限制数据的传输速率。在某些情况下,漏桶算法不能够有效地使用网络资源。因为漏桶的漏出速率是固定的。
- 令牌桶算法能够在限制数据的平均传输速率的同时还允许某种程度的突发传输.
-
C# DataGridView绑定List对象时,利用BindingList来实现增删查改
当DataGridView的DataSource是DataTable的时候,DataTable的数据改变时,DataGridView的数据会随之改变,无需重新绑定到DataGridView。 当DataGridView的DataSource是泛型List,当List的数据改变时,则需要先将DataGridView的DataSource设置为new List<T>(),再将改变后的List<T>赋给DataGridView的DataSource。绑定List时,注意:切莫将DataGridView的DataSource设置为Null,否则会破坏DataGridView的列结构。
如果要对绑定在DataGridView中的List<T>进行数据的添加删除,先要把List<T>转换成BindingList<T>,再进行绑定:DataGridView.DataSource=new BindingList<T>(new List<T>)。否则的话会产生许多意想不到的错误。 如:初始绑定空数据后再添加数据绑定后,却取不到DataGridView.CurrentCell属性。
IList<T> list= new List<T>();
DataGridView.DataSource=list;//DataGridView的行不能添加删除
DataGridView.DataSource=new BindingList<T>(list);//DataGridView的行可以添加删除(只有允许添加行、删除行)
示例代码:
public partial class ucServer : UserControl
{
private List<ServerInfo> serverList;
private BindingList<ServerInfo> dataBindings;public ucServer(List<ServerInfo> serverList)
{
InitializeComponent();
if (serverList == null)
serverList = new List<ServerInfo>();
this.serverList = serverList;
dataBindings = new BindingList<ServerInfo>(this.serverList);
}private void ucChecker_Load(object sender, EventArgs e)
{
this.dgParams.DataSource = dataBindings;
}private void llDownloadUrl_LinkClicked(object sender, LinkLabelLinkClickedEventArgs e)
{
MessageBox.Show("请设置下载地址。");
}private void llHelp_LinkClicked(object sender, LinkLabelLinkClickedEventArgs e)
{
MessageBox.Show("请设置帮助信息。");
}public bool IsValid()
{
return true;//CheckService.Check(this.component);
}private void btnAdd_Click(object sender, EventArgs e)
{
ServerInfo info = new ServerInfo() { OSType="Windows", Ports="8000-9999"};dataBindings.Add(info);
}private void btnDel_Click(object sender, EventArgs e)
{
//允许删除多行DataGridViewSelectedRowCollection rows = this.dgParams.SelectedRows;
foreach (DataGridViewRow row in rows)
{
this.dataBindings.RemoveAt(row.Index);
}
}}
-
.net中ThreadPool与Task的认识总结
线程池和Task是多线程编程中两个经常使用的技术,大家在熟悉不过了。他们有什么关联关系?Task又是怎么工作的呢?估计很多时候会犯糊涂。通过翻阅资料,终于弄明白了,与大家分享一下。
工作线程与I/O线程
在ThreadPool中有这样一个方法:public static bool SetMaxThreads(int workerThreads, int completionPortThreads);
此方法中有两个参数:workerThreads和completionPortThreads。这两个参数引申出了两个概念:辅助线程(也叫工作线程)和异步 I/O 线程。这两个线程有什么区别么?通过查阅资料,我们可以了解到,工作线程其实就是我们编码主动向ThreadPool中创建的线程。而I/O线程是线程池中预先保留出来的部分线程,这部分线程的作用是为了分发从IOCP(I/O completion port) 中的回调。
那么什么是IOCP回调呢?
在CLR内部,系统维护了一个IOCP(I/O completion port),它提供了处理多个异步I/O请求的线程模型。我们可以把IOCP看做是一个消息队列。当一个进程创建了一个IOCP,即创建了一个队列。当异步I/O请 求完成时,设备驱动程序就会生成一个I/O完成包,将它按照FIFO方式排队列入该完成端口。之后,会由I/O线程提取完成I/O请求包,并调用之前的委托。注意:异步调用服务时,回调函数都是运行于CLR线程池的I/O线程当中。
I/O线程是由CLR调用的,通常情况下,我们不会直接用到它 。但是线程池中区分它们的目的是为了避免线程都去处理I/O回调而被耗尽,从而引发死锁。在编程时,开发人员需要关注的是确保I/O线程返回到线程池,I/O回调代码应该做尽量小的工作,并尽快返回到线程池,否则I/O线程会很快消耗光。如果回调代码中的工作很多的话,应该考虑把工作拆分到一个工作者线程中去。否则,I/O线程被耗尽,大量工作线程空闲,可能导致死锁。
再补充一下,当执行I/O操作的时候,无论是同步I/O操作还是异步I/O操作,都会调用的Windows的API方法,比如,当读取文件时,调用ReadFile函数。该方法会将你的当前线程从用户态转变成内核态,会生成一个I/O请求包,并初始化这个请求包。ReadFile会向内核传递,根据这个请求包,windows内核知道需要将这个I/O操作发送给哪个硬件设备。这些I/O操作会进入设备自己的处理队列中,该队列由这个设备的驱动程序维护。
如果此时是同步I/O操作,那么在硬件设备操作I/O的时候,发出I/O请求的线程由于无事可做被windows变成睡眠状态,当硬件设备完成操作后,再唤醒这个线程。这种方式非常直接,但是性能不高,如果请求数很多,那么休眠的线程数也很多,浪费了大量资源。
如果是异步I/O操作(.Net中,异步的I/O操作大部分为BeginXXX的形式 ),该方法在Windows把I/O请求包发送到设备的处理队列后就返回。同时,在调用异步I/O操作时,即调用BeginXXX方法的时候,需要传入一个委托,该委托方法会随着I/O请求包一路传递到设备的驱动程序。在设备处理完I/O请求包后,将该委托再放到CLR线程池队列。
总结来说,IOCP(I/O completion port)中有2个队列,一个是先进先出的队列,存放的是IO完成包,即已经完成的IO操作需要执行回调方法。还有一个队列是线程队列,IOCP会预分配一些线程在这个队列中,这样会比即时创建线程处理I/O请求速度更快。这个队列是后进先出的,好处是下一个请求的到来可能还是用之前的线程来处理,就不需要进行线程上下文切换,提高了性能。
这里有一个IOCP的解释,写的很好。http://gamebabyrocksun.blog.163.com/blog/static/57153463201036104134250/
Task的运行原理分析
Task与ThreadPool什么关系呢?简单来说,Task是基于ThreadPool实现的,当然被标记为LongRunning的Task(单独创建线程实现)除外。Task被创建后,通过TaskScheduler执行工作项的分配。TaskScheduler会把工作项存储到两类队列中: 全局队列与本地队列。全局队列被设计为FIFO的队列。本地队列存储在线程中,被设计为LIFO.
当主程序创建了一个Task后,由于创建这个Task的线程不是线程池中的线程,则TaskScheduler 会把该Task放入全局队列中。
如果这个Task是由线程池中的线程创建,并且未设置TaskCreationOptions.PreferFairness标记(默认情况下未设置),TaskScheduler 会把该Task放入到该线程的本地队列中。如果设置了TaskCreationOptions.PreferFairness标记,则放入全局队列。
官方的解释是: *任务(即不在其他任务的上下文中创建的任务)与任何其他工作项一样放在全局队列上。 但是,嵌套任务或子任务(在其他任务的上下文中创建)的处理方式大不相同。 子任务或嵌套任务放置在特定于执行父任务的线程的本地队列上。 父任务可能是*任务,也可能是其他任务的子任务。
那么任务放入到两类队列中后,是如何被执行的呢?
当线程池中的线程准备好执行更多工作时,首先查看本地队列。 如果工作项在此处等待,直接通过LIFO的模式获取执行。 如果没有,则向全局队列以FIFO的模式获取工作项。如果全局队列也没有工作项,则查看其他线程的本地队列是否有可执行工作项,如果存在可执行工作项,则以FIFO的模式出队执行。
C# 排序技术研究与对比
一、 排序场景
- 一维数组排序
- 多维数组排序
- 集合排序,例如Dictionary ,List<T>, 自定义类集合排序等
- DataTable排序
二、 排序实现和测试
1. 一维数组排序
1.1 一维数组排序特点
元素之间是一维线性的关系,每个元素只有一个下标,在排序场景下,每个元素的数据类型是一致的。例如:
1.2 C# 一维数组排序实现
A:调用Array.Sort方法实现数组排序,不限制元素数据类型,底层基于对IComparable的接口实现
B:使用Linq实现排序
1.2 测试结果对比
1000条数据(GUID)
10000数据(GUID)
100000数据(GUID)
Array.Sort
2ms
35ms
420ms
Linq
4ms
74ms
738ms
可以看出, Array.Sort排序优于Linq的性能(越底层的结构,排序的性能越好)。
2. 多维数组排序
2.1 多维数组排序特点
数组可以具有多个维度,支持多行多列,各个维度的数据类型可以不同。
在此文中,交错数组不在研究范围内,主要研究的是不同数据类型的矩阵数组,这样更加贴近我们在实际场景中的数据。例如:
2.2 C# 多维数组排序实现
Step1:定义一个对象排序类ObjectComparer,实现IComparer接口,主要负责数组中某个列的排序,
如果要排序的列是int类型,进行如下比较即可:其他类推。
ObjectComparer类结构:
Step2:定义维度顺序整形数组:tagObjArray,实例化ObjectComparer对象,将要排序的数组作为参数传递给ObjectComparer的构造函数。
Step3:调用Array.Sort方法排序,参数:维度顺序整形数组:tagObjArray和ObjectComparer对象。
3. 集合排序
3.1.ArrayList 类
使用大小可按需动态增加的数组。
3.2 List 类
可通过索引访问的对象的强类型列表。提供用于对列表进行搜索、排序和操作的方法,在决定使用 List 还是使用 ArrayList 类(两者具有类似的功能)时, List 类在大多数情况下执行得更好并且是类型安全的。例子中的Sort其实调用的是String.Sort方法。
3.3 List和ArrayList性能测试对比
ArrayList
List
100000
498 ms
538ms
3.4 Dictionary类/SortedDictionary类
从数据结构上来说都属于Hashtable,对关键字(键值)进行散列操作,适合键值对的存取,排序可以使用LINQ实现,建议使用SortedDictionary替换。
Dictionary和HashTable内部实现差不多,但前者无需装箱拆箱操作,效率略高一点
3.5 HashTable类
Hashtable 主要用于键值快速查找,却没有提供排序的方法,所以它的排序需要借住数组或其它集合来实现。
HashTable中的key/value均为object类型,由包含集合元素的存储桶组成。存储桶是 HashTable中各元素的虚拟子组,与大多数集合中进行的搜索和检索相比,存储桶可令搜索和检索更为便捷。每一存储桶都与一个哈希代码关联,该哈希代码是使用哈希函数生成的并基于该元素的键。HashTable的优点就在于其索引的方式,速度非常快。如果以任意类型键值访问其中元素会快于其他集合,特别是当数据量特别大的时候,效率差别尤其大。
HashTable的应用场合有:对象缓存,树递归算法的替代,和各种需提升效率的场合。
3.6 Stack类
Stack,栈,表示对象的简单的后进先出非泛型集合。Push方法入栈,Pop方法出栈。
3.7 Queue类
队列,先进先出。enqueue方法入队列,dequeue方法出队列。
3.8 自定义类集合
Step1:定义自定义类:Person
Step2:构造实体类集合:List<Person> persons
Step3:排序方法实现:
1、直接排序
2、Person实现IComparable接口,直接调用Sort方法排序
直接调用Sort方法排序
3、Linq实现排序
测试结果对比:
1.直接排序
2.实现IComparable接口
3.Linq
100000
75ms
99ms
29ms
4. DataTable排序
4.1 DataTable特点
(1)DataTable 对象是按条件区分大小写的。(如果有两个 DataTable对象分别为“mydatatable”和“Mydatatable”,则搜索表的字符串被认为是区分大小写的。如果只有“mydatatable”而不存在“Mydatatable”,则该搜索表的字符串不区分大小写)。
(2)以编程方式创建 DataTable,须先通过将 DataColumn 对象添加到 DataColumnCollection(通过 Columns 属性访问)中来定义其架构。
(3)向 DataTable 添加行,须使用 NewRow 方法返回新的 DataRow 对象。(DataTable 可存储的最大行数是 16,777,216)。
(4)DataTable 也包含可用于确保数据完整性的 Constraint 对象的集合
(5)DataTable 事件(RowChanged、RowChanging、RowDeleting 和 RowDeleted)可用于确定对表进行更改的时间
4.2 DataTable排序实现
Step1: 构造DataTable
Step2:DataView排序
Step2:DataTable.Select排序
4.3 测试结果对比
1.DataView排序
2.DataTable.Select排序
100000
526 ms
368ms
对比下自定义类存储100000条相同数据的排序结果:
测试结果对比:
1.直接排序
2.实现IComparable接口
3.Linq
100000
75ms
99ms
29ms
三、 排序效率总结
1、 在数组排序中,建议使用Array.Sort 方式,优于LINQ方式
2、 在自定义类排序时,推荐使用LINQ方式
3、 DataTable和自定义类存储同类型数据时,自定义类的排序整体优于DataTable方式。
4、 DataTable排序时,推荐使用DataTable.Select排序方式。
5、 Dictionary和HashTable内部实现差不多,但Dictionary无需装箱拆箱操作,效率略高一点。数据量较大时,建议采用HashTable。
6、 ArrayList集合的排序性能优于List集合。
7、 Stack和Queue集合用于栈和队列操作。
基于.net的通用内存缓存模型组件
谈到缓存,我们自然而然就会想到缓存的好处,比如:
- 降低高并发数据读取的系统压力:静态数据访问、动态数据访问
- 存储预处理数据,提升系统响应速度和TPS
- 降低高并发数据写入的系统压力
- 提升系统可用性,后台宕机后,系统还存在可用的机会
缓存技术一直是优化程序性能的一个重要手段,在互联网技术体系中,也不例外。但是在分布式架构下,大家开始更多的使用分布式缓存,比如Redis、MemcacheD等等,对进程内的内存缓存使用的越来越少。其主要原因无外乎几点:
一是,数据不能做到强一致性,程序内存数据缓存同步的周期相对分布缓存更慢一些。
二是,需要对缓存的各种同步策略进行封装,并控制同步时机。进程内缓存的使用比分布式缓存的使用具有更高的技术门槛。没有分布缓存使用简单。
虽然分布式缓存具有非常多很好的特性,但是当完全抛弃了程序内存缓存后,分布式缓存将会被滥用,应用程序甚至过度的依赖分布式缓存。笔者认为,任何一种技术的滥用,都将可能导致系统架构在健壮性上存在缺陷。分布式缓存虽然很好用,性能也不错,但是与进程内存缓存比起来,性能还是差了好多个数量级。要想把系统的性能做到极致,仅仅依赖Redis等分布式缓存还不不够的,还需要充分利用进程内存缓存。
缓存技术,从出现到现在,总结来说,已有四个阶段的发展:本地缓存、分布式缓存、弹性缓存平台,弹性应用平台。本地缓存的特点是数据存储在应用代码所在内存空间,可提供快速的数据访问,纳秒级性能。缺点也很明显,数据无法分布式共享,无容错处理。分布式缓存的特点是数据在固定数目的集群节点间分布存储,缓存容量可扩展(静态扩展),但是扩展过程中需大量配置,无容错机制。弹性缓存平台的特性是数据在集群节点间分布存储,基于冗余机制实现高可用性。其优点是可动态扩展,具有容错能力,但是复制备份会对系统性能造成一定影响。弹性应用平台的特点是弹性缓存与代码执行的组合体,将业务逻辑代码转移到数据所在节点执行,极大地降低数据传输开销,提升系统性能。纵观整个缓存技术的发展,经历了从分散到集中,又到集中并分散的一个过程。弹性应用平台作为最终的缓存解决方案,已经不仅仅停留在缓存技术本身,而是更多的考虑了如何更好的与业务代码无缝集成,并提供进程内存级别的性能。
基于此,我们规划设计了一个通用的内存缓存组件。通过此组件,可以实现各种内存数据的缓存,以及缓存数据同步等,并提供了分布式缓存数据同步到进程内存的方案。此组件与传统的缓存组件有很大的不同,它是对进程内存缓存的使用和同步做了抽象和总结,直接提供了一套模型。上文也提到,使用进程内存缓存最大的挑战在与数据同步,那么,我们先看一下影响进程内存缓存同步的一些因素。通过下图,我们看到在同步策略、同步时机、同步模式上都有很多选择。进程内存缓存的使用,其实就是下面三个维度组合处理的一个结果。
在实际的业务中,同步策略更多的会基于时间、数据版本或者时间来做,然后选择合适的同步时机和模式来执行数据同步。所以,在组件的封装上,我们支持了三种应用模型:
- 对缓存数据标记有效期,过期自动清理
- 缓存数据时,同时缓存数据版本,定时校验或实时校验数据版本,发现版本不一致时清理或重新同步缓存数据
- 缓存数据并订阅数据变化通知,当收到变化通知后,更新缓存数据
模型一:对缓存数据标记有效期,过期自动清理
此模型主要适用于, 比如:字符串资源信息查询、App充电地图数据、App最新动态、高频率日志记录、配置中心数据的缓存等等。
数据实时性要求很低,访问频率很高,变化频率较小的数据查询
访问频率很高,查询参数基本一致的数据查询
访问频率很高,允许丢失的辅助信息写入
代码示例:
//nuget:Teld.Core//引用:Teld.Core.Cache.ServiceEx.dll//创建一个带过期时间的本地内存容器using(var container = LocalCacheService.CreateExpirationContainer("TTP-Cache-CFG")){//向容器中增加项目,3秒钟的有效期container.Add("Name", "张三", new TimeSpan(0, 0, 3));//想容器中增加项目,永久有效container.Add("Address", "鑫盛大厦1号楼12层北特来电");}模型二:缓存数据时,同时缓存数据版本,定时校验或实时校验数据版本,发现版本不一致时清理或重新同步缓存数据
此模型主要适用于, 比如:帮助查询等。
- 数据实时性要求不高,访问频率很高的数据查询
- 访问频率很高,查询参数基本一致的数据查询
代码示例1: 定时同步,每分钟执行一次版本同步
static voidMain(string[] args)
{
//创建缓存数据同步代理
CacheValidateEventHandler<UserInfo> handler = newCacheValidateEventHandler<UserInfo>(SyncUserData);//创建一个带版本校验的本地内存缓存容器,每隔60s,自动进行一次数据全量同步using(varcontainer = LocalCacheService.CreateVersionContainer<UserInfo>("TTP-Cache-User", handler, SyncTime.Timing, SyncModel.All, 60)){
container.SyncFailed += Container_SyncFailed; //数据同步失败,事件通知
container.SyncData(); //立即执行一次数据同步
var user = container.Get("Name"); //从缓存中获取数据
Console.WriteLine(JsonConvert.SerializeObject(user));
}
Console.ReadLine();
}//数据同步,返回全量或者增量数据,并返回数据的最新版本
static Dictionary<string, UserInfo> SyncUserData(CacheValidateEventArgs e,out stringnewVersion)
{
//通过e.Version获取上次同步的数据数据
Dictionary<string, UserInfo> result = new Dictionary<string, Cache.UserInfo>();
Random r = new Random(DateTime.Now.Second);result.Add("Name", new Cache.UserInfo() { Name = "Name", Age =r.Next(1,20) , IsMale= true , LastModifyTime = DateTime.Now});newVersion = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff");
return result;
}代码示例2: 调用缓存Get方法时,自动同步缓存
static void Main (string[] args)
{ //创建缓存数据同步代理
CacheValidateEventHandler<UserInfo> handler = new CacheValidateEventHandler<UserInfo>(SyncUserData);
//创建一个带版本校验的本地内存缓存容器
varcontainer = LocalCacheService.CreateVersionContainer<UserInfo>("TTP-Cache-User", handler, SyncTime.GetCheck, SyncModel.All);
container.SyncFailed += Container_SyncFailed; //数据同步失败,事件通知
var user = container.Get("Name"); //从缓存数据中获取数据
Console.WriteLine(JsonConvert.SerializeObject(user));
Console.ReadLine();
}
模型三:缓存数据并订阅数据变化通知,当收到变化通知后,更新缓存数据
此模型主要适用于, 比如:电站状态缓存等。
- 数据实时性要求比较高,访问频率很高的数据查询
代码示例1: 带事件更新通知的缓存
public void GetString_Invoke()
{
//创建一个带MQ变化通知的本地内存缓存容器
using (var container = LocalCacheService.CreateEventContainer<string>("TTP-Cache-EventCacheUnitTest",
(CacheValidateEventArgs e, out string newVersion) =>
{
newVersion = Guid.NewGuid().ToString();
return BuildStringData();
}, SyncModel.All, 1))
{
container.SyncData(); //为容器初始化数据
var data = container.Get("lastModifytime"); //获取数据项
Assert.IsNotNull(data);
var data1 = container.Get("lastModifytime");
Assert.AreEqual(data, data1);
//发送数据项的更新通知事件
LocalCacheService.SendDataChangedEvent(container.Name, "lastModifytime");
Thread.Sleep(5000);
var data2 = container.Get("lastModifytime");
Assert.AreNotEqual(data2, data);
}
}
代码示例2:数据删除的事件通知
public void GetString_Delete(){//创建一个带MQ变化通知的本地内存缓存容器using (var container = LocalCacheService.CreateEventContainer<string>("TTP-Cache-EventCacheUnitTest",(CacheValidateEventArgs e, out string newVersion) =>{newVersion = Guid.NewGuid().ToString();return BuildStringData();}, SyncModel.All, 1)){container.SyncData(); //为容器初始化数据var data = container.Get("lastModifytime"); //获取数据项Assert.IsNotNull(data);var data1 = container.Get("lastModifytime");Assert.AreEqual(data, data1);LocalCacheService.SendDataChangedEvent(container.Name,"lastModifytime", EventType.Delete); //发送数据项的删除通知事件Thread.Sleep(5000);var data2 = container.Get("lastModifytime");Assert.IsNull(data2);}}以上是此缓存组件应用的三种模型。三种模型,可通过 LocalCacheService 创建。其代码如下:
public class LocalCacheService
{/// <summary>/// 创建过期自动清理的本地内存缓存容器/// </summary>/// <param name="key">容器标识,三段式命名,全局唯一:TTP-Resource-DataCache</param>/// <param name="limitSize">限制使用的内存大小(M)</param>public static IExpirationCacheContainer CreateExpirationContainer(string key,long? limitSize =null){if (string.IsNullOrEmpty(key)){throw new ArgumentException("Key值不能为空!", nameof(key));}return new InMemoryExpirationContainer(key, limitSize);}/// <summary>/// 创建基于版本比较的本地内存缓存容器/// </summary>/// <typeparam name="T"></typeparam>/// <param name="key">容器标识,三段式命名,全局唯一:TTP-Resource-DataCache</param>/// <param name="handler">基于版本比较的数据处理器</param>/// <param name="syncTime">同步时机</param>/// <param name="model">同步模式</param>/// <param name="syncTimeMin">同步时机为Timing时,同步时间间隔</param>/// <returns></returns>public static IVersionCacheContainer<T> CreateVersionContainer<T>(string key,CacheValidateEventHandler<T> handler, SyncTime syncTime =SyncTime.Invoke,SyncModel model = SyncModel.All,int syncTimeSec=180) where T : class{if (string.IsNullOrEmpty(key)){throw new ArgumentException("Key值不能为空!", nameof(key));}if (handler == null){throw new ArgumentNullException(nameof(handler));}if (syncTimeSec == 0)syncTimeSec = 180;return new InMemoryVersionCacheContainer<T>(key, handler, syncTime, model,TimeSpan.FromSeconds(syncTimeSec));}/// <summary>/// 创建基于事件通知的本地内存缓存容器/// </summary>/// <typeparam name="T"></typeparam>/// <param name="key">容器标识,三段式命名,全局唯一:TTP-Resource-DataCache</param>/// <param name="handler">基于版本比较的数据处理器</param>/// <param name="model">同步模式</param>/// <param name="syncTimeMin">同步时机为Timing时,同步时间间隔</param>/// <returns></returns>public static IVersionCacheContainer<T> CreateEventContainer<T>(stringkey,CacheValidateEventHandler<T> handler, SyncModel model = SyncModel.All, intsyncTimeSec = 180) where T : class{if (string.IsNullOrEmpty(key)){throw new ArgumentException("Key值不能为空!", nameof(key));}if (handler == null){throw new ArgumentNullException(nameof(handler));}if (syncTimeSec == 0)syncTimeSec = 180;return new InMemoryEventCacheContainer<T>(key,model,handler,TimeSpan.FromSeconds(syncTimeSec));}}同步模式和同步时机的定义如下:
/// <summary>
/// 同步模式
/// </summary>
public enum SyncModel : int
{
All, //全量同步,清楚历史缓存信息,重新插入
Increase, //增量同步,同步变化部分,不对历史缓存数据清理
Clear //仅清理历史数据
}/// <summary>
/// 同步时机
/// </summary>
public enum SyncTime : int
{
Invoke, //调用方执行SyncData方法主动同步
Timing, //定时同步
GetCheck //Get方法是自动同步
}以上是我们在进程内存组件的一些实践心得,更多技术细节,欢迎同学们来电沟通。微信号:vveiliang
Scala学习笔记:重要语法特性
1.变量声明
Scala 有两种变量, val 和 var val的值声明后不可变,var可变
val msg: String = "Hello yet again, world!"
或者类型推断
val msg = "Hello, world!"
2.函数定义
如果函数仅由一个句子组成,你可以可选地不写大括号。
def max2(x: Int, y: Int) = if (x > y) x else y
3.for循环
打印每一个命令行参数的方法是:
args.foreach(arg => println(arg))
如果函数文本由带一个参数的一句话组成,
args.foreach(println)
Scala 里只有一个指令式 for的函数式近似。
for (arg <- args)
println(arg)<- 的左边是变量,右边是数组。
再比如带类型的参数化数组
val greetStrings = new Array[String](3)
greetStrings(0) = "Hello"
greetStrings(1) = ", "
greetStrings(2) = "world!\n"
for (i <- 0 to 2)
print(greetStrings(i))注意这里的数组定义,只要new的时候带类型Array[String]就行了,val后面自动推断类型。
注意这里的数组索引用的是()而不是java里面的[]。
因为scala里面根本没有传统意义上的操作符,取而代之的是他们都可以转换为方法。例如greetStrings(i)可以转换成 greetStrings.apply(i),
greetStrings(0) = "Hello" 将被转化为 greetStrings.update(0, "Hello")尽管实例化之后你无法改变 Array 的长度,它的元素值却是可变的。因此,Array 是可变的对象。
4.List对象
创建一个 List 很简单。 List里面元素不可变。
val oneTwoThree = List(1, 2, 3)
List有个叫“ :::”的方法实现叠加功能。
val oneTwo = List(1, 2)
val threeFour = List(3, 4)
val oneTwoThreeFour = oneTwo ::: threeFour//结果是List(1, 2, 3, 4)
Cons 把一个新元素组合到已有 List的最前端,然后返回结果 List。 例如,若执行这个脚本:
val twoThree = list(2, 3)
val oneTwoThree = 1 :: twoThree
println(oneTwoThree)
//你会看到: List(1, 2, 3)一个简单的需记住的规则:如果一个方法被用作操作符标注,如 a* b,那么方法被左操作数调用,就像 a.*(b)——除非方法名以冒号结尾。这种情况下,方法被右操作数调用。因此, 1 :: twoThree 里, ::方法被 twoThree 调用,传入 1,像这样: twoThree.::(1)。
类 List 没有提供 append 操作,因为随着列表变长 append 的耗时将呈线性增长,而使用::做前缀则仅花费常量时间。如果你想通过添加元素来构造列表,你的选择是把它们前缀进去,当你完成之后再调用 reverse;5.元组
与列表一样,元组也是不可变的,但与列表不同,元组可以包含不同类型的元素。
val pair = (99, "Luftballons", 55)
println(pair._1)
println(pair._2)
println(pair._3)注意这里第一个元素是从_1开始而不像List那样从0开始。
6.Set和Map
var jetSet = Set("Boeing", "Airbus")
jetSet += "Lear"默认set或者HashSet可变。即jetSet = jetSet + "Lear"
如果要用不可变的set或者HashSet,要import
import scala.collection.immutable.HashSet
val hashSet = HashSet("Tomatoes", "Chilies")
println(hashSet + "Coriander")//这里就不能再赋值给hashSet了
7.访问级别
Public 是 Scala 的缺省访问级别。C++中struct默认是public,class默认是private。java类中的变量默认是default类型,只允许在同一个包内访问,一般用的时候跟private差不多。所以java用的时候要想用public必须要指出。
class ChecksumAccumulator {
private var sum = 0
...}
8.静态对象:object
Scala 比 Java 更面向对象的一个方面是 Scala 没有静态成员。替代品是, Scala 有单例对象: singleton object。除了用 object 关键字替换了 class 关键字以外,单例对象的定义看上去就像是类定义。
import scala.collection.mutable.Map
object ChecksumAccumulator {
private val cache = Map[String, Int]()
def calculate(s: String): Int =
if (cache.contains(s))
cache(s)
else {
.....
}可以如下方式调用 ChecksumAccumulator单例对象的calculate方法:
ChecksumAccumulator.calculate("Every value is an object.")
也不用在实例化了。
类和单例对象间的一个差别是,单例对象不带参数,而类可以。因为你不能用new关键字实例化一个单例对象, 你没机会传递给它参数。9.伴生对象
当单例对象与某个类共享同一个名称时,他被称作是这个类的伴生对象: companion object。你必须在同一个源文件里定义类和它的伴生对象。类被称为是这个单例对象的伴生类: companion class。类和它的伴生对象可以互相访问其私有成员。
不与伴生类共享名称的单例对象被称为孤立对象: standalone object。 由于很多种原因你会用到它10.Main函数
要执行Scala程序,你一定要提供一个有main方法(仅带一个参数, Array[String],且结果类型为 Unit的孤立单例对象名。比如下面这个例子;
import ChecksumAccumulator.calculate
object Summer {
def main(args: Array[String]) { //对比java里面的public static voidmain(String[] args){}
for (arg <- args)
println(arg + ": " + calculate(arg))
}
}11.另一种Main函数
Application 特质:在单例对象名后面写上“ extends Application” 。然后取代main 方法.
import ChecksumAccumulator.calculate
object FallWinterSpringSummer extends Application {
for (season <- List("fall", "winter", "spring"))
println(season +": "+ calculate(season))
}效果和main函数一样。不过它也有些缺点。首先, 如果想访问命令行参数的话就不能用它,因为args数组不可访问。第二,如果你的程序是多线程的就需要显式的 main 方法。最后,某些JVM的实现没有优化被 Application 特质执行的对象的初始化代码。因此只有当你的程序相对简单和单线程情况下你才可以继承 Application 特质。
12.带参数类声明
Java 类具有可以带参数的构造器,而 Scala 类可以直接带参数。 Scala 的写法更简洁——类参数可以直接在类的主体中使用;没必要定义字段然后写赋值函数把构造器的参数复制到字段里。(不需要构造函数)
例如以下分数构造器:class Rational(n: Int, d: Int) {
require(d != 0)
override def toString = n +"/"+ dval numer: Int = n
val denom: Int = d
def add(that: Rational): Rational =
new Rational(
numer * that.denom + that.numer * denom,
denom * that.denom
}require方法带一个布尔型参数。如果传入的值为真,require将正常返回。反之,require将通过抛出IllegalArgumentException来阻止对象被构造。 这里使用了重载。重载了类自带的toString函数。 这里that也就是随便起了个名字的变量,不是关键字。this是关键字。比如下面这个函数:
def lessThan(that: Rational) =
this.numer * that.denom < that.numer * this.denom这里的this可以省略。但下面这个就不能省略了:
def max(that: Rational) =
if (this.lessThan(that)) that else this13.if返回值
Scala 的 if 可以产生值(是能返回值的表达式)。于是 Scala 持续了这种趋势让 for, try 和 match 也产生值。while不产生值,所以用得少。如果实在想用while,在纯函数式编程的时候可以考虑递归。
指令式风格:var filename = "default.txt"
if (!args.isEmpty)
filename = args(0)函数式风格:
val filename =
if (!args.isEmpty) args(0)
else "default.txt"使用 val 而不是 var 的第二点好处是他能更好地支持等效推论。无论何时都可以用表达式替代变量名。
如要替代 println(filename),你可以这么写:println(if (!args.isEmpty) args(0) else "default.txt")
14.break 和 continue
scala里面也没有break以及continue。如果想要用到他们的功能,可以使用增加布尔变量控制到循环语句判断中,类似:
var foundIt = false while (i < args.length && !foundIt) { }
15.for过滤器
在for里面还支持过滤器:
val filesHere = (new java.io.File(".")).listFiles //路径名的目录中的文件的数组。
for (file <- filesHere if file.getName.endsWith(".scala"))
println(file)甚至多个过滤器:
for (
file <- filesHere
if file.isFile;
if file.getName.endsWith(".scala")
) println(file)16.for循环生成新集合
for {子句} yield {循环体} 制造新集合
例如:def scalaFiles =
for {
file <- filesHere
if file.getName.endsWith(".scala")
} yield file这样每一步就不是打印一个file,而是将file存储起来,最终产生一个Array[File]
17.try catch finally
try {
val f = new FileReader("input.txt")
openFile(file)} catch {
case ex: FileNotFoundException => new FileReader("input.txt")
//注意这里依然有返回值。使用=>符号
case ex: IOException => // Handle other I/O error
}
finally {
file.close() // 确保关闭文件。
}这里catch {
case ex: 。。。
case ex: 。。。
}对比java,catch是这样用的
catch (Exception e) { 。。。。} 而且经常在catch里面throw。scala一般不使用throw。还有,finally里最好只做一些关闭或打印之类的操作,不要有副作用的表达式,这样会有无谓的返值。
18.switch语句
match语句就像java里的switch语句。
val firstArg = if (args.length > 0) args(0) else ""
firstArg match {
case "salt" => println("pepper")
case "chips" => println("salsa")
case "eggs" => println("bacon")
case _ => println("huh?")
}差别:
1、Java 的 case 语句里面的整数类型和枚举常量。而这里可以是任意类型,甚至正则匹配
2、在每个可选项的最后并没有 break。取而代之, break是隐含的。
3、match 表达式也能产生值:例如可以这样:val friend =
firstArg match {
case "salt" => "pepper"
case "chips" => "salsa"
case _ => "huh?"
}
println(friend)19.函数嵌套定义
函数:函数式编程风格的一个重要设计原则:程序应该被解构成若干小的函数, 每个完成一个定义良好的任务。在java中通常这样做:
def processFile(filename: String, width: Int) {
...
for (line <- source.getLines)
processLine(filename, width, line)
}
private def processLine(filename:String, width:Int, line:String) {
....
}Scala 提供了另一种方式:你可以把函数定义在另一个函数中。就好象本地变量那样,这种本地函数仅在包含它的代码块中可见。
def processFile(filename: String, width: Int) {
def processLine(filename:String, width:Int, line:String) {
...
}
val source = Source.fromFile(filename)
for (line <- source.getLines) {
processLine(filename, width, line)
}
}20.Lambda函数
Scala 拥有第一类函数: first-class function。你不仅可以定义函数和调用它们,还可以把函数写
成没有名字的文本: (跟python里面的lambda函数差不多)scala> var increase = (x: Int) => x + 1
scala> increase(10)
res0: Int = 11如果你想在函数文本中包括超过一个语句,用大括号包住函数体,当函数值被调用时,所有的语句将被执行,而函数的返回值就是最后一行产生的那个表达式。
scala> increase = (x: Int) => {
println("We")
println("are")
println("here!")
x + 1
}21.集合通配符:_
Scala 提供了许多方法去除冗余信息并把函数文本写得更简短。比如去除参数类型以及被推断的参数之外的括号:
scala> someNumbers.filter(x => x > 0)
如果想让函数文本更简洁,可以把下划线当做一个或更多参数的占位符,只要每个参数在函数文
本内仅出现一次。 :scala> someNumbers.filter(_ > 0)
还可以使用一个下划线替换整个参数列表。叫偏应用函数
scala> def sum(a: Int, b: Int, c: Int) = a + b + c
一般调用可以这样:
scala> sum(1, 2, 3)
用偏函数取而代之:
scala> val a = sum _ //请记住要在函数名和下划线之间留一个空格
scala> a(1, 2, 3)再比如
someNumbers.foreach(println _)
22.闭包
闭包:是指可以包含*(未绑定到特定对象)变量的代码块
任何带有*变量的函数文本,如(x: Int) => x + more,都是开放术语:由于函数值是关闭这个开放术语(x: Int) => x + more 的行动的最终产物, 得到的函数值将包含一个指向捕获的 more 变量的参考, 因此被称为闭包。scala> var more = 1
scala> val addMore = (x: Int) => x + more
scala> addMore(10)
res19: Int = 11每次函数被调用时都会创建一个新闭包。每个闭包都会访问闭包创建时活跃的 more 变量。
23.重复参数
想要标注一个重复参数,在参数的类型之后放一个星号。例如:
scala> def echo(args: String*) =
for (arg <- args) println(arg)这样定义, echo 可以被零个至多个 String 参数调用:
scala> echo()
scala> echo("one")
scala> echo("hello", "world!")但是如果有一个数组变量
scala> val arr = Array("What's", "up", "doc?"),则不能像scala> echo(arr)这样调用。
你需要在数组参数后添加一个冒号和一个_*符号,像这样:scala> echo(arr: _*)
24.curry化
Scala 允许你创建新的“感觉像是原生语言支持”的控制抽象:scala提供curry 化:
Scala里的Curry化可以把函数从接收多个参数转换成多个参数列表。如果要用同样的一组实参多次调用一个函数,可以用curry化来减少噪音,让代码更有味道。我们要编写的方法不是接收一个参数列表,里面有多个参数,而是有多个参数列表,每个里面可以有一个或多个参数。也就是说,写的不是def foo(a: Int, b: Int, c: Int){},而是 def foo(a: Int)(b: Int)(c: Int){}。可以这样调用这个方法,比如:foo(1)(2)(3)、foo(1){2}{3},甚至这样foo{1}{2}{3}。例如,传统函数如下:
scala> def plainOldSum(x: Int, y: Int) = x + y
scala> plainOldSum(1, 2) //res5: Int = 3
scala允许你使用curry化的新型函数:
scala> def curriedSum(x: Int)(y: Int) = x + y
scala> curriedSum(1)(2) //res5: Int = 3结果一样。
25.带函数参数的函数
高阶函数: higher-order function——带其它函数做参数的函数
def filesMatching(query: String,
matcher: (String, String) => Boolean) = {
for (file <- filesHere; if matcher(file.getName, query))
yield file
}这里matcher其实是一个函数,这里做了filesMatching函数的参数。
(String, String) => Boolean)表示matcher函数的参数是(String, String)类型,而返回值是Boolean 类型
你可以通过让多个搜索方法调用它,并传入合适的函数:def filesEnding(query: String) =
filesMatching(query, _.endsWith(_))这就相当于
def filesEnding(query: String) =
for (file <- filesHere; if file.getName.endsWith(query))
yield file因为上面matcher: (String, String)里面两个参数, matcher(file.getName, query)
所以 _.endsWith(_)里面的第一个_对应于字符串file.getName,第二个_对应于字符串query,
连在一起就是file.getName.endsWith(query)类似的
def filesContaining(query: String) =
filesMatching(query, _.contains(_))就相当于
def filesContaining(query: String) =
for (file <- filesHere; if file.getName.contains(query))
yield file26.传名参数
Scala中允许无参数的函数作为另一函数的参数传递进去,也就是传名参数(call-by-name)
定义一个函数 myAssert ,而其参数则为传名参数。在这里,我们想要实现的是断言,可以将传名参数写成函数文本的格式:(…) => Type ,即参数列表 => 类型。def myAssert(check: () => Boolean) =
if(!check()){
println("OK ...")
throw new AssertionError
}上面的函数定义了一个当客户代码传入的函数值(这里我们用()指明,代表省略了该函数的参数列表。调用方式如下:
scala> myAssert(() => 5 < 3)
客户端代码中的 () => 5 < 3 似乎有点繁琐,如果能够直接传入 5 < 3 之类的布尔表达式就更好了。这是可以实现的。只需要将函数定义 def 中空参数列表即小括号对()去掉,直接用 => 而不是()=> 就可以了。此外,if 判断中的 check 后面的()也要同时去掉。修改后的代码如下:
def myAssert(check: => Boolean) =
if(!check){
println("OK ...")
throw new AssertionError
}
myAssert(5 < 3)
-