手部姿态检测(按视频段)_v2(stgcn)

参考链接:

https://github.com/yongqyu/st-gcn-pytorch

https://www.cnblogs.com/shyern/p/11262926.html

https://blog.csdn.net/qq_36893052/article/details/79860328

https://www.zhihu.com/collection/437834930

网络特点:
stgcn(时空图卷积):
1.2018年提出,网络由九层时空图卷积,一共有9个时间卷积核,在每一个ST-GCN使用残差链接
2.通过线性堆叠的 GCN 和 TCN 来间接扩大每个节点的感受野,非常有助于时空信息的提取
3.通过 openpose 从视频中提取 2D 检测点坐标,再将置信度作为 z 轴,从而得到 3D 骨架序列

试验方法:

序号 Idea 原理 优先级 数据状态 更新前 更新后 备注
1 优化手部特征点检测 现在的手部检测点经常会检测飞了,希望能有更稳定的检测器。 目前没有找到更优的检测器,有待旷世的接口,暂时挂起来 6人(1504条)
1.动作不是很标准,手部检测很多点检测不到;
2.三类数据极其不均衡。
1.准确度不高;
2.训练过拟合。
  目前没有更好的优化器,后续采集尽量动作规范。
2 采集更多的数据 平衡各个类别的数据 已尝试 11人(3639条)
北京6人
深圳5人(动作规范)
1.准确度不高;
2.训练过拟合。
准确度有所提升,组内测试集准确度有80%多,但是泛化能力大概只有45%-60%。 过拟合严重。
3 1.weight decay
2.dropout
3.学习率
减少过拟合 已尝试 11人(3639条)
北京6人
深圳5人(动作规范)
1.准确度不高;
2.训练过拟合。
在验证集上的过拟合能很大程度缓解,但没有彻底解决泛化能力。 在北京6人上面进行测试,效果极其不佳,怀疑与动作标准有关。
4 超级人模拟更多的动作 一个人模拟各人的多种动作 已尝试 11人(3639条)
北京6人
深圳5人(动作规范)+1个超级人
1.准确度不高;
2.训练过拟合。
1.准确度不高;
2.训练过拟合。
超级用户没有明显效果。
5 继续新增数据 数据扩增能解决过拟合问题,并且有助于准确度提高 已尝试 22人(7000条)
北京6人+后采6人(距离不统一)
深圳10人(动作规范)
1.准确度不高(60%);
2.训练过拟合(轻微)。
准确度提升较大,大概能多70%出头,提升了10个点。 深圳的距离比较统一,用深圳的做测试集,准确度要比北京的高。
6 特征点数据归一化 用相对坐标除以人脸的长宽,把特征归一化,解决距离问题 优先尝试 22人(7000条)
北京6人+后采6人(距离不统一)
深圳10人(动作规范)
1.准确度不高(70%);
2.训练过拟合。
效果比原来差 效果不佳,有可能是归一化后的数据差异变小
7 清洗数据 增加了两个限制方法:
1.检测不到人脸、脸宽小于50像素的去掉;(帧)
2.手部的7个关键点,若是置信度大于0.1的 不超过4个,也去掉;(帧)
3.一秒30帧,有15帧没通过上面的话,整段视频就去掉。(视频)
优先尝试 22人(清洗后剩4000条)
北京6人+后采6人(距离不统一)
深圳10人(动作规范)
1.准确度不高(70%);
2.训练过拟合(轻微)。
1.准确度提升较大,大概到达80%多,提升了10多个点;
2.过拟合也有减轻。
若能让手部检测更准确,准确度还能提升。
8 延长每段视频的时间(2s/5s) 视频时间延长,能表达的内容更多 优先尝试 2/3/4/9批数据(16人)
深圳10+北京6
1.在清洗数据上准确度80%多;
2.在原数据上准确度很低。
1.在清洗数据上有两三个点的提升(85%左右);
2.在原始数据上没有什么效果(75%左右)。
1.还是脏数据太多,影响了预测结果,需要有更优的手部检测器;
2.经过1s/2s/4s的交叉对比,目前最优的是:2s+通过15帧(清洗数据)、2s+通过15帧+全部帧合并(不清洗数据)。

目前最优:2s视频,64帧/通过15帧即可,准确度90%。

若是希望更多的数据合格,可以选择通过8帧即可,损失一点精度。

