物理世界的鲁棒语音对抗样本(源码阅读笔记)下

这篇博客记录IJCAI19发表的Robust Audio Adversarial Example for a Physical Attack的源代码的Attack部分的阅读过程。
物理世界的鲁棒语音对抗样本(源码阅读笔记)下

Attack类有两个方法,init和attack。
def init 是构造函数,用来定义实例的属性,在创建对象时自动执行
def attack中实现对抗样本的生成(只有三个参数,self实例,输出路径,迭代次数)

init方法

init的作用:配置攻击过程,创建TF图以使用它来实际生成对抗性示例。

这一串参数不明觉厉

  def __init__(self, sess, restore_path, audio, impulse, phrase, freq_min, freq_max, batch_size, learning_rate, weight_decay):

assert检查audio和impulse的合法性,目的是减少bug,是个好的编程习惯但是也可以不要。之后几行为实例对象的属性赋值

        assert len(audio.shape) == 1
        assert len(impulse.shape) == 2 ## impulse有两个维度

        self.sess = sess
        self.phrase = phrase

        self.impulse_size = impulse.shape[0]
        self.batch_size = batch_size

将原始语音放到计算图里

original = tf.constant(audio.astype(np.float32))

在频域中创建脉冲滤波器(audio.shape[0]=51072 impulse.shape[1]=64000)
math.ceil(x) 返回一个大于等于x的最小整数,nfft即为大于等于conv_length的最小二的幂次
对输入脉冲做快速傅里叶变换,并扩展到nfft位

        conv_length = audio.shape[0] + impulse.shape[1] - 1
        nfft = 2 ** int(math.ceil(math.log(conv_length, 2)))
        imp_filters = tf.constant(np.fft.rfft(impulse, nfft).astype(np.complex64))

改变过滤器,以动态应用
tf.gather的用法如下

tf.gather
该接口的作用:就是抽取出params的第axis维度上在indices里面所有的index(看后面的例子,就会懂)
 
tf.gather(
    params,
    indices,
    validate_indices=None,
    name=None,
    axis=0
)

以下面为例,即收集(gather) imp_filters的第indices位

        apply_filters = tf.gather(imp_filters, self.imp_indices)

这里又涉及一个技巧,将个人定义的变量以qq_为前缀命名,这样在恢复计算图时这些数据就不会被改变了,这里不想被改变的数据有qq_delta
这里产生一个困惑,就是为什么变量的name和这个实例的name不同,这个问题在https://blog.csdn.net/xiaohuihui1994/article/details/81022043中得到了很好的回答。tf.Variable的name是真实存在的属性,脚本运行结束后仍存在,而python空间的name仅仅是一个符号(地址引用)而已。
这里如果不是为了tensorboard的可读性,可以直接用qq(或v1,v2…)表示这些变量的name

         self.imp_indices = tf.Variable(np.zeros((batch_size, ), dtype=np.int32), name='qq_filters')
         self.noise_ratio = tf.Variable(np.ones((1, ), dtype=np.float32), name='qq_noise_ratio')
         self.delta = tf.Variable(np.random.normal(0, np.sqrt(np.abs(audio).mean()), audio.shape).astype(np.float32), name='qq_delta')

创建一个带通滤波器以应用于扰动。

        freq = np.fft.rfftfreq(audio.shape[0], 1.0 / Fs)
        bp_filter = ((freq_min < freq) & (freq < freq_max)).astype(np.int32)
        bp_filter = tf.constant(bp_filter.astype(np.complex64))

应用增量过滤器以模拟现实世界并创建一个对抗样本。

        self.delta_filtered = tf.spectral.irfft(tf.spectral.rfft(self.delta) * bp_filter)
        self.ae_input = original + tf.pad(self.delta_filtered, [[0, original.get_shape().as_list()[0] - self.delta_filtered.get_shape().as_list()[0]]])

卷积对输入的脉冲响应

        fft_length = tf.constant(np.array([nfft], dtype=np.int32))
        ae_frequency = tf.spectral.rfft(self.ae_input, fft_length=[nfft]) * apply_filters
        ae_convolved = tf.spectral.irfft(ae_frequency, fft_length=[nfft])[:, :conv_length]

标准化卷积音频

        max_audio = tf.reduce_max(tf.abs(ae_convolved), axis=1, keepdims=True)
        self.ae_transformed = ae_convolved / max_audio * tf.reduce_max(tf.abs(self.ae_input))

添加一点点噪音

        small_noise = tf.random_normal(self.ae_transformed.get_shape().as_list(), stddev=2 ** 14) * ([1] - self.noise_ratio)
        final_input = tf.clip_by_value(self.ae_transformed + small_noise, -2 ** 15, 2 ** 15 - 1)

