NCP:可实现自动驾驶控制的神经回路策略

引入

  • 自动驾驶是现在人工智能技术发展的一个重要领域

  • 本次介绍一个很有意思的工作,其中介绍了一种新型的神经网络模型

  • 该方法受线虫等小型动物大脑的启发,仅用数十个神经元即可控制自动驾驶汽车,而常规深度神经网络方法(如 Inception、ResNet、VGG 等)则需要数百万神经元

  • 这一新型网络仅使用 75000 个参数、19 个神经元,比之前减少了数万倍

  • 自动驾驶的 Demo 演示如下方的动图所示:

    NCP:可实现自动驾驶控制的神经回路策略

  • 不过由于论文中使用的自动驾驶数据集比较大而且获取比较困难,所以本次介绍的只是其中的 NCP 模型,迁移 NCP 项目至 Paddle框架,以及使用该模型对一个简单的序列数据进行拟合

参考资料

  • 论文:Neural circuit policies enabling auditable autonomy

  • 官方项目:mlech26l/keras-ncp

  • Paddle 实现:jm12138/keras-ncp

  • 引用:

    @article{lechner2020neural,
      title={Neural circuit policies enabling auditable autonomy},
      author={Lechner, Mathias and Hasani, Ramin and Amini, Alexander and Henzinger, Thomas A and Rus, Daniela and Grosu, Radu},
      journal={Nature Machine Intelligence},
      volume={2},
      number={10},
      pages={642--652},
      year={2020},
      publisher={Nature Publishing Group}
    }
    

NCP

  • 神经回路策略(Neural Circuit Policies, NCP)是基于 LTC 神经元和突触模型设计的稀疏循环神经网络

    NCP:可实现自动驾驶控制的神经回路策略

  • 其中 LTC 神经元的公式表达如下,具体的推导过程可以参考论文(实在看不太懂,就不瞎解释了):

    NCP:可实现自动驾驶控制的神经回路策略

  • 具体的代码实现如下:

import paddle
import paddle.nn as nn
import numpy as np