原数据:64帧 64帧 最优
data 清洗 脏数据 清洗 脏数据 通过15帧 数据量 清洗前 总和 清洗后 总和
2+3-811 0.917 0.851 2+3-811-2 0.918 0.743 0 1 3 0 1 3
2+4-811 0.855 0.667 2+4-811-2 0.868 0.686 2 545 327 93 965 369 193 90 652
3+4-811 0.926 0.695 3+4-811-2 0.92 0.83 3 473 300 129 902 375 269 128 772
4 475 304 162 941 319 217 162 698
9 184 99 48 331 152 82 48 282
1677 1030 432 3139 1215 761 428 2404

代码工程:

data.py    

数据处理:
1.一段2min的视频,每1s剪裁下来,编号1-n,得到n段视频(n条数据);加上人物编号1-m;每段加动作类型标签:xx
2.每一帧得到关键坐标点,以某个稳定的中心点作为坐标原点,其他的点都减去这个原点
原点坐标改为(0,0),得到整体相对坐标;
3. 把坐标变为一维存储,后面要用再从代码reshape成(n,-1,2)

from torch.utils import data
import torch
import os
import random
import numpy as np


list1=[14,18,23,30,40,48]   # 测试人物编号
list2=[13,17,22,29,39,47]   # 验证测试编号

forder='2+3+4+5+6'
forder2='2+3+4+5+6_v1'
file_name = 'dataset/test9/src/'+str(forder)+'.txt'

save='dataset/test9/src/'+str(forder2)+"/"
if os.path.exists(save) == False:
	os.makedirs(save)

split_label=1
frame_nums=64
f = open(file_name)
lines = f.readlines()

prev_video = int(lines[0].strip().split(' ')[1])   # 视频编号
prev_categ = int(lines[0].strip().split(' ')[2])   # 类别标签

datas=[]
datas_label=[]
frames = []
train = []
valid = []
test  = []
train_label = []
valid_label = []
test_label  = []
m=0

for line in lines:
	line = line.strip().split(' ')
	vid = int(line[1])   # 视频编号
	aid = int(line[0])   # 任务编号
	cid = int(line[2])   # 类别标签
	label=list(map(int, line[:3]))
	features = list(map(float, line[3:]))   # 21个特征点
	
	# 若是视频标签相同,则都放入数组中,作为一条训练数据
	if prev_video == vid:
		frames.append(np.reshape(np.asarray(features), (-1,3)))   # 把一维转换成[15,3]的格式
	else:
		# 如果一条视频帧数过多,则选取前frame_nums帧,并连接起来,转成torch格式
		if len(frames) >= frame_nums:
			# frames = random.sample(frames, frame_nums)   # 随机取帧
			frames = frames[0:frame_nums]    # 按顺序取帧
			frames = torch.from_numpy(np.stack(frames, 0))  # 把每一帧在0维连接起来,转成torch格式

		# 若是视频帧数不够多,则利用线性插值,把数据补充到frame_nums帧
		else:
			frames = np.stack(frames, 0) # 把每一帧连接起来,如:n帧 n*[1,15,3]=[n,15,3] 作为一条数据
			xloc = np.arange(frames.shape[0])   # np.arange:生成n个自然数,即等于frame_nums帧数
			new_xloc = np.linspace(0, frames.shape[0], frame_nums)  # 生成start和end之间frame_nums个等差间隔的元素,如:1、2、··n
			frames = np.reshape(frames, (frames.shape[0], -1)).transpose()  # transpose:矩阵转置
			# print(frames.shape,xloc.shape,new_xloc.shape)
			
			new_datas = []
			for data in frames:
				new_datas.append(np.interp(new_xloc, xloc, data))   # interp:进行线性插值, 获得frame_nums帧数据
			frames = torch.from_numpy(np.stack(new_datas, 0)).t()  # 把n帧数据再次连接起来,转换torch格式

		frames = frames.view(frame_nums, -1, 3)  # 强制reshape矩阵形状
		datas.append(frames)   #数据
		if split_label==1:
			datas_label.append(label)   #标签
		else:
			datas_label.append(prev_categ)   #标签

		m+=1
		# 2.按人物编号分
		if aid in list1:
			test.append(frames)
			test_label.append(prev_categ)
		elif aid in list2:
			valid.append(frames)
			valid_label.append(prev_categ)
		else:
			train.append(frames)
			train_label.append(prev_categ)

		frames = [np.reshape(np.asarray(features), (-1,3))]  # frames重置,等于每条视频的第一帧的关键点 

	prev_actor = aid   # 人物编号重置
	prev_video = vid   # 视频编号重置
	prev_categ = cid   # 标签重置


