c# – Observable.Zip当zip到序列的数量在运行时之前是未知的

我需要为审批流程建模.之前很简单.两个角色必须批准某些东西,然后我们可以继续下一步:

public class Approved
{
    public string ApproverRole;
}

var approvals = Subscribe<Approved>();

var vpOfFinance = approvals.Where(e => e.ApproverRole == "Finance VP");
var vpOfSales = approvals.Where(e => e.ApproverRole == "Sales VP");

var approvedByAll = vpOfFinance.Zip(vpOfSales, Tuple.Create);

approvedByAll.Subscribe(_ => SomeInterestingBusinessProcess());

但现在有一个新要求:批准某些内容所需的角色数量可能会有所不同:

public class ApprovalRequested
{
    public string[] Roles;
}
var approvalRequest = Subscribe<ApprovalRequested>().Take(1);
var approvals = Subscribe<Approved>();

var approvedByAll = ???;

approvedByAll.Subscribe(_ => SomeInterestingBusinessProcess());

我觉得我错过了一些非常明显的东西……有人能指出我正确的方向吗?

编辑

澄清:批准过程是基于每个项目.批准可以到达的顺序是不确定的.我们不关心一个角色是否多次批准某个项目.

解决方法:

问题基本上可以减少为从值流中创建一个集合,其中值可能是乱序的或本质上很多.

如果N是集合的基数,我们可以简单地假设在至少推出N种类型的值(在这种情况下为角色)之前,该过程将不会继续.

这是Zip运算符的示例解决方案;也许这可以让你开始:

    public static IObservable<IList<T>> Zip<T>(this IList<IObservable<T>> observables)
    {
        return Observable.Create<IList<T>>(observer =>
        {
            List<List<T>> store = new List<List<T>>(Enumerable.Range(1, observables.Count).Select(_ => new List<T>()));

            return new CompositeDisposable(observables.Select((o, i) => 
                o.Subscribe(value =>
                {
                    lock (store)
                    {
                        store[i].Add(value);

                        if (store.All(list => list.Count > 0))
                        {
                            observer.OnNext(store.Select(list => list[0]).ToList());
                            store.ForEach(list => list.RemoveAt(0));
                        }
                    }
                }))
            );
        });
    }

测试:

        Observable.Interval(TimeSpan.FromSeconds(0.5))
                  .GroupBy(i => i % 3)
                  .Select(gr => gr.AsObservable())
                  .Buffer(3)                      
                  .SelectMany(set => set.Zip())
                  .Subscribe(v => Console.WriteLine(String.Join(",", v)));

这里的一个问题是,在组形成时您可能会丢失初始值,因此您可能希望通过将方法重写为IObservable< IList< T>>来合并它. Zip< TKey,T>(此IGroupedObservable< TKey,T> observables).

上一篇:自主移动机器人市场现状研究分析与发展前景预测报告


下一篇:javascript – RxJS 6获取Observable数组的筛选列表