class LTCCell(nn.Layer):
    def __init__(
        self,
        wiring,
        in_features=None,
        input_mapping="affine",
        output_mapping="affine",
        ode_unfolds=6,
        epsilon=1e-8,
    ):
        super(LTCCell, self).__init__()
        if in_features is not None:
            wiring.build((None, in_features))
        if not wiring.is_built():
            raise ValueError(
                "Wiring error! Unknown number of input features. Please pass the parameter 'in_features' or call the 'wiring.build()'."
            )
        self._init_ranges = {
            "gleak": (0.001, 1.0),
            "vleak": (-0.2, 0.2),
            "cm": (0.4, 0.6),
            "w": (0.001, 1.0),
            "sigma": (3, 8),
            "mu": (0.3, 0.8),
            "sensory_w": (0.001, 1.0),
            "sensory_sigma": (3, 8),
            "sensory_mu": (0.3, 0.8),
        }
        self._wiring = wiring
        self._input_mapping = input_mapping
        self._output_mapping = output_mapping
        self._ode_unfolds = ode_unfolds
        self._epsilon = epsilon
        self._allocate_parameters()

    @property
    def state_size(self):
        return self._wiring.units

    @property
    def sensory_size(self):
        return self._wiring.input_dim

    @property
    def motor_size(self):
        return self._wiring.output_dim

    @property
    def output_size(self):
        return self.motor_size

    @property
    def synapse_count(self):
        return np.sum(np.abs(self._wiring.adjacency_matrix))

    @property
    def sensory_synapse_count(self):
        return np.sum(np.abs(self._wiring.adjacency_matrix))

    def add_weight(self, name, init_value):
        param = self.create_parameter(
            init_value.shape, attr=nn.initializer.Assign(init_value))
        self.add_parameter(name, param)
        return param

    def _get_init_value(self, shape, param_name):
        minval, maxval = self._init_ranges[param_name]
        if minval == maxval:
            return paddle.ones(shape) * minval
        else:
            return paddle.rand(shape) * (maxval - minval) + minval

    def _allocate_parameters(self):
        print("alloc!")
        self._params = {}
        self._params["gleak"] = self.add_weight(
            name="gleak", init_value=self._get_init_value((self.state_size,), "gleak")
        )
        self._params["vleak"] = self.add_weight(
            name="vleak", init_value=self._get_init_value((self.state_size,), "vleak")
        )
        self._params["cm"] = self.add_weight(
            name="cm", init_value=self._get_init_value((self.state_size,), "cm")
        )
        self._params["sigma"] = self.add_weight(
            name="sigma",
            init_value=self._get_init_value(
                (self.state_size, self.state_size), "sigma"
            ),
        )
        self._params["mu"] = self.add_weight(
            name="mu",
            init_value=self._get_init_value(
                (self.state_size, self.state_size), "mu"),
        )
        self._params["w"] = self.add_weight(
            name="w",
            init_value=self._get_init_value(
                (self.state_size, self.state_size), "w"),
        )
        self._params["erev"] = self.add_weight(
            name="erev",
            init_value=paddle.to_tensor(self._wiring.erev_initializer()),
        )
        self._params["sensory_sigma"] = self.add_weight(
            name="sensory_sigma",
            init_value=self._get_init_value(
                (self.sensory_size, self.state_size), "sensory_sigma"
            ),
        )
        self._params["sensory_mu"] = self.add_weight(
            name="sensory_mu",
            init_value=self._get_init_value(
                (self.sensory_size, self.state_size), "sensory_mu"
            ),
        )
        self._params["sensory_w"] = self.add_weight(
            name="sensory_w",
            init_value=self._get_init_value(
                (self.sensory_size, self.state_size), "sensory_w"
            ),
        )
        self._params["sensory_erev"] = self.add_weight(
            name="sensory_erev",
            init_value=paddle.to_tensor(
                self._wiring.sensory_erev_initializer()),
        )

        self._params["sparsity_mask"] = paddle.to_tensor(
            np.abs(self._wiring.adjacency_matrix)
        )
        self._params["sensory_sparsity_mask"] = paddle.to_tensor(
            np.abs(self._wiring.sensory_adjacency_matrix)
        )

        if self._input_mapping in ["affine", "linear"]:
            self._params["input_w"] = self.add_weight(
                name="input_w",
                init_value=paddle.ones((self.sensory_size,)),
            )
        if self._input_mapping == "affine":
            self._params["input_b"] = self.add_weight(
                name="input_b",
                init_value=paddle.zeros((self.sensory_size,)),
            )

        if self._output_mapping in ["affine", "linear"]:
            self._params["output_w"] = self.add_weight(
                name="output_w",
                init_value=paddle.ones((self.motor_size,)),
            )
        if self._output_mapping == "affine":
            self._params["output_b"] = self.add_weight(
                name="output_b",
                init_value=paddle.zeros((self.motor_size,)),
            )

    def _sigmoid(self, v_pre, mu, sigma):
        v_pre = paddle.unsqueeze(v_pre, -1)  # For broadcasting
        mues = v_pre - mu
        x = sigma * mues
        return nn.functional.sigmoid(x)

    def _ode_solver(self, inputs, state, elapsed_time):
        v_pre = state

        # We can pre-compute the effects of the sensory neurons here
        sensory_w_activation = self._params["sensory_w"] * self._sigmoid(
            inputs, self._params["sensory_mu"], self._params["sensory_sigma"]
        )
        sensory_w_activation *= self._params["sensory_sparsity_mask"]

        sensory_rev_activation = sensory_w_activation * \
            self._params["sensory_erev"]

        # Reduce over dimension 1 (=source sensory neurons)
        w_numerator_sensory = paddle.sum(sensory_rev_activation, axis=1)
        w_denominator_sensory = paddle.sum(sensory_w_activation, axis=1)

        # cm/t is loop invariant
        cm_t = self._params["cm"] / (elapsed_time / self._ode_unfolds)

        # Unfold the multiply ODE multiple times into one RNN step
        for t in range(self._ode_unfolds):
            w_activation = self._params["w"] * self._sigmoid(
                v_pre, self._params["mu"], self._params["sigma"]
            )

            w_activation *= self._params["sparsity_mask"]

            rev_activation = w_activation * self._params["erev"]

            # Reduce over dimension 1 (=source neurons)
            w_numerator = paddle.sum(
                rev_activation, axis=1) + w_numerator_sensory
            w_denominator = paddle.sum(
                w_activation, axis=1) + w_denominator_sensory

            numerator = (
                cm_t * v_pre
                + self._params["gleak"] * self._params["vleak"]
                + w_numerator
            )
            denominator = cm_t + self._params["gleak"] + w_denominator

            # Avoid dividing by 0
            v_pre = numerator / (denominator + self._epsilon)

        return v_pre

    def _map_inputs(self, inputs):
        if self._input_mapping in ["affine", "linear"]:
            inputs = inputs * self._params["input_w"]
        if self._input_mapping == "affine":
            inputs = inputs + self._params["input_b"]
        return inputs

    def _map_outputs(self, state):
        output = state
        if self.motor_size < self.state_size:
            output = output[:, 0: self.motor_size]  # slice

        if self._output_mapping in ["affine", "linear"]:
            output = output * self._params["output_w"]
        if self._output_mapping == "affine":
            output = output + self._params["output_b"]
        return output

    def _clip(self, w):
        return nn.functional.relu(w)

    def apply_weight_constraints(self):
        self._params["w"].set_value(self._clip(self._params["w"].detach()))
        self._params["sensory_w"].set_value(self._clip(self._params["sensory_w"].detach()))
        self._params["cm"].set_value(self._clip(self._params["cm"].detach()))
        self._params["gleak"].set_value(self._clip(self._params["gleak"].detach()))

    def forward(self, inputs, states):
        # Regularly sampled mode (elapsed time = 1 second)
        elapsed_time = 1.0
        inputs = self._map_inputs(inputs)

        next_state = self._ode_solver(inputs, states, elapsed_time)

        outputs = self._map_outputs(next_state)

        return outputs, next_state

简单使用

  • 虽然原理看上去非常的深奥,实际上也确实非常深奥,但是 LTC 神经元使用起来和普通的 RNN 神经元区别不大

  • 下面就通过一个简单的序列拟合的任务来介绍一下如何使用 NCP 模型,这也是官方项目中提供的一个示例任务

同步项目代码

!git clone https://github.com/jm12138/keras-ncp

切换到项目目录

%cd ~/keras-ncp
%matplotlib inline
/home/aistudio/keras-ncp

导入基础模块

import paddle
import numpy as np
import paddle.nn as nn
import kerasncp as kncp
import matplotlib.pyplot as plt
from paddle.optimizer import Adam
from kerasncp.paddle import LTCCell
from paddle.io import DataLoader, TensorDataset

搭建一个简单的 RNN 模型网络

class RNNSequence(nn.Layer):
    def __init__(
        self,
        rnn_cell,
    ):
        super(RNNSequence, self).__init__()
        self.rnn_cell = rnn_cell

    def forward(self, x):
        batch_size, seq_len = x.shape[:2]
        hidden_state = paddle.zeros((batch_size, self.rnn_cell.state_size))
        outputs = []
        for t in range(seq_len):
            inputs = x[:, t]
            new_output, hidden_state = self.rnn_cell.forward(
                inputs, hidden_state)
            outputs.append(new_output)
        outputs = paddle.stack(outputs, axis=1)  # return entire sequence
        return outputs

搭建一个简单的模型学习器

  • 这个模型学习器是魔改了一下 Paddle.Model 实现的

  • 可以正常训练验证推理,不过看起来不太优雅