# # 3.随机划分
# lens=len(datas)
# num=random.sample(range(lens),lens)   #获取随机数

# for i in range(lens):
# 	index=num[i]
# 	if i <=int(lens*0.7):
# 		train.append(datas[index])
# 		train_label.append(datas_label[index])
# 	elif i <=int(lens*0.9):
# 		valid.append(datas[index])
# 		valid_label.append(datas_label[index])
# 	else:
# 		test.append(datas[index])
# 		test_label.append(datas_label[index])



train_label = torch.from_numpy(np.asarray(train_label))
valid_label = torch.from_numpy(np.asarray(valid_label))
test_label  = torch.from_numpy(np.asarray(test_label))
print(len(train_label),len(valid_label),len(test_label))
print(train[0].shape)


torch.save((torch.stack(train, 0), train_label), save+'train.pkl')
torch.save((torch.stack(valid, 0), valid_label), save+'valid.pkl')
torch.save((torch.stack(test, 0),  test_label),  save+'test.pkl')


# 数据处理:
# 1.一段2min的视频,每1s剪裁下来,编号1-n,得到n段视频(n条数据);加上人物编号1-m;每段加动作类型标签:xx
# 2.每一帧得到关键坐标点,以某个稳定的中心点作为坐标原点,其他的点都减去这个原点
# 原点坐标改为(0,0),得到整体相对坐标;
# 3. 把坐标变为一维存储,后面要用再从代码reshape成(n,-1,2)

main.py

import os
import numpy as np
import torch
import torch.optim as optim
import torch.utils.data as data
import time
from model import *
from metric import accuracy
from config import get_args
args = get_args()

# 判断是否有gpu
device = torch.device('cpu' if torch.cuda.is_available() else 'cpu')

# torch的数据加载方法
train_tensor, train_label = torch.load(args.train_path)
valid_tensor, valid_label = torch.load(args.valid_path)
test_tensor , test_label  = torch.load(args.test_path)

# 数据加载器,一次性加载所有数据,每次取出batch个数据
train_loader = data.DataLoader(data.TensorDataset(train_tensor.to(device)),
							   batch_size = args.batch_size, shuffle=False)
valid_loader = data.DataLoader(data.TensorDataset(valid_tensor.to(device)),
							   batch_size = args.batch_size, shuffle=False)
test_loader  = data.DataLoader(data.TensorDataset(test_tensor.to(device)),
							   batch_size = args.batch_size, shuffle=False)
train_label = train_label.to(device)
valid_label = valid_label.to(device)
test_label  = test_label.to(device)

# 权重矩阵
A = [[0,1,0,0,0,0,0,0,0,0,0,0,0,0,0],
	 [1,0,1,0,0,0,0,0,0,0,0,0,0,0,0],
	 [0,1,0,1,0,0,1,0,0,1,0,0,0,0,0],
	 [0,0,1,0,1,0,0,0,0,0,0,0,0,0,0],
	 [0,0,0,1,0,1,0,0,0,0,0,0,0,0,0],
	 [0,0,0,0,1,0,0,0,0,0,0,0,0,0,0],
	 [0,0,1,0,0,0,0,1,0,0,0,0,0,0,0],
	 [0,0,0,0,0,0,1,0,1,0,0,0,0,0,0],
	 [0,0,0,0,0,0,0,1,0,0,0,0,0,0,0],
	 [0,0,1,0,0,0,0,0,0,0,1,0,1,0,0],
	 [0,0,0,0,0,0,0,0,0,1,0,1,0,0,0],
	 [0,0,0,0,0,0,0,0,0,0,1,0,0,0,0],
	 [0,0,0,0,0,0,0,0,0,1,0,0,1,0,0],
	 [0,0,0,0,0,0,0,0,0,0,0,1,0,1,0],
	 [0,0,0,0,0,0,0,0,0,0,0,0,1,0,0]]
A = torch.from_numpy(np.asarray(A)).to(device)

