LIVE555源代码研究之四:MediaServer (一)
从本篇文章開始我们将从简单server程序作为突破点,深入研究LIVE555源代码。
从前面的文章我们知道。不论什么一个基于LIVE555库实现的程序都须要实现自己的环境类和调度类。
这里。server程序就使用了BasicEnvironment库中实现的简单环境类和简单调度类。
说它简单,是由于该环境类只实现了将错误信息输出到控制台。而调度类只通过select模型实现socket的读写。
以下我们来看下简单环境类BasicEnvironment和简单调度类BasicTaskScheduler是怎样实现的。
打开live555MediaServer.cpp能够看到熟悉的main函数。main函数比較简单。除去错误信息输出代码。有效代码仅十几行而已。
为了便于分析。我们仅列出精简后的代码
int main(int argc, char** argv) { TaskScheduler* scheduler = BasicTaskScheduler::createNew();//创建详细调度类 UsageEnvironment* env = BasicUsageEnvironment::createNew(*scheduler); RTSPServer* rtspServer; portNumBits rtspServerPortNum = 554;//默认端口554 rtspServer = DynamicRTSPServer::createNew(*env, rtspServerPortNum, authDB); if (rtspServer == NULL) { rtspServerPortNum = 8554;//若554被占用。尝试使用8554port rtspServer = DynamicRTSPServer::createNew(*env, rtspServerPortNum, authDB); } if(rtspServer->setUpTunnelingOverHTTP(80)//rtsp over http 监听80port ||rtspServer->setUpTunnelingOverHTTP(8000) || rtspServer->setUpTunnelingOverHTTP(8080)) {
}
env->taskScheduler().doEventLoop(); return 0; }
能够看到開始的两行创建了调度器和环境类对象。
CreateNew为static成员变量,内部实现不过new一个对象而已。
此处採用简单工厂模式。第二行在创建详细环境类时。将第一行创建的子调度类指针传入。这是由于环境类内部维护了抽象调度类指针,这样不论什么能够引用到环境类的类都能够输出错误信息同一时候也能够将自己增加到调度中。
一、BasicUsageEnvironment类
类继承关系例如以下图:
UsageEnvironment抽象类,定义了相关接口,比較简单。
UsageEnvironment0类定义一个存储错误信息的缓冲区,同一时候实现了一系列在UsageEnvironment中定义的操作该缓冲区的方法。
BasicUsageEnvironment类重定义了输出操作符。
用于向控制台输出错误信息。
二、BasicTaskScheduler类
类继承关系例如以下图
watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvaXRoemhhbmc=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast" />
BasicTaskScheduler用于程序的调度,是整个程序的发动机。在TaskScheduler类中定义的doEventLoop虚成员函数。在TaskScheduler0中实现,用于循环读取任务,并进行调度。
void BasicTaskScheduler0::doEventLoop(char* watchVariable) {
while (1)
{
if (watchVariable != NULL && *watchVariable != 0) break;
SingleStep();
}
}
SingleStep在BasicTaskScheduler中实现。每运行一次会调度一个任务运行。
任务调度类中定义了三种类型的任务。分别为:延迟任务、socket任务和事件任务。
在TaskSheduler0中分别定义了DelayQueue(延迟队列)、HandlerSet(存储socket和相应处理函数的映射)、保存事件任务的数组这三种结构。SingleStep函数每运行一次都会从这三种结构搜索满足条件的一个任务。并运行相应处理函数。
接下来我们具体介绍下这三种结构:
1.延迟队列DelayQueue
DelayQueue继承自DelayQueueEntry。
用于管理延迟队列中的每一项,提供增删改查功能。
DelayQueueEntry代表延迟队列中的一项。其成员为fDeltaTimeRemaining。表示该项任务的剩余时间。
AlarmHandler也继承自DelayQueueEntry,是真正的存储在DelayQueue中的延迟项。其成员为TaskFunc* fProc和void* fClientData。当延迟项的剩余时间为0时,fProc会被调用,fClientData为该函数的參数。TaskFunc为处理函数,其定义例如以下:
void TaskFunc(void* clientData);
当该延迟项剩余时间为0时。相应处理函数就会被调用。
调用处理函数的成员函数为在AlarmHandler实现的handleTimeout。事实上现代码例如以下:
virtual void handleTimeout() {
(*fProc)(fClientData); //个人觉得推断一下fProc是否为空会更好 DelayQueueEntry::handleTimeout(); //调用基类handleTimeout
}
DelayQueue的实现并不复杂,但须要注意的是:存储在延迟队列中的每一项是依照剩余时间递增的顺序存储的,同一时候后一项仅保存与前一项的时间差。
举例说明:正常情况下。在我们构建的自己延迟队列时,如果各项剩余时间分别为:1、3、5、7、9。
依照DelayQueue的算法。保存在DelayQueue中保存的每一项的剩余时间为:1、2、2、2、2。
明确了此算法后再看下DelayQueue的各种方法实现,均是对链表的各种处理。是不是感觉非常easy。
DelayQueue中一个非常重要的方法是Synchronize()。
该方法会在运行SingleStep时被调用,递减每一项的剩余时间。然后再检查第一项的剩余时间。当第一项的剩余时间小于等于0时。其相应处理函数就会被调用。同一时候该项会被从延迟队列清除出去。
最后一个须要注意的地方:DelayQueue继承自DelayQueueEntry,并作为整个延迟队列的表头,在DelayQueue的构造函数中,调用了基类DelayQueueEntry的构造函数,同一时候传入參数ETERNITY。
其定义例如以下:
const DelayInterval ETERNITY(INT_MAX, MILLION-1);
宏INT_MAX为int类型的最大值。 而MILLION定义例如以下:
static const int MILLION = 1000000;
因此我们能够知道该项的延迟时间是最大的。不论什么新插入的项都会插入到该项的前面。这导致的结果就是在遍历时不用检查是否到达最后一项。插入的不论什么不论什么剩余时间的延迟项都会被插入到最后一项之前。且延迟队列中总会存在一项。其它类能够调用schedulerDelayedTask向调度器加入一项延迟任务:
TaskToken scheduleDelayedTask(int64_t microseconds, TaskFunc*proc, void* clientData);
值为TaskToken,用于标识每个延迟任务。相似于任务的ID。定义例如以下:
typedef void* TaskToken;
scheduleDelayedTask用于取消某延迟任务,參数为TaskToken:
virtual void unscheduleDelayedTask(TaskToken& prevTask);
scheduleDelayedTask用于使用新的时间又一次调度某延迟项,又一次调度后原来的延迟时间不再起作用:
virtual void rescheduleDelayedTask(TaskToken& task, int64_t microseconds, TaskFunc* proc, void* clientData);
2. Socket任务处理
Socket事件保存在HandlerSet * fHandlers开头的链表中。
HandlerSet内部定义了HandlerDescriptor成员。该成员内部定义了例如以下成员:
int socketNum; int conditionSet; TaskScheduler::BackgroundHandlerProc* handlerProc; void* clientData;
socketNum为要检測的socket,conditionSet为满足条件的socket的状态,HandlerProc为发生相应事件时须要调用的处理函数, clientData为传递给事件处理函数的參数。其定义例如以下:
typedef void BackgroundHandlerProc(void* clientData, int mask);
Socket的任务处理比較简单。相信大伙也能够看懂。其它类能够调用turnOnBackgroundReadHandling向调度器加入socket任务:
void turnOnBackgroundReadHandling(int socketNum, BackgroundHandlerProc* handlerProc, void* clientData);
已经加入调度器的socket任务能够调用turnOffBackgroundReadHandling取消:
void turnOffBackgroundReadHandling(int socketNum)
但上述两个函数已被废弃,仅为了保持向前兼容而加以保留,被以下的函数代替:
//向调度器加入socket任务: virtual void setBackgroundHandling(int socketNum, int conditionSet, BackgroundHandlerProc* handlerProc, void* clientData); //disableBackgroundHandling用于取消对某socket事件的调度: void disableBackgroundHandling(int socketNum);
3. 事件任务处理
为了实现事件任务,定义了下面结构:
EventTriggerId fTriggersAwaitingHandling, fLastUsedTriggerMask; TaskFunc* fTriggeredEventHandlers[MAX_NUM_EVENT_TRIGGERS]; void* fTriggeredEventClientDatas[MAX_NUM_EVENT_TRIGGERS]; unsigned fLastUsedTriggerNum;
宏 MAX_NUM_EVENT_TRIGGERS值为32,能够知道事件任务最大支持32项。
fTriggeredEventHandlers是一个有32个项的数组。
每一个项保存一个TaskFunc类型的任务处理函数:
typedef void TaskFunc(void* clientData);
该任务处理函数与延迟队列中的任务处理函数定义同样。fTriggeredEventClientDatas数组中保存相应任务处理函数的參数。
EventTriggerId fTriggersAwaitingHandling, fLastUsedTriggerMask; typedef u_int32_t EventTriggerId;
EventTriggerId 32位无符号整形。事实上它是一个32bit的位图。
每一位相应fTriggeredEventClientDatas数组中的每一项。当相应位为1时,表示该数组中的相应位置存在一项。使用位图,能够节省存储空间。
但随之而来的问题就是对位图的操作也变的相应复杂。假设让我来搞的话,我宁愿定义一个32项的bool类型数组。
其它类能够调用createEventTrigger向TaskScheduler中加入一项事件任务。
EventTriggerID createEventTrigger(TaskFunc*eventHandlerProc);
使用deleteEventTrigger来删除某事件对象:
virtual void deleteEventTrigger(EventTriggerId eventTriggerId) = 0;
前面我们说过BasicTaskScheduler实现了SingleStep。SingleStep驱动了整个程序的运行。有了前面的铺垫,相信读懂它应该不成问题。对于某些细节问题,此时能够不必深究,可等以后对整个架构有了全局的认识之后,再具体探究。
因为篇幅限制,在此不具体介绍。
总结起来在SingleStep运行了下面动作:
1.调用select检查fReadSet、fWriteSet和fExceptionSet看是否有满足条件加入的socket。然后遍历HandlerSet检查每一个socket的状态。假设状态得到满足即说明在该socket上发生了相应的事件,然后调用与该socket相应的处理函数。
2. 检查事件任务数组是否存在可用项,如存在则调用相应处理函数。
3. 检查延时队列。看是否存在剩余时间为0的项,如找到则运行相应处理函数。然后将该项删除。
SingleStep每次仅仅会指向上述三种类型的事件中的一项。延迟任务运行后即会被从延迟队列删除。
其它两种类型的任务却仍然在任务队列中等待着下次触发。
2014.8.19于浙江杭州