RTMP数据格式
RTMP消息
RTMP消息由消息头
和载荷
两部分组成
值 |
长度 |
含义 |
message type |
1byte |
表示消息类型 |
payload length |
3byte |
表示荷载的字节数,big-endian格式 |
timestamp |
4byte |
表示消息的时间戳,big-endian格式 |
stream id |
3byte |
表示消息流ID,big-endian格式 |
RTMP块
消息是rtmp协议的基本数据单元,在网络传输时,消息会被重新封装成块进行传输,每个块都有固定的大小,
如果消息大小大于块的大小,消息就会被拆分成几个块发送。
块由头
和载荷
组成
值 |
长度 |
含义 |
basic header |
1byte或2byte或3byte |
块基本头 |
chunk msg header |
11byte或2byte或3byte |
块消息头 |
extended timestamp |
4byte |
扩展时间戳 |
值 |
长度 |
含义 |
fmt |
2byte |
块消息头的格式 |
cs id |
6byte |
块流ID(用于唯一标识一个消息) |
值 |
含义 |
0 |
表示块消息头是类型0 |
1 |
表示块消息头是类型1 |
2 |
表示块消息头是类型2 |
3 |
表示块消息头是类型3 |
值 |
含义 |
0 |
块基本头2个字节,块流ID计算方法(第2个字节+64),范围(64-319) |
1 |
块基本头3个字节,块流ID计算方法(第3个字节*255+第2个字节+64),范围(3-65599) |
2 |
块基本头1个字节,2表示低层协议消息 |
3-64 |
块基本头1个字节,该数表示块流ID,范围(3-64) |
值 |
长度 |
含义 |
timestamp |
3byte |
时间戳 |
message length |
3byte |
荷载长度 |
message type id |
1byte |
消息类型id |
message stream id |
4byte |
消息流id |
类型1
值 |
长度 |
含义 |
timestamp |
3byte |
时间戳 |
message length |
3byte |
荷载长度 |
message type id |
1byte |
消息类型id |
类型2
值 |
长度 |
含义 |
timestamp |
3byte |
时间戳 |
类型3
类型3的块没有头。流ID,消息长度,时间戳都不出现。这种类型的块使用与先前块相同的数据。当一个消息被分成多个块,除了第一块以外,所有的块都应使用这种类型。
ZLMediaKit中解析RTMP块头
const char* RtmpProtocol::handle_rtmp(const char *data, size_t len) {
auto ptr = data;
while (len) {
int offset = 0;
uint8_t flags = ptr[0];
//其中flags >> 6表示基础头中保存的块格式,然后使用HEADER_LENGTH直接求出块消息头的长度
size_t header_len = HEADER_LENGTH[flags >> 6];
//判断基础头是1字节 、2字节还是3字节的版本,以此来获取 块流ID
_now_chunk_id = flags & 0x3f;
switch (_now_chunk_id) {
case 0: {
//0 值表示二字节形式,并且 ID 范围 64 - 319
//(第二个字节 + 64)。
if (len < 2) {
//need more data
return ptr;
}
_now_chunk_id = 64 + (uint8_t) (ptr[1]);
offset = 1;
break;
}
case 1: {
//1 值表示三字节形式,并且 ID 范围为 64 - 65599
//((第三个字节) * 256 + 第二个字节 + 64)。
if (len < 3) {
//need more data
return ptr;
}
_now_chunk_id = 64 + ((uint8_t) (ptr[2]) << 8) + (uint8_t) (ptr[1]);
offset = 2;
break;
}
//带有 2 值的块流 ID 被保留,用于下层协议控制消息和命令。
default : break;
}
if (len < header_len + offset) {
//need more data
return ptr;
}
RtmpHeader &header = *((RtmpHeader *) (ptr + offset));
//根据块流id获取 pair<RtmpPacket::Ptr/*now*/, RtmpPacket::Ptr/*last*/>
//同一个RTMP消息拆成的 chunk 块拥有相同的 cs id(块流id), 用于区分chunk所属的RTMP消息
auto &pr = _map_chunk_data[_now_chunk_id];
auto &now_packet = pr.first;
auto &last_packet = pr.second;
if (!now_packet) {
now_packet = RtmpPacket::create();
if (last_packet) {
//恢复chunk上下文
*now_packet = *last_packet;
}
//绝对时间戳标记复位
now_packet->is_abs_stamp = false;
}
auto &chunk_data = *now_packet;
chunk_data.chunk_id = _now_chunk_id;
switch (header_len) {
case 12:
chunk_data.is_abs_stamp = true;
chunk_data.stream_index = load_le32(header.stream_index);
case 8:
chunk_data.body_size = load_be24(header.body_size);
chunk_data.type_id = header.type_id;
case 4:
chunk_data.ts_field = load_be24(header.time_stamp);
}
//判断是否使用扩展时间戳(hunk_data.ts_field == 0xFFFFFF 表示有使用扩展时间戳)
//如果使用加载正确的时间戳
auto time_stamp = chunk_data.ts_field;
if (chunk_data.ts_field == 0xFFFFFF) {
if (len < header_len + offset + 4) {
//need more data
return ptr;
}
time_stamp = load_be32(ptr + offset + header_len);
offset += 4;
}
//荷载长度应该大于等于buffer的长度
if (chunk_data.body_size < chunk_data.buffer.size()) {
throw std::runtime_error("非法的bodySize");
}
auto more = min(_chunk_size_in, (size_t) (chunk_data.body_size - chunk_data.buffer.size()));
//收到的数据应该是等于之前定义的块大小 或者最后一个块(不足一个块大小)
//否则 数据不足 return
if (len < header_len + offset + more) {
//need more data
return ptr;
}
if (more) {
chunk_data.buffer.append(ptr + header_len + offset, more);
}
ptr += header_len + offset + more;
len -= header_len + offset + more;
//如果已经完整的收到了一个块数据(块数据的荷载接收完成),则调用handle_chunk处理
if (chunk_data.buffer.size() == chunk_data.body_size) {
//frame is ready
_now_stream_index = chunk_data.stream_index;
chunk_data.time_stamp = time_stamp + (chunk_data.is_abs_stamp ? 0 : chunk_data.time_stamp);
//保存chunk上下文
last_packet = now_packet;
if (chunk_data.body_size) {
handle_chunk(std::move(now_packet));
} else {
now_packet = nullptr;
}
}
}
return ptr;
}