前言
接下来我们要分析的是拦截器链上的最后一个拦截器CallServerInterceptor
,它用于写入请求与获取响应,这里不需要再调用拦截器责任链的proceed
方法,CallServerInterceptor
在intercept
方法中将自己的工作做完后,就直接将响应返回给上一拦截器。
CallServerInterceptor::intercept
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.exchange!!
val request = realChain.request
val requestBody = request.body
val sentRequestMillis = System.currentTimeMillis()
// 写入请求头
exchange.writeRequestHeaders(request)
var invokeStartEvent = true
var responseBuilder: Response.Builder? = null
// 如果请求方法允许携带请求体(非GET请求,非HEAD请求),并且请求体不为空
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
// 当客户端需要发送一个体积可能很大的消息体时,就会将Expect: 100-continue写入请求头,然后发送
// 不带请求体的请求给服务端,然后服务端开始检查请求消息头,如果服务端满足条件就会返回一个状态码
// 100的回复告知客户端继续发送消息体,否则返回一个状态码417(Expectation Failed)的回复来告知
// 客户端不满足期待条件,别发送请求体。
// 若客户端包含Expect: 100-continue请求头,则进入下面的If语句。
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
exchange.flushRequest()
// 构建Response.Builder,当response状态码为100时,返回null
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
exchange.responseHeadersStart()
invokeStartEvent = false
}
// 客户端不包含"Expect: 100-continue",或者,客户端包含"Expect: 100-continue"并且服务端
// 满足期待条件
if (responseBuilder == null) {
// 请求体是否为双工的,默认情况下返回false
if (requestBody.isDuplex()) {
// Prepare a duplex body so that the application can send a request body later.
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
requestBody.writeTo(bufferedRequestBody)
} else {
// Write the request body if the "Expect: 100-continue" expectation was met.
// 如果满足Expect: 100-continue,并且请求体不是双工的,则写入请求体
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
} else {
// 服务器未达到"Except: 100-continue"期望,不发送请求体
exchange.noRequestBody()
// 若连接不是多路复用的
if (!exchange.connection.isMultiplexed) {
exchange.noNewExchangesOnConnection()
}
}
} else {
// 不发送请求体
exchange.noRequestBody()
}
// 默认下,requestBody.isDuplex()返回false,进入If语句
if (requestBody == null || !requestBody.isDuplex()) {
// 结束请求处理
exchange.finishRequest()
}
if (responseBuilder == null) {
// 读取响应头
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
invokeStartEvent = false
}
}
// 构建响应,包含:原始请求、握手信息、发送时间、接收时间
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
var code = response.code
if (code == 100) {
// Server sent a 100-continue even though we did not request one. Try again to read the actual
// response status.
// 即使我们没有请求,服务端也返回了状态码为100的响应,再次尝试去读实际的响应状态
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
}
response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
code = response.code
}
exchange.responseHeadersEnd(response)
// forWebSocket默认为false,code为101表示切换协议
response = if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response.newBuilder()
.body(EMPTY_RESPONSE)
.build()
} else {
// 读取响应体
response.newBuilder()
.body(exchange.openResponseBody(response))
.build()
}
// 是否要关闭TCP连接
if ("close".equals(response.request.header("Connection"), ignoreCase = true) ||
"close".equals(response.header("Connection"), ignoreCase = true)) {
exchange.noNewExchangesOnConnection()
}
if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {
throw ProtocolException(
"HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")
}
// 将响应返回给上一个拦截器
return response
}
该方法的大致流程如下:
- 写入请求头(
Exchange.writeRequestHeaders
) - 写入请求体(
Exchange.createRequestBody & requestBody.writeTo
) - 读取响应头(
Exchange.readResponseHeaders
) - 读取响应体(
Exchange.openResponseBody
)
可以看出,该拦截器主要是利用了Exchange
的各种方法,完成请求和响应的写与读,在Exchange
里面,实际的I/O
操作交由ExchangeCodec
处理,ExchangeCodec
是一个接口,它有Http1ExchangeCodec
和Http2ExchangeCodec
两个实现类,分别对应HTTP/1.x
、HTTP/2
协议版本。
ExchangeCodec
ExchangeCodec
是一个接口,负责对Http
请求进行编码和对Http
响应进行解码。有如下的变量与方法
Exchange::writeRequestHeaders
调用了ExchangeCodec
的writeRequestHeaders
方法;
@Throws(IOException::class)
fun writeRequestHeaders(request: Request) {
try {
eventListener.requestHeadersStart(call)
codec.writeRequestHeaders(request)
eventListener.requestHeadersEnd(call, request)
} catch (e: IOException) {
eventListener.requestFailed(call, e)
trackFailure(e)
throw e
}
}
Exchange::createRequestBody
调用了ExchangeCodec
的createRequestBody
方法;
@Throws(IOException::class)
fun createRequestBody(request: Request, duplex: Boolean): Sink {
this.isDuplex = duplex
val contentLength = request.body!!.contentLength()
eventListener.requestBodyStart(call)
val rawRequestBody = codec.createRequestBody(request, contentLength)
return RequestBodySink(rawRequestBody, contentLength)
}
Exchange::readResponseHeaders
调用了ExchangeCodec
的readResponseHeaders
方法
@Throws(IOException::class)
fun readResponseHeaders(expectContinue: Boolean): Response.Builder? {
try {
val result = codec.readResponseHeaders(expectContinue)
result?.initExchange(this)
return result
} catch (e: IOException) {
eventListener.responseFailed(call, e)
trackFailure(e)
throw e
}
}
Exchange::openResponseBody
调用了ExchangeCodec
的openResponseBodySource
方法
@Throws(IOException::class)
fun openResponseBody(response: Response): ResponseBody {
try {
val contentType = response.header("Content-Type")
val contentLength = codec.reportedContentLength(response)
val rawSource = codec.openResponseBodySource(response)
val source = ResponseBodySource(rawSource, contentLength)
return RealResponseBody(contentType, contentLength, source.buffer())
} catch (e: IOException) {
eventListener.responseFailed(call, e)
trackFailure(e)
throw e
}
}
Http1ExchangeCodec
writeRequestHeaders
override fun writeRequestHeaders(request: Request) {
// 拼接请求行,包含 method、url、HTTP/1.1
val requestLine = RequestLine.get(request, connection.route().proxy.type())
// 写入请求Header和请求行
writeRequest(request.headers, requestLine)
}
该方法先是拼接请求行,接着写入请求Header
和请求行。
Http1ExchangeCodec::writeRequest
如下
fun writeRequest(headers: Headers, requestLine: String) {
check(state == STATE_IDLE) { "state: $state" }
// 写入请求行
sink.writeUtf8(requestLine).writeUtf8("\r\n")
// 写入请求Header
for (i in 0 until headers.size) {
sink.writeUtf8(headers.name(i))
.writeUtf8(": ")
.writeUtf8(headers.value(i))
.writeUtf8("\r\n")
}
sink.writeUtf8("\r\n")
state = STATE_OPEN_REQUEST_BODY
}
createRequestBody
Http1ExchangeCodec::createRequestBody
:
override fun createRequestBody(request: Request, contentLength: Long): Sink {
return when {
request.body != null && request.body.isDuplex() -> throw ProtocolException(
"Duplex connections are not supported for HTTP/1")
// 分块传输
request.isChunked -> newChunkedSink() // Stream a request body of unknown length.
// 请求体的长度已确定
contentLength != -1L -> newKnownLengthSink() // Stream a request body of a known length.
else -> // Stream a request body of a known length.
throw IllegalStateException(
"Cannot stream a request body without chunked encoding or a known content length!")
}
}
若要分块传输,则调用newChunkedSink
方法获取输出流;
若不使用分块传输,请求体的长度已确定,则调用newKnownLengthSink
方法获取输出流。
newChunkedSink
Http1ExchangeCodec::newChunkedSink
:
private fun newChunkedSink(): Sink {
// 检查状态
check(state == STATE_OPEN_REQUEST_BODY) { "state: $state" }
state = STATE_WRITING_REQUEST_BODY
return ChunkedSink()
}
这里创建了一个ChunkedSink
并返回,ChunkedSink
是Http1ExchangeCodec
的普通内部类,且实现了Sink
接口
/**
* An HTTP body with alternating chunk sizes and chunk bodies. It is the caller's
* responsibility to buffer chunks; typically by using a buffered sink with this sink.
*/
// ChunkedSink含有两个信息:分块的数据长度 和 分块的具体数据
private inner class ChunkedSink : Sink {
...
override fun write(source: Buffer, byteCount: Long) {
check(!closed) { "closed" }
if (byteCount == 0L) return
sink.writeHexadecimalUnsignedLong(byteCount)
sink.writeUtf8("\r\n")
sink.write(source, byteCount)
sink.writeUtf8("\r\n")
}
...
}
在write
方法中,首先写入了十六进制的数据大小,接着写入了数据。
newKnownLengthSink
Http1ExchangeCodec::newKnownLengthSink
:
private fun newKnownLengthSink(): Sink {
// 检查状态
check(state == STATE_OPEN_REQUEST_BODY) { "state: $state" }
state = STATE_WRITING_REQUEST_BODY
return KnownLengthSink()
}
这里创建了一个KnownLengthSink
并返回,KnownLengthSink
是Http1ExchangeCodec
的普通内部类,且实现了Sink
接口
/** An HTTP request body. */
private inner class KnownLengthSink : Sink {
...
override fun write(source: Buffer, byteCount: Long) {
check(!closed) { "closed" }
checkOffsetAndCount(source.size, 0, byteCount)
sink.write(source, byteCount)
}
...
}
在KnownLengthSink
的write
方法中,完成了对数据的写入。
readResponseHeaders
Http1ExchangeCodec::readResponseHeaders
如下:
override fun readResponseHeaders(expectContinue: Boolean): Response.Builder? {
check(state == STATE_OPEN_REQUEST_BODY || state == STATE_READ_RESPONSE_HEADERS) {
"state: $state"
}
try {
// 读取并解析响应行内容
val statusLine = StatusLine.parse(headersReader.readLine())
// 根据响应行和响应头,构建Response.Builder
val responseBuilder = Response.Builder()
.protocol(statusLine.protocol)
.code(statusLine.code)
.message(statusLine.message)
.headers(headersReader.readHeaders())
return when {
// 若expectContinue为true且响应码为100,返回null
// (若用户的请求头包含Expect: 100-continue,则返回的响应码有可能为100)
expectContinue && statusLine.code == HTTP_CONTINUE -> {
null
}
statusLine.code == HTTP_CONTINUE -> {
state = STATE_READ_RESPONSE_HEADERS
responseBuilder
}
else -> {
state = STATE_OPEN_RESPONSE_BODY
responseBuilder
}
}
} catch (e: EOFException) {
// Provide more context if the server ends the stream before sending a response.
val address = connection.route().address.url.redact()
throw IOException("unexpected end of stream on $address", e)
}
}
该方法主要做的事情
- 调用
readLine
方法读取响应行信息,并且调用StatusLine.parse
方法将其解析为StatusLine
对象 - 根据响应行和响应头,构建
Response.Builder
openResponseBodySource
Http1ExchangeCodec::openResponseBodySource
如下
override fun openResponseBodySource(response: Response): Source {
return when {
// 根据响应行和响应头,判断是否有响应体
!response.promisesBody() -> newFixedLengthSource(0)
// 判断是否使用分块传输
response.isChunked -> newChunkedSource(response.request.url)
else -> {
// 响应体长度
val contentLength = response.headersContentLength()
if (contentLength != -1L) {
// 长度已知
newFixedLengthSource(contentLength)
} else {
// 长度未知
newUnknownLengthSource()
}
}
}
}
这里根据不同的情况构造不同的输入流,分别有newFixedLengthSource
、newChunkedSource
、newUnknownLengthSource
。
newChunkedSource
private fun newChunkedSource(url: HttpUrl): Source {
check(state == STATE_OPEN_RESPONSE_BODY) { "state: $state" }
state = STATE_READING_RESPONSE_BODY
return ChunkedSource(url)
}
返回了一个ChunkedSource
对象,该对象如下
/** An HTTP body with alternating chunk sizes and chunk bodies. */
private inner class ChunkedSource(private val url: HttpUrl) :
AbstractSource() {
...
override fun read(sink: Buffer, byteCount: Long): Long {
...
if (bytesRemainingInChunk == 0L || bytesRemainingInChunk == NO_CHUNK_YET) {
// 读取分块数据大小
readChunkSize()
if (!hasMoreChunks) return -1
}
// 调用父类的read方法,读取分块数据内容
val read = super.read(sink, minOf(byteCount, bytesRemainingInChunk))
...
return read
}
private fun readChunkSize() {
...
}
...
}
newFixedLengthSource
private fun newFixedLengthSource(length: Long): Source {
check(state == STATE_OPEN_RESPONSE_BODY) { "state: $state" }
state = STATE_READING_RESPONSE_BODY
return FixedLengthSource(length)
}
返回了一个FixedLengthSource
对象,该对象如下
/** An HTTP body with a fixed length specified in advance. */
private inner class FixedLengthSource(private var bytesRemaining: Long) :
AbstractSource() {
...
override fun read(sink: Buffer, byteCount: Long): Long {
...
// 调用父类的read方法读取数据
val read = super.read(sink, minOf(bytesRemaining, byteCount))
...
return read
}
...
}
HTTP/2 流量控制
HTTP/2利用流来实现多路复用,这引入了对TCP连接的使用争夺,会造成流被阻塞。流量控制方案确保在同一连接上的多个流之间不会造成破坏性的干扰。流量控制会用于各个独立的流,也会用于整个连接。
HTTP/2"流"的流量控制的目标是:在不改变协议的情况下允许使用多种流量控制算法。
HTTP/2的流量控制具有以下特征:
- 流量控制是特定于一个连接的。每种类型的流量控制都是在单独的一跳的两个端点之间的,并不是在整个端到端的路径上的。(这里的一跳指的是HTTP连接的一跳,而不是IP路由的一跳)
- 流量控制是基于WINDOW_UPDATE帧的。接收方公布自己打算在每个流以及整个连接上分别接收多少字节。这是一个以信用为基础的方案。
- 流量控制是有方向的,由接收者全面控制。接收方可以为每个流和整个连接设置任意的窗口大小。发送方必须尊重接收方设置的流量控制限制。客户方、服务端和中间代理作为接收方时都独立地公布各自的流量控制窗口,作为发送方时都遵守对端的流量控制设置。
- 无论是新流还是整个连接,流量控制窗口的初始值是65535字节。
- 帧的类型决定了流量控制是否适用于帧。目前,只有DATA帧服从流量控制,所有其它类型的帧并不消耗流量控制窗口的空间。这保证了重要的控制帧不会被流量控制阻塞。
- 流量控制不能被禁用。
- HTTP/2只定义了WINDOW_UPDATE帧的格式和语义,并没有规定接收方如何决定何时发送帧、发送什么样的值,也没有规定发送方如何选择发送包。具体实现可以选择任何满足需求的算法。
发送端持有一个流量控制窗口(window),初始值为65536,发送端每发送一个DATA帧,就会把window递减,递减量为DATA帧的大小,如果window为0则不能发送任何帧。接收端可以发送WINDOW_UPDATE帧给发送端,发送端以帧内的Window Size Increment作为增量,加到window上。
在TCP中已经有流量控制了,为什么在HTTP/2中还需要进行流量控制?
答:在HTTP/2中利用流来实现多路复用,在一个TCP连接中有多条流用于传输数据,在TCP中的流量控制是针对于整个连接的,而HTTP/2中的流量控制既针对各个独立的流,也针对整个TCP连接。HTTP/2对每条流都进行了流量控制,确保某个流不会阻塞其他流。
Http2ExchangeCodec
writeRequestHeaders
override fun writeRequestHeaders(request: Request) {
if (stream != null) return
val hasRequestBody = request.body != null
// 获取存放Header的List
val requestHeaders = http2HeadersList(request)
// 获取Http2Stream对象
stream = http2Connection.newStream(requestHeaders, hasRequestBody)
// We may have been asked to cancel while creating the new stream and sending the request
// headers, but there was still no stream to close.
if (canceled) {
stream!!.closeLater(ErrorCode.CANCEL)
throw IOException("Canceled")
}
stream!!.readTimeout().timeout(chain.readTimeoutMillis.toLong(), TimeUnit.MILLISECONDS)
stream!!.writeTimeout().timeout(chain.writeTimeoutMillis.toLong(), TimeUnit.MILLISECONDS)
}
该方法主要做了两件事情:
- 将请求头放入
List
集合中 - 通过刚刚创建的请求头集合初始化并获取一个
Http2Stream
对象
http2HeadersList
方法如下
fun http2HeadersList(request: Request): List<Header> {
val headers = request.headers
val result = ArrayList<Header>(headers.size + 4)
result.add(Header(TARGET_METHOD, request.method))
result.add(Header(TARGET_PATH, RequestLine.requestPath(request.url)))
val host = request.header("Host")
if (host != null) {
result.add(Header(TARGET_AUTHORITY, host)) // Optional.
}
result.add(Header(TARGET_SCHEME, request.url.scheme))
for (i in 0 until headers.size) {
// header names must be lowercase.
val name = headers.name(i).toLowerCase(Locale.US)
if (name !in HTTP_2_SKIPPED_REQUEST_HEADERS ||
name == TE && headers.value(i) == "trailers") {
result.add(Header(name, headers.value(i)))
}
}
return result
}
该方法将request.headers
转化为了ArrayList<Header>
,并且添加了method
、path
、authority
、scheme
的Header
。
Http2Connection::newStream
如下
/**
* Returns a new locally-initiated stream.
*
* @param out true to create an output stream that we can use to send data to the
* remote peer.Corresponds to `FLAG_FIN`.
*/
// 返回一个本地初始化的流
@Throws(IOException::class)
fun newStream(
requestHeaders: List<Header>,
out: Boolean
): Http2Stream {
return newStream(0, requestHeaders, out)
}
@Throws(IOException::class)
private fun newStream(
associatedStreamId: Int,
requestHeaders: List<Header>,
out: Boolean
): Http2Stream {
val outFinished = !out
val inFinished = false
val flushHeaders: Boolean
val stream: Http2Stream
val streamId: Int
synchronized(writer) {
synchronized(this) {
...
// streamId 以2递增
streamId = nextStreamId
nextStreamId += 2
// 初始化一个新的Http2Stream
stream = Http2Stream(streamId, this, outFinished, inFinished, null)
...
}
if (associatedStreamId == 0) {
// 写入请求头
writer.headers(outFinished, streamId, requestHeaders)
} else {
require(!client) { "client streams shouldn't have associated stream IDs" }
// HTTP/2 has a PUSH_PROMISE frame.
// 发送PUSH_PROMISE帧
writer.pushPromise(associatedStreamId, streamId, requestHeaders)
}
}
...
return stream
}
在该方法中,首先计算当前请求对应的streamId
,然后初始化一个Http2Stream
对象。因为传入的associatedStreamId
为0,所以会调用writer.header
写入请求头(否则会调用writer.pushPromise
方法发送PUSH_PROMISE
帧)
写入请求头的Http2Writer::headers
方法如下:
@Synchronized @Throws(IOException::class)
fun headers(
outFinished: Boolean,
streamId: Int,
headerBlock: List<Header>
) {
if (closed) throw IOException("closed")
hpackWriter.writeHeaders(headerBlock)
val byteCount = hpackBuffer.size
val length = minOf(maxFrameSize.toLong(), byteCount)
var flags = if (byteCount == length) FLAG_END_HEADERS else 0
if (outFinished) flags = flags or FLAG_END_STREAM
frameHeader(
streamId = streamId,
length = length.toInt(),
type = TYPE_HEADERS,
flags = flags
)
sink.write(hpackBuffer, length)
if (byteCount > length) writeContinuationFrames(streamId, byteCount - length)
}
这里调用了 hpackWriter.writeHeaders
对 Header
进行了 HPACK
压缩编码,然后调用 frameHeader
方法写入帧头,再将压缩编码后的 Header
数据写入输出流sink
中。(在 HTTP/2 中会对 Header 的信息进行 HPACK 压缩编码)
createRequestBody
Http2ExchangeCodec::createRequestBody
:
override fun createRequestBody(request: Request, contentLength: Long): Sink {
return stream!!.getSink()
}
stream
在前面「写入请求头writeRequestHeaders
」中被创建,这里得到了stream
的输出流sink
。
Http2Stream::getSink
:
fun getSink(): Sink {
synchronized(this) {
check(hasResponseHeaders || isLocallyInitiated) {
"reply before requesting the sink"
}
}
return sink
}
sink
变量:
internal val sink = FramingSink(
finished = outFinished
)
FramingSink
是Http2Stream
的一个普通内部类,实现了Sink
接口:
/*A sink that writes outgoing data frames of a stream. This class is not thread safe.*/
internal inner class FramingSink(
/** True if either side has cleanly shut down this stream. We shall send no more bytes. */
var finished: Boolean = false
) : Sink {
// 数据缓冲区
private val sendBuffer = Buffer()
/** Trailers to send at the end of the stream. */
var trailers: Headers? = null
var closed: Boolean = false
@Throws(IOException::class)
override fun write(source: Buffer, byteCount: Long) {
this@Http2Stream.assertThreadDoesntHoldLock()
// 将数据写入缓冲区
sendBuffer.write(source, byteCount)
while (sendBuffer.size >= EMIT_BUFFER_SIZE) {
// 发出数据帧
emitFrame(false)
}
}
/**
* Emit a single data frame to the connection. The frame's size be limited by this stream's
* write window. This method will block until the write window is nonempty.
*/
// 向连接发出单个数据帧。帧的大小受此流的写窗口的限制。此方法将阻塞,直到写窗口非空为止
@Throws(IOException::class)
private fun emitFrame(outFinishedOnLastFrame: Boolean) {
val toWrite: Long
val outFinished: Boolean
synchronized(this@Http2Stream) {
writeTimeout.enter()
try {
// 如果应用程序已发送的字节数 >= 允许发送的字节总数,阻塞,等待WINDOW_UPDATE帧
// 更新流量窗口大小
while (writeBytesTotal >= writeBytesMaximum &&
!finished &&
!closed &&
errorCode == null) {
waitForIo() // Wait until we receive a WINDOW_UPDATE for this stream.
}
} finally {
writeTimeout.exitAndThrowIfTimedOut()
}
checkOutNotClosed() // Kick out if the stream was reset or closed while waiting.
// 计算能够写入的字节数
toWrite = minOf(writeBytesMaximum - writeBytesTotal, sendBuffer.size)
writeBytesTotal += toWrite
outFinished = outFinishedOnLastFrame && toWrite == sendBuffer.size && errorCode == null
}
writeTimeout.enter()
try {
// 写入数据
connection.writeData(id, outFinished, sendBuffer, toWrite)
} finally {
writeTimeout.exitAndThrowIfTimedOut()
}
}
...
}
FramingSink
的write
方法首先将数据写入缓冲区中,如果缓冲区的数据大小大于等于EMIT_BUFFER_SIZE
,就会调用emitFrame
方法发出数据帧。
在emitFrame
方法中,出现了两个变量:
-
writeBytesTotal
: 应用程序已发送的字节数 -
writeBytesMaximum
:stream
流量窗口的大小
在emitFrame
方法中,如果应用程序已发送的字节数超过了流量窗口的大小,就会阻塞,等待接收方(此时接收方是服务端)发送WINDOW_UPDATE
帧,且writeBytesTotal < writeBytesMaximum
才会发送数据帧。发送数据帧的时候,会先调用minOf(writeBytesMaximum - writeBytesTotal, sendBuffer.size)
计算能够发送的数据大小,它会取「写窗口的剩余空间」和「写缓冲区大小」的最小值,然后调用connection.writeData
写入数据,其中connection
是一个Http2Connection
对象。
Http2Connection::writeData
如下:
/**
* ...(省略)
* Zero [byteCount] writes are not subject to flow control and will not block.
* The only use case for zero [byteCount] is closing a flushed output stream.
*/
// ...(省略)零字节计数写入不受流控制,也不会阻塞,byteCount为0的唯一用例是关闭一个刷新的输出流
@Throws(IOException::class)
fun writeData(
streamId: Int,
outFinished: Boolean,
buffer: Buffer?,
byteCount: Long
) {
// Empty data frames are not flow-controlled.
// 空数据帧不受流控制
if (byteCount == 0L) {
writer.data(outFinished, streamId, buffer, 0)
return
}
var byteCount = byteCount
while (byteCount > 0L) {
var toWrite: Int
synchronized(this@Http2Connection) {
try {
// 当前连接不允许发送更多数据,进入阻塞状态,直到连接的流量控制窗口更新
while (writeBytesTotal >= writeBytesMaximum) {
// Before blocking, confirm that the stream we're writing is still open. It's possible
// that the stream has since been closed (such as if this write timed out.)
if (!streams.containsKey(streamId)) {
throw IOException("stream closed")
}
this@Http2Connection.wait() // Wait until we receive a WINDOW_UPDATE.
}
} catch (e: InterruptedException) {
Thread.currentThread().interrupt() // Retain interrupted status.
throw InterruptedIOException()
}
// 计算能够写入的数据大小
toWrite = minOf(byteCount, writeBytesMaximum - writeBytesTotal).toInt()
toWrite = minOf(toWrite, writer.maxDataLength())
// 更新写入的总数据大小
writeBytesTotal += toWrite.toLong()
}
// 减去将要写入的数据大小
byteCount -= toWrite.toLong()
// 写入数据
writer.data(outFinished && byteCount == 0L, streamId, buffer, toWrite)
}
}
在writeData
方法中,主要工作如下:
- 判断当前连接是否允许发送数据(
writeBytesTotal < writeBytesMaximum
),若不允许则进入阻塞状态直到收到WINDOW_UPDATE
帧更新连接的流量窗口。 - 计算可发送数据大小(不超过剩余窗口大小以及每个帧的限制大小),调用
writer.data
方法写入数据。 - 若当前还有需要发送的数据,重复1、2步骤。
Http2Writer::data
方法如下
/**
* ...
*
* @param source the buffer to draw bytes from. May be null if byteCount is 0.
* @param byteCount must be between 0 and the minimum of `source.length` and [maxDataLength].
*/
// source:缓存的字节,如果byteCount为0,source可能为null
// byteCount:值必须在[0,min(source.length,maxDataLength)]之间
@Synchronized @Throws(IOException::class)
fun data(outFinished: Boolean, streamId: Int, source: Buffer?, byteCount: Int) {
if (closed) throw IOException("closed")
var flags = FLAG_NONE
if (outFinished) flags = flags or FLAG_END_STREAM
dataFrame(streamId, flags, source, byteCount)
}
data
方法主要是调用了Http2Writer::dataFrame
方法:
@Throws(IOException::class)
fun dataFrame(streamId: Int, flags: Int, buffer: Buffer?, byteCount: Int) {
frameHeader(
streamId = streamId,
length = byteCount,
type = TYPE_DATA,
flags = flags
)
if (byteCount > 0) {
sink.write(buffer!!, byteCount.toLong())
}
}
先调用frameHeader
方法写入帧头,然后再写入相应的数据。
readResponseHeaders
Http2ExchangeCodec::readResponseHeaders
如下
override fun readResponseHeaders(expectContinue: Boolean): Response.Builder? {
// 获取响应的Header
val headers = stream!!.takeHeaders()
// 构建一个Response.Builder对象
val responseBuilder = readHttp2HeadersList(headers, protocol)
// 若expectContinue为true,且响应码为100,返回null,否则返回responseBuilder
return if (expectContinue && responseBuilder.code == HTTP_CONTINUE) {
null
} else {
responseBuilder
}
}
该方法首先获取了响应的Header
,然后构建了一个Response.Builder
对象,接着根据情况,返回null
,或者responseBuilder
。我们在CallServerInterceptor::intercept
提到过,若用户的请求头包含Expect: 100-continue
,则返回的响应码有可能为100
。
Http2Stream::takeHeaders
如下
@Synchronized @Throws(IOException::class)
fun takeHeaders(): Headers {
readTimeout.enter()
try {
// 若当前接收响应Header的队列是空的,则阻塞
// headersQueue的类型是ArrayDeque<Headers>
while (headersQueue.isEmpty() && errorCode == null) {
waitForIo()
}
} finally {
readTimeout.exitAndThrowIfTimedOut()
}
// 取队列的第一个Headers信息返回
if (headersQueue.isNotEmpty()) {
return headersQueue.removeFirst()
}
throw errorException ?: StreamResetException(errorCode!!)
}
headersQueue
的类型是ArrayDeque<Headers>
,表示响应Header
。如果headersQueue
里面没有元素,就阻塞,否则取其第一个元素返回。
构建Response.Builder
对象的Http2ExchangeCodec::readHttp2HeadersList
方法如下:
/** Returns headers for a name value block containing an HTTP/2 response. */
fun readHttp2HeadersList(headerBlock: Headers, protocol: Protocol): Response.Builder {
var statusLine: StatusLine? = null
val headersBuilder = Headers.Builder()
// 遍历Headers的信息
for (i in 0 until headerBlock.size) {
val name = headerBlock.name(i)
val value = headerBlock.value(i)
// 提取状态行信息
if (name == RESPONSE_STATUS_UTF8) {
statusLine = StatusLine.parse("HTTP/1.1 $value")
// 过滤响应头,HTTP_2_SKIPPED_RESPONSE_HEADERS包括:CONNECTION,HOST,KEEP_ALIVE,
// PROXY_CONNECTION,TE,TRANSFER_ENCODING,ENCODING,UPGRADE
} else if (name !in HTTP_2_SKIPPED_RESPONSE_HEADERS) {
headersBuilder.addLenient(name, value)
}
}
if (statusLine == null) throw ProtocolException("Expected ':status' header not present")
// 构建Response.Builder对象
return Response.Builder()
.protocol(protocol)
.code(statusLine.code)
.message(statusLine.message)
.headers(headersBuilder.build())
}
该方法主要是对Headers
对象进行遍历,遍历的过程中,会提取状态行信息,并且符合要求的响应头记录下来,最后构建一个Response.Builder
对象。
openResponseBodySource
Http2ExchangeCodec::openResponseBodySource
如下:
override fun openResponseBodySource(response: Response): Source {
return stream!!.source
}
这里返回了Http2Stream
的source
对象,Http2Stream
的source
对象定义如下:
internal val source = FramingSource(
maxByteCount = connection.okHttpSettings.initialWindowSize.toLong(),
finished = inFinished
)
这里返回了一个FramingSource
对象,FramingSource
是Http2Stream
的普通内部类,实现了okio
的Source
接口:
// 该Source用于读取流上即将到来的数据帧
inner class FramingSource internal constructor(
/** Maximum number of bytes to buffer before reporting a flow control error. */
private val maxByteCount: Long,
internal var finished: Boolean
) : Source {
// 接收数据缓冲区
val receiveBuffer = Buffer()
// 读取数据缓冲区
val readBuffer = Buffer()
var trailers: Headers? = null
/** True if the caller has closed this stream. */
internal var closed: Boolean = false
@Throws(IOException::class)
override fun read(sink: Buffer, byteCount: Long): Long {
require(byteCount >= 0L) { "byteCount < 0: $byteCount" }
while (true) {
var tryAgain = false
var readBytesDelivered = -1L
var errorExceptionToDeliver: IOException? = null
// 1. Decide what to do in a synchronized block.
synchronized(this@Http2Stream) {
readTimeout.enter()
try {
if (errorCode != null) {
// Prepare to deliver an error.
errorExceptionToDeliver = errorException ?: StreamResetException(errorCode!!)
}
if (closed) {
throw IOException("stream closed")
} else if (readBuffer.size > 0L) {
// Prepare to read bytes. Start by moving them to the caller's buffer.
// 读取数据
readBytesDelivered = readBuffer.read(sink, minOf(byteCount, readBuffer.size))
readBytesTotal += readBytesDelivered
// 未被WINDOW_UPDATE帧确认的字节总数
val unacknowledgedBytesRead = readBytesTotal - readBytesAcknowledged
if (errorExceptionToDeliver == null &&
unacknowledgedBytesRead >= connection.okHttpSettings.initialWindowSize / 2) {
// Flow control: notify the peer that we're ready for more data! Only
// send a WINDOW_UPDATE if the stream isn't in error.
// 流控制:通知对方我们已经准备好接收更多数据!只在流没有错误的情况下发送
// WINDOW_UPDATE帧。
connection.writeWindowUpdateLater(id, unacknowledgedBytesRead)
readBytesAcknowledged = readBytesTotal
}
} else if (!finished && errorExceptionToDeliver == null) {
// Nothing to do. Wait until that changes then try again.
// 没有数据可读,阻塞,被唤醒之后再次尝试读取数据
waitForIo()
tryAgain = true
}
} finally {
readTimeout.exitAndThrowIfTimedOut()
}
}
// 2. Do it outside of the synchronized block and timeout.
// 是否要再次尝试
if (tryAgain) {
continue
}
if (readBytesDelivered != -1L) {
// Update connection.unacknowledgedBytesRead outside the synchronized block.
updateConnectionFlowControl(readBytesDelivered)
return readBytesDelivered
}
if (errorExceptionToDeliver != null) {.
throw errorExceptionToDeliver!!
}
return -1L // This source is exhausted.
}
}
private fun updateConnectionFlowControl(read: Long) {
...
}
@Throws(IOException::class)
internal fun receive(source: BufferedSource, byteCount: Long) {
...
}
...
}
在FramingSource
的read
方法中,若有数据可读,则调用readBuffer.read
方法读取数据,若没有数据可读,则阻塞,被唤醒之后再次尝试读取数据。
当接收方(此时接收方是客户端)接收的数据大小超过流量控制窗口大小,就调用connection.writeWindowUpdateLater
方法通知对方增加窗口大小,增加的大小为"未被WINDOW_UPDATE
帧确认的字节总数"。
Http2Connection::writeWindowUpdateLater
如下
internal fun writeWindowUpdateLater(
streamId: Int,
unacknowledgedBytesRead: Long
) {
writerQueue.execute("$connectionName[$streamId] windowUpdate") {
try {
writer.windowUpdate(streamId, unacknowledgedBytesRead)
} catch (e: IOException) {
failConnection(e)
}
}
}
这里异步发送一个WINDOW_UPDATE
帧,Http2Writer::windowUpdate
如下:
/**
* Inform peer that an additional `windowSizeIncrement` bytes can be sent on
* `streamId`, or the connection if `streamId` is zero.
*/
@Synchronized @Throws(IOException::class)
fun windowUpdate(streamId: Int, windowSizeIncrement: Long) {
if (closed) throw IOException("closed")
require(windowSizeIncrement != 0L && windowSizeIncrement <= 0x7fffffffL) {
"windowSizeIncrement == 0 || windowSizeIncrement > 0x7fffffffL: $windowSizeIncrement"
}
frameHeader(
streamId = streamId,
length = 4,
type = TYPE_WINDOW_UPDATE,
flags = FLAG_NONE
)
sink.writeInt(windowSizeIncrement.toInt())
sink.flush()
}
先写入WINDOW_UPDATE
的帧头,然后再写入窗口增大数值。
总结
在 HTTP/1.x
的请求写入和响应读取中,主要是将普通请求、响应及 chunked
特性下的请求、响应进行了不同的处理。
在 HTTP/2
的请求写入和响应读取中,对流量控制进行了很好的支持,每个stream
以及connection
都会记录当前发送数据大小以及流量窗口大小,通过窗口大小对写入的数据大小进行了限制,客户端和服务端在HTTP/2
的流量控制中都可以作为接收方,当请求写入的时候,服务端是接收方,当读取响应的时候,客户端是接收方。此外,在传输Header
的时候,也使用了HTTP/2
的特性,利用HPACK
对 Header
进行压缩编码。