Tensorflow中的队列

Tensorflow提供了一个队列机制,通过多线程将读取数据与计算数据分开,因为在处理海量数据集的训练时,无法把数据集一次全部载入内存中,需要以便从硬盘中读取,一边训练计算。

队列(queue)

启动线程

1
tf.train.start_queue_runners()

那什么时候程序会进入挂起状态呢?

源于上面第四行代码,意思是我们要从队列中拿出指定批次的数据,但是队列里没有数据,所以程序会进入挂起状态

在session内部的退出机制

如果把session部分改为with语法:

再次运行程序后,程序虽然能够正常运行,但是结束后会报错。原因是with语法的session是自动关闭的, 当运行结束后session自动关闭的同时会把里面所有的操作都关掉, 而此时的队列还在等待另一个进程往里写数据, 所以就会报错。

解决方法:使用sess=tf.InteractiveSession()实现,或者像第一张图片一样创建

tf.InteractiveSession()和tf.Session()的区别:

使用InteractiveSession()来创建会话,我们要先构建Session()然后定义操作。如果使用Session来创建会话,我们需要在会话之前定义好全部的操作然后再构建会话。

上面的代码在单例程序中没什么问题, 资源会随着程序关闭而整体销毁。 但如果在复杂的代码中, 需要某个线程自动关闭, 而不是依赖进程的结束而销毁, 这种情况下需要使用tf.train.Coordinator函数来创建一个协调器, 以信号量的方式来协调线程间的关系, 完成线程间的同步。

协调器

下面这个例子是先建立一个100大小的队列,主线程使用计数器不停的加1,队列线程把主线程里的计数器放到队列中,当队列为空时,主线程会在sess.run(queue.dequeue())语句位置挂起。当队列线程写入队列中时,主线程的计数器同步开始工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import tensorflow as tf
#创建一个长度为100的队列
queue=tf.FIFOQueue(100,"float")
c=tf.Variable(0.0) #计数器
#c+1.0
op=tf.assign_add(c,tf.constant(1.0))
#将计数器的结果加入队列
enqueue_op=queue.enqueue(c)
#创建一个队列管理器queueRunner,用上面这两个操作向queue中添加元素。
qr=tf.train.QueueRunner(queue,enqueue_ops=[op,enqueue_op])
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
coord=tf.train.Coordinator()
#启动入队进程
enqueue_threads=qr.create_threads(sess,coord=coord,start=True)
#主线程
for i in range(0,10):
print(sess.run(queue.dequeue()))
#通知其它线程关闭,其它所有线程关闭后,这一函数才返回
coord.request_stop()

还可以使用coord.join(enqueue_threads)指定等待某个进程结束

为session中的队列加上协调器,只需要将上例的coord放到启动队列中。

----本文结束,感谢您的阅读。如有错,请指正。----
大哥大嫂过年好!支持我一下呗
0%