# 定义GCN模型
model = GGCN(A, train_tensor.size(3), args.num_classes,
			 [train_tensor.size(3), train_tensor.size(3)*3], [train_tensor.size(3)*3, 16, 32, 64],
			 args.feat_dims, args.frame_nums, args.dropout_rate)
# print([train_tensor.size(3), train_tensor.size(3)*3], [train_tensor.size(3)*3, 16, 32, 64])

if device == 'cuda':
	model.cuda()

# 查看模型参数
num_params = 0
for p in model.parameters():
	num_params += p.numel()
# print(model)

# 定义Loss,优化器,学习率衰减
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr = args.learning_rate,
					   betas=[args.beta1, args.beta2], weight_decay = args.weight_decay)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma = 0.1)

best_epoch = 0
best_acc = 0
def train():
	global best_epoch, best_acc

	# 恢复模型,训练
	if args.start_epoch:
		model.load_state_dict(torch.load(os.path.join(args.model_path, 'model-%d.pkl'%(args.start_epoch))))

	# Training
	for epoch in range(args.start_epoch, args.num_epochs):
		train_loss = 0
		train_acc  = 0
		scheduler.step()   # 用了scheduler.step(),按epoch更新lr
		model.train()  # model.train():启用 BatchNormalization、Dropout    model.eval():不启用


		for i, x in enumerate(train_loader):
			logit = model(x[0].float())   # 模型预测结果
			target = train_label[i]  # 真实标签
			loss = criterion(logit, target.view(1).long())   #计算loss
			
			model.zero_grad()  # 每个batch清除一次梯度
			loss.backward()    # 反向传播loss,计算梯度
			optimizer.step()   # 优化器根据梯度更新网络参数

			train_loss += loss.item()   # 按epoch统计loss
			train_acc  += accuracy(logit, target.view(1).long())   # 按epoch统计acc
		print('[epoch',epoch+1,'] Train loss:',train_loss/(i+1), 'Train Acc:',train_acc/(i+1))

		# 保存模型
		if os.path.exists(args.model_path) == False:
			os.makedirs(args.model_path)
		if (epoch+1) % 20 ==0:
			torch.save(model.state_dict(), os.path.join(args.model_path, 'model-%d.pkl'%(epoch+1)))

		# 训练中进行验证
		if (epoch+1) % args.val_step == 0:
			model.eval()
			val_loss = 0
			val_acc  = 0
			with torch.no_grad():
				for i, x in enumerate(valid_loader):
					logit = model(x[0].float())
					target = valid_label[i]

					val_loss += criterion(logit, target.view(1).long()).item()
					val_acc += accuracy(logit, target.view(1).long())

				if best_acc >= (val_acc/(i+1)):
					best_epoch = epoch+1
					torch.save(model.state_dict(), os.path.join(args.model_path, 'best_model-%d.pkl'%(best_epoch)))
				best_acc = (val_acc/(i+1))

			print('Val loss:',val_loss/(i+1), 'Val Acc:',val_acc/(i+1))

def test():
	global best_epoch
	
	model.load_state_dict(torch.load(os.path.join(args.model_path, 
												  'model-%d.pkl'%(best_epoch))))
	print("load model from 'model-%d.pkl'"%(best_epoch))

	model.eval()
	test_loss = 0
	test_acc  = 0
	with torch.no_grad():
		for i, x in enumerate(test_loader):
			star=time.time()
			logit = model(x[0].float())
			target = test_label[i]

			test_loss += criterion(logit, target.view(1).long()).item()
			test_acc  += accuracy(logit, target.view(1).long())

			end=int((time.time()-star)*1000)
			# print('pred:',torch.max(logit, 1)[1].float()
			# .cpu().numpy(),  'true:',target.cpu().numpy(),'time:',end, 'index:',i)
	print('Test loss:',test_loss/(i+1), 'Test Acc:',test_acc/(i+1))

if __name__ == '__main__':
	# if args.mode == 'train':
	# 	train()
	# elif args.mode == 'test':
	# 	best_epoch = args.test_epoch
	# 	test()
	
	train()
	# best_epoch = 200
	# test()

config.py

import argparse

