SRS流媒体服务器——源码分析——基本流程简单分析
前言
部门打算用SRS搭建直播源站,由我负责完成,所以借此机会学习SRS相关内容,对我也是提升的机会,最近也有动手在自己的阿里云服务器搭建各个模式的集群和看了一些源码。
我自己是一个记性差的人,所以喜欢总结和整理,才没那么容易忘记,并且做了就要求尽自己最大的努力做好。
虽然很多内容SRS官网wiki都有很详细的说明了,但源码分析这部分好像没怎么看见。
SRS的目标是降低音视频的门槛,我的目标的降低SRS学习门槛,所以我想开一个系列,分为三部分:
SRS环境搭建。
SRS源码分析。
SRS实战过程遇到的问题及解决办法。
自己也还是初学者,所以有很多地方理解不够到位或者说的不对的地方麻烦请指出,或者有什么好的建议也可以说出来,我会去改进,希望能一起进步。
如果流程图无法放大请看语雀文档,那可以将流程图放大 《1. SRS基本流程》
第一篇SRS源码分析先整理个大体流程,后续会详细分析RTMP握手,建联,推拉流内容。
目录
main()、domain()和run_master()
SrsServer::listen()
SrsServer::listen_rtmp()
SrsBufferListener::listen()、SrsTcpListener::listen()
SrsTcpListener::cycle()
SrsBufferListener::on_tcp_client()
SrsServer::accept_client()、SrsServer::fd2conn()
SrsSTCoroutine::start()、SrsConnection::cycle()
SrsRtmpConn::do_cycle()
0. 基本流程图
1. main()、domain()和run_master()
main函数所在文件在main/srs_main_server.cpp中。
定义了一些全局变量: a. _srs_config:全局配置文件 b. _srs_log:全局的log文件
main() 调用 domain() 执行流程,下面的domain()内部分函数作用解析,因为不是主流程,不过过多介绍
_srs_config->parse_options(argc, argv) //解析命令行参数
_srs_config->get_work_dir() //设置工作目录以及当前目录
_srs_config->initialize_cwd()
_srs_log->initialize() //初始化log
重点是会创建SrsServer对象并运行
_srs_server = new SrsServer();
run(_srs_server)
创建SrsServer时还会初始化http_api_mux和http_server
http_api_mux = new SrsHttpServeMux(); // HTTP请求多路复用器,不是http拉流的
http_server = new SrsHttpServer(this); // http服务
run(SrsServer* svr) 会初始化服务器和获取守护进程配置in_daemon(默认为false),如果in_daemon为false,直接执行 run_master(SrsServer* svr)
svr->initialize(NULL) //初始化服务器
_srs_config->get_daemon() //获取守护进行配置,默认false,如果为true, srs将fork子进程,让子进程执行run_master
run_master(svr)
run_master(SrsServer* svr) 函数中,服务器做一些初始化工作并调用listern监听客户端的连接,然后调用do_cycle函数(死循环),做一些监控,更新时间及缓存等。
svr->initialize_st() //初始化st协程库
svr->initialize_signal() //初始化信号
svr->acquire_pid_file() //将pid线程写入文件
svr->listen() //监听客户端请求
svr->register_signal() //注册信号
svr->http_handle() //注册http的处理模块
svr->ingest() //开启流采集
svr->cycle() //消息循环处理
初始化signal时,在使用state-threads时需要将信号转化为IO,并创建一个协程处理信号IO。 a. svr->initialize_signal() 进行初始化信号,里面的signal_manager->initialize() 创建pipe。 b. svr->register_signal() 进行注册信号,里面的signal_manager->start() 注册信号并启动信号监听的协程。 c. SrsSignalManager::cycle() 进行最终的监听,循环执行监听操作,调用 SrsServer::on_signal() d. SrsServer::on_signal() 会对每一种信号检测,如果信号发生,设置相应的bool变量为true e. 最后在SrsServer::do_cycle()中检查信号,并处理
重点关注 SrsServer::listen()
2. SrsServer::listen()
SrsServer::listen() 会监听rtmp/http等客户端请求。
listen_rtmp() //监听rtmp
listen_http_api() //监听http
listen_http_stream() //监听http-stream
listen_stream_caster() //监听转换流
conn_manager->start() //启动连接管理的协程
先分析rtmp推拉流过程,所以重点关注 SrsServer::listen_rtmp(),其他后面用到会介绍,比如listen_http_stream()。
3. SrsServer::listen_rtmp()
SrsServer::listen_rtmp() 函数如下
srs_error_t SrsServer::listen_rtmp()
{
srs_error_t err = srs_success;
// stream service port. 获取配置文件中要监听的端口列表
std::vector<std::string> ip_ports = _srs_config->get_listens();
srs_assert((int)ip_ports.size() > 0);
//关闭SrsListenerRtmpStream类型的监听器,从listeners管理器中删除监听对象。
close_listeners(SrsListenerRtmpStream);
for (int i = 0; i < (int)ip_ports.size(); i++) { //遍历ip_ports列表,父类 SrsListener 的指针listener 指向新构造的子类 SrsBufferListener 的对象
SrsListener* listener = new SrsBufferListener(this, SrsListenerRtmpStream);
listeners.push_back(listener); //加入listeners管理器列表
int port; string ip; //分割 ip 地址(如果有的话)和 port 端口
srs_parse_endpoint(ip_ports[i], ip, port);
//多态:调用子类 SrsBufferListener 的成员函数 listen
if ((err = listener->listen(ip, port)) != srs_success) {
srs_error_wrap(err, "rtmp listen %s:%d", ip.c_str(), port);
}
}
return err;
}
监听的类型有:
// The listener type for server to identify the connection,
// that is, use different type to process the connection.
enum SrsListenerType
{
// RTMP client,
SrsListenerRtmpStream = 0,
// HTTP api,
SrsListenerHttpApi = 1,
// HTTP stream, HDS/HLS/DASH
SrsListenerHttpStream = 2,
// UDP stream, MPEG-TS over udp.
SrsListenerMpegTsOverUdp = 3,
// TCP stream, RTSP stream.
SrsListenerRtsp = 4,
// TCP stream, FLV stream over HTTP.
SrsListenerFlv = 5,
};
close_listeners是将listeners中类型为type的元素移除。
void SrsServer::close_listeners(SrsListenerType type)
{
std::vector<SrsListener*>::iterator it;
for (it = listeners.begin(); it != listeners.end();) {
SrsListener* listener = *it;
if (listener->listen_type() != type) { //不同type,continue
++it;
continue;
}
srs_freep(listener);
it = listeners.erase(it); //从listeners移除(vector)
}
}
4. SrsBufferListener::listen()、SrsTcpListener::listen()
listern_rtmp() 调用调用 SrsBufferListener 的 listen() 监听
srs_error_t SrsBufferListener::listen(string i, int p) //i默认为"0.0.0.0"
{
srs_error_t err = srs_success;
ip = i;
port = p;
srs_freep(listener);
listener = new SrsTcpListener(this, ip, port); //创建TCP监听
if ((err = listener->listen()) != srs_success) { //进入监听
return srs_error_wrap(err, "buffered tcp listen");
}
string v = srs_listener_type2string(type);
srs_trace("%s listen at tcp://%s:%d, fd=%d", v.c_str(), ip.c_str(), port, listener->fd());
return err;
}
listener 会调用SrsTcpListener::listen()
srs_error_t SrsTcpListener::listen()
{
srs_error_t err = srs_success;
if ((err = srs_tcp_listen(ip, port, &lfd)) != srs_success) { //创建监听的fd,并将fd注册到st库上
return srs_error_wrap(err, "listen at %s:%d", ip.c_str(), port);
}
srs_freep(trd);
trd = new SrsSTCoroutine("tcp", this); //创建一个协程
if ((err = trd->start()) != srs_success) { //启动协程,进入SrsSTCoroutine::cycle()
return srs_error_wrap(err, "start coroutine");
}
return err;
}
SrsTcpListener类进行实际的监听,通过socket->bind->listen(在srs_tcp_listen函数中完成)创建监听的fd,并将fd注册到st库上,之后fd上的事件都有st库监听并处理。
创建tcp协程,用于处理连接,协程启动,并进入 SrsSTCoroutine::cycle() 函数。 a. cycle()函数用于处理客户端连接。
部分函数注释如下:
srs_tcp_listen(ip, port, &lfd)
fd = socket(r->ai_family, r->ai_socktype, r->ai_protocol) //创建socket
do_srs_tcp_listen(fd, r, pfd)
srs_fd_closeexec(fd) //设置FD_CLOEXEC
srs_fd_reuseaddr(fd) //重复使用fd
bind(fd, r->ai_addr, r->ai_addrlen) //绑定服务ip和端口
::listen(fd, SERVER_LISTEN_BACKLOG) //在_fd上开启监听
(*pfd = srs_netfd_open_socket(fd)) //将fd注册到st库上,以后这个fd的所有请求都交由库处理
trd = new SrsSTCoroutine("tcp", this) //创建一个协程
trd->start() //启动协程,进入SrsSTCoroutine::cycle()
5. SrsTcpListener::cycle()
SrsSTCoroutine::cycle() 最后会调用到 SrsTcpListener::cycle()
SrsTcpListener::cycle() 监听协程接受连接请求后将执行逻辑交给BufferListener处理。
srs_error_t SrsTcpListener::cycle()
{
srs_error_t err = srs_success;
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "tcp listener");
}
//接收连接
srs_netfd_t fd = srs_accept(lfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT);
if(fd == NULL){
return srs_error_new(ERROR_SOCKET_ACCEPT, "accept at fd=%d", srs_netfd_fileno(lfd));
}
if ((err = srs_fd_closeexec(srs_netfd_fileno(fd))) != srs_success) {
return srs_error_wrap(err, "set closeexec");
}
//这个监听协程只是处理连接请求,具体的执行逻辑交给BufferListener处理
if ((err = handler->on_tcp_client(fd)) != srs_success) {
return srs_error_wrap(err, "handle fd=%d", srs_netfd_fileno(fd));
}
}
return err;
}
6. SrsBufferListener::on_tcp_client()
SrsBufferListener::on_tcp_client() 代码如下:
srs_error_t SrsBufferListener::on_tcp_client(srs_netfd_t stfd)
{
srs_error_t err = server->accept_client(type, stfd); //交给SrsServer接受处理
if (err != srs_success) {
srs_warn("accept client failed, err is %s", srs_error_desc(err).c_str());
srs_freep(err);
}
return srs_success;
}
最终调用SrsServer的accept_client处理
7. SrsServer::accept_client()、SrsServer::fd2conn()
SrsServer::accept_client() 代码如下: a. 先根据type获取连接的SrsConnection b. 将SrsConnection加入conns,conns存放所有的连接 c. 为每一个SrsConnection开启一个连接协程
srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd)
{
srs_error_t err = srs_success;
SrsConnection* conn = NULL;
if ((err = fd2conn(type, stfd, &conn)) != srs_success) { //根据type获取连接的SrsConnection
if (srs_error_code(err) == ERROR_SOCKET_GET_PEER_IP && _srs_config->empty_ip_ok()) {
srs_close_stfd(stfd); srs_error_reset(err);
return srs_success;
}
return srs_error_wrap(err, "fd2conn");
}
srs_assert(conn);
// directly enqueue, the cycle thread will remove the client.
conns.push_back(conn); // 加入conns,conns存放所有的连接
// cycle will start process thread and when finished remove the client.
// @remark never use the conn, for it maybe destroyed.
if ((err = conn->start()) != srs_success) { //每个连接开启一个连接协程
return srs_error_wrap(err, "start conn coroutine");
}
return err;
}
获取连接的主要代码: a. 因为现在type是SrsListenerRtmpStream,所有conn返回的是SrsRtmpConn。
srs_error_t SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsConnection** pconn)
{
srs_error_t err = srs_success;
int fd = srs_netfd_fileno(stfd);
string ip = srs_get_peer_ip(fd);
// for some keep alive application, for example, the keepalived,
// will send some tcp packet which we cann't got the ip,
// we just ignore it.
if (ip.empty()) { //无法获取ip则进行忽略
return srs_error_new(ERROR_SOCKET_GET_PEER_IP, "ignore empty ip, fd=%d", fd);
}
// check connection limitation.
int max_connections = _srs_config->get_max_connections(); //获取最大连接数
if (handler && (err = handler->on_accept_client(max_connections, (int)conns.size())) != srs_success) {
return srs_error_wrap(err, "drop client fd=%d, max=%d, cur=%d for err: %s",
fd, max_connections, (int)conns.size(), srs_error_desc(err).c_str());
}
if ((int)conns.size() >= max_connections) { //如果超过了连接限制,直接拒绝连接
return srs_error_new(ERROR_EXCEED_CONNECTIONS,
"drop fd=%d, max=%d, cur=%d for exceed connection limits",
fd, max_connections, (int)conns.size());
}
// avoid fd leak when fork.
// @see https://github.com/ossrs/srs/issues/518
if (true) {
int val;
if ((val = fcntl(fd, F_GETFD, 0)) < 0) {
return srs_error_new(ERROR_SYSTEM_PID_GET_FILE_INFO, "fnctl F_GETFD error! fd=%d", fd);
}
val |= FD_CLOEXEC;
if (fcntl(fd, F_SETFD, val) < 0) {
return srs_error_new(ERROR_SYSTEM_PID_SET_FILE_INFO, "fcntl F_SETFD error! fd=%d", fd);
}
}
if (type == SrsListenerRtmpStream) {
*pconn = new SrsRtmpConn(this, stfd, ip); //创建RTMP连接
} else if (type == SrsListenerHttpApi) {
*pconn = new SrsHttpApi(this, stfd, http_api_mux, ip);
} else if (type == SrsListenerHttpStream) {
*pconn = new SrsResponseOnlyHttpConn(this, stfd, http_server, ip);
} else {
srs_warn("close for no service handler. fd=%d, ip=%s", fd, ip.c_str());
srs_close_stfd(stfd);
return err;
}
return err;
}
8. SrsSTCoroutine::start()、SrsConnection::cycle()
SrsConnection::start() 代码如下:
srs_error_t SrsConnection::start()
{
srs_error_t err = srs_success;
if ((err = skt->initialize(stfd)) != srs_success) {
return srs_error_wrap(err, "init socket");
}
//启动conn协程,最后会执行到SrsConnection::cycle()
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "coroutine");
}
return err;
}
SrsConnection::cycle() 代码如下:
srs_error_t SrsConnection::cycle()
{
srs_error_t err = do_cycle(); //SrsRtmpConn::do_cycle
// Notify manager to remove it.
manager->remove(this);
// success.
if (err == srs_success) {
srs_trace("client finished.");
return err;
}
// client close peer.
// TODO: FIXME: Only reset the error when client closed it.
if (srs_is_client_gracefully_close(err)) {
srs_warn("client disconnect peer. ret=%d", srs_error_code(err));
} else if (srs_is_server_gracefully_close(err)) {
srs_warn("server disconnect. ret=%d", srs_error_code(err));
} else {
srs_error("serve error %s", srs_error_desc(err).c_str());
}
srs_freep(err);
return srs_success;
}
9. SrsRtmpConn::do_cycle()
如果有推流事件,就会进入SrsRtmpConn::do_cycle(),此函数负责具体执行RTMP流程,包括握手,接收connect请求,发送response connect响应,以及接收音视频流数据等处理。
srs_error_t SrsRtmpConn::do_cycle()
{
srs_error_t err = srs_success;
srs_trace("RTMP client ip=%s, fd=%d", ip.c_str(), srs_netfd_fileno(stfd));
rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT); //设置接收超时时间
rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT); //设置发送超时时间
if ((err = rtmp->handshake()) != srs_success) { //rtmp握手
return srs_error_wrap(err, "rtmp handshake");
}
uint32_t rip = rtmp->proxy_real_ip();
if (rip > 0) {
srs_trace("RTMP proxy real client ip=%d.%d.%d.%d",
uint8_t(rip>>24), uint8_t(rip>>16), uint8_t(rip>>8), uint8_t(rip));
}
SrsRequest* req = info->req;
if ((err = rtmp->connect_app(req)) != srs_success) { //握手成功后,srs会接收并解析客户端发送过来的RTMP消息connect
return srs_error_wrap(err, "rtmp connect tcUrl");
}
// set client ip to request.
req->ip = ip;
srs_trace("connect app, tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%d, app=%s, args=%s",
req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(),
req->schema.c_str(), req->vhost.c_str(), req->port,
req->app.c_str(), (req->args? "(obj)":"null"));
// show client identity
if(req->args) {
std::string srs_version;
std::string srs_server_ip;
int srs_pid = 0;
int srs_id = 0;
SrsAmf0Any* prop = NULL;
if ((prop = req->args->ensure_property_string("srs_version")) != NULL) {
srs_version = prop->to_str();
}
if ((prop = req->args->ensure_property_string("srs_server_ip")) != NULL) {
srs_server_ip = prop->to_str();
}
if ((prop = req->args->ensure_property_number("srs_pid")) != NULL) {
srs_pid = (int)prop->to_number();
}
if ((prop = req->args->ensure_property_number("srs_id")) != NULL) {
srs_id = (int)prop->to_number();
}
if (srs_pid > 0) {
srs_trace("edge-srs ip=%s, version=%s, pid=%d, id=%d",
srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);
}
}
//服务循环
if ((err = service_cycle()) != srs_success) {
err = srs_error_wrap(err, "service cycle");
}
srs_error_t r0 = srs_success;
if ((r0 = on_disconnect()) != srs_success) {
err = srs_error_wrap(err, "on disconnect %s", srs_error_desc(r0).c_str());
srs_freep(r0);
}
// If client is redirect to other servers, we already logged the event.
if (srs_error_code(err) == ERROR_CONTROL_REDIRECT) {
srs_error_reset(err);
}
return err;
}