会议平台后端优化方案

会议平台后端优化方案

通过RTC的学习,我了解到了端对端技术,就想着做一个节省服务器资源的会议平台
之前做了这个项目,快手二面被问到卡着不知如何介绍,便有了这篇文章

分析当下机制

相对于传统视频平台(SFU,MCU架构)

之前帮学长做压测RTC监控时,10M校园网宽带,标清画质只能跑到22个(MCU架构),

毕竟是监控就只能卡在自己宽带下了

image-20241001001901382

所有资源从服务器拉取,大量的上下传极大的加重了服务器压力,

目前行业内如直播方式更多是使用分布式服务器,

如我在北京开了直播,一部分观众在广东一部分在上海,

此时,视频流将先传到北京服务器转发到广东服务器和上海服务器进行一个服务器的分压处理,当然下面还有更多子服务器去分压,

而这种形式对于我们这种学生开发者是行不通的,我们只能省吃俭用租用一台公有云,

而在之前开发 ESP32 Mesh 组网时来了灵感

ESP-MESH - ESP32 - — ESP-IDF 编程指南 v4.3.1 文档 (espressif.com)

image-20241001002741260

我们如果做一个多人群聊(类似会议平台),只需要使用每个设备进行 P2P 的 RTC 推拉流可以完美减缓服务器压力,

而服务器只需要负责信令汇总交换,还有客户端检测,保活的操作即可

甚至我的轻量级服务器也能跑上(压测时,用到了redis缓冲mysql)

也许好的想法总是相似的,网上看到了很多大佬也这样去做了(MESH架构)

image-20241001003423135

我第一次尝试使用了 golang gin 去写一个信令交互(也是有golang直接能调库使用RTC的原因),

实现的并不是很理想,

做了票据凭证登录(邮箱),

设计了 Mysql数据库(大家平时多练习,面试要手撕),

最后实现了点对点,并不是很理想。

毕竟跑在一个2核1G内存的超级轻量级服务器上的,

配合其他项目。。。。单是跑起来就直接爆了几次内存,(也不太敢和别人聊这个项目最后成果)

更别提对他进行压测,

后来重新捡起来后,打算直接融入到我的博客网站其中一个路由中,

实现了一个保障会议成员隐私,使用完无痕,的会议平台。(仅供学习参考)

image-20241001013250207

会议创建机制

一个post创建会议

会议号类似秘钥,名字可填可不填,不填我爬 ip 用 ip代指,

房间创建后永久保留唯一留痕在这里(不是说无痕吗?创建的人还是要跟踪记录爬取一下的哈哈哈,之前经历了sql注入,还好做了防注入 )

a20f0822cbcb1e32db5ec3580c9d65e

爬取ip以及地点 看是谁创的

image-20241001005056686

image-20241001005121677

这里meeting_id作为索引,指向另一张表参加此id的人群,

来完成一个快速查表工作,

当然此时还没加入会议,

加入会议机制

对于保活,和及时提出会议,像我个人感受

我每次开完会更希望直接关闭浏览器实现一个全自动退出会议的方式,

image-20241001012330563

关闭浏览器后,检测断开直接删除

image-20241001012344316

我之前开发 esp32 AP 模式时(哈哈哈还是esp)

需要通过wifi节点的web直接对esp进行操作,如小车前进后退

使用了websocket,

我便考虑了websocket,他可以实时检测,又是全双工,

完美的事物总是有缺陷的一面,

我需要给他额外开端口,

这可不是什么好方式,又要去申请一张证书,不然万一来个中间人攻击那大家的隐私不是没了吗?

此时我发现了SocketIO 基于websocket,可跑在443端口的神器,

python示例代码:

首先,便是相应库使用,以及对基础连接进行反应

from flask import Flask
from flask_socketio import SocketIO, emit,session

app = Flask(__name__)
socketio = SocketIO(app)

@socketio.on('connect')
def handle_connect():
    print('A client has connected.')

