开始原因
在尝试对高码流的视频进行rtsp推流后,opencv取流的时候出现:
[h264 @ 0x7fd990026040] left block unavailable for requested intra4x4 mode -1
[h264 @ 0x7fd990026040] error while decoding MB 0 18, bytestream 269
而低码流基本没有出现过,视频能顺利分析完,再经过不断优化后,最终能适应4M左右的码流传输,并且至少一小时不会间断。写这篇博客的时候已经过了一个多月了,也是为了总结之前有犯下的问题,还有一些个人经验。关于码流的定义,如下所示:
码流(Data Rate)是指视频文件在单位时间内使用的数据流量,也叫码率或码流率,通俗一点的理解就是取样率,是视频编码中画面质量控制中最重要的部分,一般我们用的单位是kb/s或者Mb/s。一般来说同样分辨率下,视频文件的码流越大,压缩比就越小,画面质量就越高。码流越大,说明单位时间内取样率越大,数据流,精度就越高,处理出来的文件就越接近原始文件,图像质量越好,画质越清晰,要求播放设备的解码能力也越高。
当然,码流越大,文件体积也越大,其计算公式是文件体积=时间X码率/8。例如,网络上常见的一部90分钟1Mbps码流的720P RMVB文件,其体积就=5400秒×1Mb/8=675MB。
通常来说,一个视频文件包括了画面及声音,例如一个RMVB的视频文件,里面包含了视频信息和音频信息,音频及视频都有各自不同的采样方式和比特率,也就是说,同一个视频文件音频和视频的比特率并不是一样的。而我们所说的一个视频文件码流率大小,一般是指视频文件中音频及视频信息码流率的总和。
以以国内最流行,大家最熟悉的RMVB视频文件为例,RMVB中的VB,指的是VBR,即Variable Bit Rate的缩写,中文含义是可变比特率,它表示RMVB采用的是动态编码的方式,把较高的采样率用于复杂的动态画面(歌舞、飞车、战争、动作等),而把较低的采样率用于静态画面,合理利用资源,达到画质与体积可兼得的效果。
码率和取样率最根本的差别就是码率是针对源文件来讲的。
网络流中断的问题分析日志为:
2020-11-19 10:01:41.950 W MediaServer[5503] RtmpSession.cpp:31 one rror | 140630248395504(172.17.0.2:55334) RTMP推流器(__defaultVhost__/live/1200w)断开:end of file,耗时(s):604
2020-11-19 10:01:41.950 D MediaServer[5503] RtmpSession.cpp:25 ~RtmpSession | 140630248395504(172.17.0.2:55334)
2020-11-19 10:01:41.965 I MediaServer[5503] MediaSource.cpp:350 unregist | hls __defaultVhost__ live 1200w
2020-11-19 10:01:41.966 T MediaServer[5503] HttpSession.cpp:30 HttpSession | 140630315608080(127.0.0.1:45347)
2020-11-19 10:01:41.966 I MediaServer[5503] MediaSource.cpp:350 unregist | rtsp __defaultVhost__ live 1200w
2020-11-19 10:01:41.966 I MediaServer[5503] MediaSource.cpp:350 unregist | rtmp __defaultVhost__ live 1200w
2020-11-19 10:01:41.967 T MediaServer[5503] HttpSession.cpp:30 HttpSession | 140630382613488(127.0.0.1:33187)
2020-11-19 10:01:41.967 T MediaServer[5503] HttpSession.cpp:30 HttpSession | 140630181485248(127.0.0.1:33417)
2020-11-19 10:01:41.967 D MediaServer[5503] SSLBox.cpp:153 findCertificate | client does not specify host, select default certificate of host: zlmedaikit
2020-11-19 10:01:41.967 D MediaServer[5503] SSLBox.cpp:153 findCertificate | client does not specify host, select default certificate of host: zlmedaikit
2020-11-19 10:01:41.967 D MediaServer[5503] SSLBox.cpp:153 findCertificate | client does not specify host, select default certificate of host: zlmedaikit
2020-11-19 10:01:41.970 W MediaServer[5503] SSLBox.cpp:215 operator() | 0 unable to get local issuer certificate
2020-11-19 10:01:41.970 W MediaServer[5503] SSLBox.cpp:215 operator() | 0 unable to verify the first certificate
2020-11-19 10:01:41.970 W MediaServer[5503] SSLBox.cpp:215 operator() | 0 unable to get local issuer certificate
2020-11-19 10:01:41.970 W MediaServer[5503] SSLBox.cpp:215 operator() | 0 unable to verify the first certificate
2020-11-19 10:01:41.971 W MediaServer[5503] SSLBox.cpp:215 operator() | 0 unable to get local issuer certificate
2020-11-19 10:01:41.971 W MediaServer[5503] SSLBox.cpp:215 operator() | 0 unable to verify the first certificate
2020-11-19 10:01:41.971 D MediaServer[5503] WebApi.cpp:182 operator() |
# request:
POST /index/hook/on_stream_changed
2020-11-19 10:01:41.971 D MediaServer[5503] WebApi.cpp:182 operator() |
# request:
POST /index/hook/on_stream_changed
# content:
{
"app" : "live",
"regist" : false,
"schema" : "rtsp",
"stream" : "1200w",
"vhost" : "__defaultVhost__"
}
# args:
app : live
regist : false
schema : rtsp
stream : 1200w
vhost : __defaultVhost__
# response:
{
"code" : 0
}
2020-11-19 10:01:41.972 T MediaServer[5503] HttpSession.cpp:118 one rror | 140630382613488(127.0.0.1:33187) end of file
2020-11-19 10:01:41.972 T MediaServer[5503] HttpSession.cpp:38 ~HttpSession | 140630382613488(127.0.0.1:33187)
^C2020-11-19 10:04:33.824 W MediaServer[5500] System.cpp:103 operator() | 收到主动退出信号,关闭父进程与子进程
2020-11-19 10:04:33.824 I MediaServer[5500] logger.cpp:57 ~Logger |
2020-11-19 10:04:33.824 I MediaServer[5503] main.cpp:345 operator() | SIGINT:exit
2020-11-19 10:04:33.824 T MediaServer[5503] TcpServer.h:142 ~TcpServer | start clean tcp server...
2020-11-19 10:04:33.824 T MediaServer[5503] TcpServer.h:147 ~TcpServer | clean tcp server completed!
2020-11-19 10:04:33.824 T MediaServer[5503] TcpServer.h:142 ~TcpServer | start clean tcp server...
2020-11-19 10:04:33.825 T MediaServer[5503] TcpServer.h:147 ~TcpServer | clean tcp server completed!
2020-11-19 10:04:33.825 T MediaServer[5503] TcpServer.h:142 ~TcpServer | start clean tcp server...
2020-11-19 10:04:33.825 T MediaServer[5503] TcpServer.h:147 ~TcpServer | clean tcp server completed!
2020-11-19 10:04:33.825 T MediaServer[5503] TcpServer.h:142 ~TcpServer | start clean tcp server...
2020-11-19 10:04:33.825 T MediaServer[5503] TcpServer.h:147 ~TcpServer | clean tcp server completed!
2020-11-19 10:04:33.825 T MediaServer[5503] TcpServer.h:142 ~TcpServer | start clean tcp server...
2020-11-19 10:04:33.826 T MediaServer[5503] TcpServer.h:147 ~TcpServer | clean tcp server completed!
2020-11-19 10:04:33.826 T MediaServer[5503] TcpServer.h:142 ~TcpServer | start clean tcp server...
2020-11-19 10:04:33.826 T MediaServer[5503] TcpServer.h:147 ~TcpServer | clean tcp server completed!
2020-11-19 10:04:33.826 T MediaServer[5503] TcpServer.h:142 ~TcpServer | start clean tcp server...
2020-11-19 10:04:33.826 T MediaServer[5503] TcpServer.h:147 ~TcpServer | clean tcp server completed!
2020-11-19 10:04:33.826 I MediaServer[5503] main.cpp:358 start_main | 程序退出中,请等待...
优化方案
用队列将同步转为异步
这种是我在找资料的时候看到最多的一种,看到收藏率比较高的帖子为:
https://blog.csdn.net/darkeyers/article/details/84865363
代码为:
import cv2
import queue
import time
import threading
q=queue.Queue()
def Receive():
print("start Reveive")
cap = cv2.VideoCapture("rtsp://admin:admin_123@172.0.0.0")
ret, frame = cap.read()
q.put(frame)
while ret:
ret, frame = cap.read()
q.put(frame)
def Display():
print("Start Displaying")
while True:
if q.empty() !=True:
frame=q.get()
cv2.imshow("frame1", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
if __name__=='__main__':
p1=threading.Thread(target=Receive)
p2 = threading.Thread(target=Display)
p1.start()
p2.start()
但这也仅仅解决了简单的从取流到展示的一个过程,如果说要加上管道以及ffmpeg推流分析,对于本地视频来讲,还要对视频帧数做限制,因为有可能会超过30FPS也不一定。
另外,如果加上算法推断,这个时间将被进一步拉长,那么出现的情况就是队列的积压,消费者消费不过来导致帧数计算错误,那么将需要对取流进程做跳帧处理。
抓取与跳帧
首先,最简单的一个方案既是跳帧,read读了但不传给内部队列,这种业务情况常见于录频,算法并不需要对一个视频流的一秒25帧都做检测分析,但视频如果有少帧,播放效果就会有明显的加速效果,这显然不行,所以我们就能对上面代码做新的逻辑:
import cv2
import queue
import time
import threading
q=queue.Queue()
def Receive():
print("start Reveive")
cap = cv2.VideoCapture("rtsp://admin:admin_123@172.0.0.0")
ret, frame = cap.read()
path = ""
size = (int(vidcap.get(cv2.CAP_PROP_FRAME_WIDTH)),int(vidcap.get(cv2.CAP_PROP_FRAME_HEIGHT)))
fps = 25
out_video = cv2.VideoWriter(path, cv2.VideoWriter_fourcc('m','j','p','g'), fps, size)
q.put(frame)
fp = 0
while ret:
fp += 1
ret, frame = cap.read()
out_video.write(frame)
if (fp % 2 == 0):
q.put(frame)
def Display():
print("Start Displaying")
while True:
if q.empty() !=True:
frame=q.get()
cv2.imshow("frame1", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
if __name__=='__main__':
p1=threading.Thread(target=Receive)
p2 = threading.Thread(target=Display)
p1.start()
p2.start()
而标题中抓取的意思,即是将read换成grab加快取流进程的读取速度。关于这两者官方的介绍,可以看opencv官网对其讲解:cv::VideoCapture Class Reference
但官网说得并不是很明白,介绍的是多相机环境下可以用:
该功能的主要用途是在多相机环境中,尤其是在相机没有硬件同步的情况下。也就是说,您为每个摄像机调用VideoCapture :: grab(),然后调用较慢的方法VideoCapture :: retrieve()解码并从每个摄像机获取帧。这样,消除了去马赛克或运动jpeg压缩等方面的开销,并且从不同摄像机检索到的帧将在时间上更近。
另外,当连接的摄像机是多头摄像机(例如,立体摄像机或Kinect设备)时,从中检索数据的正确方法是先调用VideoCapture :: grab(),然后再调用VideoCapture :: retrieve()使用不同的channel参数值一次或多次。
反正我是看得有点迷惑,所以当初基本没有花时间在这两个函数上,但后来发现,其实它们的作用并不只有这样。我们可以从字面上去看这两个的区别:
-
抓取:方法/函数从视频文件或摄像机抓取下一帧,并在成功的情况下返回true(非零)。
-
读取:解码并返回下一个视频帧。
那么可以说grab的意思是当前帧还没有进入缓冲区,即还没有交给进程,只是获悉当前状态正常并不会继续下一帧。而retrieve才是建立缓冲区将图片拉给进程。那么可以用一个公式去说明这个问题:
- 视频流==>抓取==>检索==>图片
- 视频流==>读取=抓取+检索+缓冲区==>图片
具体的基准测试可以去跑一下多进程案例,这里引用自https://gist.github.com/yinguobing/7cf76e04c5e7cf45eae6ef296fd84dec
"""
read() and grab() in OpenCV VideoCapture takes different time as there is no
decoding for grab() operation. Let's try to find out how much these two methods
differs.
"""
import cv2
def main():
tm = cv2.TickMeter()
# =========================================================================
# Test 1, grab and decode each frame.
cap = cv2.VideoCapture()
cap.open("/home/robin/Videos/clip1-ssd.mp4", cv2.CAP_FFMPEG)
total_frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
tm.start()
# print("Total frames number: {}".format(total_frame_count))
f_count = 0
while True:
f_count += 1
r, img = cap.read()
if r == False:
break
# for _ in range(24):
# cap.grab()
tm.stop()
print("Read and decode {} frames takes {} seconds.".format(
f_count, tm.getTimeSec()/tm.getCounter()))
# =========================================================================
# Test 2, grab only without decoding.
cap = cv2.VideoCapture()
cap.open("/home/robin/Videos/clip1-ssd.mp4", cv2.CAP_FFMPEG)
total_frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
tm.reset()
tm.start()
while True:
r = cap.grab()
if r == False:
break
tm.stop()
print("Read {} frames takes {} seconds.".format(
total_frame_count, tm.getTimeSec()/tm.getCounter()))
# =========================================================================
# Test 3, reading same file with two decoders. This block failed on my PC.
# Maybe it's not supported by FFMPEG?
from multiprocessing.pool import ThreadPool
thread_num = cv2.getNumberOfCPUs()
pool = ThreadPool(processes=thread_num)
def get_frame_at(src, location):
cap = cv2.VideoCapture()
cap.open(src, cv2.CAP_FFMPEG)
cap.set(cv2.CAP_PROP_FRAME_COUNT, location)
rst, frame = cap.read()
return rst, frame
def get_frame_next(cap):
rst, frame = cap.read()
return rst, frame
video_src = "/home/robin/Videos/clip1-ssd.mp4"
cap1 = cv2.VideoCapture()
cap1.open(video_src, cv2.CAP_FFMPEG)
cap2 = cv2.VideoCapture()
cap2.open(video_src, cv2.CAP_FFMPEG)
tm.reset()
tm.start()
f_count = 0
while True:
f_count += 1
t1 = pool.apply_async(get_frame_next, (cap1, ))
t2 = pool.apply_async(get_frame_next, (cap2, ))
print(t1.get()[0])
if (t1.get()[0] is False) and (t2.get()[0] is False):
break
tm.stop()
print("Decode same file with 2 decoder takes {} seconds.".format(
tm.getTimeSec()/tm.getCounter()))
if __name__ == "__main__":
main()
然后清楚了grab的含义,我们就可以对开始的
fp += 1
success = cap.grab()
if (fp % 2 == 0):
continue
_,image = cap.retrive()
那么还会对性能有所提升。
读写二进制图像
这一部分就是我当初遇到的问题,初步考虑是高码的rtsp流解码的图片很大,队列很难承受或者有大量积压导致帧数计算错误。最上面的推流错误日志说的是我的解码进程因为解码解得太慢被强制socket断开。因为rtsp是一秒25帧,我需要在这个时间内即做解码又做编码,对于10M以上的码流,显然不现实。所以这里可以从两个方向上去修改,第一个是解码:
关于这个可以看:python图像处理opencv笔记(二):视频基本操作
另一个角度从编码考虑,因为存在录制视频的功能,流数据又比较大,不可能做到既同时解码又同时编码,所以可以将它放到后面进程里,或者直接独立出去,那么解码就可以考虑把图片写进某个容器里,具体的选择方案有:
Method | Save Single Image + Meta | Memory |
---|---|---|
Disk | 1.915 ms | 8 K |
LMDB | 1.203 ms | 32 K |
HDF5 | 8.243 ms | 8 K |
如果说不考虑速度,HDF5还是比较不错的:
关于这个的python依赖为:https://docs.h5py.org/en/stable/
我有分别使用过hdf5和二进制存储,这里的二进制存储我使用的是numpy,就像上面说的,如果不考虑速度,两者差距不大,但只要磁盘IO能更得上,二进制的读取永远是线性的,这给从CPU解码的层面上有一个非常稳定的作用,后来确实如此,跑了一个小时,基本也没有队列积压。另外最后一种方法,就是对opencv做编译或者换硬编码。
从软解到硬解
关于这种方案,可以看之前的两篇文章,我就不再做过多叙述了: