TPL异步并行编程之任务超时

此处参考自阿涛的博文:http://www.cnblogs.com/HelloMyWorld/p/5526914.html

一 自己定义

基本的思路:

net中异步操作由于是交给线程来实现,因此不可能真正想js那样将一个单线程上的任务移除:如

var id=setTimeout(fun,200);

if(id>0){

  clearTimeout(id);//将一个任务从单线程的任务栈中移除,自然就做到了真正的移除任务

}

但是在net中一个任务交给线程执行后,具体什么时候执行完成我们并不确定,就算是我们把线程终止掉,如果任务执行完了,且执行完后与之关联的处理函数关系任然建立,那么其处理函数一样会执行

那么对于net这样的现状,我们只好斩断与之关联的处理函数的关系,来达到取消一个任务的或者认为他超时

同样的原理,比如我们有一个搜索框,或者地图缩放来做一些事情,其实我们在输入框中输入一段文字是非常快的,假如我们以文本发生变化就去发起网络搜索,那么其实会发起n个字符的搜索请求;

但是实际上我们想要只是最后一次发起的请求而已,那么问题就来了,这些回来的数据谁先谁后都是随机的,也就自然的出现了结果乱七八糟,那么怎么办呢?

1 同步搜索

我们可以锁定ui界面让用户不再能够发起请求:当然是一个传统并非很友好的解决

如:点击搜索按钮,调用搜索程序发起搜索,锁住ui,等待结果返回,再解锁ui将操作交给用户;

2 延时处理

比如连续的触发任务执行,那么我们就让他在小于一段时间内的触发不发起真正的请求,这种办法也只是减少无用请求而已

如:我一直点搜索按钮一直点,是的一直点,但是当我到达比如200ms的才会发起请求,之前点的都没用;

3 取消无用的处理

比如我连续发起了10次请求但是,只要最后一次的,那么我把之前的就取消掉,请求其实已经发出,只是对于结果我们丢弃掉了

如:我一直点一直点一直点,发起了10次请求,但是我每次发起请求前就把上一次的丢弃掉,注意这里并不是真正让这个网络请求取消了,本质上是没有办法取消掉的,

我只是让回来的结果丢弃掉而已不做任何处理了,那么就算是我点了n次其实我也只取了我最后一次的结果,所以这样看起来合情合理,但是对于网络流量要求的app来说就很不科学了

4 将2和3结合起来

首先我们做一个延时处理比如200ms,当达到延时处理后再发起请求,但是有个特殊地方比如我在下一个200ms内又发起请求,此时结果并没有回来,那么我们将之前的任务取消掉就好了,这样也就相对友好的

处理了这些矛盾,这里兼顾了2和3的优缺点

以下是我改进的代码,让任务可以手动取消