def get_args():
	parser = argparse.ArgumentParser()

	parser.add_argument('--mode',  type=str, default='train')
	parser.add_argument('--test_epoch',type=int, default=80)
	
	parser.add_argument('--start_epoch',type=int, default=0)
	parser.add_argument('--num_epochs',type=int, default=100)
	parser.add_argument('--val_step',type=int, default=20)
	
	model_forder = "2+3+4+5+6_v3"
	data_forder = "2+3+4+5+6_v3"
	parser.add_argument('--train_path', type=str, default='dataset/test9/clear/'+str(data_forder)+'/train.pkl')
	parser.add_argument('--valid_path', type=str, default='dataset/test9/clear/'+str(data_forder)+'/valid.pkl')
	parser.add_argument('--test_path',  type=str, default='dataset/test9/clear/'+str(data_forder)+'/test.pkl')

	parser.add_argument('--model_path',  type=str, default='model/test9/clear/'+str(model_forder)+'/')

	parser.add_argument('--batch_size',  type=int, default=1)
	parser.add_argument('--learning_rate',type=int, default=0.01)
	parser.add_argument('--beta1',type=int, default=0.5)
	parser.add_argument('--beta2',type=int, default=0.99)
	parser.add_argument('--dropout_rate',type=int, default=0.5)
	parser.add_argument('--weight_decay',type=int, default=0.0)

	parser.add_argument('--frame_nums',type=int, default=64)   #32-192, 64-960, 128-2496
	parser.add_argument('--num_classes',type=int, default=4)
	parser.add_argument('--feat_dims',type=int, default=13)
	
	
	args = parser.parse_args()

	return args

test.py

import os
import numpy as np
import torch
import torch.optim as optim
import torch.utils.data as data
import time
from model import *
from metric import accuracy
from test_config import get_args
args = get_args()

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

train_tensor, train_label = torch.load(args.train_path)
valid_tensor, valid_label = torch.load(args.valid_path)
test_tensor , test_label  = torch.load(args.test_path)

train_loader = data.DataLoader(data.TensorDataset(train_tensor.to(device)),
							   batch_size = args.batch_size, shuffle=False)
valid_loader = data.DataLoader(data.TensorDataset(valid_tensor.to(device)),
							   batch_size = args.batch_size, shuffle=False)
test_loader  = data.DataLoader(data.TensorDataset(test_tensor.to(device)),
							   batch_size = args.batch_size, shuffle=False)
train_label = train_label.to(device)
valid_label = valid_label.to(device)
test_label  = test_label.to(device)

A = [[0,1,0,0,0,0,0,0,0,0,0,0,0,0,0],
	 [1,0,1,0,0,0,0,0,0,0,0,0,0,0,0],
	 [0,1,0,1,0,0,1,0,0,1,0,0,0,0,0],
	 [0,0,1,0,1,0,0,0,0,0,0,0,0,0,0],
	 [0,0,0,1,0,1,0,0,0,0,0,0,0,0,0],
	 [0,0,0,0,1,0,0,0,0,0,0,0,0,0,0],
	 [0,0,1,0,0,0,0,1,0,0,0,0,0,0,0],
	 [0,0,0,0,0,0,1,0,1,0,0,0,0,0,0],
	 [0,0,0,0,0,0,0,1,0,0,0,0,0,0,0],
	 [0,0,1,0,0,0,0,0,0,0,1,0,1,0,0],
	 [0,0,0,0,0,0,0,0,0,1,0,1,0,0,0],
	 [0,0,0,0,0,0,0,0,0,0,1,0,0,0,0],
	 [0,0,0,0,0,0,0,0,0,1,0,0,1,0,0],
	 [0,0,0,0,0,0,0,0,0,0,0,1,0,1,0],
	 [0,0,0,0,0,0,0,0,0,0,0,0,1,0,0]]
A = torch.from_numpy(np.asarray(A)).to(device)

model = GGCN(A, train_tensor.size(3), args.num_classes, 
			 [train_tensor.size(3), train_tensor.size(3)*3], [train_tensor.size(3)*3, 16, 32, 64], 
			 args.feat_dims, args.frame_nums, args.dropout_rate)
if device == 'cuda':
	model.cuda()

num_params = 0
for p in model.parameters():
	num_params += p.numel()
# print(model)
# print('The number of parameters: {}'.format(num_params))

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr = args.learning_rate,
					   betas=[args.beta1, args.beta2], weight_decay = args.weight_decay)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma = 0.1)