接下来我们就要处理加入事件

拿到消息后,存入到session中,以便后续对断开操作

@socketio.on('join')
def handle_join(data):
    
    meeting_id = data['meeting_id']
    user_id = data['user_id']
    SDP = data['SDP']
    CANDIDATE = data['CANDIDATE']
    session['user_id'] = user_id
    session['meeting_id'] = meeting_id
    print(f"User {user_id} try to join meeting {meeting_id}")

连接数据库

把这个用户归类到相应room中

以便后续处理

connection = connector.connect(**db_config)
        cursor = connection.cursor()
        
        cursor.execute("SELECT * FROM meetings WHERE meeting_id = %s", (meeting_id,))
        meeting = cursor.fetchone()
        if not meeting:
            socketio.emit('join_response', {"error": "会议不存在"}, to=request.sid)
            return
        

        print(f'a client {user_id} has joined the meeting {meeting_id}')
        # 将用户加入房间
        join_room(meeting_id)
        # 插入参与者到数据库
        cursor.execute("INSERT INTO participants (meeting_id, user_id,SDP,CANDIDATE) VALUES (%s, %s,%s,%s)", (meeting_id, user_id,SDP,CANDIDATE))
        connection.commit()

拉取room的所有人进行广播,更新参与者,把拉取的数据整理成字典,传至前端

#返回此会议室的所有人的信令
        cursor.execute("SELECT user_id, SDP, CANDIDATE FROM participants WHERE meeting_id = %s",(meeting_id,))
        room_participants = cursor.fetchall()
        dist={}
        for i in room_participants:
            dist[i[0]]={"SDP":i[1],"CANDIDATE":i[2]}
        
        # 成功后发送确认消息
        socketio.emit('join_response', {"status": "success", "message": f"User {user_id} joined meeting {meeting_id}.","room_participants":dist}, to=request.sid)


    # 通知所有房间内的客户端更新参与者列表
    socketio.emit('update_participants', participants_list[meeting_id], room=meeting_id)

image-20241001012453533

{
	王宇:{
		Can:“ssss”,
		Sdp:“sss”
	},
	王:{
		Can:“ssss”,
		Sdp:“sss”
	},。。。。。
}

检测断联,从session读取操作对象信息

@socketio.on('disconnect')
def handle_disconnect():
    
    user_id = session.get('user_id')
    meeting_id = session.get('meeting_id')
    print(f'A client {user_id} has disconnected.')
    leave_room(meeting_id)

        # 从数据库中删除参与者
           connection = connector.connect(**db_config)
           cursor = connection.cursor()
            
           cursor.execute("DELETE FROM participants WHERE meeting_id = %s AND user_id = %s", (meeting_id, user_id))
           connection.commit()


        # 通知所有客户端更新参与者列表
        socketio.emit('update_participants', participants_list[meeting_id], room=meeting_id)


通过以上能简单完成我们想要的效果.

测试并进行二次开发

我先对上面板块进行单元测试

发现没有任何问题,

将其与博客网站进行整体测试,

最终没有问题直接上传代码更新了服务器(2c 1g 30MB),

我们对其进行压测,

在无连接时,内存占用到了700mb

哈哈哈至少还有250mb给我们用

(我这里把 ip 限制关闭和 uwsgi 限制客户端个数关闭了 ,本来只能一个ip五个坑位,这里放开以便测试)

通过python自动构建了客户端与其连接,

当产生到320+客户端的时候,成功崩了

但通过查询内存占用的时候(如下图)

发现还不到20Mb,零头都没到啊,看来不是内存原因

image-20241001014730406

30/8大概在3.8mb/s

大概问题出现在宽带,

但相对传统架构(MCU)真的节省太多昂贵的服务器资源了

上一篇:宁夏众智科技OA办公系统存在SQL注入漏洞


下一篇:【TypeScript学习】TypeScript基础学习总结二