引入
-
自动驾驶是现在人工智能技术发展的一个重要领域
-
本次介绍一个很有意思的工作,其中介绍了一种新型的神经网络模型
-
该方法受线虫等小型动物大脑的启发,仅用数十个神经元即可控制自动驾驶汽车,而常规深度神经网络方法(如 Inception、ResNet、VGG 等)则需要数百万神经元
-
这一新型网络仅使用 75000 个参数、19 个神经元,比之前减少了数万倍
-
自动驾驶的 Demo 演示如下方的动图所示:
-
不过由于论文中使用的自动驾驶数据集比较大而且获取比较困难,所以本次介绍的只是其中的 NCP 模型,迁移 NCP 项目至 Paddle框架,以及使用该模型对一个简单的序列数据进行拟合
参考资料
-
官方项目:mlech26l/keras-ncp
-
Paddle 实现:jm12138/keras-ncp
-
引用:
@article{lechner2020neural, title={Neural circuit policies enabling auditable autonomy}, author={Lechner, Mathias and Hasani, Ramin and Amini, Alexander and Henzinger, Thomas A and Rus, Daniela and Grosu, Radu}, journal={Nature Machine Intelligence}, volume={2}, number={10}, pages={642--652}, year={2020}, publisher={Nature Publishing Group} }
NCP
-
神经回路策略(Neural Circuit Policies, NCP)是基于 LTC 神经元和突触模型设计的稀疏循环神经网络
-
其中 LTC 神经元的公式表达如下,具体的推导过程可以参考论文(实在看不太懂,就不瞎解释了):
-
具体的代码实现如下:
import paddle
import paddle.nn as nn
import numpy as np
class LTCCell(nn.Layer):
def __init__(
self,
wiring,
in_features=None,
input_mapping="affine",
output_mapping="affine",
ode_unfolds=6,
epsilon=1e-8,
):
super(LTCCell, self).__init__()
if in_features is not None:
wiring.build((None, in_features))
if not wiring.is_built():
raise ValueError(
"Wiring error! Unknown number of input features. Please pass the parameter 'in_features' or call the 'wiring.build()'."
)
self._init_ranges = {
"gleak": (0.001, 1.0),
"vleak": (-0.2, 0.2),
"cm": (0.4, 0.6),
"w": (0.001, 1.0),
"sigma": (3, 8),
"mu": (0.3, 0.8),
"sensory_w": (0.001, 1.0),
"sensory_sigma": (3, 8),
"sensory_mu": (0.3, 0.8),
}
self._wiring = wiring
self._input_mapping = input_mapping
self._output_mapping = output_mapping
self._ode_unfolds = ode_unfolds
self._epsilon = epsilon
self._allocate_parameters()
@property
def state_size(self):
return self._wiring.units
@property
def sensory_size(self):
return self._wiring.input_dim
@property
def motor_size(self):
return self._wiring.output_dim
@property
def output_size(self):
return self.motor_size
@property
def synapse_count(self):
return np.sum(np.abs(self._wiring.adjacency_matrix))
@property
def sensory_synapse_count(self):
return np.sum(np.abs(self._wiring.adjacency_matrix))
def add_weight(self, name, init_value):
param = self.create_parameter(
init_value.shape, attr=nn.initializer.Assign(init_value))
self.add_parameter(name, param)
return param
def _get_init_value(self, shape, param_name):
minval, maxval = self._init_ranges[param_name]
if minval == maxval:
return paddle.ones(shape) * minval
else:
return paddle.rand(shape) * (maxval - minval) + minval
def _allocate_parameters(self):
print("alloc!")
self._params = {}
self._params["gleak"] = self.add_weight(
name="gleak", init_value=self._get_init_value((self.state_size,), "gleak")
)
self._params["vleak"] = self.add_weight(
name="vleak", init_value=self._get_init_value((self.state_size,), "vleak")
)
self._params["cm"] = self.add_weight(
name="cm", init_value=self._get_init_value((self.state_size,), "cm")
)
self._params["sigma"] = self.add_weight(
name="sigma",
init_value=self._get_init_value(
(self.state_size, self.state_size), "sigma"
),
)
self._params["mu"] = self.add_weight(
name="mu",
init_value=self._get_init_value(
(self.state_size, self.state_size), "mu"),
)
self._params["w"] = self.add_weight(
name="w",
init_value=self._get_init_value(
(self.state_size, self.state_size), "w"),
)
self._params["erev"] = self.add_weight(
name="erev",
init_value=paddle.to_tensor(self._wiring.erev_initializer()),
)
self._params["sensory_sigma"] = self.add_weight(
name="sensory_sigma",
init_value=self._get_init_value(
(self.sensory_size, self.state_size), "sensory_sigma"
),
)
self._params["sensory_mu"] = self.add_weight(
name="sensory_mu",
init_value=self._get_init_value(
(self.sensory_size, self.state_size), "sensory_mu"
),
)
self._params["sensory_w"] = self.add_weight(
name="sensory_w",
init_value=self._get_init_value(
(self.sensory_size, self.state_size), "sensory_w"
),
)
self._params["sensory_erev"] = self.add_weight(
name="sensory_erev",
init_value=paddle.to_tensor(
self._wiring.sensory_erev_initializer()),
)
self._params["sparsity_mask"] = paddle.to_tensor(
np.abs(self._wiring.adjacency_matrix)
)
self._params["sensory_sparsity_mask"] = paddle.to_tensor(
np.abs(self._wiring.sensory_adjacency_matrix)
)
if self._input_mapping in ["affine", "linear"]:
self._params["input_w"] = self.add_weight(
name="input_w",
init_value=paddle.ones((self.sensory_size,)),
)
if self._input_mapping == "affine":
self._params["input_b"] = self.add_weight(
name="input_b",
init_value=paddle.zeros((self.sensory_size,)),
)
if self._output_mapping in ["affine", "linear"]:
self._params["output_w"] = self.add_weight(
name="output_w",
init_value=paddle.ones((self.motor_size,)),
)
if self._output_mapping == "affine":
self._params["output_b"] = self.add_weight(
name="output_b",
init_value=paddle.zeros((self.motor_size,)),
)
def _sigmoid(self, v_pre, mu, sigma):
v_pre = paddle.unsqueeze(v_pre, -1) # For broadcasting
mues = v_pre - mu
x = sigma * mues
return nn.functional.sigmoid(x)
def _ode_solver(self, inputs, state, elapsed_time):
v_pre = state
# We can pre-compute the effects of the sensory neurons here
sensory_w_activation = self._params["sensory_w"] * self._sigmoid(
inputs, self._params["sensory_mu"], self._params["sensory_sigma"]
)
sensory_w_activation *= self._params["sensory_sparsity_mask"]
sensory_rev_activation = sensory_w_activation * \
self._params["sensory_erev"]
# Reduce over dimension 1 (=source sensory neurons)
w_numerator_sensory = paddle.sum(sensory_rev_activation, axis=1)
w_denominator_sensory = paddle.sum(sensory_w_activation, axis=1)
# cm/t is loop invariant
cm_t = self._params["cm"] / (elapsed_time / self._ode_unfolds)
# Unfold the multiply ODE multiple times into one RNN step
for t in range(self._ode_unfolds):
w_activation = self._params["w"] * self._sigmoid(
v_pre, self._params["mu"], self._params["sigma"]
)
w_activation *= self._params["sparsity_mask"]
rev_activation = w_activation * self._params["erev"]
# Reduce over dimension 1 (=source neurons)
w_numerator = paddle.sum(
rev_activation, axis=1) + w_numerator_sensory
w_denominator = paddle.sum(
w_activation, axis=1) + w_denominator_sensory
numerator = (
cm_t * v_pre
+ self._params["gleak"] * self._params["vleak"]
+ w_numerator
)
denominator = cm_t + self._params["gleak"] + w_denominator
# Avoid dividing by 0
v_pre = numerator / (denominator + self._epsilon)
return v_pre
def _map_inputs(self, inputs):
if self._input_mapping in ["affine", "linear"]:
inputs = inputs * self._params["input_w"]
if self._input_mapping == "affine":
inputs = inputs + self._params["input_b"]
return inputs
def _map_outputs(self, state):
output = state
if self.motor_size < self.state_size:
output = output[:, 0: self.motor_size] # slice
if self._output_mapping in ["affine", "linear"]:
output = output * self._params["output_w"]
if self._output_mapping == "affine":
output = output + self._params["output_b"]
return output
def _clip(self, w):
return nn.functional.relu(w)
def apply_weight_constraints(self):
self._params["w"].set_value(self._clip(self._params["w"].detach()))
self._params["sensory_w"].set_value(self._clip(self._params["sensory_w"].detach()))
self._params["cm"].set_value(self._clip(self._params["cm"].detach()))
self._params["gleak"].set_value(self._clip(self._params["gleak"].detach()))
def forward(self, inputs, states):
# Regularly sampled mode (elapsed time = 1 second)
elapsed_time = 1.0
inputs = self._map_inputs(inputs)
next_state = self._ode_solver(inputs, states, elapsed_time)
outputs = self._map_outputs(next_state)
return outputs, next_state
简单使用
-
虽然原理看上去非常的深奥,实际上也确实非常深奥,但是 LTC 神经元使用起来和普通的 RNN 神经元区别不大
-
下面就通过一个简单的序列拟合的任务来介绍一下如何使用 NCP 模型,这也是官方项目中提供的一个示例任务
同步项目代码
!git clone https://github.com/jm12138/keras-ncp
切换到项目目录
%cd ~/keras-ncp
%matplotlib inline
/home/aistudio/keras-ncp
导入基础模块
import paddle
import numpy as np
import paddle.nn as nn
import kerasncp as kncp
import matplotlib.pyplot as plt
from paddle.optimizer import Adam
from kerasncp.paddle import LTCCell
from paddle.io import DataLoader, TensorDataset
搭建一个简单的 RNN 模型网络
class RNNSequence(nn.Layer):
def __init__(
self,
rnn_cell,
):
super(RNNSequence, self).__init__()
self.rnn_cell = rnn_cell
def forward(self, x):
batch_size, seq_len = x.shape[:2]
hidden_state = paddle.zeros((batch_size, self.rnn_cell.state_size))
outputs = []
for t in range(seq_len):
inputs = x[:, t]
new_output, hidden_state = self.rnn_cell.forward(
inputs, hidden_state)
outputs.append(new_output)
outputs = paddle.stack(outputs, axis=1) # return entire sequence
return outputs
搭建一个简单的模型学习器
-
这个模型学习器是魔改了一下 Paddle.Model 实现的
-
可以正常训练验证推理,不过看起来不太优雅
class SequenceLearner(paddle.Model):
def train_batch(self, inputs, labels=None, update=True):
x, y = inputs[0], labels[0]
y_hat = self.network.forward(x)
y_hat = y_hat.reshape(y.shape)
loss = self._loss(y_hat, y)
loss.backward()
if update:
self._optimizer.step()
self._optimizer.clear_grad()
self.network.rnn_cell.apply_weight_constraints()
return [loss.numpy()]
def eval_batch(self, inputs, labels=None):
x, y = inputs[0], labels[0]
y_hat = self.network.forward(x)
y_hat = y_hat.reshape(y.shape)
loss = self._loss(y_hat, y)
return [loss.numpy()]
def predict_batch(self, inputs):
x = inputs[0]
y_hat = self.network.forward(x)
return [x.numpy(), y_hat.numpy()]
构建数据
- 生成一个 sin 和 一个 cos 序列作为输入特征,拟合的标签为另一个 sin 函数
- 序列长度为 128
# 数据参数
in_features = 2
out_features = 1
N = 128
# 生成数据
data_x = np.stack(
[np.sin(np.linspace(0, 3 * np.pi, N)),
np.cos(np.linspace(0, 3 * np.pi, N))],
axis=1
)
data_x = np.expand_dims(data_x, axis=0).astype(np.float32)
data_y = np.sin(np.linspace(0, 6 * np.pi, N)).reshape([1, N, 1]).astype(np.float32)
# 预览数据
print("data_x.shape: ", str(data_x.shape))
print("data_y.shape: ", str(data_y.shape))
for i in range(in_features):
plt.plot(range(N), data_x[0, :, i], color='black')
for i in range(out_features):
plt.plot(range(N), data_y[0, :, i], color='red')
# 转换为 Tensor
data_x = paddle.to_tensor(data_x)
data_y = paddle.to_tensor(data_y)
# 组建数据集和读取器
train_dataloader = DataLoader(TensorDataset(
[data_x, data_y]), batch_size=1, shuffle=True, num_workers=0)
val_dataloader = DataLoader(TensorDataset(
[data_x, data_y]), batch_size=1, shuffle=False, num_workers=0)
data_x.shape: (1, 128, 2)
data_y.shape: (1, 128, 1)
/opt/conda/envs/python35-paddle120-env/lib/python3.7/site-packages/matplotlib/cbook/__init__.py:2349: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working
if isinstance(obj, collections.Iterator):
/opt/conda/envs/python35-paddle120-env/lib/python3.7/site-packages/matplotlib/cbook/__init__.py:2366: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working
return list(data) if isinstance(data, collections.MappingView) else data
使用 NCP 构建 RNN 序列模型
wiring = kncp.wirings.FullyConnected(8, out_features)
ltc_cell = LTCCell(wiring, in_features)
ltc_sequence = RNNSequence(ltc_cell)
配置模型训练器
loss = nn.MSELoss()
opt = Adam(learning_rate=0.01, parameters=ltc_sequence.parameters())
learn = SequenceLearner(ltc_sequence)
learn.prepare(opt, loss)
模型训练和验证
-
模型的拟合过程如下图所示:
-
可以看到对应这样一个简单的任务,NCP 的拟合效果还是相当不错的
learn.fit(train_dataloader, epochs=400, verbose=2)
learn.evaluate(val_dataloader)
模型测试
x, y = learn.predict(val_dataloader)
x = x[0]
y = y[0]
for i in range(in_features):
plt.plot(range(N), data_x[0, :, i], color='black')
for i in range(out_features):
plt.plot(range(N), data_y[0, :, i], color='red')
for i in range(out_features):
plt.plot(range(N), y[0, :, i], color='green')
Predict begin...
step 1/1 [==============================] - 834ms/step
Predict samples: 1
总结
-
本次迁移了 NCP 模型到 Paddle 框架上,并实现的简单的序列拟合任务
-
至于论文中提到的自动驾驶,以后看看有没有机会拿到数据集来试一下,感觉还是蛮有意思的