使用字符RNN生成莎士比亚文本

创建训练数据集

首先,使用Keras的get_file()函数来下载莎士比亚的所有作品,并从Andrej Karpathy的Char-RNN项目中下载数据:

import tensorflow as tf
from tensorflow import keras

shakespeare_url = 'https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare'
filepath = keras.utils.get_file('input.txt', shakespeare_url)
with open(filepath) as f:
    shakespeare_text = f.read()

接下来,必须将每个字符编码为整数。一种选择是创建自定义的预处理层。在这种情况下,使用Keras的Tokenizer类会更简单。首先,为文本添加一个分词器:它会找到文本中使用的所有字符,并将它们映射到不同的字符ID,从1到不同字符的数量(它不从0开始,所以可以使用该值进行屏蔽)

tokenizer = keras.preprocessing.text.Tokenizer(char_level=True)
tokenizer.fit_on_texts(shakespeare_text)

设置char_level=True来得到字符级编码,而不是默认的单词级编码。默认情况下,该分词器将文本转换为小写(但是,如果不希望这样,可以将其设置为lower=False)。现在,分词器可以将一个句子(或句子列表)编码为字符ID列表并返回,并告诉有多少个不同的字符以及文本中的字符总数

tokenizer.texts_to_sequences(['First'])
[[20, 6, 9, 8, 3]]
tokenizer.sequences_to_texts([[20, 6, 9, 8, 3]])
['f i r s t']
max_id = len(tokenizer.word_index)
dataset_size = tokenizer.document_count
print(max_id)
print(dataset_size)
39
1115394

对全文进行编码,以便每个字符都由其ID表示(减去1即可得到从0到38的ID,而不是1到39的ID):

import numpy as np

[encoded] = np.array(tokenizer.texts_to_sequences([shakespeare_text])) - 1

在继续之前,需要将训练集分为训练集、验证集和测试集。不能只是将文本中的所有字符进行混洗

如何拆分数据集

避免训练集、验证集和测试集之间的任何重合都是非常重要的。例如,可以将文本的前90%用作训练集,其后的5%用作验证集,最后的5%作为测试集。在两个集合之前留一个空袭也是一个好主意,可以避免出现两个集合中的段落重合。

在处理时间序列时,通常会跨时间划分:例如,使用2000年到2012年之间的作为训练集,在2013年到2015年之间的作为验证集,在2016年到2018年之间的作为测试集。但是,在某些情况下,可以沿着其他维度来拆分,这将使得拥有更长的的训练时间。如果有2000年到2018年间10000家公司的财务状况数据,可以按照不同的公司将这些数据拆分。但是,其中许多公司很有可能会高度相关(例如,整个经济部门可能会一起上升或下降),如果在训练集和测试集中有相关联的公司,那测试集不会很有用,因为其对泛化误差的度量偏向于乐观

因此,跨时间划分通常更安全。但这隐含地假设RNN过去(在训练集中)学习到的模式将来仍会存在。换句话说,假设时间序列是稳定的(至少在广义上是这样)。对于许多时间序列,此假设是合理的(例如,化学反应就很好,因为化学定律不会每天改变),但是对于其他的并非如此(例如,金融市场并非一成不变,因为一旦交易者发现并开始利用弄个它们,模式就会消失)。为了确保时间序列足够稳定,可以按时间在验证集上绘制模型的误差:如果模型在验证集地第一部分比最后一部分表现得更好,则时间序列可能不过稳定,因此最好在较短的时间范围内训练模型

简而言之,将时间序列分为训练集、验证集和测试集不是一件容易的事,而如何划分将在很大程度上取决于手头的任务

回到莎士比亚,使用文本的前90%作为训练集(其余部分保留为验证集和测试集),并创建一个tf.data.Dataset,它将从该集合中逐个返回每个字符:

train_size = dataset_size * 90 // 100
dataset = tf.data.Dataset.from_tensor_slices(encoded[:train_size])

将顺序数据集切成多个窗口

训练集现在由超过一百万个字符的单个序列组成,所以不能直接在其上训练神经网络:RNN相当于一个超过一百万层的深度网络,会有一个很长的实例来训练它。取而代之的是,使用数据集的window()方法将这个长字符串序列转换为许多较小的文本窗口,数据集中的每个实例是将整个文本的很短的子字符串,并且RNN仅在这些子字符串的长度上展开。这称为时间截断反向传播。调用window()方法来创建短文本窗口数据集:

n_steps = 100
window_length = n_steps + 1
dataset = dataset.window(size=window_length, shift=1,
                         drop_remainder=True)  # 可以调整n_steps:在较短的输入序列上训练RNN会更容易,但是RNN不能学习比n_steps更长的模式,因此不要使其太短