class SequenceLearner(paddle.Model):
    def train_batch(self, inputs, labels=None, update=True):
        x, y = inputs[0], labels[0]
        y_hat = self.network.forward(x)
        y_hat = y_hat.reshape(y.shape)
        loss = self._loss(y_hat, y)
        loss.backward()
        if update:
            self._optimizer.step()
            self._optimizer.clear_grad()
            self.network.rnn_cell.apply_weight_constraints()
        return [loss.numpy()]

    def eval_batch(self, inputs, labels=None):
        x, y = inputs[0], labels[0]
        y_hat = self.network.forward(x)
        y_hat = y_hat.reshape(y.shape)
        loss = self._loss(y_hat, y)
        return [loss.numpy()]

    def predict_batch(self, inputs):
        x = inputs[0]
        y_hat = self.network.forward(x)
        return [x.numpy(), y_hat.numpy()]

构建数据

  • 生成一个 sin 和 一个 cos 序列作为输入特征,拟合的标签为另一个 sin 函数
  • 序列长度为 128
# 数据参数
in_features = 2
out_features = 1
N = 128

# 生成数据
data_x = np.stack(
    [np.sin(np.linspace(0, 3 * np.pi, N)), 
    np.cos(np.linspace(0, 3 * np.pi, N))], 
    axis=1
)
data_x = np.expand_dims(data_x, axis=0).astype(np.float32)
data_y = np.sin(np.linspace(0, 6 * np.pi, N)).reshape([1, N, 1]).astype(np.float32)

# 预览数据
print("data_x.shape: ", str(data_x.shape))
print("data_y.shape: ", str(data_y.shape))
for i in range(in_features):
    plt.plot(range(N), data_x[0, :, i], color='black')
for i in range(out_features):
    plt.plot(range(N), data_y[0, :, i], color='red')

# 转换为 Tensor
data_x = paddle.to_tensor(data_x)
data_y = paddle.to_tensor(data_y)

# 组建数据集和读取器
train_dataloader = DataLoader(TensorDataset(
    [data_x, data_y]), batch_size=1, shuffle=True, num_workers=0)
val_dataloader = DataLoader(TensorDataset(
    [data_x, data_y]), batch_size=1, shuffle=False, num_workers=0)
data_x.shape:  (1, 128, 2)
data_y.shape:  (1, 128, 1)


/opt/conda/envs/python35-paddle120-env/lib/python3.7/site-packages/matplotlib/cbook/__init__.py:2349: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working
  if isinstance(obj, collections.Iterator):
/opt/conda/envs/python35-paddle120-env/lib/python3.7/site-packages/matplotlib/cbook/__init__.py:2366: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working
  return list(data) if isinstance(data, collections.MappingView) else data

NCP:可实现自动驾驶控制的神经回路策略

使用 NCP 构建 RNN 序列模型

wiring = kncp.wirings.FullyConnected(8, out_features)
ltc_cell = LTCCell(wiring, in_features)
ltc_sequence = RNNSequence(ltc_cell)

配置模型训练器

loss = nn.MSELoss()
opt = Adam(learning_rate=0.01, parameters=ltc_sequence.parameters())

learn = SequenceLearner(ltc_sequence)
learn.prepare(opt, loss)

模型训练和验证

  • 模型的拟合过程如下图所示:

    NCP:可实现自动驾驶控制的神经回路策略

  • 可以看到对应这样一个简单的任务,NCP 的拟合效果还是相当不错的

learn.fit(train_dataloader, epochs=400, verbose=2)
learn.evaluate(val_dataloader)

模型测试

x, y = learn.predict(val_dataloader)
x = x[0]
y = y[0]


for i in range(in_features):
    plt.plot(range(N), data_x[0, :, i], color='black')
for i in range(out_features):
    plt.plot(range(N), data_y[0, :, i], color='red')
for i in range(out_features):
    plt.plot(range(N), y[0, :, i], color='green')
Predict begin...
step 1/1 [==============================] - 834ms/step
Predict samples: 1

NCP:可实现自动驾驶控制的神经回路策略

总结

  • 本次迁移了 NCP 模型到 Paddle 框架上,并实现的简单的序列拟合任务

  • 至于论文中提到的自动驾驶,以后看看有没有机会拿到数据集来试一下,感觉还是蛮有意思的

上一篇:tnsnames.ora中UR=A配置使用


下一篇:只做阅读记录笔记,方便后续整理成文