2021-07-16

       我使用的瑞芯微板子是RK3399Pro,这个板子自带有人脸识别功能,于是参看Rockchip_Developer_Guide_RockX_SDK_CN.pdf这个文档进行测试,具体操作手册上还算详细,在此不赘述。

        原始的程序中是可以用摄像头来demo的,但是板子没直接接显示器,你imshow不了呀,无赖之下只能采用网络通讯方式来把结果发送到pc端然后绘制结果,最后显示,在此我采用了python3中的socket来通讯并收发json数据,下面直接上程序吧,先说明下,原始代码写的有些复杂我改写了好些地方。

1、板子服务端程序:

import os
import sys
import time
import sqlite3
import argparse
import numpy as np

from rockx import RockX
import cv2,json,socket


class FaceDB:

    def __init__(self, db_file):
        self.db_file = db_file
        self.conn = sqlite3.connect(self.db_file)
        self.cursor = self.conn.cursor()
        if not self._is_face_table_exist():
            self.cursor.execute("create table FACE (NAME text, VERSION int, FEATURE blob, ALIGN_IMAGE blob)")

    def load_face(self):
        all_face = dict()
        c = self.cursor.execute("select * from FACE")
        for row in c:
            name = row[0]
            version = row[1]
            feature = np.frombuffer(row[2], dtype='float32')
            align_img = np.frombuffer(row[3], dtype='uint8')
            align_img = align_img.reshape((112, 112, 3))
            all_face[name] = {
                'feature': RockX.FaceFeature(version=version, len=feature.size, feature=feature),
                'image': align_img
            }
        return all_face

    def insert_face(self, name, feature, align_img):
        self.cursor.execute("INSERT INTO FACE (NAME, VERSION, FEATURE, ALIGN_IMAGE) VALUES (?, ?, ?, ?)",
                            (name, feature.version, feature.feature.tobytes(), align_img.tobytes()))
        self.conn.commit()

    def _get_tables(self):
        cursor = self.cursor
        cursor.execute("select name from sqlite_master where type='table' order by name")
        tables = cursor.fetchall()
        return tables

    def _is_face_table_exist(self):
        tables = self._get_tables()
        for table in tables:
            if 'FACE' in table:
                return True
        return False


def get_max_face(results):
    max_area = 0
    max_face = None
    for result in results:
        area = (result.box.bottom - result.box.top) * (result.box.right * result.box.left)
        if area > max_area:
            max_face = result
    return max_face


def get_face_feature(image_path):
    img = cv2.imread(image_path)
    img_h, img_w = img.shape[:2]
    ret, results = face_det_handle.rockx_face_detect(img, img_w, img_h, RockX.ROCKX_PIXEL_FORMAT_BGR888)
    if ret != RockX.ROCKX_RET_SUCCESS:
        return None, None
    max_face = get_max_face(results)
    if max_face is None:
        return None, None
    ret, align_img = face_landmark5_handle.rockx_face_align(img, img_w, img_h,
                                                            RockX.ROCKX_PIXEL_FORMAT_BGR888,
                                                            max_face.box, None)
    if ret != RockX.ROCKX_RET_SUCCESS:
        return None, None
    if align_img is not None:
        ret, face_feature = face_recog_handle.rockx_face_recognize(align_img)
        if ret == RockX.ROCKX_RET_SUCCESS:
            return face_feature, align_img
    return None, None


def get_all_image(image_path):
    img_files = dict()
    g = os.walk(image_path)

    for path, dir_list, file_list in g:
        for file_name in file_list:
            file_path = os.path.join(path, file_name)
            if not os.path.isdir(file_path):
                img_files[os.path.splitext(file_name)[0]] = file_path
    return img_files


def import_face(face_db, images_dir):
    image_files = get_all_image(images_dir)
    image_name_list = list(image_files.keys())
    for name, image_path in image_files.items():
        feature, align_img = get_face_feature(image_path)
        if feature is not None:
            face_db.insert_face(name, feature, align_img)
            print('[%d/%d] success import %s ' % (image_name_list.index(name)+1, len(image_name_list), image_path))
        else:
            print('[%d/%d] fail import %s' % (image_name_list.index(name)+1, len(image_name_list), image_path))


def search_face(face_library, cur_feature):
    min_similarity = 10.0
    target_name = None
    target_face = None
    for name, face in face_library.items():
        feature = face['feature']
        ret, similarity = face_recog_handle.rockx_face_similarity(cur_feature, feature)
        if similarity < min_similarity:
            target_name = name
            min_similarity = similarity
            target_face = face
    if min_similarity < 1.0:
        return target_name, min_similarity, target_face
    return None, -1, None