默认情况下,window()方法会创建不重叠的窗口,但是为了获得最大可能地数据集,使用shift=1,以便使得第一个窗口包含0到100的字符,第二个窗口包含1到101的字符,依次类推。为确保所有窗口正好是101个字符长度(这使得不需要进行任何填充就可以创建批处理),设置drop_remainder=True(否则最后100个窗口将包含100个字符,99个字符,以此类推直至1个字符)。

window()方法创建一个包含窗口的数据集,每个窗口也表示为一个数据集。它是一个嵌套的数据集,类似列表的列表。想通过调用窗口数据集方法来转换每个窗口时(例如对它们进行混洗和批处理),此功能非常有用。但是,不能直接使用嵌套数据集进行训练,因为模型希望输入张量,而不是数据集。因此,必须调用flat_map()方法:它将嵌套的数据集转换为一个展平的数据集(一个不包含数据集的数据集)。例如,假设{1,2,3}表示一个包含张量1、2、3的序列的数据集。如果展平嵌套的数据集{{1,2},{3,4,5,6}},将获得展平的数据集{1,2,3,4,5,6}。此外,flat_map()方法将一个函数用作参数,它允许在展平前变换嵌套数据集中的每个数据集。例如,如果将函数lambda ds:ds.batch(2)传递给flat_map(),则它将嵌套数据集{{1,2},{3,4,5,6}}转换为平数据集{[1,2],[3,4],[5,6]}:它是大小为2的张量的数据集。考虑到这一点,这里需要将数据展平:

dataset = dataset.flat_map(lambda window: window.batch(window_length))

在每个窗口上调用batch(window_length):由于所有窗口的长度都恰好相同,因此每个窗口都获得一个张量。现在数据集包含101个字符的连续窗口。由于当训练集中的实例独立且分布相同时,梯度下降效果最好,因此需要对这些窗口进行混洗,然后,可以批量处理这些窗口并将输入(前100个字符)与目标(最后一个字符)分开:

batch_size = 32
dataset = dataset.shuffle(10000).batch(batch_size)
dataset = dataset.map(lambda windows: (windows[:, :-1], windows[:, 1:]))  # X从0到100,Y从1到101

将类别输入特征编码为独热向量或嵌入。在这里,将使用独热向量对每个字符进行编码,因为只有很少的不同字符:

dataset = dataset.map(lambda X_batch, Y_batch: (tf.one_hot(X_batch, depth=max_id), Y_batch))
# 添加预取
dataset = dataset.prefetch(1)

创建和训练Char-RNN模型

要基于前100个字符来预测下一个字符,可以使用有2个GRU层的RNN,每个GRU层有128个单元,输入(dropout)和隐藏状态(recurrent_dropout)的dropout率均为20%。如果需要,可以稍后调整这些超参数。输出层是一个时间分布的Dense层,这一层必须有39个单元(max_id),因为文本中有39个不同的字符,并且想为每个可能的字符输出一个概率(在每个时间步长)。每个时间步长的输出概率总和为1,因此将softmax激活函数应用于Dense层的输出。然后,使用‘sparse_categorical_crossentropy’损失和Adam优化器来编译此模型:

model = keras.models.Sequential([
    keras.layers.GRU(128, return_sequences=True, input_shape=[None, max_id], dropout=0, recurrent_dropout=0),
    keras.layers.GRU(128, return_sequences=True, dropout=0, recurrent_dropout=0),
    # 设置了dropout不为0,发现不能在GPU上运行,在CPU上太慢了,就改成了0
    keras.layers.TimeDistributed(keras.layers.Dense(max_id, activation='softmax'))
])
model.compile(loss='sparse_categorical_crossentropy', optimizer='adam')
history = model.fit(dataset, epochs=1)
31368/31368 [==============================] - 343s 11ms/step - loss: 0.9550

使用Char-RNN模型

现在有了一个可以预测莎士比亚写的文本中下一个字符的模型。为了提供一些文本,需要向之前一样对它进行预处理:

def preprocess(texts):
    X = np.array(tokenizer.texts_to_sequences(texts)) - 1
    return tf.one_hot(X, max_id)


X_new = preprocess(['How are yo'])
Y_pred = np.argmax(model.predict(X_new), axis=2)
tokenizer.sequences_to_texts(Y_pred + 1)[0][-1]  # 输出是0到38索引表示,输出+1为ID表示从1到39,第一句最后一个字母
'u'

生成假莎士比亚文本

