在使用TensorFlow 1.X版本的estimator的时候经常会碰到类似于ValueError:GraphDef cannot be larger than 2GB
的报错信息,可能的原因是数据太大无法写入graph。
一般来说,常见的数据构建方法如下:
def input_fn():
features, labels = (np.random.sample((100,2)), np.random.sample((100,1)))
dataset = tf.data.Dataset.from_tensor_slices((features,labels))
dataset = dataset.shuffle(100000).repeat().batch(batch_size)
return dataset
...
estimator.train(input_fn)
TensorFlow在读取数据的时候会将数据也写入Graph,所以当数据量很大的时候会碰到这种情况,之前做实验在多GPU的时候也会遇到这种情况,即使我把batch size调到很低。所以解决办法有两种思路,一直不保存graph,而是使用feed_dict
的方式来构建input pipeline。
不写入graph
我的代码环境是TensorFlow1.14,所以我以这个版本为例进行介绍。
首先总结一下estimator的运行原理(假设在单卡情况下),以estimator.train
为例(eval和predict类似),其调用顺序如下:
class Estimator():
...
def train():
...
loss = self._train_model(input_fn, hooks, saving_listeners)
...
def _train_model(self, input_fn, hooks, saving_listeners):
if self._train_distribution:
return self._train_model_distributed(input_fn, hooks, saving_listeners)
else:
return self._train_model_default(input_fn, hooks, saving_listeners)
def _train_model_default(self, input_fn, hooks, saving_listeners):
...
return self._train_with_estimator_spec(estimator_spec, worker_hooks,
hooks, global_step_tensor,
saving_listeners)
def _train_with_estimator_spec(self, estimator_spec, worker_hooks, hooks,
global_step_tensor, saving_listeners):
....
with training.MonitoredTrainingSession(
master=self._config.master,
is_chief=self._config.is_chief,
checkpoint_dir=self._model_dir,
scaffold=estimator_spec.scaffold,
hooks=worker_hooks,
chief_only_hooks=(tuple(chief_hooks) +
tuple(estimator_spec.training_chief_hooks)),
save_checkpoint_secs=0, # Saving is handled by a hook.
save_summaries_steps=save_summary_steps,
config=self._session_config,
max_wait_secs=self._config.session_creation_timeout_secs,
log_step_count_steps=log_step_count_steps) as mon_sess:
单步调试后发现,estimator写入event文件发生在调用MonitoredTrainingSession
的时刻,而真正写入event是在执行hook的时候,例如在我的实验中我设置了log_step_count_steps
这个值,这个值会每隔指定次数steps就会打印出计算速度和当前的loss值。而实现这一功能的是StepCounterHook
,它定义在tensorflow/tensorflow/python/training/basic_session_run_hooks.py
中,部分定义如下:
class StepCounterHook(session_run_hook.SessionRunHook):
"""Hook that counts steps per second."""
def __init__(...):
...
self._summary_writer = summary_writer
def begin(self):
if self._summary_writer is None and self._output_dir:
self._summary_writer = SummaryWriterCache.get(self._output_dir)
self._summary_tag = training_util.get_global_step().op.name + "/sec"
def before_run(self, run_context): # pylint: disable=unused-argument
return SessionRunArgs(self._global_step_tensor)
def _log_and_record(self, elapsed_steps, elapsed_time, global_step):
steps_per_sec = elapsed_steps / elapsed_time
if self._summary_writer is not None:
summary = Summary(value=[
Summary.Value(tag=self._summary_tag, simple_value=steps_per_sec)
])
self._summary_writer.add_summary(summary, global_step)
logging.info("%s: %g", self._summary_tag, steps_per_sec)
所以我们只需要将出现类似于self._summary_writer.add_summary
的地方注释掉,这样estimator在运行过程中就不会再生成event文件,也就不会有2GB的问题了。
feed_dict
为了在大数据量时使用 dataset,我们可以用 placeholder 创建 dataset。这时数据就不会直接写到 graph 中,graph 中只有一个 placeholder 占位符。但是,用了 placeholder 就需要我们在一开始对它进行初始化填数据,需要调用 sess.run(iter.initializer, feed_dict={ x: data })
。
但是estimator并没有显示的session可以调用,那应该怎么办呢?其实我们可以使用SessionRunHook
来解决这个问题。tf.train.SessionRunHook()
类定义在tensorflow/python/training/session_run_hook.py
,该类的具体介绍可参见【转】tf.SessionRunHook使用方法。
仔细看一下 estimator 的 train 和 evaluate 函数定义可以发现它们都接收 hooks 参数,这个参数的定义是:List of tf.train.SessionRunHook subclass instances. Used for callbacks inside the training loop. 也就是说我们可以自己定义一个SessionRunHook作为参数传递到hook就可以了。
train(
input_fn,
hooks=None,
steps=None,
max_steps=None,
saving_listeners=None
)
我们现在想要在训练之前初始化 dataset 的 placeholder,那么我们就应该具体实现 SessionRunHook 的after_create_session 成员函数:
class IteratorInitializerHook(tf.train.SessionRunHook):
def __init__(self):
super(IteratorInitializerHook, self).__init__()
self.iterator_initializer_fn = None
def after_create_session(self, session, coord):
del coord
self.iterator_initializer_fn(session)
def make_input_fn():
iterator_initializer_hook = IteratorInitializerHook()
def input_fn():
x = tf.placeholder(tf.float32, shape=[None,2])
dataset = tf.data.Dataset.from_tensor_slices(x)
dataset = dataset.shuffle(100000).repeat().batch(batch_size)
iter = dataset.make_initializable_iterator()
data = np.random.sample((100,2))
iterator_initializer_hook.iterator_initializer_fn = (
lambda sess: sess.run(iter.initializer, feed_dict={x: data})
)
return iter.get_next()
return input_fn, iterator_initializer_hook
...
input_fn, iterator_initializer_hook = make_input_fn()
estimator.train(input_fn, hooks=[iterator_initializer_hook])