[关闭]
@a604572782 2016-02-04T15:33:10.000000Z 字数 9678 阅读 2443

实现更简单的异步操作

Async c#


前言

在.net4.0以后异步操作,并行计算变得异常简单,但是由于公司项目开发基于.net3.5所以无法用到4.0的并行计算以及Task等异步编程。因此,为了以后更方便的进行异步方式的开发,我根据.net的使用方式封装了异步编程框架,通过BeginInvoke、EndInvoke的方式实现异步编程。

框架结构

整个框架包括四个部分
1. 基类抽象Opeartor
我把每个异步执行过程称为一个Operate,因此需要一个Opeartor去执行
2. FuncAsync
异步的Func
3. ActionAsync
异步的Action
4. Asynchorus
对ActionAsync和FuncAsync的封装

Operator

Operator是一个抽象类,实现了IOperationAsyncIContinueWithAsync两个接口。
IOperationAsync实现了异步操作,IContinueWithAsync实现了类似于Task的ContinueWith方法,在当前异步操作完成后继续进行的操作

IOperationAsync接口详解

  1. public interface IOperationAsync
  2. {
  3. IAsyncResult Invoke();
  4. void Wait();
  5. void CompletedCallBack(IAsyncResult ar);
  6. void CatchException(Exception exception);
  7. }

IContinueWithAsync接口详情

  1. public interface IContinueWithAsync
  2. {
  3. Operator Previous { get; set; }
  4. Operator Next { get; set; }
  5. Operator ContinueWithAsync(Action action);
  6. Operator ContinueWithAsync<TParameter>(Action<TParameter> action, TParameter parameter);
  7. }
  1. public abstract class Operator : IOperationAsync, IContinueWithAsync
  2. {
  3. public IAsyncResult Middle;
  4. public readonly string Id;
  5. public Exception Exception { get; private set; }
  6. public Operator Previous { get; set; }
  7. public Operator Next { get; set; }
  8. protected Operator()
  9. {
  10. Id = Guid.NewGuid().ToString();
  11. }
  12. public abstract IAsyncResult Invoke();
  13. protected void SetAsyncResult(IAsyncResult result)
  14. {
  15. this.Middle = result;
  16. }
  17. public virtual void Wait()
  18. {
  19. if (!Middle.IsCompleted) Middle.AsyncWaitHandle.WaitOne();
  20. }
  21. public virtual void CompletedCallBack(IAsyncResult ar)
  22. {
  23. }
  24. public void CatchException(Exception exception)
  25. {
  26. this.Exception = exception;
  27. }
  28. protected Operator ContinueAsync()
  29. {
  30. if (Next != null) Next.Invoke();
  31. return Next;
  32. }
  33. public virtual Operator ContinueWithAsync(Action action)
  34. {
  35. Next = new ActionAsync(action);
  36. Next.Previous = this;
  37. return Next;
  38. }
  39. public virtual Operator ContinueWithAsync<TParameter>(Action<TParameter> action, TParameter parameter)
  40. {
  41. Next = new ActionAsync<TParameter>(action, parameter);
  42. Next.Previous = this;
  43. return Next;
  44. }
  45. public virtual Operator ContinueWithAsync<TResult>(Func<TResult> func)
  46. {
  47. Next = new FuncAsync<TResult>();
  48. Next.Previous = this;
  49. return Next;
  50. }
  51. public virtual Operator ContinueWithAsync<TParameter, TResult>(Func<TParameter, TResult> func,
  52. TParameter parameter)
  53. {
  54. Next = new FuncAsync<TParameter, TResult>(func, parameter);
  55. Next.Previous = this;
  56. return Next;
  57. }
  58. }

无返回异步操作

