Tensorflow提供了一个队列机制,通过多线程将读取数据与计算数据分开,因为在处理海量数据集的训练时,无法把数据集一次全部载入内存中,需要以便从硬盘中读取,一边训练计算。
队列(queue)
启动线程
1 | tf.train.start_queue_runners() |
那什么时候程序会进入挂起状态呢?
源于上面第四行代码,意思是我们要从队列中拿出指定批次的数据,但是队列里没有数据,所以程序会进入挂起状态
在session内部的退出机制
如果把session部分改为with语法:
再次运行程序后,程序虽然能够正常运行,但是结束后会报错。原因是with语法的session是自动关闭的, 当运行结束后session自动关闭的同时会把里面所有的操作都关掉, 而此时的队列还在等待另一个进程往里写数据, 所以就会报错。
解决方法:使用sess=tf.InteractiveSession()实现,或者像第一张图片一样创建
tf.InteractiveSession()和tf.Session()的区别:
使用InteractiveSession()来创建会话,我们要先构建Session()然后定义操作。如果使用Session来创建会话,我们需要在会话之前定义好全部的操作然后再构建会话。
上面的代码在单例程序中没什么问题, 资源会随着程序关闭而整体销毁。 但如果在复杂的代码中, 需要某个线程自动关闭, 而不是依赖进程的结束而销毁, 这种情况下需要使用tf.train.Coordinator函数来创建一个协调器, 以信号量的方式来协调线程间的关系, 完成线程间的同步。
协调器
下面这个例子是先建立一个100大小的队列,主线程使用计数器不停的加1,队列线程把主线程里的计数器放到队列中,当队列为空时,主线程会在sess.run(queue.dequeue())语句位置挂起。当队列线程写入队列中时,主线程的计数器同步开始工作。
1 | import tensorflow as tf |
还可以使用coord.join(enqueue_threads)指定等待某个进程结束
为session中的队列加上协调器,只需要将上例的coord放到启动队列中。