用RUST写流媒体服务器实战——rtmp chunk 深入解析
最近几个月断更了,把精力放在了新的开源项目上,一个用rust写的流媒体服务xiu。
实现过程中踩了不少坑,今天说下rtmp中的chunk。
RTMP协议确实复杂,在做这个项目之前,看过很多帖子,看过官方文档,但总是感觉不能彻底的理解清楚,在实现过一遍此协议之后,感觉清楚了不少。
目前做的测试还不够多,倒是发现了一些问题。chunk这个东西看了很久可能很多人还是不明白,说明一下,RTMP 协议除了3次握手数据,其它的,包括信令和媒体数据(音视频相关的数据),都会被封装成chunk块。
handshake的残留数据
TCP发送数据不是按照协议信令,一次只发送一个信令,有时候会发送多个,rtmp握手阶段从TCP流中读一次数据,握手结束后,会留下一部分数据,这部分要填到chunk解析缓冲数据中。
chunk size
初始化的chunk size要设置成128。
我的测试和排查过程记录如下:
我一开始的chunk size设置成了4096,用ffplay播放流,发送connect信令的时候,总是会多出一个byte,导致amf解析失败,用wireshark抓包,这个byte是没有的,一开始认为wireshark是不会出错的,以为tokio网络库,于是换成了tcp基础库,这个byte还是存在,想了个笨方法,找到一个开源的rtmp服务器,也打印出此信令,刚收到tcp数据的时候,这个byte也有,但是amf解析却成功了,接下来就是把每一步的数据都打印出来,从解析chunk到解析amf. 看看这个byte究竟是在哪个步骤消失的,最后发现,这个byte是chunk的第一个byte,fmt+csid,初始化的chunk size不对。。
状态保留
解释状态保留之前说一下chunk的各部分组成,按照官方的文档,chunk由四部分组成:
- basic header
- message header
- extended timestamp
- payload
前三部分是都可以压缩的。
basic header
/******************************************************************
* 5.3.1.1. Chunk Basic Header
* The Chunk Basic Header encodes the chunk stream ID and the chunk
* type(represented by fmt field in the figure below). Chunk type
* determines the format of the encoded message header. Chunk Basic
* Header field may be 1, 2, or 3 bytes, depending on the chunk stream
* ID.
*
* The bits 0-5 (least significant) in the chunk basic header represent
* the chunk stream ID.
*
* Chunk stream IDs 2-63 can be encoded in the 1-byte version of this
* field.
* 0 1 2 3 4 5 6 7
* +-+-+-+-+-+-+-+-+
* |fmt| cs id |
* +-+-+-+-+-+-+-+-+
* Figure 6 Chunk basic header 1
*
* Chunk stream IDs 64-319 can be encoded in the 2-byte version of this
* field. ID is computed as (the second byte + 64).
* 0 1
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* |fmt| 0 | cs id - 64 |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* Figure 7 Chunk basic header 2
*
* Chunk stream IDs 64-65599 can be encoded in the 3-byte version of
* this field. ID is computed as ((the third byte)*256 + the second byte
* + 64).
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* |fmt| 1 | cs id - 64 |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* Figure 8 Chunk basic header 3
*
* cs id: 6 bits
* fmt: 2 bits
* cs id - 64: 8 or 16 bits
*
* Chunk stream IDs with values 64-319 could be represented by both 2-
* byte version and 3-byte version of this field.
***********************************************************************/
第一个byte的前两个bit是format,有0,1,2,3四个值,这个四个值的作用是压缩message header,详细的会在下面说,后6个bit是chunk stream ID, 简称csid(关于这个字段有坑,下面会解释),6个bit的取值范围为[0,63] ,0和1有特殊用途,2到63表示真正的csid,关于特殊值0和1:
- 0 表示csid用 6+ 8个bit表示
- 1 表示csid用 6 + 16个bit表示
解析代码如下:
let mut csid = (byte & 0b00111111) as u32;
match csid {
0 => {
if self.reader.len() < 1 {
return Ok(UnpackResult::NotEnoughBytes);
}
csid = 64;
csid += self.reader.read_u8()? as u32;
}
1 => {
if self.reader.len() < 1 {
return Ok(UnpackResult::NotEnoughBytes);
}
csid = 64;
csid += self.reader.read_u8()? as u32;
csid += self.reader.read_u8()? as u32 * 256;
}
_ => {}
}
message header
下面说下message header, 这部分比较复杂,有四种类型,对应着basic header里面的format字段的0~3。
type 0
/*****************************************************************/
/* 5.3.1.2.1. Type 0 */
/*****************************************************************
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| timestamp(3bytes) |message length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| message length (cont)(3bytes) |message type id| msg stream id |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| message stream id (cont) (4bytes) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*****************************************************************/
任何字段都不省略。
type 1
/*****************************************************************/
/* 5.3.1.2.2. Type 1 */
/*****************************************************************
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| timestamp(3bytes) |message length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| message length (cont)(3bytes) |message type id|
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*****************************************************************/
省略了message stream id,使用上一个chunk的数据。
type 2
/************************************************/
/* 5.3.1.2.3. Type 2 */
/************************************************
0 1 2
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| timestamp(3bytes) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
***************************************************/
更绝了,省略了message stream id、message length和 message type id,这个也从前边的chunk读。
type 3
3 啥都没有,全从前边拿。
extended timestamp
这个字段是可选的,占用4个byte,如果message header里面的timestamp字段大于0xFFFFFF,则读取这个字段。
payload
最后是payload,payload的长度由 message header里面的message length决定。
chunk块的整个读取流程如下,一开始我的实现流程是这样的(有问题)
- 读取一个chunk的第一个byte,解析 format和chunk stream ID。
- 根据format解析message header:
- 如果是0 则每个字段都要从TCP流里面解析出来。
- 如果是1 则使用上一个chunk块的message stream ID。
- 如果是2 则使用上一个chunk块的message stream id、message length和 message type id。
- 如果是3 则使用上一个chunk块的message stream id、message length、message type id以及timestamp。
- 根据timestamp值来决定是否读取4个bytes的extendtimestamp。
- 根据message length读取payload值,这里有种情况比较特殊,有可能一块payload数据被分成了2个或者多个chunk块,在这一步里面就需要将这些分割的payload 数据合成一个完整的chunk数据再返回。也就是说如果读完payload数据后发现message length 不等于payload的长度,要回到步骤1从下一个chunk块里面继续读剩余的payload数据,直到读完为止。
好了,整个流程基本上介绍清楚了。大标题里面的状态保留我这里有两个意思,第一个意思是要说明一下我上面表述的问题。我说的是『从上一个chunk块』拿省略的字段,这里是不对的,因为有下面这种情况存在:
+--------+---------+-----+------------+------- ---+------------+
| | Chunk |Chunk|Header Data |No.of Bytes|Total No.of |
| |Stream ID|Type | | After |Bytes in the|
| | | | |Header |Chunk |
+--------+---------+-----+------------+-----------+------------+
|Chunk#1 | 3 | 0 | delta: 1000| 32 | 44 |
| | | | length: 32,| | |
| | | | type: 8, | | |
| | | | stream ID: | | |
| | | | 12345 (11 | | |
| | | | bytes) | | |
+--------+---------+-----+------------+-----------+------------+
|Chunk#2 | 3 | 2 | 20 (3 | 32 | 36 |
| | | | bytes) | | |
+--------+---------+-----+----+-------+-----------+------------+
|Chunk#3 | 4 | 3 | none (0 | 32 | 33 |
| | | | bytes) | | |
+--------+---------+-----+------------+-----------+------------+
|Chunk#4 | 3 | 3 | none (0 | 32 | 33 |
| | | | bytes) | | |
+--------+---------+-----+------------+-----------+------------+
注意:message header里面的字段复用是针对chunk stream ID的。
因此上面的情况,chunk2 可以复用 chunk1的message header,但是chunk 4不能复用chunk 3的,所以,在代码里面要特殊处理,每个csid的message header都需要保存一份,每解析一个chunk,读完basic header之后,需要把这个csid的上一个message header先恢复出来。
第二种情况也是我写代码时不曾想到的:
tcp数据包可以在任何地方拆分。
也就是说,可能一个chunk还没读完,这次的tcp数据就用完了,需要等下一次的数据,这种情况就要保留读取各个字段的状态了。每一个读取操作就应该设置一个标记,因此写了下面的四个大状态,message header里面有4个小的状态。
#[derive(Copy, Clone)]
enum ChunkReadState {
ReadBasicHeader = 1,
ReadMessageHeader = 2,
ReadExtendedTimestamp = 3,
ReadMessagePayload = 4,
Finish = 5,
}
#[derive(Copy, Clone)]
enum MessageHeaderReadState {x'x
ReadTimeStamp = 1,
ReadMsgLength = 2,
ReadMsgTypeID = 3,
ReadMsgStreamID = 4,
}
例如: ReadExtendedTimestamp占用4个bytes,但是读到这里的时候就还剩下2个bytes,就要保留这个状态,下次从TCP里面读出新数据的时候从这个状态开始。
最后rtmp chunk解析的rust完整实现在这里
最后,欢迎star。