在使用TensorFlow进行异步计算时,队列是一种强大的机制。
为了感受一下队列,让我们来看一个简单的例子。我们先创建一个“先入先出”的队列(FIFOQueue),并将其内部所有元素初始化为某些值。然后,我们构建一个TensorFlow图,它从队列前端取走一个元素,加上1之后,放回队列的后端。慢慢地,队列的元素的值就会增加。
TensorFlow提供了两个类来帮助多线程的实现:tf.Coordinator和 tf.QueueRunner。Coordinator类可以用来同时停止多个工作线程并且向那个在等待所有工作线程终止的程序报告异常,QueueRunner类用来协调多个工作线程同时将多个张量推入同一个队列中。
Tensorflow队列
队列,如FIFOQueue(先进先出队列,按顺序出队列)和RandomShuffleQueue(随机出队列),在TensorFlow的张量异步计算时都非常重要。
FIFOQueue(capacity, dtypes, name='fifo_queue'):创建一个以先进先出的顺序对元素进行排队的队列
- capacity:整数。可能存储在此队列中的元素数量的上限
- dtypes:DType对象列表。长度dtypes必须等于每个队列元素中的张量数,dtype的类型形状,决定了后面进队列元素形状
- 方法
- dequeue(name=None)
- enqueue(vals, name=None):
- enqueue_many(vals, name=None):vals列表或者元组,返回一个进队列操作
- size(name=None)
同步执行队列
完成一个出队列、+1、入队列操作(同步操作):
import tensorflow as tf
# 同步操作,如队列,+1,出队列
# 创建一个队列
Q = tf.FIFOQueue(3, dtypes=tf.float32)
# 数据进队列
init_q = Q.enqueue_many([[1.0, 2.0, 3.0], ])
# 定义操作
de_q = Q.dequeue()
data = de_q + 1
en_q = Q.enqueue(data)
with tf.Session() as sess:
# 初始化队列
sess.run(init_q)
# 执行10次 +1 操作
for i in range(10):
sess.run(en_q)
# 取出数据
for i in range(Q.size().eval()):
print(Q.dequeue().eval())
输出:
5.0
6.0
5.0
当数据量很大时,入队操作从硬盘中读取数据,放入内存中,主线程需要等待入队操作完成,才能进行训练。会话里可以运行多个线程,实现异步读取。
队列管理器
QueueRunner类会创建一组线程, 这些线程可以重复的执行Enquene操作, 他们使用同一个Coordinator来处理线程同步终止。此外,一个QueueRunner会运行一个closer thread,当Coordinator收到异常报告时,这个closer thread会自动关闭队列。
您可以使用一个queue runner,来实现上述结构。 首先建立一个TensorFlow图表,这个图表使用队列来输入样本。增加处理样本并将样本推入队列中的操作。增加training操作来移除队列中的样本。
tf.train.QueueRunner(queue, enqueue_ops=None):创建一个QueueRunner
- queue:A Queue
- enqueue_ops:添加线程的队列操作列表,[]*2,指定两个线程
- 方法
- create_threads(sess, coord=None,start=False):创建线程来运行给定会话的入队操作
- start:布尔值,如果True启动线程;如果为False调用者必须调用start()启动线程
- coord:线程协调器,后面线程管理需要用到
- create_threads(sess, coord=None,start=False):创建线程来运行给定会话的入队操作
异步执行队列
通过队列管理器来实现变量加1,入队,主线程出队列的操作(异步操作):
# 异步操作,变量+1,入队,出队列
Q = tf.FIFOQueue(100, dtypes=tf.float32)
# 要做的事情
var = tf.Variable(0.0)
data = tf.assign_add(var, 1)
en_q = Q.enqueue(data)
# 队列管理器op
qr = tf.train.QueueRunner(Q, enqueue_ops=[en_q] * 5)
# 变量初始化op
init_op = tf.global_variables_initializer()
with tf.Session() as sess:
# 初始化变量
sess.run(init_op)
# 开始子线程
threads = qr.create_threads(sess, start=True)
# 主线程读取数据
for i in range(50):
print(sess.run(Q.dequeue()))
分析:这时候有一个问题就是,入队自顾自的去执行,在需要的出队操作完成之后,程序没法结束。需要一个实现线程间的同步,终止其他线程。程序执行完成报如下的错误:
tensorflow.python.framework.errors_impl.CancelledError: Enqueue operation was cancelled
[[{{node fifo_queue_enqueue}}]]
线程协调器
tf.train.Coordinator():线程协调员,实现一个简单的机制来协调一组线程的终止
- should_stop():如果线程应该停止则返回True。
- request_stop(): 请求该线程停止。
- join():等待被指定的线程终止。
首先创建一个Coordinator对象,然后建立一些使用Coordinator对象的线程。这些线程通常一直循环运行,一直到should_stop()返回True时停止。 任何线程都可以决定计算什么时候应该停止。它只需要调用request_stop(),同时其他线程的should_stop()将会返回True,然后都停下来。
加入线程协调器的程序:
# 异步操作,变量+1,入队,出队列
Q = tf.FIFOQueue(100, dtypes=tf.float32)
# 要做的事情
var = tf.Variable(0.0)
data = tf.assign_add(var, 1)
en_q = Q.enqueue(data)
# 队列管理器op
qr = tf.train.QueueRunner(Q, enqueue_ops=[en_q] * 5)
# 变量初始化op
init_op = tf.global_variables_initializer()
with tf.Session() as sess:
# 初始化变量
sess.run(init_op)
# 开启线程协调器
coord = tf.train.Coordinator()
# 开始子线程
threads = qr.create_threads(sess, coord=coord, start=True)
# 主线程读取数据
for i in range(50):
print(sess.run(Q.dequeue()))
# 请求停止线程
coord.request_stop()
coord.join()