ActionAsync

  1. public class ActionAsync : Operator
  2. {
  3. private readonly Action _action;
  4. protected ActionAsync()
  5. {
  6. }
  7. public ActionAsync(Action action)
  8. : this()
  9. {
  10. this._action = action;
  11. }
  12. public override IAsyncResult Invoke()
  13. {
  14. var middle = _action.BeginInvoke(CompletedCallBack, null);
  15. SetAsyncResult(middle);
  16. return middle;
  17. }
  18. public override void CompletedCallBack(IAsyncResult ar)
  19. {
  20. try
  21. {
  22. _action.EndInvoke(ar);
  23. }
  24. catch (Exception exception)
  25. {
  26. this.CatchException(exception);
  27. }
  28. ContinueAsync();
  29. }
  30. }
  31. public class ActionAsync<T> : ActionAsync
  32. {
  33. public T Result;
  34. private readonly Action<T> _action1;
  35. protected readonly T Parameter1;
  36. public ActionAsync()
  37. {
  38. }
  39. public ActionAsync(T parameter)
  40. {
  41. this.Parameter1 = parameter;
  42. }
  43. public ActionAsync(Action<T> action, T parameter)
  44. {
  45. this._action1 = action;
  46. this.Parameter1 = parameter;
  47. }
  48. public override IAsyncResult Invoke()
  49. {
  50. var result = _action1.BeginInvoke(Parameter1, CompletedCallBack, null);
  51. SetAsyncResult(result);
  52. return result;
  53. }
  54. public override void CompletedCallBack(IAsyncResult ar)
  55. {
  56. try
  57. {
  58. _action1.EndInvoke(ar);
  59. }
  60. catch (Exception exception)
  61. {
  62. this.CatchException(exception);
  63. }
  64. ContinueAsync();
  65. }
  66. }

有返回异步

FuncAsync实现了IFuncOperationAsync接口

IFuncOperationAsync

  1. public interface IFuncOperationAsync<T>
  2. {
  3. void SetResult(T result);
  4. T GetResult();
  5. }

FuncAsync

  1. public class FuncAsync<TResult> : Operator, IFuncOperationAsync<TResult>
  2. {
  3. private TResult _result;
  4. public TResult Result
  5. {
  6. get
  7. {
  8. if (!Middle.IsCompleted || _result == null)
  9. {
  10. _result = GetResult();
  11. }
  12. return _result;
  13. }
  14. }
  15. private readonly Func<TResult> _func1;
  16. public FuncAsync()
  17. {
  18. }
  19. public FuncAsync(Func<TResult> func)
  20. {
  21. this._func1 = func;
  22. }
  23. public override IAsyncResult Invoke()
  24. {
  25. var result = _func1.BeginInvoke(CompletedCallBack, null);
  26. SetAsyncResult(result);
  27. return result;
  28. }
  29. public override void CompletedCallBack(IAsyncResult ar)
  30. {
  31. try
  32. {
  33. var result = _func1.EndInvoke(ar);
  34. SetResult(result);
  35. }
  36. catch (Exception exception)
  37. {
  38. this.CatchException(exception);
  39. SetResult(default(TResult));
  40. }
  41. ContinueAsync();
  42. }
  43. public virtual TResult GetResult()
  44. {
  45. Wait();
  46. return this._result;
  47. }
  48. public void SetResult(TResult result)
  49. {
  50. _result = result;
  51. }
  52. }
  53. public class FuncAsync<T1, TResult> : FuncAsync<TResult>
  54. {
  55. protected readonly T1 Parameter1;
  56. private readonly Func<T1, TResult> _func2;
  57. public FuncAsync(Func<T1, TResult> action, T1 parameter1)
  58. : this(parameter1)
  59. {
  60. this._func2 = action;
  61. }
  62. protected FuncAsync(T1 parameter1)
  63. : base()
  64. {
  65. this.Parameter1 = parameter1;
  66. }
  67. public override IAsyncResult Invoke()
  68. {
  69. var result = _func2.BeginInvoke(Parameter1, CompletedCallBack, null);
  70. SetAsyncResult(result);
  71. return result;
  72. }
  73. public override void CompletedCallBack(IAsyncResult ar)
  74. {
  75. try
  76. {
  77. var result = _func2.EndInvoke(ar);
  78. SetResult(result);
  79. }
  80. catch (Exception exception)
  81. {
  82. CatchException(exception);
  83. SetResult(default(TResult));
  84. }
  85. ContinueAsync();
  86. }
  87. }

