c# – 嵌套的Observable在Wait()上挂起

在C#控制台应用程序中,使用System.Reactive.Linq,我正在尝试创建一个observable,其中每个项目是由另一个observable进行某些处理的字符串结果.
我用字符串和字符创建了一个简单的repro.
警告,此示例完全是CONTRIVED,重点是嵌套的.Wait()挂起.

class Program
{
    static void Main(string[] args)
    {
        string[] fileNames = { "file1.doxc", "file2.xlsx", "file3.pptx" };
        IObservable<string> files = fileNames.ToObservable();
        string[] extensions = files.Select(fn =>
        {
            var extension = fn.ToObservable()
            .TakeLast(4)
            .ToArray()
            .Wait(); // <<<<<<<<<<<<< HANG HERE
            return new string(extension);
        })
        .ToArray()
        .Wait();
    }
}

同样,这不是我如何找到许多文件名的后缀.
问题是我如何生成一个Observable of strings,其中字符串是从一个完整的observable计算出来的.

如果我拿出这个代码并单独运行它就可以了.

     var extension = fn.ToObservable()
        .TakeLast(4)
        .ToArray()
        .Wait();

有一些关于异步方法的嵌套Wait(),我不明白.

我如何编写嵌套的异步observable,所以我可以生成一个简单的字符串数组?

谢谢

-约翰

解决方法:

您的代码阻塞的原因是您在不指定调度程序的情况下使用ToObservable().在这种情况下,它将使用CurrentThreadScheduler.

因此,文件observable使用当前线程发出它的第一个OnNext()[A](发送“file1.doxc”).在OnNext()返回之前,它无法继续迭代.但是,内部fn observable也使用ToObservable()和Wait()块直到fn完成 – 它会将第一个OnNext()(发送“f”)排队到当前线程调度程序,但它永远无法发送它,因为现在第一个OnNext()[A]将永远不会返回.

两个简单的修复:

要么像这样改变文件可观察性:

IObservable<string> files = fileNames.ToObservable(NewThreadScheduler.Default);

或者,避免使用带有SelectMany的内部Wait()(这肯定是更惯用的Rx):

string[] extensions = files.SelectMany(fn =>
{
    return fn.ToObservable()
             .TakeLast(4)
             .ToArray()
             .Select(x => new string(x));
})
.ToArray()
.Wait();

// display results etc.

每种方法都有完全不同的执行语义 – 第一种方法将像嵌套循环一样运行,每个内部observable在下一次外部迭代之前完成.第二个将更加交错,因为删除了Wait()的阻塞行为.如果你使用我编写的Spy方法并在ToObservable()调用之后附加它,你会很清楚地看到这种行为.

上一篇:javascript – 在保持订阅的同时更改可观察流


下一篇:rxjava2的使用(1)