《C# 爬虫 破境之道》:第一境 爬虫原理 — 第五节:数据流处理的那些事儿

为什么说到数据流了呢,因为上一节中介绍了一下异步发送请求。同样,在数据流的处理上,C#也为我们提供几个有用的异步处理方法。而且,爬虫这生物,处理数据流是基础本能,比较重要。本着这个原则,就聊一聊吧。

我们经常使用到的流有文件流、内存流、网络流,爬虫与这三种流都有着密不可分的联系,可以联想以下这些场景:

  • 当我们采集的数据,是一个压缩包或者照片,那么要存储它们到硬盘上,就需要使用到文件流了;
  • 当我们采集的数据,是经过GZip等压缩算法压缩过的,那么要解压它,就需要使用到内存流了;
  • 当我们的爬虫运行起来,就需要用到网络了,使用网络流是必然不可缺少的了;

所以,对流的操作,也是一个必要重要的环节;除了上面列举的几个场景之外,还有很多场景会涉及到流的处理,就不一一列举了,数不胜数;但每种流的处理,都对应其相应的I/O操作。所以,在DotNetFramework中,封装了System.IO.Stream这个基础流,在其基础之上,派生出很多有用的流;

我们在这里结合上一节中第一种异步请求方式的案例,来讲述爬虫中的网络流处理,其他类型的流处理,也是触类旁通的,文件流、内存流,在后续章节中,都会有所涉及,只是不会当作专题来讲解了。

在爬虫中,我们主要面临的网络流,有两个:

  • RequestStream:请求流
  • ResponseStream:回复流

当然,这里说的爬虫,还很小,只是基于WebRequest、WebResponse的,等后面我们再继续下沉,让它再成长成长,到Socket层面,我们要处理的网络流主要就是System.Net.Sockets.NetworkStream了,不过先不急,以小见大,也是很好的事情:)

至于为什么要使用流,上一节中已经举例说明了,这里就不再赘述。

 

第一部分:同步方式处理数据流

[Code 5.1.1] 

《C# 爬虫 破境之道》:第一境 爬虫原理 — 第五节:数据流处理的那些事儿
 1 {
 2     Stopwatch watch = new Stopwatch();
 3     Console.WriteLine("/* ********** 异步请求方式 * BeginGetResponse() & EndGetResponse() **********/");
 4     watch.Start();
 5     {
 6         var request = WebRequest.Create(@"https://tool.runoob.com/compile.php");
 7         request.Method = WebRequestMethods.Http.Post;
 8         request.ContentType = @"application/x-www-form-urlencoded; charset=UTF-8";
 9 
10         var requestDataBuilder = new StringBuilder();
11         requestDataBuilder.AppendLine("using System;");
12         requestDataBuilder.AppendLine("namespace HelloWorldApplication");
13         requestDataBuilder.AppendLine("{");
14         requestDataBuilder.AppendLine("    class HelloWorld");
15         requestDataBuilder.AppendLine("    {");
16         requestDataBuilder.AppendLine("        static void Main(string[] args)");
17         requestDataBuilder.AppendLine("        {");
18         requestDataBuilder.AppendLine("            Console.WriteLine(\"《C# 爬虫 破境之道》\");");
19         requestDataBuilder.AppendLine("        }");
20         requestDataBuilder.AppendLine("    }");
21         requestDataBuilder.AppendLine("}");
22 
23         var requestData = Encoding.UTF8.GetBytes(@"code=" + System.Web.HttpUtility.UrlEncode(requestDataBuilder.ToString())
24             + @"&token=4381fe197827ec87cbac9552f14ec62a&language=10&fileext=cs");
25         requestDataBuilder.Clear();
26         request.ContentLength = requestData.Length;
27         var requestStream = request.GetRequestStream();
28         requestStream.Write(requestData, 0, requestData.Length);
29         request.BeginGetResponse(new AsyncCallback(ar =>
30         {
31             using (var response = (ar.AsyncState as WebRequest).EndGetResponse(ar))
32             {
33                 using (var stream = response.GetResponseStream())
34                 {
35                     using (var reader = new StreamReader(stream, new UTF8Encoding(false)))
36                     {
37                         var content = reader.ReadToEnd();
38                         Console.WriteLine(content.Length > 100 ? content.Substring(0, 90) + "..." : content);
39                     }
40                 }
41                 response.Close();
42             }
43 
44             watch.Stop();
45             Console.WriteLine("/* ********************** using {0}ms / request  ******************** */"
46                 + Environment.NewLine + Environment.NewLine, (watch.Elapsed.TotalMilliseconds / 100).ToString("000.00"));
47         }), request);
48     }
49 }
同步方式处理网络流

相对于异步发送的案例,代码的变动主要在第7行到第28行。

