python通过Canal进行数据监控后的数据缓存设计
使用场景
本次做了个人脸识别项目,分为编码阶段和检测阶段。编码阶段将输入人脸进行编码,存到Mysql
数据库中。检测阶段读取数据库全量数据,将输入的检测图像进行人脸编码与数据库中编码好的数据进行比对。
由于编码数据可能有增删改的操作,且批量较大,所以在本地做了数据缓存用于编码对比。并通过Canal
对Mysql
数据库进行行为监测,将本地缓存数据与数据库操作行为数据进行合并,构成新的本地缓存数据。
Canal
客户端
在服务中单独启了一个Canal
客户端线程,每秒捕获Mysql
行为。在根据增删改,构造不同的字典用于做缓存数据合并。
from canal.client import Client
from canal.protocol import EntryProtocol_pb2
from utils.data_buffer import DataBuffer
df_buffer = DataBuffer()
class CanalUtils:
def __init__(self):
# 建立与canal服务端的连接
self.client = Client()
self.client.connect(host='127.0.0.1', port=11111) # canal服务端部署的主机IP与端口
self.client.check_valid(username=b'', password=b'') # 自行填写配置的数据库账户密码
self.client.subscribe(client_id=b'1001', destination=b'example', filter='test.va_face_feature_copy1')
def get_canal_change(self):
while True:
message = self.client.get(100)
# entries是每个循环周期内获取到数据集
entries = message['entries']
for entry in entries:
entry_type = entry.entryType
if entry_type in [EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN,
EntryProtocol_pb2.EntryType.TRANSACTIONEND]:
continue
row_change = EntryProtocol_pb2.RowChange()
row_change.MergeFromString(entry.storeValue)
header = entry.header
event_type = header.eventType
# row是binlog解析出来的行变化记录,一般有三种格式,对应增删改
for row in row_change.rowDatas:
new_data_dict = {}
# 删除行为
if event_type == EntryProtocol_pb2.EventType.DELETE:
for column in row.beforeColumns:
new_data_dict[column.name] = column.value
# 增加行为
elif event_type == EntryProtocol_pb2.EventType.INSERT:
for column in row.afterColumns:
new_data_dict[column.name] = column.value
# 更改行为
else:
for column in row.afterColumns:
new_data_dict[column.name] = column.value
df_buffer.write_data(event_type, new_data_dict)
time.sleep(1)
其中DataBuffer
是数据缓存类,后面再说。
单独写了个线程类,用于启动Canal
客户端。
class CanalThread(threading.Thread):
def run(self) -> None:
canal_util = CanalUtils()
canal_util.get_canal_change()
数据缓存类DataBuffer
DataBuffer
其实就两个行为,一个是拉取Canal
客户端去写数据。一个是读数据。这个服务是个并发读取的服务,可能多个接口同时去读。
在架构的带领下我写了两级缓存。第一级是有个face_feature_change
去记录行为。第二级是face_feature
,就是最大的全量数据。
数据一共分为三个部分:
- 本地落盘的数据:用于服务重启时防止数据丢失。
- 从
Canal
拉取的行为数据:记录数据库操作行为,同时相当于做了一级缓存。 - 全量数据:每次编码对比时用的都是这个数据。
读数据
读数据的过程很简单,以防写数据的时候同时去读,设置了全量数据缓存。
在处理数据后全局缓存的数据指针指到新的全量数据上。
为了防止指针切换时读取数据为None,使用了重试机制。
def get_data(self):
# 重试机制
for i in range(3):
if self.data is None:
print('数据正在写入中,进行重试')
time.sleep(1)
else:
return self.data
return None
写数据
写数据的流程复杂了一些
- 读取
face_feature_change
数据 - 对
face_feature_change
进行抵消处理,减少数据量 - 将抵消后的数据与原数据进行合并处理
- 数据落盘
- 切换
DataBuffer
指针,指向合并后的数据
def write_data(self, event_type, new_data_dict):
# 读取本地缓存数据
df_change = pd.read_pickle(r'utils/face_feature_change.pkl')
new_dict = new_data_dict
new_dict['change_status'] = event_type
# 增加本次数据
df_change = df_change.append(new_data_dict, ignore_index=True)
# 修改数据筛选
df_temp = self.get_change(df_change)
# 修改数据和原数据合并
df = self.get_df_union(df_temp)
# 落盘
if df_change.shape[0] > 1000:
df.to_pickle('utils/face_feature.pkl')
df_change = pd.DataFrame(columns=df.columns)
df_change.to_pickle('utils/face_feature_change.pkl')
# 指针转移
self.data = df
读取face_feature_change
数据
根据Canal
对行为的定义,我也通过变量change_status
定义了三个行为,INSERT=1;UPDATE=2;DELETE=3
。
每次使用的时候,先把行为数据放到face_feature_change
中,face_feature_change
对这些数据进行行为抵消处理。
比如:先增加了一条id=123
的数据,后面又给做了修改。那就可以认为新增了一条内容为修改后的数据。
比如:先增加了一条id=123
的数据,后面又给删了,那就可以不理会这个id
数据的操作。
处理这些抵消行为后,在与全量数据做合并,就能节约很多效率。不然的话每有一条行为就要去动全量数据,修改效率很慢的。
def get_change(self, df_change):
df_new = pd.DataFrame(columns=df_change.columns)
unique_id_list = np.unique(df_change['face_feature_id'])
for id in unique_id_list:
df = df_change[df_change['face_feature_id'] == id]
# 只有一条,该咋干就咋干
if df.shape[0] == 1:
df_new = df_new.append(df.loc[df.index.values[0]])
# 有多条就得判断状态了
else:
start_status = int(df.loc[df.index.values[0]]['change_status'])
end_status = int(df.loc[df.index.values[-1]]['change_status'])
# 如果最后的状态是删除,判断是删又增又删,还是增/改又删
if end_status == 3:
# 1.增/改了又删,就当它没发生过
# 2.先删最后又删,就是删之前的
if start_status == end_status:
df_new = df_new.append(df.loc[df.index.values[-1]])
# 如果最后的状态是修改
elif end_status == 2:
# 1.修改了新增加的数据,相当于增加了修改后的值.
# 2.无论是删了又增又修改还是修改又修改,都是修改之前的数据
if start_status == 1:
df['change_status'].loc[df.index.values[-1]] = start_status
df_new = df_new.append(df.loc[df.index.values[-1]])
# 如果最后的状态是新增
else:
# 1.可能是改/删了增了,相当于修改之前的数据
# 2.可能是增了又删了又增,就是增加之前没有的数据
if start_status != end_status:
df['change_status'].loc[df.index.values[-1]] = 2
df_new = df_new.append(df.loc[df.index.values[-1]])
return df_new
合并face_feature
和face_feature_change
数据
读取本地的face_feature
数据,与face_feature_change
数据合并,构成新的全量数据。
由于face_feature_change
已经对新增加的行为数据进行了抵消处理,最后的行为状态只有三种:INSERT=1;UPDATE=2;DELETE=3
。
那么对于face_feature
,考虑一些效率问题,使用先删除,再修改,最后新增的方式。
def get_df_union(self, df_temp):
df_old = pd.read_pickle(r'utils/face_feature.pkl')
# 删除
if df_temp[df_temp['change_status'] == 3].shape[0] > 0:
df_old = df_old.drop(
df_old[df_old['face_feature_id'].isin(
df_temp[df_temp['change_status'] == 3]['face_feature_id'].values)].index)
# 修改
df_update = df_temp[df_temp['change_status'] == 2]
if df_update.shape[0] > 0:
# df_update = df_update.drop(['change_status'], axis=1)
for face_feature_id in df_update['face_feature_id']:
index = df_old[df_old['face_feature_id'] == face_feature_id].index
for column in df_old.columns:
df_old.loc[index, column] = df_update[df_update['face_feature_id'] == face_feature_id][column]
# 新增
df_insert = df_temp[df_temp['change_status'] == 1]
if df_insert.shape[0] > 0:
df_insert = df_insert.drop(['change_status'], axis=1)
df_old = df_old.append(df_insert, ignore_index=True)
return df_old
pandas
的pickle
数据格式
pickle
是python
的一种序列化本地格式。由于是序列化方式,df.to_pickle()
中是没有index
这个参数的,它必须要带序列化。
序列化的好处主要在于写入效率上,由于有了序列化,可以在落盘写入时只写入改动信息,效率显著提高。
在读取效率方面,pickle
也是优于csv
的。
但是pickle
落盘更占磁盘空间。
在pandas
中使用pickle
好像只能在python3.8
以上版本才可以使用。为此在那个网络有问题的服务器上重新搞了个python3.8
的虚拟环境还废了一些力气。