1 重点类
epoll管理多个socket连接 class EventPoller : public TaskExecutor, public AnyStorage, public std::enable_shared_from_this<EventPoller> { public: using Ptr = std::shared_ptr<EventPoller>; friend class TaskExecutorGetterImp; ~EventPoller(); /** * 获取EventPollerPool单例中的第一个EventPoller实例, * 保留该接口是为了兼容老代码 * @return 单例 */ static EventPoller &Instance(); /** * 添加事件监听 * @param fd 监听的文件描述符 * @param event 事件类型,例如 Event_Read | Event_Write * @param eventCb 事件回调functional * @return -1:失败,0:成功 */ int addEvent(int fd, int event, PollEventCB eventCb); /** * 删除事件监听 * @param fd 监听的文件描述符 * @param delCb 删除成功回调functional * @return -1:失败,0:成功 */ int delEvent(int fd, PollDelCB delCb = nullptr); /** * 修改监听事件类型 * @param fd 监听的文件描述符 * @param event 事件类型,例如 Event_Read | Event_Write * @return -1:失败,0:成功 */ int modifyEvent(int fd, int event); /** * 异步执行任务 * @param task 任务 * @param may_sync 如果调用该函数的线程就是本对象的轮询线程,那么may_sync为true时就是同步执行任务 * @return 是否成功,一定会返回true */ Task::Ptr async(TaskIn task, bool may_sync = true) override ; /** * 同async方法,不过是把任务打入任务列队头,这样任务优先级最高 * @param task 任务 * @param may_sync 如果调用该函数的线程就是本对象的轮询线程,那么may_sync为true时就是同步执行任务 * @return 是否成功,一定会返回true */ Task::Ptr async_first(TaskIn task, bool may_sync = true) override ; /** * 判断执行该接口的线程是否为本对象的轮询线程 * @return 是否为本对象的轮询线程 */ bool isCurrentThread(); /** * 延时执行某个任务 * @param delayMS 延时毫秒数 * @param task 任务,返回值为0时代表不再重复任务,否则为下次执行延时,如果任务中抛异常,那么默认不重复任务 * @return 可取消的任务标签 */ DelayTask::Ptr doDelayTask(uint64_t delayMS, function<uint64_t()> task); /** * 获取当前线程关联的Poller实例 */ static EventPoller::Ptr getCurrentPoller(); /** * 获取当前线程下所有socket共享的读缓存 */ BufferRaw::Ptr getSharedBuffer(); /** * 获取poller线程id */ const thread::id& getThreadId() const; private: /** * 本对象只允许在EventPollerPool中构造 */ EventPoller(ThreadPool::Priority priority = ThreadPool::PRIORITY_HIGHEST); /** * 执行事件轮询 * @param blocked 是否用执行该接口的线程执行轮询 * @param regist_self 是否注册到全局map */ void runLoop(bool blocked , bool regist_self); /** * 内部管道事件,用于唤醒轮询线程用 */ void onPipeEvent(); /** * 切换线程并执行任务 * @param task * @param may_sync * @param first * @return 可取消的任务本体,如果已经同步执行,则返回nullptr */ Task::Ptr async_l(TaskIn task, bool may_sync = true,bool first = false) ; /** * 阻塞当前线程,等待轮询线程退出; * 在执行shutdown接口时本函数会退出 */ void wait() ; /** * 结束事件轮询 * 需要指出的是,一旦结束就不能再次恢复轮询线程 */ void shutdown(); /** * 刷新延时任务 */ uint64_t flushDelayTask(uint64_t now); /** * 获取select或epoll休眠时间 */ uint64_t getMinDelay(); private: class ExitException : public std::exception{ public: ExitException(){} ~ExitException(){} }; private: //标记loop线程是否退出 bool _exit_flag; //当前线程下,所有socket共享的读缓存 weak_ptr<BufferRaw> _shared_buffer; //线程优先级 ThreadPool::Priority _priority; //正在运行事件循环时该锁处于被锁定状态 mutex _mtx_runing; //执行事件循环的线程 thread *_loop_thread = nullptr; //事件循环的线程id thread::id _loop_thread_id; //通知事件循环的线程已启动 semaphore _sem_run_started; //内部事件管道 PipeWrap _pipe; //从其他线程切换过来的任务 mutex _mtx_task; List<Task::Ptr> _list_task; //保持日志可用 Logger::Ptr _logger; #if defined(HAS_EPOLL) //epoll相关 int _epoll_fd = -1; unordered_map<int, std::shared_ptr<PollEventCB> > _event_map; #else //select相关 struct Poll_Record { using Ptr = std::shared_ptr<Poll_Record>; int event; int attach; PollEventCB callBack; }; unordered_map<int, Poll_Record::Ptr> _event_map; #endif //HAS_EPOLL //定时器相关 multimap<uint64_t, DelayTask::Ptr> _delay_task_map; }; |
线程池 class EventPollerPool : public std::enable_shared_from_this<EventPollerPool>, public TaskExecutorGetterImp { public: using Ptr = std::shared_ptr<EventPollerPool>; ~EventPollerPool(){}; /** * 获取单例 * @return */ static EventPollerPool &Instance(); /** * 设置EventPoller个数,在EventPollerPool单例创建前有效 * 在不调用此方法的情况下,默认创建thread::hardware_concurrency()个EventPoller实例 * @param size EventPoller个数,如果为0则为thread::hardware_concurrency() */ static void setPoolSize(size_t size = 0); /** * 获取第一个实例 * @return */ EventPoller::Ptr getFirstPoller(); /** * 根据负载情况获取轻负载的实例 * 如果优先返回当前线程,那么会返回当前线程 * 返回当前线程的目的是为了提高线程安全性 * @param prefer_current_thread 是否优先获取当前线程 */ EventPoller::Ptr getPoller(bool prefer_current_thread = true); /** * 设置 getPoller() 是否优先返回当前线程 * 在批量创建Socket对象时,如果优先返回当前线程, * 那么将导致负载不够均衡,所以可以暂时关闭然后再开启 * @param flag 是否优先返回当前线程 */ void preferCurrentThread(bool flag = true); private: EventPollerPool() ; private: bool _preferCurrentThread = true; }; |
管道,进程通信 class PipeWrap { public: PipeWrap(); ~PipeWrap(); int write(const void *buf, int n); int read(void *buf, int n); int readFD() const { return _pipe_fd[0]; } int writeFD() const { return _pipe_fd[1]; } private: int _pipe_fd[2] = { -1,-1 }; void clearFD(); #if defined(_WIN32) int _listenerFd = -1; #endif // defined(_WIN32) }; |
2 webrtc play堆栈
void EventPoller::runLoop(bool blocked,bool regist_self)
bool Socket::attachEvent(const SockFD::Ptr &sock, bool is_udp) /lambda表达式
ssize_t Socket::onRead(const SockFD::Ptr &sock, bool is_udp) noexcept
void TcpServer::onAcceptConnection(const Socket::Ptr &sock) /lambda表达式
void HttpSession::onRecv(const Buffer::Ptr &pBuf)
void HttpRequestSplitter::input(const char *data,size_t len)
void HttpSession::onRecvContent(const char *data,size_t len)
void HttpSession::Handle_Req_POST(ssize_t &content_len) /lambda表达式
bool HttpSession::emitHttpEvent(bool doInvoke)
int NoticeCenter::emitEvent(const string &strEvent, ArgsType &&...args)
int EventDispatcher::emitEvent(ArgsType &&...args)
static inline void addHttpListener() /lambda表达式
static HttpApi toApi(const function<void(API_ARGS_STRING_ASYNC)> &cb)/lambda表达式
api_regist("/index/api/webrtc",[](API_ARGS_STRING_ASYNC)
void WebRtcPluginManager::getAnswerSdp(Session &sender, const string &type, const string &offer, const WebRtcArgs &args,const onCreateRtc &cb)
void play_plugin(Session &sender, const string &offer_sdp, const WebRtcArgs &args, const WebRtcPluginManager::onCreateRtc &cb)/lambda表达式
void MediaSource::findAsync(const MediaInfo &info, const std::shared_ptr<Session> &session, const function<void (const Ptr &)> &cb)
static void findAsync_l(const MediaInfo &info, const std::shared_ptr<Session> &session, bool retry,const function<void(const MediaSource::Ptr &src)> &cb)
WebRtcPlayer::Ptr WebRtcPlayer::create(const EventPoller::Ptr &poller,const RtspMediaSource::Ptr &src,const MediaInfo &info)
void WebRtcTransportImp::onCreate()
WebRtcTransport::onCreate()/lambda表达式
const Session::Ptr& UdpServer::createSession(
static void emitSessionRecv(const Session::Ptr &session, const Buffer::Ptr &buf)
void WebRtcSession::onRecv(const Buffer::Ptr &buffer)
void WebRtcTransport::inputSockData(char *buf, int len, RTC::TransportTuple *tuple)
void DtlsTransport::ProcessDtlsData(const uint8_t* data, size_t len)
inline bool DtlsTransport::CheckStatus(int returnCode)
inline bool DtlsTransport::ProcessHandshake()
inline void DtlsTransport::ExtractSrtpKeys(RTC::SrtpSession::CryptoSuite srtpCryptoSuite)
void WebRtcTransport::OnDtlsTransportConnected(
void WebRtcPlayer::onStartWebRTC()
void WebRtcTransportImp::onSendRtp(const RtpPacket::Ptr &rtp, bool flush, bool rtx)
void WebRtcTransport::sendRtpPacket(const char *buf, int len, bool flush, void *ctx)
void WebRtcTransportImp::onSendSockData(Buffer::Ptr buf, bool flush, RTC::TransportTuple *tuple)
ssize_t SocketHelper::send(Buffer::Ptr buf)
ssize_t Socket::send(Buffer::Ptr buf, struct sockaddr *addr, socklen_t addr_len, bool try_flush)
ssize_t Socket::send_l(Buffer::Ptr buf, bool is_buf_sock, bool try_flush)