首先7、8行,为request的两个属性赋值发生了变化,我们要操作RequestStream,一定要指定合适的Method,POST或PUT等,其他的Method并不支持对流操作,就会出错;另外就是然使用流了,流里的数据到底是个什么,服务器端应该如何解释,可以通过ContentType来指定,有时候服务器端并不是那么严谨,可能稀里糊涂的也就过去了;

接下来,第10~21行,我构建了一个字符串,作为要提交的主体数据,在第23行,将字符串转换为字节数组;对流操作,字节数组和编码都是跑不掉的,时而绕晕,时而迷糊,也是很正常的:)

第26行,指定填充到数据流的数据长度;说到这个长度,再啰嗦一下HTTP协议,用Wireshark随便抓个包当个栗子

[Code 5.1.2] 

《C# 爬虫 破境之道》:第一境 爬虫原理 — 第五节:数据流处理的那些事儿
 1 HTTP/1.1 200 OK
 2 Date: Fri, 10 Jan 2020 08:10:02 GMT
 3 Server: Apache/2.2.9 (APMServ) PHP/5.2.6
 4 Last-Modified: Sun, 05 Jan 2020 10:29:06 GMT
 5 ETag: "29000000008812-616-59b620334ab88"
 6 Accept-Ranges: bytes
 7 Content-Length: 1558  <-----------这里不对,是因为我把下面xml精简了一下,要不太长。
 8 Content-Type: application/xml
 9 
10 <?xml version="1.0" encoding="gb2312"?>
11 <root>
12 <FileList>
13     <FileName version="20181122">sound/FaceSuccess.wav</FileName>
14 </FileList>
15 </root>
某请求的回复报文

第1行,请求和回复不太一样,具体就不说了,大体就是HTTP协议的版本、URI地址、状态等;

第2~8行,就是协议头,一行一对,对应我们WebRequest和WebResponse里的Headers;

第9行,是一个空行(\r\n),不要以为是我为了美观加的,这也是协议的一部分;它的作用就是来分隔协议头和协议体的;

从第10行到第15行,就是协议体,也就是我们流中的内容了。

再回到我们刚才说的ContentLength属性,这个属性的值,其实就是协议体(报文中第10到15行)的字节长度;

WebRequest和WebResponse都有这个属性,这样,就给我们一个制作进度条的可能性,比如下载一个AV,可以显示已经下载了多少了,占比是多少,之类的。但为什么说是可能性呢,因为这个属性,无论是Request还是Response的时候,都可以不指定,它有默认值:-1;也就是说,当ContentLength==-1的时候,数据的长度将以实际发送或收到的数据长度为准,这就对数据的完整性校验和传送进度的统计产生了困难。所以,我们最好在刚开始学习的时候,就养成为它们赋值的好习惯;就啰嗦这么多吧。

再回到[Code 5.1.1] 中,继续第27行,这里就是以同步的方式来获取请求流了,线程将在这里阻塞。同样,第33行获取回复流,也是同样的道理。

其余就是写流、读流的操作,没什么好说的了~

 

第二部分:异步方式处理数据流

首先,我们定义一个结构体[WebAsyncContext],用来存储上下文中使用的变量;

[Code 5.2.1] 

《C# 爬虫 破境之道》:第一境 爬虫原理 — 第五节:数据流处理的那些事儿
1 public class WebAsyncContext
2 {
3     public System.Net.WebRequest Request { get; set; }
4     public System.Net.WebResponse Response { get; set; }
5     public System.IO.Stream RequestStream { get; set; }
6     public System.IO.Stream ResponseStream { get; set; }
7     public System.IO.MemoryStream Memory { get; set; }
8     public byte[] Buffer { get; set; }
9 }
WebAsyncContext

比较简单,不做解释了,接下来,就是一票子异步操作了,别眨眼~

[Code 5.2.2]