封装的超时Task类

 public class TimeoutTask
{
#region 字段
private Action _action;
private CancellationToken _token;
private event AsyncCompletedEventHandler _asyncCompletedEvent;
private TaskCompletionSource<AsyncCompletedEventArgs> _tcs;
#endregion #region 静态方法
public static async Task<AsyncCompletedEventArgs> StartNewTask(Action action, CancellationToken token)
{
return await TimeoutTask.StartNewTask(action, token, Timeout.Infinite);
} public static async Task<AsyncCompletedEventArgs> StartNewTask(Action action, int timeout)
{
return await TimeoutTask.StartNewTask(action, CancellationToken.None, timeout);
} public static async Task<AsyncCompletedEventArgs> StartNewTask(Action action, CancellationToken token,
int timeout = Timeout.Infinite)
{
var task = new TimeoutTask(action, token, timeout); return await task.Run();
}
#endregion #region 构造 public TimeoutTask(Action action, int timeout) : this(action, CancellationToken.None, timeout)
{ } public TimeoutTask(Action action, CancellationToken token) : this(action, token, Timeout.Infinite)
{ } public TimeoutTask(Action action, CancellationToken token, int timeout = Timeout.Infinite)
{
_action = action; _tcs = new TaskCompletionSource<AsyncCompletedEventArgs>(); if (timeout != Timeout.Infinite)
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(token);
cts.CancelAfter(timeout);
_token = cts.Token;
}
else
{
_token = token;
}
}
#endregion #region 公用方法 /// <summary>
/// 运行
/// </summary>
/// <returns></returns>
public async Task<AsyncCompletedEventArgs> Run()
{
_asyncCompletedEvent += AsyncCompletedEventHandler; try
{
using (_token.Register(() => _tcs.TrySetCanceled()))
{
ExecuteAction();
return await _tcs.Task.ConfigureAwait(false);
} }
finally
{
_asyncCompletedEvent -= AsyncCompletedEventHandler;
} } public void Cancel()
{
if (!_token.CanBeCanceled)
{
_tcs.TrySetCanceled();
}
}
#endregion #region 私有方法 /// <summary>
/// 执行Action
/// </summary>
private void ExecuteAction()
{
Task.Factory.StartNew(() =>
{
_action.Invoke(); OnAsyncCompleteEvent(null);
});
} /// <summary>
/// 异步完成事件处理
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void AsyncCompletedEventHandler(object sender, AsyncCompletedEventArgs e)
{
if (e.Cancelled)
{
_tcs.TrySetCanceled();
}
else if (e.Error != null)
{
_tcs.TrySetException(e.Error);
}
else
{
_tcs.TrySetResult(e);
}
} /// <summary>
/// 触发异步完成事件
/// </summary>
/// <param name="userState"></param>
private void OnAsyncCompleteEvent(object userState)
{
if (_asyncCompletedEvent != null)
{
_asyncCompletedEvent(this, new AsyncCompletedEventArgs(error: null, cancelled: false, userState: userState));
}
}
#endregion
} /// <summary>
/// 有返回值,可超时,可取消的Task
/// </summary>
/// <typeparam name="T"></typeparam>
public class TimeoutTask<T>
{
#region 字段
private Func<T> _func;
private CancellationToken _token;
private event AsyncCompletedEventHandler _asyncCompletedEvent;
private TaskCompletionSource<AsyncCompletedEventArgs> _tcs;
#endregion #region 静态方法
public static async Task<T> StartNewTask(Func<T> func, CancellationToken token,
int timeout = Timeout.Infinite)
{
var task = new TimeoutTask<T>(func, token, timeout); return await task.Run();
} public static async Task<T> StartNewTask(Func<T> func, int timeout)
{
return await TimeoutTask<T>.StartNewTask(func, CancellationToken.None, timeout);
} public static async Task<T> StartNewTask(Func<T> func, CancellationToken token)
{
return await TimeoutTask<T>.StartNewTask(func, token, Timeout.Infinite);
} #endregion #region 公用方法
/// <summary>
/// 运行Task
/// </summary>
/// <returns></returns>
public async Task<T> Run()
{
_asyncCompletedEvent += AsyncCompletedEventHandler; try
{
using (_token.Register(() => _tcs.TrySetCanceled()))
{
ExecuteFunc();
var args = await _tcs.Task.ConfigureAwait(false);
return (T)args.UserState;
} }
finally
{
_asyncCompletedEvent -= AsyncCompletedEventHandler;
} } public bool CanBeCanceled
{
get { return _token.CanBeCanceled; }
} public void Cancel()
{
if (!_token.CanBeCanceled)
{
_tcs.SetCanceled();
}
}
#endregion #region 构造
public TimeoutTask(Func<T> func, CancellationToken token) : this(func, token, Timeout.Infinite)
{ } public TimeoutTask(Func<T> func, int timeout = Timeout.Infinite) : this(func, CancellationToken.None, timeout)
{ } public TimeoutTask(Func<T> func, CancellationToken token, int timeout = Timeout.Infinite)
{
_func = func; _tcs = new TaskCompletionSource<AsyncCompletedEventArgs>(); if (timeout != Timeout.Infinite)
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(token); cts.CancelAfter(timeout);
_token = cts.Token;
}
else
{
_token = token;
}
}
#endregion #region 私有方法 /// <summary>
/// 执行
/// </summary>
private void ExecuteFunc()
{
ThreadPool.QueueUserWorkItem(s =>
{
var result = _func.Invoke(); OnAsyncCompleteEvent(result);
});
} /// <summary>
/// 异步完成事件处理
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void AsyncCompletedEventHandler(object sender, AsyncCompletedEventArgs e)
{
if (e.Cancelled)
{
_tcs.TrySetCanceled();
}
else if (e.Error != null)
{
_tcs.TrySetException(e.Error);
}
else
{
_tcs.TrySetResult(e);
}
} /// <summary>
/// 触发异步完成事件
/// </summary>
/// <param name="userState"></param>
private void OnAsyncCompleteEvent(object userState)
{
if (_asyncCompletedEvent != null)
{
_asyncCompletedEvent(this, new AsyncCompletedEventArgs(error: null, cancelled: false, userState: userState));
}
}
#endregion
}

demo

 class Program
{
private static TimeoutTask<string> result; static void Main(string[] args)
{ ThreadMethod(); Console.WriteLine("启动完成");
Console.ReadLine();
} private async static void ThreadMethod()
{
// await TimeoutTask.StartNewTask(LongTimeWork, 6000);
// await TimeoutTask<string>.StartNewTask(LongTimeWork2, 2000);
try
{
for (int i = ; i < ; i++)
{ //我手动取消掉上一次的
if (result!=null)
{
try
{
//取消掉
result.Cancel();
}
catch (Exception er)
{
}
} result = new TimeoutTask<string>(LongTimeWork2); try
{
result.Run();
}
catch (Exception ee)
{ }
} Console.WriteLine(result);
}
catch (Exception ex)
{ }
} private static void LongTimeWork()
{
Thread.Sleep();
} private static string LongTimeWork2()
{
Thread.Sleep();
return "XXD";
} }

二 Task天生超时

什么是尝试超时,比如说连接数据库就有TryConnect尝试,在一些访问资源的时候经常用到,且Task本身也天生支持超时处理

var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;

var task=Task.Factory.StartNew(() =>
{
Thread.Sleep(3 * 1000);

}, token);

var timeout = task.Wait(4*1000,token);

if (timeout)
{

}

Wait会等待给定的时间,如果在给定的时间内已经完成那么,将返回true,意思是在指定的时间内完成了一个task,反之就认为超时了,这个也不乏一种超时处理

上一篇:NumPy 学习(3): 通用函数


下一篇:java socket编程开发简单例子 与 nio非阻塞通道