参考链接:
https://github.com/yongqyu/st-gcn-pytorch
https://www.cnblogs.com/shyern/p/11262926.html
https://blog.csdn.net/qq_36893052/article/details/79860328
https://www.zhihu.com/collection/437834930
网络特点:
stgcn(时空图卷积):
1.2018年提出,网络由九层时空图卷积,一共有9个时间卷积核,在每一个ST-GCN使用残差链接
2.通过线性堆叠的 GCN 和 TCN 来间接扩大每个节点的感受野,非常有助于时空信息的提取
3.通过 openpose 从视频中提取 2D 检测点坐标,再将置信度作为 z 轴,从而得到 3D 骨架序列
试验方法:
序号 | Idea | 原理 | 优先级 | 数据状态 | 更新前 | 更新后 | 备注 |
1 | 优化手部特征点检测 | 现在的手部检测点经常会检测飞了,希望能有更稳定的检测器。 | 目前没有找到更优的检测器,有待旷世的接口,暂时挂起来 | 6人(1504条) 1.动作不是很标准,手部检测很多点检测不到; 2.三类数据极其不均衡。 |
1.准确度不高; 2.训练过拟合。 |
目前没有更好的优化器,后续采集尽量动作规范。 | |
2 | 采集更多的数据 | 平衡各个类别的数据 | 已尝试 | 11人(3639条) 北京6人 深圳5人(动作规范) |
1.准确度不高; 2.训练过拟合。 |
准确度有所提升,组内测试集准确度有80%多,但是泛化能力大概只有45%-60%。 | 过拟合严重。 |
3 | 1.weight decay 2.dropout 3.学习率 |
减少过拟合 | 已尝试 | 11人(3639条) 北京6人 深圳5人(动作规范) |
1.准确度不高; 2.训练过拟合。 |
在验证集上的过拟合能很大程度缓解,但没有彻底解决泛化能力。 | 在北京6人上面进行测试,效果极其不佳,怀疑与动作标准有关。 |
4 | 超级人模拟更多的动作 | 一个人模拟各人的多种动作 | 已尝试 | 11人(3639条) 北京6人 深圳5人(动作规范)+1个超级人 |
1.准确度不高; 2.训练过拟合。 |
1.准确度不高; 2.训练过拟合。 |
超级用户没有明显效果。 |
5 | 继续新增数据 | 数据扩增能解决过拟合问题,并且有助于准确度提高 | 已尝试 | 22人(7000条) 北京6人+后采6人(距离不统一) 深圳10人(动作规范) |
1.准确度不高(60%); 2.训练过拟合(轻微)。 |
准确度提升较大,大概能多70%出头,提升了10个点。 | 深圳的距离比较统一,用深圳的做测试集,准确度要比北京的高。 |
6 | 特征点数据归一化 | 用相对坐标除以人脸的长宽,把特征归一化,解决距离问题 | 优先尝试 | 22人(7000条) 北京6人+后采6人(距离不统一) 深圳10人(动作规范) |
1.准确度不高(70%); 2.训练过拟合。 |
效果比原来差 | 效果不佳,有可能是归一化后的数据差异变小 |
7 | 清洗数据 | 增加了两个限制方法: 1.检测不到人脸、脸宽小于50像素的去掉;(帧) 2.手部的7个关键点,若是置信度大于0.1的 不超过4个,也去掉;(帧) 3.一秒30帧,有15帧没通过上面的话,整段视频就去掉。(视频) |
优先尝试 | 22人(清洗后剩4000条) 北京6人+后采6人(距离不统一) 深圳10人(动作规范) |
1.准确度不高(70%); 2.训练过拟合(轻微)。 |
1.准确度提升较大,大概到达80%多,提升了10多个点; 2.过拟合也有减轻。 |
若能让手部检测更准确,准确度还能提升。 |
8 | 延长每段视频的时间(2s/5s) | 视频时间延长,能表达的内容更多 | 优先尝试 | 2/3/4/9批数据(16人) 深圳10+北京6 |
1.在清洗数据上准确度80%多; 2.在原数据上准确度很低。 |
1.在清洗数据上有两三个点的提升(85%左右); 2.在原始数据上没有什么效果(75%左右)。 |
1.还是脏数据太多,影响了预测结果,需要有更优的手部检测器; 2.经过1s/2s/4s的交叉对比,目前最优的是:2s+通过15帧(清洗数据)、2s+通过15帧+全部帧合并(不清洗数据)。 |
目前最优:2s视频,64帧/通过15帧即可,准确度90%。
若是希望更多的数据合格,可以选择通过8帧即可,损失一点精度。
原数据:64帧 | 64帧 | 最优 | |||||||||||||
data | 清洗 | 脏数据 | 清洗 | 脏数据 | 通过15帧 | 数据量 | 清洗前 | 总和 | 清洗后 | 总和 | |||||
2+3-811 | 0.917 | 0.851 | 2+3-811-2 | 0.918 | 0.743 | 0 | 1 | 3 | 0 | 1 | 3 | ||||
2+4-811 | 0.855 | 0.667 | 2+4-811-2 | 0.868 | 0.686 | 2 | 545 | 327 | 93 | 965 | 369 | 193 | 90 | 652 | |
3+4-811 | 0.926 | 0.695 | 3+4-811-2 | 0.92 | 0.83 | 3 | 473 | 300 | 129 | 902 | 375 | 269 | 128 | 772 | |
4 | 475 | 304 | 162 | 941 | 319 | 217 | 162 | 698 | |||||||
9 | 184 | 99 | 48 | 331 | 152 | 82 | 48 | 282 | |||||||
1677 | 1030 | 432 | 3139 | 1215 | 761 | 428 | 2404 |
代码工程:
data.py
数据处理:
1.一段2min的视频,每1s剪裁下来,编号1-n,得到n段视频(n条数据);加上人物编号1-m;每段加动作类型标签:xx
2.每一帧得到关键坐标点,以某个稳定的中心点作为坐标原点,其他的点都减去这个原点
原点坐标改为(0,0),得到整体相对坐标;
3. 把坐标变为一维存储,后面要用再从代码reshape成(n,-1,2)
from torch.utils import data
import torch
import os
import random
import numpy as np
list1=[14,18,23,30,40,48] # 测试人物编号
list2=[13,17,22,29,39,47] # 验证测试编号
forder='2+3+4+5+6'
forder2='2+3+4+5+6_v1'
file_name = 'dataset/test9/src/'+str(forder)+'.txt'
save='dataset/test9/src/'+str(forder2)+"/"
if os.path.exists(save) == False:
os.makedirs(save)
split_label=1
frame_nums=64
f = open(file_name)
lines = f.readlines()
prev_video = int(lines[0].strip().split(' ')[1]) # 视频编号
prev_categ = int(lines[0].strip().split(' ')[2]) # 类别标签
datas=[]
datas_label=[]
frames = []
train = []
valid = []
test = []
train_label = []
valid_label = []
test_label = []
m=0
for line in lines:
line = line.strip().split(' ')
vid = int(line[1]) # 视频编号
aid = int(line[0]) # 任务编号
cid = int(line[2]) # 类别标签
label=list(map(int, line[:3]))
features = list(map(float, line[3:])) # 21个特征点
# 若是视频标签相同,则都放入数组中,作为一条训练数据
if prev_video == vid:
frames.append(np.reshape(np.asarray(features), (-1,3))) # 把一维转换成[15,3]的格式
else:
# 如果一条视频帧数过多,则选取前frame_nums帧,并连接起来,转成torch格式
if len(frames) >= frame_nums:
# frames = random.sample(frames, frame_nums) # 随机取帧
frames = frames[0:frame_nums] # 按顺序取帧
frames = torch.from_numpy(np.stack(frames, 0)) # 把每一帧在0维连接起来,转成torch格式
# 若是视频帧数不够多,则利用线性插值,把数据补充到frame_nums帧
else:
frames = np.stack(frames, 0) # 把每一帧连接起来,如:n帧 n*[1,15,3]=[n,15,3] 作为一条数据
xloc = np.arange(frames.shape[0]) # np.arange:生成n个自然数,即等于frame_nums帧数
new_xloc = np.linspace(0, frames.shape[0], frame_nums) # 生成start和end之间frame_nums个等差间隔的元素,如:1、2、··n
frames = np.reshape(frames, (frames.shape[0], -1)).transpose() # transpose:矩阵转置
# print(frames.shape,xloc.shape,new_xloc.shape)
new_datas = []
for data in frames:
new_datas.append(np.interp(new_xloc, xloc, data)) # interp:进行线性插值, 获得frame_nums帧数据
frames = torch.from_numpy(np.stack(new_datas, 0)).t() # 把n帧数据再次连接起来,转换torch格式
frames = frames.view(frame_nums, -1, 3) # 强制reshape矩阵形状
datas.append(frames) #数据
if split_label==1:
datas_label.append(label) #标签
else:
datas_label.append(prev_categ) #标签
m+=1
# 2.按人物编号分
if aid in list1:
test.append(frames)
test_label.append(prev_categ)
elif aid in list2:
valid.append(frames)
valid_label.append(prev_categ)
else:
train.append(frames)
train_label.append(prev_categ)
frames = [np.reshape(np.asarray(features), (-1,3))] # frames重置,等于每条视频的第一帧的关键点
prev_actor = aid # 人物编号重置
prev_video = vid # 视频编号重置
prev_categ = cid # 标签重置
# # 3.随机划分
# lens=len(datas)
# num=random.sample(range(lens),lens) #获取随机数
# for i in range(lens):
# index=num[i]
# if i <=int(lens*0.7):
# train.append(datas[index])
# train_label.append(datas_label[index])
# elif i <=int(lens*0.9):
# valid.append(datas[index])
# valid_label.append(datas_label[index])
# else:
# test.append(datas[index])
# test_label.append(datas_label[index])
train_label = torch.from_numpy(np.asarray(train_label))
valid_label = torch.from_numpy(np.asarray(valid_label))
test_label = torch.from_numpy(np.asarray(test_label))
print(len(train_label),len(valid_label),len(test_label))
print(train[0].shape)
torch.save((torch.stack(train, 0), train_label), save+'train.pkl')
torch.save((torch.stack(valid, 0), valid_label), save+'valid.pkl')
torch.save((torch.stack(test, 0), test_label), save+'test.pkl')
# 数据处理:
# 1.一段2min的视频,每1s剪裁下来,编号1-n,得到n段视频(n条数据);加上人物编号1-m;每段加动作类型标签:xx
# 2.每一帧得到关键坐标点,以某个稳定的中心点作为坐标原点,其他的点都减去这个原点
# 原点坐标改为(0,0),得到整体相对坐标;
# 3. 把坐标变为一维存储,后面要用再从代码reshape成(n,-1,2)
main.py
import os
import numpy as np
import torch
import torch.optim as optim
import torch.utils.data as data
import time
from model import *
from metric import accuracy
from config import get_args
args = get_args()
# 判断是否有gpu
device = torch.device('cpu' if torch.cuda.is_available() else 'cpu')
# torch的数据加载方法
train_tensor, train_label = torch.load(args.train_path)
valid_tensor, valid_label = torch.load(args.valid_path)
test_tensor , test_label = torch.load(args.test_path)
# 数据加载器,一次性加载所有数据,每次取出batch个数据
train_loader = data.DataLoader(data.TensorDataset(train_tensor.to(device)),
batch_size = args.batch_size, shuffle=False)
valid_loader = data.DataLoader(data.TensorDataset(valid_tensor.to(device)),
batch_size = args.batch_size, shuffle=False)
test_loader = data.DataLoader(data.TensorDataset(test_tensor.to(device)),
batch_size = args.batch_size, shuffle=False)
train_label = train_label.to(device)
valid_label = valid_label.to(device)
test_label = test_label.to(device)
# 权重矩阵
A = [[0,1,0,0,0,0,0,0,0,0,0,0,0,0,0],
[1,0,1,0,0,0,0,0,0,0,0,0,0,0,0],
[0,1,0,1,0,0,1,0,0,1,0,0,0,0,0],
[0,0,1,0,1,0,0,0,0,0,0,0,0,0,0],
[0,0,0,1,0,1,0,0,0,0,0,0,0,0,0],
[0,0,0,0,1,0,0,0,0,0,0,0,0,0,0],
[0,0,1,0,0,0,0,1,0,0,0,0,0,0,0],
[0,0,0,0,0,0,1,0,1,0,0,0,0,0,0],
[0,0,0,0,0,0,0,1,0,0,0,0,0,0,0],
[0,0,1,0,0,0,0,0,0,0,1,0,1,0,0],
[0,0,0,0,0,0,0,0,0,1,0,1,0,0,0],
[0,0,0,0,0,0,0,0,0,0,1,0,0,0,0],
[0,0,0,0,0,0,0,0,0,1,0,0,1,0,0],
[0,0,0,0,0,0,0,0,0,0,0,1,0,1,0],
[0,0,0,0,0,0,0,0,0,0,0,0,1,0,0]]
A = torch.from_numpy(np.asarray(A)).to(device)
# 定义GCN模型
model = GGCN(A, train_tensor.size(3), args.num_classes,
[train_tensor.size(3), train_tensor.size(3)*3], [train_tensor.size(3)*3, 16, 32, 64],
args.feat_dims, args.frame_nums, args.dropout_rate)
# print([train_tensor.size(3), train_tensor.size(3)*3], [train_tensor.size(3)*3, 16, 32, 64])
if device == 'cuda':
model.cuda()
# 查看模型参数
num_params = 0
for p in model.parameters():
num_params += p.numel()
# print(model)
# 定义Loss,优化器,学习率衰减
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr = args.learning_rate,
betas=[args.beta1, args.beta2], weight_decay = args.weight_decay)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma = 0.1)
best_epoch = 0
best_acc = 0
def train():
global best_epoch, best_acc
# 恢复模型,训练
if args.start_epoch:
model.load_state_dict(torch.load(os.path.join(args.model_path, 'model-%d.pkl'%(args.start_epoch))))
# Training
for epoch in range(args.start_epoch, args.num_epochs):
train_loss = 0
train_acc = 0
scheduler.step() # 用了scheduler.step(),按epoch更新lr
model.train() # model.train():启用 BatchNormalization、Dropout model.eval():不启用
for i, x in enumerate(train_loader):
logit = model(x[0].float()) # 模型预测结果
target = train_label[i] # 真实标签
loss = criterion(logit, target.view(1).long()) #计算loss
model.zero_grad() # 每个batch清除一次梯度
loss.backward() # 反向传播loss,计算梯度
optimizer.step() # 优化器根据梯度更新网络参数
train_loss += loss.item() # 按epoch统计loss
train_acc += accuracy(logit, target.view(1).long()) # 按epoch统计acc
print('[epoch',epoch+1,'] Train loss:',train_loss/(i+1), 'Train Acc:',train_acc/(i+1))
# 保存模型
if os.path.exists(args.model_path) == False:
os.makedirs(args.model_path)
if (epoch+1) % 20 ==0:
torch.save(model.state_dict(), os.path.join(args.model_path, 'model-%d.pkl'%(epoch+1)))
# 训练中进行验证
if (epoch+1) % args.val_step == 0:
model.eval()
val_loss = 0
val_acc = 0
with torch.no_grad():
for i, x in enumerate(valid_loader):
logit = model(x[0].float())
target = valid_label[i]
val_loss += criterion(logit, target.view(1).long()).item()
val_acc += accuracy(logit, target.view(1).long())
if best_acc >= (val_acc/(i+1)):
best_epoch = epoch+1
torch.save(model.state_dict(), os.path.join(args.model_path, 'best_model-%d.pkl'%(best_epoch)))
best_acc = (val_acc/(i+1))
print('Val loss:',val_loss/(i+1), 'Val Acc:',val_acc/(i+1))
def test():
global best_epoch
model.load_state_dict(torch.load(os.path.join(args.model_path,
'model-%d.pkl'%(best_epoch))))
print("load model from 'model-%d.pkl'"%(best_epoch))
model.eval()
test_loss = 0
test_acc = 0
with torch.no_grad():
for i, x in enumerate(test_loader):
star=time.time()
logit = model(x[0].float())
target = test_label[i]
test_loss += criterion(logit, target.view(1).long()).item()
test_acc += accuracy(logit, target.view(1).long())
end=int((time.time()-star)*1000)
# print('pred:',torch.max(logit, 1)[1].float()
# .cpu().numpy(), 'true:',target.cpu().numpy(),'time:',end, 'index:',i)
print('Test loss:',test_loss/(i+1), 'Test Acc:',test_acc/(i+1))
if __name__ == '__main__':
# if args.mode == 'train':
# train()
# elif args.mode == 'test':
# best_epoch = args.test_epoch
# test()
train()
# best_epoch = 200
# test()
config.py
import argparse
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument('--mode', type=str, default='train')
parser.add_argument('--test_epoch',type=int, default=80)
parser.add_argument('--start_epoch',type=int, default=0)
parser.add_argument('--num_epochs',type=int, default=100)
parser.add_argument('--val_step',type=int, default=20)
model_forder = "2+3+4+5+6_v3"
data_forder = "2+3+4+5+6_v3"
parser.add_argument('--train_path', type=str, default='dataset/test9/clear/'+str(data_forder)+'/train.pkl')
parser.add_argument('--valid_path', type=str, default='dataset/test9/clear/'+str(data_forder)+'/valid.pkl')
parser.add_argument('--test_path', type=str, default='dataset/test9/clear/'+str(data_forder)+'/test.pkl')
parser.add_argument('--model_path', type=str, default='model/test9/clear/'+str(model_forder)+'/')
parser.add_argument('--batch_size', type=int, default=1)
parser.add_argument('--learning_rate',type=int, default=0.01)
parser.add_argument('--beta1',type=int, default=0.5)
parser.add_argument('--beta2',type=int, default=0.99)
parser.add_argument('--dropout_rate',type=int, default=0.5)
parser.add_argument('--weight_decay',type=int, default=0.0)
parser.add_argument('--frame_nums',type=int, default=64) #32-192, 64-960, 128-2496
parser.add_argument('--num_classes',type=int, default=4)
parser.add_argument('--feat_dims',type=int, default=13)
args = parser.parse_args()
return args
test.py
import os
import numpy as np
import torch
import torch.optim as optim
import torch.utils.data as data
import time
from model import *
from metric import accuracy
from test_config import get_args
args = get_args()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
train_tensor, train_label = torch.load(args.train_path)
valid_tensor, valid_label = torch.load(args.valid_path)
test_tensor , test_label = torch.load(args.test_path)
train_loader = data.DataLoader(data.TensorDataset(train_tensor.to(device)),
batch_size = args.batch_size, shuffle=False)
valid_loader = data.DataLoader(data.TensorDataset(valid_tensor.to(device)),
batch_size = args.batch_size, shuffle=False)
test_loader = data.DataLoader(data.TensorDataset(test_tensor.to(device)),
batch_size = args.batch_size, shuffle=False)
train_label = train_label.to(device)
valid_label = valid_label.to(device)
test_label = test_label.to(device)
A = [[0,1,0,0,0,0,0,0,0,0,0,0,0,0,0],
[1,0,1,0,0,0,0,0,0,0,0,0,0,0,0],
[0,1,0,1,0,0,1,0,0,1,0,0,0,0,0],
[0,0,1,0,1,0,0,0,0,0,0,0,0,0,0],
[0,0,0,1,0,1,0,0,0,0,0,0,0,0,0],
[0,0,0,0,1,0,0,0,0,0,0,0,0,0,0],
[0,0,1,0,0,0,0,1,0,0,0,0,0,0,0],
[0,0,0,0,0,0,1,0,1,0,0,0,0,0,0],
[0,0,0,0,0,0,0,1,0,0,0,0,0,0,0],
[0,0,1,0,0,0,0,0,0,0,1,0,1,0,0],
[0,0,0,0,0,0,0,0,0,1,0,1,0,0,0],
[0,0,0,0,0,0,0,0,0,0,1,0,0,0,0],
[0,0,0,0,0,0,0,0,0,1,0,0,1,0,0],
[0,0,0,0,0,0,0,0,0,0,0,1,0,1,0],
[0,0,0,0,0,0,0,0,0,0,0,0,1,0,0]]
A = torch.from_numpy(np.asarray(A)).to(device)
model = GGCN(A, train_tensor.size(3), args.num_classes,
[train_tensor.size(3), train_tensor.size(3)*3], [train_tensor.size(3)*3, 16, 32, 64],
args.feat_dims, args.frame_nums, args.dropout_rate)
if device == 'cuda':
model.cuda()
num_params = 0
for p in model.parameters():
num_params += p.numel()
# print(model)
# print('The number of parameters: {}'.format(num_params))
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr = args.learning_rate,
betas=[args.beta1, args.beta2], weight_decay = args.weight_decay)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma = 0.1)
best_epoch = 0
best_acc = 0
def test():
global best_epoch
model.load_state_dict(torch.load(os.path.join(args.model_path,
'model-%d.pkl'%(best_epoch))))
print("load model from 'model-%d.pkl'"%(best_epoch))
model.eval()
test_loss = 0
test_acc = 0
with torch.no_grad():
for i, x in enumerate(test_loader):
star=time.time()
logit = model(x[0].float())
target = test_label[i]
test_loss += criterion(logit, target.view(1).long()).item()
test_acc += accuracy(logit, target.view(1).long())
end=int((time.time()-star)*1000)
# print('pred:',torch.max(logit, 1)[1].float().cpu().numpy(), 'true:',target.cpu().numpy(),'time:',end, 'index:',i)
print('Test loss:',test_loss/(i+1), 'Test Acc:',test_acc/(i+1))
if __name__ == '__main__':
# if args.mode == 'train':
# train()
# elif args.mode == 'test':
# best_epoch = args.test_epoch
# test()
# train()
best_epoch = 100
test()
test_config.py
import argparse
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument('--mode', type=str, default='train')
parser.add_argument('--test_epoch',type=int, default=80)
parser.add_argument('--start_epoch',type=int, default=0)
parser.add_argument('--num_epochs',type=int, default=100)
parser.add_argument('--val_step',type=int, default=20)
model_forder = "2+3+4+5+6_v1"
data_forder = "2+3+4+5+6_v1"
parser.add_argument('--train_path', type=str, default='dataset/test9/src/'+str(data_forder)+'/train.pkl')
parser.add_argument('--valid_path', type=str, default='dataset/test9/src/'+str(data_forder)+'/valid.pkl')
parser.add_argument('--test_path', type=str, default='dataset/test9/src/'+str(data_forder)+'/test.pkl')
parser.add_argument('--model_path', type=str, default='model/test9/clear/'+str(model_forder)+'/')
parser.add_argument('--batch_size', type=int, default=1)
parser.add_argument('--learning_rate',type=int, default=0.01)
parser.add_argument('--beta1',type=int, default=0.5)
parser.add_argument('--beta2',type=int, default=0.99)
parser.add_argument('--dropout_rate',type=int, default=0.5)
parser.add_argument('--weight_decay',type=int, default=0.0)
parser.add_argument('--frame_nums',type=int, default=64) #32-192, 64-960, 128-2496
parser.add_argument('--num_classes',type=int, default=4)
parser.add_argument('--feat_dims',type=int, default=13)
args = parser.parse_args()
return args
model.py
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch
import numpy as np
from layer import GraphConvolution, StandConvolution, StandRecurrent
class GGCN(nn.Module):
def __init__(self, adj, num_v, num_classes, gc_dims, sc_dims, feat_dims, frame_nums, dropout=0.5):
super(GGCN, self).__init__()
terminal_cnt = 5
actor_cnt = 1
self.frame_nums=frame_nums
# torch.eye:返回一个2维张量,对角线位置全1,其它位置全0。 shape:adj.size(0)*adj.size(0)
# detach():创建一个新的tensor,将其从当前的计算图中分离出来.新的tensor与之前的共享data,但是不具有梯度.(可解决爆显存问题)
adj = adj + torch.eye(adj.size(0)).to(adj).detach() # [15,15]
ident = torch.eye(adj.size(0)).to(adj) # [15,15]
zeros = torch.zeros(adj.size(0), adj.size(1)).to(adj) # [15,15]
a=torch.cat([adj, ident, zeros], 1) # [15,15]*3=[15,45]
b=torch.cat([ident, adj, ident], 1) # [15,15]*3=[15,45]
c=torch.cat([zeros, ident, adj], 1) # [15,15]*3=[15,45]
self.adj = torch.cat([a,b,c], 0).float() # [15,45]*3=[45,45]
# nn.Parameter:把xx转成模型中根据训练可以改动的参数
# randn:返回一个正态分布的随机数的张量,均值为“0”,方差为“1”
self.terminal = nn.Parameter(torch.randn(terminal_cnt, actor_cnt, feat_dims)) #[5,1,13]
self.gcl = GraphConvolution(gc_dims[0]+feat_dims, gc_dims[1], num_v, dropout=dropout) #3+13、9、3
self.conv= StandConvolution(sc_dims, num_classes, dropout=dropout) # 使用标准卷积,自定义模型
# self.conv= StandRecurrent(sc_dims, num_classes, dropout=dropout) # 使用LSTM模型
nn.init.xavier_normal_(self.terminal) # Xavier正态分布初始化
def forward(self, x):
# F.interpolate(input,x,mode='nearest'):
# 采样函数,x为采样倍数(1/2则为下采样,2为上采样); model为采样方法,默认nearest
head_la = F.interpolate(torch.stack([self.terminal[0],self.terminal[1]],2), 6) # [1,13,6]
head_ra = F.interpolate(torch.stack([self.terminal[0],self.terminal[2]],2), 6) # [1,13,6]
lw_ra = F.interpolate(torch.stack([self.terminal[3],self.terminal[4]],2), 6) # [1,13,6]
# print(lw_ra.shape)
# 在2维度合并以下5个矩阵, node_features=[1,13,3]*5--[1,13,15] x=[1, 32, 15, 3]
node_features = torch.cat([
(head_la[:,:,:3] + head_ra[:,:,:3])/2, # [1,13,3] head_la、head_ra的0-3相加
torch.stack((lw_ra[:,:,2], lw_ra[:,:,1], lw_ra[:,:,0]), 2), # [1,13,3] lw_ra的0-2合并
lw_ra[:,:,3:], head_la[:,:,3:], head_ra[:,:,3:]], 2).to(x) # [1,13,3] 三者都是取3-5
# permute:将tensor的维度换位; unsqueeze:指定位置1(或者n),加上一个维数为1的维度; repeat:沿着纵轴(1)方向,重复增加n倍列数
# 数据变化:[1,13,15]-[1,15,13]-[1,1,15,13]-[1,32,15,13]
node_features=node_features.permute(0,2,1).unsqueeze(1).repeat(1,self.frame_nums,1,1)
# [1,32,15,3]+[1,32,15,13]=[1,32,15,16]
x = torch.cat((x, node_features), 3)
# [1,30,15,16]*3=[1,30,45,16]
concat_seq = torch.cat([x[:,:-2], x[:,1:-1], x[:,2:]], 2)
# print(self.adj.shape, concat_seq.shape)
multi_conv = self.gcl(self.adj, concat_seq)
logit = self.conv(multi_conv)
return logit
layer.py
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# 图卷积结构
class GraphConvolution(nn.Module):
def __init__(self, input_dim, output_dim, num_vetex, act=F.relu, dropout=0.5, bias=True):
super(GraphConvolution, self).__init__()
self.alpha = 1.
self.act = act
self.dropout = nn.Dropout(dropout)
self.weight = nn.Parameter(torch.randn(input_dim, output_dim)).to(device) # [16,9]
if bias:
self.bias = nn.Parameter(torch.randn(output_dim)).to(device)
else:
self.bias = None
for w in [self.weight]:
nn.init.xavier_normal_(w)
# gcn的网络结构---计算公式
def normalize(self, m):
rowsum = torch.sum(m, 0)
r_inv = torch.pow(rowsum, -0.5)
r_mat_inv = torch.diag(r_inv).float()
m_norm = torch.mm(r_mat_inv, m)
m_norm = torch.mm(m_norm, r_mat_inv)
return m_norm
def forward(self, adj, x):
x = self.dropout(x) # [1,30,45,16]
adj_norm = self.normalize(adj) # [45,45]
# 如果mat1 是一个n×m张量,mat2 是一个 m×p 张量,将会输出一个 n×p 张量
sqr_norm = self.normalize(torch.mm(adj,adj)) # [45,45]
m_norm = self.alpha*adj_norm + (1.-self.alpha)*sqr_norm # [45,45]
x_tmp = np.einsum('abcd,de->abce', x, self.weight) # # [1,30,45,16]+[16,9]=[1, 30, 45, 9]
x_out = np.einsum('ij,abid->abjd', m_norm, x_tmp) # [1, 30, 45, 9]
# x_tmp = torch.einsum('abcd,de->abce', x, self.weight)
# x_out = torch.einsum('ij,abid->abjd', m_norm, x_tmp)
if self.bias is not None:
x_out += self.bias
x_out=torch.from_numpy(x_out)
# x_out=torch.from_numpy(x_out).cuda()
x_out = self.act(x_out)
return x_out
# 使用标准卷积,自定义模型
class StandConvolution(nn.Module):
def __init__(self, dims, num_classes, dropout):
super(StandConvolution, self).__init__()
# h/w = (h/w - kennel_size + 2padding) / stride + 1
# x = ([10,16,30,32]),其中h=30,w=32,对于卷积核长分别是 h:3,w:2 ;对于步长分别是h:2,w:1;padding默认0;
# h = (30 - 3 + 20)/ 2 +1 = 27/2 +1 = 13+1 =14
# w =(32 - 2 + 2*0)/ 1 +1 = 30/1 +1 = 30+1 =31
# batch = 10, out_channel = 33
# 故: y= ([10, 33, 14, 31])
self.num_classes=num_classes
self.dropout = nn.Dropout(dropout)
self.conv = nn.Sequential(
nn.Conv2d(dims[0], dims[1], kernel_size=5, stride=2),
nn.InstanceNorm2d(dims[1]),
nn.ReLU(inplace=True),
# nn.AvgPool2d(5, stride=1),
nn.Conv2d(dims[1], dims[2], kernel_size=5, stride=2),
nn.InstanceNorm2d(dims[2]),
nn.ReLU(inplace=True),
#nn.AvgPool2d(3, stride=1),
nn.Conv2d(dims[2], dims[3], kernel_size=5, stride=2),
nn.InstanceNorm2d(dims[3]),
nn.ReLU(inplace=True),
#nn.AvgPool2d(3, stride=2)
).to(device)
# self.fc = nn.Linear(dims[3]*3, num_classes).to(device)
self.fc = nn.Linear(960, num_classes).to(device) # 32-192, 64-960, 128-2496
def forward(self, x):
x = self.dropout(x.permute(0,3,1,2)) # [1, 9, 30, 45] [1,9,62,45]
x_tmp = self.conv(x) # [1, 64, 1, 3] [1,64,5,3]
# view:按照行优先的顺序排成一个一维的数据,再重新组成需要的形状 [1, 64, 1, 3]---[1, 192]
x_tmp = x_tmp.view(x.size(0), -1) # [1, 64, 1, 3]---[1, 192]
# print(x_tmp.shape)
x_out = self.fc(x_tmp) # [1,4],返回分类标签
return x_out
# 使用LSTM模型
class StandRecurrent(nn.Module):
def __init__(self, dims, num_classes, dropout):
super(StandRecurrent, self).__init__()
self.lstm = nn.LSTM(dims[0]*45, dims[1], batch_first=True,dropout=0.5).to(device)
self.fc = nn.Linear(dims[1], num_classes).to(device)
def forward(self, x):
x_tmp,_ = self.lstm(x.contiguous().view(x.size(0), x.size(1), -1))
x_out = self.fc(x_tmp[:,-1])
return x_out
metric.py
import torch
from sklearn.metrics import accuracy_score
def accuracy(preds, target):
# torch.max(a,dim):返回torch中的最大值a、以及a对应的下标; dim=0行对比最大值,dim=1则是列对比最大值
preds = torch.max(preds, 1)[1].float()
acc = accuracy_score(preds.cpu().numpy(), target.cpu().numpy()) # 返回准度率
# print(preds.cpu().numpy(), target.cpu().numpy())
return acc