流水线-Stage
背景
在一个通常的TensorFlow训练任务中通常由样本数据的读取,图计算构成,样本数据的读取属于IO bound操作,在整个E2E的耗时占据表较大的百分比,从而拖慢训练任务,同时并不能高效的利用计算资源(CPU、GPU)。DeepRec已经提供了stage 功能,它的思想来源于TensorFlow的StagingArea功能,我们在DeepRec提供了API tf.staged
,用户通过显式的指定图中哪一部分需要stage,以及在Session创建的时候加入tf.make_prefetch_hook()
在TensorFlow runtime驱动异步执行,从而提高整张图的执行效率。
用户接口
tf.staged
,对输入的 features
进行预取,返回预取后的 tensor。
参数 |
含义 |
默认值 |
---|---|---|
features |
需要异步化执行的 op,可以是 tensor、list of tensor(list 中每一个元素都是 tensor ) 或者 dict of tensor (dict 中的 key 都是 string,value 都是 tensor) |
必选参数 |
capacity |
缓存的 |
1 |
num_threads |
异步化执行 |
1 |
items |
|
None,即 |
feed_generator |
|
None,即 |
closed_exception_types |
被识别为正常退出的异常类型 |
( |
ignored_exception_types |
被识别可忽略跳过的异常类型 |
() |
use_stage_subgraph_thread_pool |
是否在独立线程池上运行Stage子图,需要先创建独立线程池 |
False(可选,若为True则必须先创建独立线程池) |
stage_subgraph_thread_pool_id |
如果开启了在独立线程池上运行Stage子图,用于指定独立线程池索引,需要先创建独立线程池,并打开use_stage_subgraph_thread_pool选项。 |
0,索引范围为[0, 创建的独立线程池数量-1] |
stage_subgraph_stream_id |
GPU Multi-Stream 场景下, stage子图执行使用的gpu stream的索引 |
0(可选,0表示stage子图共享计算主图使用的gpu stream, 索引范围为[0, gpu stream总数-1]) |
Session中加入tf.make_prefetch_hook()
hook
hooks=[tf.make_prefetch_hook()]
with tf.train.MonitoredTrainingSession(hooks=hooks, config=sess_config) as sess:
创建独立线程池(可选)
sess_config = tf.ConfigProto()
sess_config.session_stage_subgraph_thread_pool.add() # 增加一个独立线程池
sess_config.session_stage_subgraph_thread_pool[0].inter_op_threads_num = 8 # 独立线程池中inter线程数量
sess_config.session_stage_subgraph_thread_pool[0].intra_op_threads_num = 8 # 独立线程池中intra线程数量
sess_config.session_stage_subgraph_thread_pool[0].global_name = "StageThreadPool_1" # 独立线程池名称
GPU Multi-Stream Stage(可选)
sess_config = tf.ConfigProto()
sess_config.graph_options.rewrite_options.use_multi_stream = (rewriter_config_pb2.RewriterConfig.ON) # 开启GPU Multi-Stream功能
sess_config.graph_options.rewrite_options.multi_stream_opts.multi_stream_num = 2 # 设定可用的stream数量, 其中0号stream提供给计算主图使用
sess_config.graph_options.optimizer_options.stage_multi_stream = True # 开启GPU Multi-Stream Stage
GPU Multi-Stream Stage功能还可以通过开启GPU MPS来实现更好的性能, 请见GPU-MultiStream.
注意事项
待异步化的计算应该尽可能少和后续主体计算争抢资源(gpu、cpu、线程池等)
capacity
更大会消耗更多的内存或显存,同时可能会抢占后续模型训练的 CPU 资源,建议设置为后续计算时间/待异步化时间。可以从 1 开始逐渐向上调整num_threads
并不是越大越好,只需要可以让计算和预处理重叠起来即可,数量更大会抢占模型训练的 CPU 资源。计算公式:num_threads >= 预处理时间 / 训练时间,可以从 1 开始向上调整tf.make_prefetch_hook()
一定要加上,否则会hang住
代码示例
import tensorflow as tf
filename_queue = tf.train.string_input_producer(['1.txt'])
reader = tf.TextLineReader()
k, v = reader.read(filename_queue)
var = tf.get_variable("var", shape=[100, 3], initializer=tf.ones_initializer())
v = tf.train.batch([v], batch_size=2, capacity=20 * 3)
v0, v1 = tf.decode_csv(v, record_defaults=[[''], ['']], field_delim=',')
xx = tf.staged([v0, v1])
xx[0]=tf.string_to_hash_bucket(xx[0],num_buckets=10)
xx[0] = tf.nn.embedding_lookup(var, xx[0])
xx[1]=tf.concat([xx[1], ['xxx']], axis = 0)
target = tf.concat([tf.as_string(xx[0]), [xx[1], xx[1]]], 0)
# mark target 节点
tf.train.mark_target_node([target])
with tf.train.MonitoredTrainingSession(hooks=[tf.make_prefetch_hook()]) as sess:
for i in range(5):
print(sess.run([target]))