Asynchronous 异步操作封装

ActionAsync和FuncAsync为异步操作打下了基础,接下来最重要的工作就是通过这两个类执行我们的异步操作,为此我封装了一个异步操作类
主要封装了以下几个部分:
1. WaitAll(IEnumerable<Operator> operations):等待所有操作执行完毕
2. WaitAny(IEnumerable<Operator> operations):等待任意操作执行完毕
3. ActionAsync
4. FuncAsync
5. ContinueWithAction
6. ContinueWithFunc

后面四个包含若干个重载,这里只是笼统的代表一个类型的方法

WaitAll

  1. public static void WaitAll(IEnumerable<Operator> operations)
  2. {
  3. foreach (var @operator in operations)
  4. {
  5. @operator.Wait();
  6. }
  7. }

WaitAny

  1. public static void WaitAny(IEnumerable<Operator> operations)
  2. {
  3. while (operations.All(o => !o.Middle.IsCompleted))
  4. Thread.Sleep(100);
  5. }

等待时间可以自定义

ActionInvoke

  1. public static Operator Invoke(Action action)
  2. {
  3. Operator operation = new ActionAsync(action);
  4. operation.Invoke();
  5. return operation;
  6. }
  7. public static Operator Invoke<T>(Action<T> action, T parameter)
  8. {
  9. Operator operation = new ActionAsync<T>(action, parameter);
  10. operation.Invoke();
  11. return operation;
  12. }
  13. public static Operator Invoke<T1, T2>(Action<T1, T2> action, T1 parameter1, T2 parameter2)
  14. {
  15. Operator operation = new ActionAsync<T1, T2>(action, parameter1, parameter2);
  16. operation.Invoke();
  17. return operation;
  18. }

FuncInvoke

  1. public static Operator Invoke<TResult>(Func<TResult> func)
  2. {
  3. Operator operation = new FuncAsync<TResult>(func);
  4. operation.Invoke();
  5. return operation;
  6. }
  7. public static Operator Invoke<TParameter, TResult>(Func<TParameter, TResult> func, TParameter parameter)
  8. {
  9. TParameter param = parameter;
  10. Operator operation = new FuncAsync<TParameter, TResult>(func, param);
  11. operation.Invoke();
  12. return operation;
  13. }
  14. public static Operator Invoke<T1, T2, TResult>(Func<T1, T2, TResult> func, T1 parameter1, T2 parameter2)
  15. {
  16. Operator operation = new FuncAsync<T1, T2, TResult>(func, parameter1, parameter2);
  17. operation.Invoke();
  18. return operation;
  19. }

ContinueWithAction

  1. public static Operator ContinueWithAsync(IEnumerable<Operator>operators, Action action)
  2. {
  3. return Invoke(WaitAll, operators)
  4. .ContinueWithAsync(action);
  5. }
  6. public static Operator ContinueWithAsync<TParameter>(IEnumerable<Operator> operators, Action<TParameter> action, TParameter parameter)
  7. {
  8. return Invoke(WaitAll, operators)
  9. .ContinueWithAsync(action, parameter);
  10. }

ContinueWithFunc

  1. public static Operator ContinueWithAsync<TResult>(IEnumerable<Operator> operators,Func<TResult> func)
  2. {
  3. return Invoke(WaitAll, operators)
  4. .ContinueWithAsync(func);
  5. }
  6. public static Operator ContinueWithAsync<TParameter, TResult>(IEnumerable<Operator> operators,
  7. Func<TParameter, TResult> func, TParameter parameter)
  8. {
  9. return Invoke(WaitAll, operators)
  10. .ContinueWithAsync(func, parameter);
  11. }

这里有个bug当调用ContinueWithAsync后无法调用Wait等待,本来Wait需要从前往后等待每个异步操作,但是测试了下不符合预期结果。不过理论上来说应该无需这样操作,ContinueWithAsync只是为了当上一个异步操作执行完毕时继续执行的异步操作,若要等待,那不如两个操作放到一起,最后再等待依然可以实现。