要使用Char-RNN模型来生成新文本,可以向其提供一些文本,使模型预测最可能的下一个字母,把它添加在文本末尾,然后将拓展的文本提供给模型来预测下一个字母,以此类推。但是实际上,这经常导致相同的单词一遍又一遍重复。相反,可以使用tf.random.categorical()函数估计出来的概率随机选择下一个字符。这将产生更多不同和有趣的文本。对给定类对数概率(logits),categorical()函数会对随机索引进行采样。为了更好地控制生成的文本的多样性,可以把logits除以一个称为温度的数字,这样可以根据需要进行调整:接近0的温度倾向于高概率的字符,而非常高的温度会给予所有的字符相同的概率。下面的next_char()函数使用这种方法来选择要添加到输入文本的下一个字符:

def next_char(text, temperature=1):
    X_new = preprocess([text])
    y_proba = model.predict(X_new)[0, -1:, :]
    rescaled_logits = tf.math.log(y_proba) / temperature
    char_id = tf.random.categorical(rescaled_logits, num_samples=1) + 1
    return tokenizer.sequences_to_texts(char_id.numpy())[0]


# 编写函数,反复调用next_char()来获得下一个字符并将其添加到给定的文本中
def complete_text(text, n_chars=50, temperature=1):
    for _ in range(n_chars):
        text += next_char(text, temperature)
    return text

有状态RNN

早目前为止,仅仅使用了无状态RNN:在每次训练迭代中,模型都从一个充满零的隐藏状态开始,然后在每个时间步长更新状态,在最后一个时间步长之后将其丢弃,因为不再需要了。如果告诉RNN在处理一个训练批次后保留此最终状态并将其作为下一个训练批次的初始状态。这样,尽管反向传播只是通过短序列,模型仍可以学习长期模式。这称为有状态RNN。

首先,只有当批次中的每个输入序列均从上一批次中对应序列中断的确切位置开始,有状态RNN才有意义。因此,创建有状态RNN所需要做的第一件事时使用顺序和非重合的输入序列(而不是用来训练无状态RNN的混洗和重叠的序列)。因此,在创建Dataset时,在调用window()方法时必须使用shift=n_steps(而不是shift=1)。而且,不能调用shuffle方法。在为有状态RNN准备数据集时,批处理要比无状态RNN困难得多。如果要调用batch(32),那32个连续的窗口应该放入同一批处理中,而下一批处理不会在这些窗口中的每个中断处继续。第一批包含窗口1至32,第二批包含窗口33至64,因此,如果考虑每个批次中的第一个窗口(即窗口1和窗口33),可以看到它们是不连续的。解决此问题的最简单方法是只是用包含单个窗口的“批处理”:

dataset = tf.data.Dataset.from_tensor_slices(encoded[:train_size])
dataset = dataset.window(window_length, shift=n_steps, drop_remainder=True)
dataset = dataset.flat_map(lambda windows: windows.batch(window_length))
dataset = dataset.batch(1)
dataset = dataset.map(lambda windows: (windows[:, :-1], windows[:, 1:]))
dataset = dataset.map(lambda X_batch, Y_batch: (tf.one_hot(X_batch, depth=max_id), Y_batch))
dataset = dataset.prefetch(1)

批处理比较困难,但并非不可能。例如,可以将莎士比亚的文本切成等长的32个文本,为每个文本创建一个连续输入序列的数据集,最后使用tf.train.Dataset.zip(dataset).map(lambda *windows:tf.stack(windows))来创建合适的连续批处理,其中批处理中的第n个输入序列恰好是从上一个批处理中第n个输入序列结束的位置开始

创建有状态RNN,在创建每个循环层时设置stateful=True,其次,有状态RNN需要知道批处理大小(因为它会为批处理中每个输入序列保留一个状态),因此必须在第一层中设置batch_input_shape参数,可以不指定第二个维度,因为输入可以有任意长度:

model = keras.models.Sequential([
    keras.layers.GRU(128, return_sequences=True, state_ful=True, dropout=0.2, recurrent_dropout=0.2,
                     batch_input_shape=[batch_size, None, max_id]),
    keras.layers.GRU(128, return_sequences=True, state_ful=True, dropout=0.2, recurrent_dropout=0.2),
    keras.layers.TimeDistributed(keras.layers.Dense(max_id, activation='softmax'))
])

在每个轮次结束时,需要先重置状态,然后再回到文本的开头,为此,可以使用一个回调函数:

class ResetStatesCallback(keras.callbacks.Callback):
    def on_epoch_begin(self, epoch, logs):
        self.model.reset_states()


model.compile(loss='sparse_categorical_crossentropy', optimizer='adam')
model.fit(dataset, epochs=50, callbacks=[ResetStatesCallback()])
上一篇:rnn--重新温习实现MNIST手写体识别


下一篇:全连接神经网络、RNN到LSTM理解