《C# 爬虫 破境之道》:第一境 爬虫原理 — 第五节:数据流处理的那些事儿
 1 {
 2     Stopwatch watch = new Stopwatch();
 3     Console.WriteLine("/* ********** 异步请求方式 * 异步方式处理数据流 **********/");
 4     watch.Start();
 5     {
 6         var requestDataBuilder = new StringBuilder();
 7         requestDataBuilder.AppendLine("using System;");
 8         requestDataBuilder.AppendLine("namespace HelloWorldApplication");
 9         requestDataBuilder.AppendLine("{");
10         requestDataBuilder.AppendLine("    class HelloWorld");
11         requestDataBuilder.AppendLine("    {");
12         requestDataBuilder.AppendLine("        static void Main(string[] args)");
13         requestDataBuilder.AppendLine("        {");
14         requestDataBuilder.AppendLine("            Console.WriteLine(\"《C# 爬虫 破境之道》\");");
15         requestDataBuilder.AppendLine("        }");
16         requestDataBuilder.AppendLine("    }");
17         requestDataBuilder.AppendLine("}");
18 
19         var requestData = Encoding.UTF8.GetBytes(@"code=" + System.Web.HttpUtility.UrlEncode(requestDataBuilder.ToString())
20             + @"&token=4381fe197827ec87cbac9552f14ec62a&language=10&fileext=cs");
21 
22         var context = new WebAsyncContext { Request = WebRequest.Create(@"https://tool.runoob.com/compile.php"), Buffer = requestData };
23 
24         requestData = null;
25         requestDataBuilder.Clear();
26 
27         context.Request.ContentLength = context.Buffer.Length;
28         context.Request.Method = WebRequestMethods.Http.Post;
29         context.Request.ContentType = @"application/x-www-form-urlencoded; charset=UTF-8";
30         context.Request.BeginGetRequestStream(acGetRequestStream =>
31         {
32             var contextGetRequestStream = acGetRequestStream.AsyncState as WebAsyncContext;
33             contextGetRequestStream.RequestStream = contextGetRequestStream.Request.EndGetRequestStream(acGetRequestStream);
34             contextGetRequestStream.RequestStream.BeginWrite(contextGetRequestStream.Buffer, 0, contextGetRequestStream.Buffer.Length, acWriteStream =>
35             {
36                 var contextWriteRequestStream = acWriteStream.AsyncState as WebAsyncContext;
37                 contextWriteRequestStream.RequestStream.EndWrite(acWriteStream);
38                 contextWriteRequestStream.Request.BeginGetResponse(new AsyncCallback(acGetResponse =>
39                 {
40                     var contextGetResponse = acGetResponse.AsyncState as WebAsyncContext;
41                     using (contextGetResponse.Response = contextGetResponse.Request.EndGetResponse(acGetResponse))
42                     using (contextGetResponse.ResponseStream = contextGetResponse.Response.GetResponseStream())
43                     using (contextGetResponse.Memory = new MemoryStream())
44                     {
45                         contextGetResponse.Buffer = new Byte[512];
46                         var readCount = 0;
47                         do
48                         {
49                             var acReadStreamResult = contextGetResponse.ResponseStream.BeginRead(contextGetResponse.Buffer, 0, contextGetResponse.Buffer.Length, acReadStream =>
50                             {
51                                 readCount = (acReadStream.AsyncState as WebAsyncContext).ResponseStream.EndRead(acReadStream);
52                                 contextGetResponse.Memory.Write(contextGetResponse.Buffer, 0, readCount);
53                             }, contextGetResponse);
54 
55                             acReadStreamResult.AsyncWaitHandle.WaitOne();
56                         } while (readCount > 0);
57 
58                         contextGetResponse.RequestStream.Close();
59                         contextGetResponse.Request.Abort();
60                         contextGetResponse.Response.Close();
61                         contextGetResponse.Buffer = null;
62 
63                         var content = new UTF8Encoding(false).GetString(contextGetResponse.Memory.ToArray());
64                         Console.WriteLine(content.Length > 100 ? content.Substring(0, 90) + "..." : content);
65 
66                         watch.Stop();
67                         Console.WriteLine("/* ********************** using {0}ms / request  ******************** */"
68                             + Environment.NewLine + Environment.NewLine, (watch.Elapsed.TotalMilliseconds / 100).ToString("000.00"));
69                     }
70                 }), contextWriteRequestStream);
71             }, contextGetRequestStream);
72         }, context);
73     }
74 }
精彩的部分来了~

代码眨一看,挺吓人,而且网页显示出来,也不那么美观,还是拷贝到VS中看吧;

个人还是比较喜欢这种风格,比较符合人的阅读习惯,从上往下看,就是正常的逻辑处理流程,感觉总比在一个又一个方法之间来回跳跃阅读要好得多;

所以,耐心一点看,还是可以看得明白的:)

归纳一下,基本上,异步操作就是以BeginXXX开始(不阻塞线程),以EndXXX结束(阻塞线程);

这里的特例就是MemoryStream的读写,没有使用异步方法,因为在其内部,异步方法和同步方法是一样的实现,所以,就没有必要搞那么麻烦了。

另外就是,我们看到用了BeginGetRequestStream(),却没有提供对应的BeginGetResponseStream()方法,这是为什么呢,我猜测是因为在EndGetResponse()的时候,就已经拿到了ResponseStream的句柄,所以没有必要再异步拿一次了。

网上还有同学问,既然已经BenginGetResponse()了,还要使用BeginRead()来异步读取呢,有这个必要吗?其实还是有必要的,如果传输的数据量很大,或者网络状态不好,Read()可能可能会阻塞很久,完全可以通过BeginRead()来解放CPU,多干点儿其他的事情。

 

本来还想写一写使用XXXAsync()的范例,不过实在太困了,以后有机会再写吧:(

《C# 爬虫 破境之道》:第一境 爬虫原理 — 第五节:数据流处理的那些事儿

上一篇:C#反射与特性(六):设计一个仿ASP.NETCore依赖注入Web


下一篇:03【掌握】dubbo环境搭建windows