前面的都是单步异步操作的调用,若需要对某集合进行某个方法的异步操作,可以foreach遍历

  1. public void ForeachAsync(IEnumerbale<string> parameters)
  2. {
  3. foreach(string p in parameters)
  4. {
  5. Asynchronous.Invoke(Tast,p);
  6. }
  7. }
  8. public void Test(string parameter)
  9. {
  10. //TODO:做一些事
  11. }

每次都需要去手写foreach,比较麻烦,因此实现类似于PLinq的并行计算方法实在有必要,不过有一点差别,PLinq是采用多核CPU进行并行计算,而我封装的仅仅遍历集合进行异步操作而已

ForeachAction

  1. public static IEnumerable<Operator> Foreach<TParameter>(IEnumerable<TParameter> items, Action<TParameter> action)
  2. {
  3. return items.Select(t => Invoke(action, t)).ToList();
  4. }

ForeachFunc

  1. public static IEnumerable<Operator> Foreach<TParameter, TResult>(IEnumerable<TParameter> items, Func<TParameter, TResult> func)
  2. {
  3. return items.Select(parameter => Invoke(func, parameter)).ToList();
  4. }

如何使用

  1. public void DoSomeThing()
  2. {
  3. //TODO:
  4. }

通过Asynchronous.Invoke(DoSomeThing) 执行

  1. public void DoSomeThing(string parameter)
  2. {
  3. //TODO:
  4. }

通过Asynchronous.Invoke(DoSomeThing,parameter) 执行

  1. public string DoSomeThing()
  2. {
  3. //TODO:
  4. }

通过Asynchronous.Invoke(()=>DoSomeThing())执行

  1. public string DoSomeThing(string parameter)
  2. {
  3. //TODO:
  4. }

通过Asynchronous.Invoke(()=>DoSomeThing(parameter))执行,或者也可以传入参数通过Asynchronous.Invoke(p=>DoSomeThing(p),parameter)

  1. public void Test
  2. {
  3. int[] parameters = {1,2,3,4,5};
  4. Asynchronous.Foreach(parameters,Console.WriteLine);
  5. }
  1. public void Test
  2. {
  3. int[] parameters = {1,2,3,4,5};
  4. var operators = Asynchronous.Foreach(parameters,p=> p*2);
  5. Asynchrous.WaitAll(operators);
  6. Asynchronous.Foreach(operators.Cast<FuncAsync<int,int>>(),
  7. p=> Console.WriteLine(p.Result));
  8. }

首先将集合每个值扩大2倍,然后输出

  1. public void Test
  2. {
  3. int[] parameters = {1,2,3,4,5};
  4. var operators = Asynchronous.Foreach(parameters,p=> p*2);
  5. Asynchrous.ContinueWithAsync(operators,Console.WriteLine,"执行完成");
  6. }
  1. public void Test
  2. {
  3. int[] parameters = {1,2,3,4,5};
  4. var operators = Asynchronous.Foreach(parameters,p=> p*2);
  5. Asynchronous.Foreach(operators,o=>{
  6. o.ContinueWithAsync(()={
  7. //每个元素执行完时执行
  8. if(o.Exception != null)
  9. {
  10. //之前执行时产生未处理的异常,这里可以捕获到
  11. }
  12. });
  13. });
  14. }
  1. public void Chain()
  2. {
  3. Asynchronous.Invoke(Console.WriteLine,1)
  4. .ContinueWithAsync(Console.WriteLine,2)
  5. .ContinueWithAsync(Console.WriteLine,3)
  6. }

这样会按步骤输出1,2,3

结束语

以上只是列出了部分重载方法,其他重载方法无非就是加参数,本质实际是一样的,具体可以下载源码。我还封装了for方法,但是感觉没什么用,而且也从没有用到过,这里不再提
通过以上的封装,已经能完成日常大部分的操作,调用还是比较方便的。

本文发布至作业部落

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注