best_epoch = 0
best_acc = 0

def test():
	global best_epoch

	model.load_state_dict(torch.load(os.path.join(args.model_path, 
												  'model-%d.pkl'%(best_epoch))))
	print("load model from 'model-%d.pkl'"%(best_epoch))

	model.eval()
	test_loss = 0
	test_acc  = 0
	with torch.no_grad():
		for i, x in enumerate(test_loader):
			star=time.time()
			logit = model(x[0].float())
			target = test_label[i]

			test_loss += criterion(logit, target.view(1).long()).item()
			test_acc  += accuracy(logit, target.view(1).long())

			end=int((time.time()-star)*1000)
			# print('pred:',torch.max(logit, 1)[1].float().cpu().numpy(),  'true:',target.cpu().numpy(),'time:',end, 'index:',i)
	print('Test loss:',test_loss/(i+1), 'Test Acc:',test_acc/(i+1))

if __name__ == '__main__':
	# if args.mode == 'train':
	# 	train()
	# elif args.mode == 'test':
	# 	best_epoch = args.test_epoch
	# 	test()
	
	# train()
	best_epoch = 100
	test()

test_config.py

import argparse

def get_args():
	parser = argparse.ArgumentParser()

	parser.add_argument('--mode',  type=str, default='train')
	parser.add_argument('--test_epoch',type=int, default=80)

	parser.add_argument('--start_epoch',type=int, default=0)
	parser.add_argument('--num_epochs',type=int, default=100)
	parser.add_argument('--val_step',type=int, default=20)
	
	model_forder = "2+3+4+5+6_v1"
	data_forder = "2+3+4+5+6_v1"
	parser.add_argument('--train_path', type=str, default='dataset/test9/src/'+str(data_forder)+'/train.pkl')
	parser.add_argument('--valid_path', type=str, default='dataset/test9/src/'+str(data_forder)+'/valid.pkl')
	parser.add_argument('--test_path',  type=str, default='dataset/test9/src/'+str(data_forder)+'/test.pkl')

	parser.add_argument('--model_path',  type=str, default='model/test9/clear/'+str(model_forder)+'/')

	parser.add_argument('--batch_size',  type=int, default=1)
	parser.add_argument('--learning_rate',type=int, default=0.01)
	parser.add_argument('--beta1',type=int, default=0.5)
	parser.add_argument('--beta2',type=int, default=0.99)
	parser.add_argument('--dropout_rate',type=int, default=0.5)
	parser.add_argument('--weight_decay',type=int, default=0.0)

	parser.add_argument('--frame_nums',type=int, default=64)   #32-192, 64-960, 128-2496
	parser.add_argument('--num_classes',type=int, default=4)
	parser.add_argument('--feat_dims',type=int, default=13)
	

	args = parser.parse_args()

	return args

model.py

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch
import numpy as np

from layer import GraphConvolution, StandConvolution, StandRecurrent