if __name__ == '__main__':

    parser = argparse.ArgumentParser(description="RockX Face Recognition Demo")
    parser.add_argument('-c', '--camera', help="camera index", type=str,
    default='rtsp://admin:abcd.1234@192.168.3.198:664/Streaming/Channels/101?transportmode=unicast&profile=Profile_101')#大华摄像机网络地址
    parser.add_argument('-b', '--db_file', help="face database path", required=True)
    parser.add_argument('-i', '--image_dir', help="import image dir")
    parser.add_argument('-d', '--device', help="target device id", type=str)
    args = parser.parse_args()

    print("camera=", args.camera)
    print("db_file=", args.db_file)
    print("image_dir=", args.image_dir)

    face_det_handle = RockX(RockX.ROCKX_MODULE_FACE_DETECTION, target_device=args.device)
    face_landmark5_handle = RockX(RockX.ROCKX_MODULE_FACE_LANDMARK_5, target_device=args.device)
    face_recog_handle = RockX(RockX.ROCKX_MODULE_FACE_RECOGNIZE, target_device=args.device)
    face_track_handle = RockX(RockX.ROCKX_MODULE_OBJECT_TRACK, target_device=args.device)

    face_db = FaceDB(args.db_file)

    if args.image_dir is not None:
        import_face(face_db, args.image_dir)
        exit(0)

    # load face from database
    face_library = face_db.load_face()
    print("load %d face" % len(face_library))

    cap = cv2.VideoCapture(args.camera)
    dstimhw=(480,640)
    last_face_feature = None


    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 获取本地主机名
    ip_port=('192.168.3.266',8686)
    # 绑定端口号
    serversocket.bind(ip_port)

    # 设置最大连接数,超过后排队
    serversocket.listen(2)

    while True:
        # Capture frame-by-frame
        ret, frame = cap.read()
        if frame is None:
            cap = cv2.VideoCapture(args.camera)
            ret, frame = cap.read()
            # continue

        starttime=time.time()
        frame=cv2.resize(frame, dstimhw[::-1], None, fx=0, fy=0)

        # 建立客户端连接
        clientsocket, addr = serversocket.accept()

        in_img_h, in_img_w = frame.shape[:2]
        
        ret, results = face_det_handle.rockx_face_detect(frame, in_img_w, in_img_h, RockX.ROCKX_PIXEL_FORMAT_BGR888)
       
        ret, results = face_track_handle.rockx_object_track(in_img_w, in_img_h, 3, results)
        

        itemresults=[]
        index = 0
        for result in results:
            # face align
            ret, align_img = face_landmark5_handle.rockx_face_align(frame, in_img_w, in_img_h,
                                                                     RockX.ROCKX_PIXEL_FORMAT_BGR888,
                                                                     result.box, None)

            # get face feature
            if ret == RockX.ROCKX_RET_SUCCESS and align_img is not None:
                ret, face_feature = face_recog_handle.rockx_face_recognize(align_img)

            target_name=None
            # search face
            if ret == RockX.ROCKX_RET_SUCCESS and face_feature is not None:
                target_name, diff, target_face = search_face(face_library, face_feature)
                # print("target_name=%s diff=%s", target_name, str(diff))

            
            itemresult={}
            itemresult["result"]=result

            if target_name is not None:
                itemresult["target_name"]=target_name
            else:
                itemresult["target_name"]=None

            itemresults.append(itemresult)

        
        if itemresults:
            costtime=time.time()-starttime
            print('costtime={}'.format(costtime))
        itemresults=json.dumps(itemresults)
        sendmsg =itemresults.encode('utf-8')
        clientsocket.send(sendmsg)  #
        clientsocket.close()
            # print('itemresults=',itemresults)
            # break
       
    # When everything done, release the capture
    cap.release()

    face_det_handle.release()

    #python3 rockx_face_recog_server.py -b face.db

2、pc客服端程序:

# 导入 socket、sys 模块
import socket,cv2
import sys,json,time


def drawfacerecogresult(im,itemresults):
    for itemresult in itemresults:
        _, _, detbox, score =itemresult['result']
        target_name=itemresult['target_name']
        cv2.rectangle(im, (detbox[0], detbox[1]), (detbox[2], detbox[3]), (0, 255, 0),2)
        cv2.putText(im,target_name, (detbox[0], detbox[1] - 12),cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0))


if __name__ == "__main__":
    camera='rtsp://admin:abcd.1234@192.168.3.198:664/Streaming/Channels/101?transportmode=unicast&profile=Profile_101'#大华摄像机网络地址
    cap = cv2.VideoCapture(camera)
    dstimhw=(480,640)
    cv2.namedWindow('myim',cv2.WINDOW_NORMAL)
    ip_port = ('192.168.3.266', 8686)

    while True:
        ret, im = cap.read()
        if im is None:
            cap = cv2.VideoCapture(camera)
            ret, im = cap.read()
            # continue
        im = cv2.resize(im, dstimhw[::-1], None, fx=0, fy=0)
        # print(im.shape)
        # 创建 socket 对象
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # 连接服务,指定主机和端口
        s.connect(ip_port)
        # 接收小于 1024 字节的数据
        recvmsg = s.recv(4096)
        try:
            recvmsg=recvmsg.decode('utf-8')
            itemresults=json.loads(recvmsg)
        except:
            pass
        if itemresults:
            drawfacerecogresult(im, itemresults)
        cv2.imshow('myim', im)
        cv2.waitKey(1)
    cap.release()
    cv2.destroyAllWindows()
    s.close()

先启动服务端,然后执行客服端,可以显示,但是比较卡顿,画面也有些花的情况,可能是视频流解码跟不上步伐导致,具体还要查找

上一篇:关于卷积神网络,以及模型训练自己的一些理解


下一篇:使用CNN提取图像中的feature