我需要为审批流程建模.之前很简单.两个角色必须批准某些东西,然后我们可以继续下一步:
public class Approved
{
public string ApproverRole;
}
var approvals = Subscribe<Approved>();
var vpOfFinance = approvals.Where(e => e.ApproverRole == "Finance VP");
var vpOfSales = approvals.Where(e => e.ApproverRole == "Sales VP");
var approvedByAll = vpOfFinance.Zip(vpOfSales, Tuple.Create);
approvedByAll.Subscribe(_ => SomeInterestingBusinessProcess());
但现在有一个新要求:批准某些内容所需的角色数量可能会有所不同:
public class ApprovalRequested
{
public string[] Roles;
}
var approvalRequest = Subscribe<ApprovalRequested>().Take(1);
var approvals = Subscribe<Approved>();
var approvedByAll = ???;
approvedByAll.Subscribe(_ => SomeInterestingBusinessProcess());
我觉得我错过了一些非常明显的东西……有人能指出我正确的方向吗?
编辑
澄清:批准过程是基于每个项目.批准可以到达的顺序是不确定的.我们不关心一个角色是否多次批准某个项目.
解决方法:
问题基本上可以减少为从值流中创建一个集合,其中值可能是乱序的或本质上很多.
如果N是集合的基数,我们可以简单地假设在至少推出N种类型的值(在这种情况下为角色)之前,该过程将不会继续.
这是Zip运算符的示例解决方案;也许这可以让你开始:
public static IObservable<IList<T>> Zip<T>(this IList<IObservable<T>> observables)
{
return Observable.Create<IList<T>>(observer =>
{
List<List<T>> store = new List<List<T>>(Enumerable.Range(1, observables.Count).Select(_ => new List<T>()));
return new CompositeDisposable(observables.Select((o, i) =>
o.Subscribe(value =>
{
lock (store)
{
store[i].Add(value);
if (store.All(list => list.Count > 0))
{
observer.OnNext(store.Select(list => list[0]).ToList());
store.ForEach(list => list.RemoveAt(0));
}
}
}))
);
});
}
测试:
Observable.Interval(TimeSpan.FromSeconds(0.5))
.GroupBy(i => i % 3)
.Select(gr => gr.AsObservable())
.Buffer(3)
.SelectMany(set => set.Zip())
.Subscribe(v => Console.WriteLine(String.Join(",", v)));
这里的一个问题是,在组形成时您可能会丢失初始值,因此您可能希望通过将方法重写为IObservable< IList< T>>来合并它. Zip< TKey,T>(此IGroupedObservable< TKey,T> observables).