class GGCN(nn.Module):
	def __init__(self, adj, num_v, num_classes, gc_dims, sc_dims, feat_dims, frame_nums, dropout=0.5):
		super(GGCN, self).__init__()
		terminal_cnt = 5
		actor_cnt = 1
		self.frame_nums=frame_nums
		
		# torch.eye:返回一个2维张量,对角线位置全1,其它位置全0。 shape:adj.size(0)*adj.size(0)
		# detach():创建一个新的tensor,将其从当前的计算图中分离出来.新的tensor与之前的共享data,但是不具有梯度.(可解决爆显存问题)
		adj = adj + torch.eye(adj.size(0)).to(adj).detach()   # [15,15]
		ident = torch.eye(adj.size(0)).to(adj)   # [15,15]
		zeros = torch.zeros(adj.size(0), adj.size(1)).to(adj)   # [15,15]
		a=torch.cat([adj, ident, zeros], 1)   # [15,15]*3=[15,45]
		b=torch.cat([ident, adj, ident], 1)   # [15,15]*3=[15,45]
		c=torch.cat([zeros, ident, adj], 1)   # [15,15]*3=[15,45]
		self.adj = torch.cat([a,b,c], 0).float()   # [15,45]*3=[45,45]


		# nn.Parameter:把xx转成模型中根据训练可以改动的参数
		# randn:返回一个正态分布的随机数的张量,均值为“0”,方差为“1”
		self.terminal = nn.Parameter(torch.randn(terminal_cnt, actor_cnt, feat_dims))   #[5,1,13]
		self.gcl = GraphConvolution(gc_dims[0]+feat_dims, gc_dims[1], num_v, dropout=dropout) #3+13、9、3
		self.conv= StandConvolution(sc_dims, num_classes, dropout=dropout)   # 使用标准卷积,自定义模型
		# self.conv= StandRecurrent(sc_dims, num_classes, dropout=dropout)   # 使用LSTM模型
		
		nn.init.xavier_normal_(self.terminal)   # Xavier正态分布初始化
		
	def forward(self, x):
		# F.interpolate(input,x,mode='nearest'):
		# 采样函数,x为采样倍数(1/2则为下采样,2为上采样); model为采样方法,默认nearest
		head_la = F.interpolate(torch.stack([self.terminal[0],self.terminal[1]],2), 6)  # [1,13,6]
		head_ra = F.interpolate(torch.stack([self.terminal[0],self.terminal[2]],2), 6)  # [1,13,6]
		lw_ra = F.interpolate(torch.stack([self.terminal[3],self.terminal[4]],2), 6)  # [1,13,6]
		# print(lw_ra.shape)
		
		# 在2维度合并以下5个矩阵, node_features=[1,13,3]*5--[1,13,15]   x=[1, 32, 15, 3]
		node_features = torch.cat([
								   (head_la[:,:,:3] + head_ra[:,:,:3])/2,   # [1,13,3]   head_la、head_ra的0-3相加
								   torch.stack((lw_ra[:,:,2], lw_ra[:,:,1], lw_ra[:,:,0]), 2),  # [1,13,3]  lw_ra的0-2合并
								   lw_ra[:,:,3:], head_la[:,:,3:], head_ra[:,:,3:]], 2).to(x)   # [1,13,3]  三者都是取3-5
		
		
		# permute:将tensor的维度换位;  unsqueeze:指定位置1(或者n),加上一个维数为1的维度; repeat:沿着纵轴(1)方向,重复增加n倍列数
		# 数据变化:[1,13,15]-[1,15,13]-[1,1,15,13]-[1,32,15,13]
		node_features=node_features.permute(0,2,1).unsqueeze(1).repeat(1,self.frame_nums,1,1)
		# [1,32,15,3]+[1,32,15,13]=[1,32,15,16]
		x = torch.cat((x, node_features), 3)
		# [1,30,15,16]*3=[1,30,45,16]
		concat_seq = torch.cat([x[:,:-2], x[:,1:-1], x[:,2:]], 2)
		
		# print(self.adj.shape, concat_seq.shape)
		multi_conv = self.gcl(self.adj, concat_seq)
		logit = self.conv(multi_conv)
		
		return logit
		

layer.py

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 图卷积结构
class GraphConvolution(nn.Module):
	def __init__(self, input_dim, output_dim, num_vetex, act=F.relu, dropout=0.5, bias=True):
		super(GraphConvolution, self).__init__()

		self.alpha = 1.

		self.act = act
		self.dropout = nn.Dropout(dropout)
		self.weight = nn.Parameter(torch.randn(input_dim, output_dim)).to(device)  # [16,9]
		if bias:
			self.bias = nn.Parameter(torch.randn(output_dim)).to(device)
		else:
			self.bias = None

		for w in [self.weight]:
			nn.init.xavier_normal_(w)

	# gcn的网络结构---计算公式
	def normalize(self, m):
		rowsum = torch.sum(m, 0)
		r_inv = torch.pow(rowsum, -0.5)
		r_mat_inv = torch.diag(r_inv).float()

		m_norm = torch.mm(r_mat_inv, m)
		m_norm = torch.mm(m_norm, r_mat_inv)

		return m_norm

	def forward(self, adj, x):
		x = self.dropout(x)  # [1,30,45,16]
		adj_norm = self.normalize(adj)  # [45,45]
		# 如果mat1 是一个n×m张量,mat2 是一个 m×p 张量,将会输出一个 n×p 张量
		sqr_norm = self.normalize(torch.mm(adj,adj))  # [45,45]
		m_norm = self.alpha*adj_norm + (1.-self.alpha)*sqr_norm  # [45,45]
		
		x_tmp = np.einsum('abcd,de->abce', x, self.weight)  # # [1,30,45,16]+[16,9]=[1, 30, 45, 9]
		x_out = np.einsum('ij,abid->abjd', m_norm, x_tmp)  # [1, 30, 45, 9]
		# x_tmp = torch.einsum('abcd,de->abce', x, self.weight)
		# x_out = torch.einsum('ij,abid->abjd', m_norm, x_tmp)
		
		if self.bias is not None:
			x_out += self.bias

		x_out=torch.from_numpy(x_out)
		# x_out=torch.from_numpy(x_out).cuda()
		x_out = self.act(x_out)

		return x_out
		