然后feed这个处理好的数据,获得未归一化概率

        lengths = tf.constant(np.array([(conv_length - 1) // 320] * batch_size, dtype=np.int32))
        self.logits = get_logits(final_input, lengths)

saver.restore用来恢复除了qq标记的其他所有variables

        saver = tf.train.Saver([x for x in tf.global_variables() if 'qq' not in x.name])
        saver.restore(sess, restore_path)

计算ctc-loss

        target_phrase = tf.constant(np.array([list(phrase) for _ in range(batch_size)], dtype=np.int32))
        target_phrase_lengths = tf.constant(np.array([len(phrase) for _ in range(batch_size)], dtype=np.int32))
        target = ctc_label_dense_to_sparse(target_phrase, target_phrase_lengths, batch_size)
        self.ctcloss = tf.nn.ctc_loss(labels=tf.cast(target, tf.int32), inputs=self.logits, sequence_length=lengths)

使用adam优化器实现梯度下降法

        start_vars = set(x.name for x in tf.global_variables())
        optimizer = AdamWOptimizer(weight_decay, learning_rate)
        
        gradients = optimizer.compute_gradients(self.ctcloss, [self.delta])
        self.train = optimizer.apply_gradients(gradients, decay_var_list=[self.delta]) # tf.sign(grad)?
        
        end_vars = tf.global_variables()
        new_vars = [x for x in end_vars if x.name not in start_vars]
        
        sess.run(tf.variables_initializer(new_vars + [self.delta]))

从Logit解码器,以了解我们的情况

        self.decoded, _ = tf.nn.ctc_beam_search_decoder(self.logits, lengths, merge_repeated=False, beam_width=100) #beam_width为比例

attack方法

tf在会话中提供了一个run方法,用来执行计算图的整体或局部的操作
调用run方法执行计算图是,首先要传入一个tensor,称为feed(填充),最后通过sess.run得到一个返回结果,我们通常将这个过程称为fetch(取回)
会话是图和执行者之间的媒介,在使用Session.run方法运行图是,实际上是将graph,fetch,feed_dict序列化到字节数组当中去,再将这些操作分发到CPU或GPU中,并行执行这些op,执行完成后返回一个tensor

      sess = self.sess
      sess.run(tf.variables_initializer([self.delta, self.noise_ratio]))

记录脉冲反应的损失值

      imp_losses = np.full(self.impulse_size, 1e10)

创建其他变量
prefix为3位的随机小写字符串,好像没有什么特殊意义
time.time()为时间记录

prefix = ''.join([random.choice(string.lowercase) for _ in range(3)])
time_last, time_start = time.time(), time.time()

梯度下降迭代过程
np.random.choice的作用是从中0-impulse_size中按概率选取一个batch_size大小的数据,概率为imp_losses / imp_losses.sum()
物理世界的鲁棒语音对抗样本(源码阅读笔记)下

 for itr in range(num_iterations + 1):
            indice = np.random.choice(self.impulse_size, self.batch_size, p=(imp_losses / imp_losses.sum()))

将参数都丢进去,正式开始执行训练图

decoded, logits, ctcloss, ae_transformed, ae_input, delta_filtered, _ = sess.run([self.decoded, self.logits, self.ctcloss, self.ae_transformed, self.ae_input, self.delta_filtered, self.train], {self.imp_indices: indice})
            imp_losses[indice] = ctcloss

输出中间过程

 #输出过程
            print('Iter: %d, Elapsed Time: %.3f, Iter Time: %.3f\n\tLosses: %s\n\t Delta: %s' % \
                  (itr, time.time() - time_start, time.time() - time_last, ' '.join('% 6.2f' % x for x in ctcloss), np.array_str(delta_filtered, max_line_width=120)))
            time_last = time.time()
            print("%f is  processing",itr)

每五次迭代输出debug信息

 if itr % 5 == 0:
                res = np.zeros(decoded[0].dense_shape) + len(toks) - 1
                for j in range(len(decoded[0].values)):
                    x, y = decoded[0].indices[j]
                    res[x, y] = decoded[0].values[j]

                # 打印出当前识别的字符串 
                res = [''.join(toks[int(x)] for x in y).replace('-', '') for y in res]
                print('Recognition:\n\t' + '\n\t'.join(res))

                # And here we print the argmax of the alignment.
                res_al = np.argmax(logits, axis=2).T
                res_al = [''.join(toks[int(x)] for x in y) for y in res_al]
                print('Alignment:\n\t' + '\n\t'.join(res_al))

                # 检查我们是否成功 ,然后记录进度并减小缩放比例常数,求更小的扰动
                matched = filter(lambda index: res[index] == ''.join([toks[x] for x in self.phrase]), range(self.batch_size))
                if len(matched) > self.batch_size * 0.5:
                    # Get the current constant
                    ratio = sess.run(self.noise_ratio)
                    print('=> It: %d, Noise Ratio: %.3f' % (itr, 1.0 - ratio[0]))

                    # 图的对象替换为新的参数 
                    sess.run(self.noise_ratio.assign(ratio * 0.99))

                if itr % 100 == 0 or len(matched) > self.batch_size * 0.5:
                    # 每100代保存中间结果 
                    wav.write(os.path.join(outdir, '%s-adv-%d.wav' % (prefix, itr)), Fs, np.array(np.clip(np.round(ae_input), -2 ** 15, 2 ** 15 - 1), dtype=np.int16))
                    wav.write(os.path.join(outdir, '%s-delta-%d.wav' % (prefix, itr)), Fs, np.array(np.clip(np.round(delta_filtered), -2 ** 15, 2 ** 15 - 1), dtype=np.int16))
                    for i in range(ae_transformed.shape[0]):
                        wav.write(os.path.join(outdir, '%s-conv-%d-%d.wav' % (prefix, itr, i)), Fs, np.array(np.clip(np.round(ae_transformed[i]), -2 ** 15, 2 ** 15 - 1), dtype=np.int16))

                    # Save also the logits
                    np.save(os.path.join(outdir, '%s-logit-%d.npy' % (prefix, itr)), logits[:, matched, :])
上一篇:HDU 1710-Binary Tree Traversals(二进制重建)


下一篇:redis6.0源码学习(一)学习路径