CsvHelper-异步读取流

我有一项服务,该服务接受包含需要大量插入到数据库中的CSV数据的输入流,并且我的应用程序尽可能使用async / await.

过程是:使用CsvHelper的CsvParser解析流,将每一行添加到DataTable,使用SqlBulkCopy将DataTable复制到数据库.

数据可以是任何大小,因此我想避免一次将整个内容读入内存-显然,无论如何我最终都会在DataTable中拥有所有这些数据,因此在内存中实际上将有2个副本.

我想尽可能异步地完成所有这些操作,但是CsvHelper没有任何异步方法,因此我想出了以下解决方法:

using (var inputStreamReader = new StreamReader(inputStream))
{
    while (!inputStreamReader.EndOfStream)
    {
        // Read line from the input stream
        string line = await inputStreamReader.ReadLineAsync();

        using (var memoryStream = new MemoryStream())
        using (var streamWriter = new StreamWriter(memoryStream))
        using (var memoryStreamReader = new StreamReader(memoryStream))
        using (var csvParser = new CsvParser(memoryStreamReader))
        {
            await streamWriter.WriteLineAsync(line);
            await streamWriter.FlushAsync();

            memoryStream.Position = 0;

            // Loop through all the rows (should only be one as we only read a single line...)
            while (true)
            {
                var row = csvParser.Read();

                // No more rows to process
                if (row == null)
                {
                    break;
                }

                // Add row to DataTable
            }
        }
    }
}

这个解决方案有什么问题吗?甚至有必要吗?我已经看到CsvHelper开发人员没有专门添加异步功能(https://github.com/JoshClose/CsvHelper/issues/202),但我并没有真正遵循不这样做的原因.

编辑:我刚刚意识到,该解决方案对于列包含换行符的实例是行不通的:(猜猜我只需要将整个输入流复制到MemoryStream或其他内容中

EDIT2:更多信息.

这是在库中的一种异步方法中,在该方法中,我一直尝试进行异步操作.它很可能会被MVC控制器使用(如果我只是想从UI线程中卸载它,那么我将使用Task.Run()).通常,该方法将在数据库/ DFS等外部源上等待,我希望在线程释放时释放该线程.

即使正在阻止的内容正在读取Stream,CsvParser.Read()也会被阻止(例如,如果我尝试读取的数据驻留在世界另一端的服务器上),而如果CsvHelper要实现异步方法使用TextReader.ReadAsync()的对象,那么我将不会被阻止等待我的数据从迪拜到达.据我所知,我并没有要求围绕同步方法进行异步包装.

解决方法:

埃里克·利珀特(Eric lippert)解释了使用a metaphor of cooking a meal in a restaurant进行异步等待的有用性.根据他的解释,如果您的线程没有其他事情要做,则异步执行某些操作是没有用的.

另外,请注意,当线程正在执行某项操作时,它将无法执行其他操作.只有线程正在等待某件事时,它才能做其他事情.您在过程中等待的事情之一就是读取文件.线程读取文件时,它必须等待几次才能读取文件部分.在读取过程中,它还可以做其他事情,例如解析读取的CSV数据并将解析的数据发送到目的地.

解析数据不是您的线程必须等待其他进程完成的过程,就像读取文件或将数据发送到数据库时一样.这就是为什么没有异步版本的解析过程的原因.正常的async-await并不能使您的线程忙,因为在解析过程中没有什么可等待的,因此在解析过程中,您的线程将没有时间做其他事情.

您当然可以使用Task.Run(()=> ParseReadData(…))将解析过程转换为可等待的任务,并等待此任务完成,但是类似于Eric Lippert的餐厅,这将为解冻一个厨师做这项工作,而你坐在柜台后面什么也没做.

但是,如果您的线程有有意义的事情要做,而在解析读取的CSV数据时(例如响应用户输入),则在单独的任务中开始解析可能会很有用.

如果您完整的阅读-解析-更新数据库过程不需要与用户交互,但是您需要线程在执行过程时可以*地做其他事情,请考虑将整个过程放在单独的任务中,然后开始执行任务没有等待它.在这种情况下,您仅使用接口线程来启动其他任务,并且接口线程可以*地执行其他操作.与您的过程总时间相比,开始这项新任务的成本相对较低.

再说一遍:如果您的线程无事可做,请让该线程进行处理,不要启动其他任务来进行处理.

上一篇:使用Csvhelper读取CSV文件某行某列数据


下一篇:使用CSVhelper C#合并具有不同标题的CSV文件