# 使用标准卷积,自定义模型
class StandConvolution(nn.Module):
	def __init__(self, dims, num_classes, dropout):
		super(StandConvolution, self).__init__()
		# h/w = (h/w - kennel_size + 2padding) / stride + 1
		# x = ([10,16,30,32]),其中h=30,w=32,对于卷积核长分别是 h:3,w:2 ;对于步长分别是h:2,w:1;padding默认0;
		# h = (30 - 3 + 20)/ 2 +1 = 27/2 +1 = 13+1 =14
		# w =(32 - 2 + 2*0)/ 1 +1 = 30/1 +1 = 30+1 =31
		# batch = 10, out_channel = 33
		# 故: y= ([10, 33, 14, 31])

		self.num_classes=num_classes
		self.dropout = nn.Dropout(dropout)
		self.conv = nn.Sequential(
								   nn.Conv2d(dims[0], dims[1], kernel_size=5, stride=2),
								   nn.InstanceNorm2d(dims[1]),
								   nn.ReLU(inplace=True),
								   # nn.AvgPool2d(5, stride=1),
								   nn.Conv2d(dims[1], dims[2], kernel_size=5, stride=2),
								   nn.InstanceNorm2d(dims[2]),
								   nn.ReLU(inplace=True),
								   #nn.AvgPool2d(3, stride=1),
								   nn.Conv2d(dims[2], dims[3], kernel_size=5, stride=2),
								   nn.InstanceNorm2d(dims[3]),
								   nn.ReLU(inplace=True),
								   #nn.AvgPool2d(3, stride=2)
								   ).to(device)

		# self.fc = nn.Linear(dims[3]*3, num_classes).to(device)
		self.fc = nn.Linear(960, num_classes).to(device)   # 32-192, 64-960, 128-2496
		

	def forward(self, x):
		x = self.dropout(x.permute(0,3,1,2))  # [1, 9, 30, 45]  [1,9,62,45]
		x_tmp = self.conv(x)   # [1, 64, 1, 3]  [1,64,5,3]
		# view:按照行优先的顺序排成一个一维的数据,再重新组成需要的形状  [1, 64, 1, 3]---[1, 192]
		x_tmp = x_tmp.view(x.size(0), -1)  # [1, 64, 1, 3]---[1, 192]
		# print(x_tmp.shape)
		
		x_out = self.fc(x_tmp)   # [1,4],返回分类标签
		
		return x_out


# 使用LSTM模型
class StandRecurrent(nn.Module):
	def __init__(self, dims, num_classes, dropout):
		super(StandRecurrent, self).__init__()
		self.lstm = nn.LSTM(dims[0]*45, dims[1], batch_first=True,dropout=0.5).to(device)
		self.fc = nn.Linear(dims[1], num_classes).to(device)

	def forward(self, x):
		x_tmp,_ = self.lstm(x.contiguous().view(x.size(0), x.size(1), -1))
		x_out = self.fc(x_tmp[:,-1])

		return x_out

metric.py

import torch
from sklearn.metrics import accuracy_score

def accuracy(preds, target):
	# torch.max(a,dim):返回torch中的最大值a、以及a对应的下标;   dim=0行对比最大值,dim=1则是列对比最大值
	preds = torch.max(preds, 1)[1].float()
	acc = accuracy_score(preds.cpu().numpy(), target.cpu().numpy())   # 返回准度率

	# print(preds.cpu().numpy(), target.cpu().numpy())
	return acc

上一篇:带你读《计算机体系结构:量化研究方法(英文版·原书第6版)》之一:Fundamentals of Quantitative Design and Analysis


下一篇:vue mounted中监听div的变化