# Pipeline-Stage ## Background A TensorFlow training task is usually composed of sample data reading and graph calculation. The reading of sample data is an IO bound operation, which occupies a large percentage of the entire E2E time, then slowing down the training task. It cannot efficiently use computing resources (CPU, GPU). DeepRec has already provided the stage function. Its idea comes from the StagingArea function of TensorFlow. We provide the API `tf.staged` in DeepRec. The user explicitly specifies which part of the graph needs the stage, and adds `tf.make_prefetch_hook()` when the Session is created. Under the asynchronous execution of TensorFlow runtime, the execution efficiency of the entire graph is improved. ## API `tf.staged`, prefetch the input `features`, and return the prefetched tensor. | parameter | description | default value | | ------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------- | | features | The tensor that needs to be executed asynchronously: tensor, list of tensor (each element in the list is a tensor) or dict of tensor (the keys in the dict are all strings, and the values are all tensors). | required | | feed_dict | A dictionary that maps graph elements to tensors. | {} | | capacity | The maximum number of cached `items` asynchronous execution results. | 1 | | num_threads | Number of threads to execute `items` asynchronously. | 1 | | num_client | Number of clients of prefetched sample. | 1 | | timeout_millis | Max milliseconds put op can take. | 300000 ms | | closed_exception_types | Exception types recognized as graceful exits. | (`tf.errors.OUT_OF_RANGE`,) | | ignored_exception_types | Exception types that are recognized to be ignored and skipped. | () | | use_stage_subgraph_thread_pool | Whether to run the Stage subgraph on an independent thread pool, you need to create an independent thread pool first. | False (If it is True, a separate thread pool must be created first) | | stage_subgraph_thread_pool_id | If you enable the stage subgraph to run on the independent thread pool to specify the independent thread pool index, you need to create an independent thread pool first, and enable the use_stage_subgraph_thread_pool option. | 0, The index range is [0, the number of independent thread pools created - 1] | | stage_subgraph_stream_id | In the GPU Multi-Stream scenario, the index of gpu stream used by stage subgraph. | 0 (0 means that the stage subgraph shares the gpu stream used by the main graph, the index range is [0, total number of GPU streams -1]) | | name | Name of prefetching operations. | None (Automatic generated) | Adds `tf.make_prefetch_hook()`hook when create session. ```python hooks=[tf.make_prefetch_hook()] with tf.train.MonitoredTrainingSession(hooks=hooks, config=sess_config) as sess: ``` - Create a separate thread pool (optional) ```python sess_config = tf.ConfigProto() sess_config.session_stage_subgraph_thread_pool.add() # Add a thread pool sess_config.session_stage_subgraph_thread_pool[0].inter_op_threads_num = 8 # inter thread number in thread pool sess_config.session_stage_subgraph_thread_pool[0].intra_op_threads_num = 8 # intra thread number in thread pool sess_config.session_stage_subgraph_thread_pool[0].global_name = "StageThreadPool_1" # thread pool name ``` - GPU Multi-Stream Stage (optional) ```python sess_config = tf.ConfigProto() sess_config.graph_options.rewrite_options.use_multi_stream = (rewriter_config_pb2.RewriterConfig.ON) # enable GPU Multi-Stream sess_config.graph_options.rewrite_options.multi_stream_opts.multi_stream_num = 2 # The number of gpu streams, stream 0 is used by the main graph sess_config.graph_options.optimizer_options.stage_multi_stream = True # enable GPU Multi-Stream Stage ``` > GPU Mulit-Stream Stage can achieve better performance by enabling GPU MPS, please refer to [GPU-MultiStream](./GPU-MultiStream.md). **Attention:** - Computations to be asynchronous should compete with subsequent main computations for resources as little as possible (gpu, cpu, thread pool, etc.) - A larger `capacity` will consume more memory or video memory, and may occupy CPU resources for subsequent model training. It is recommended to set it to follow-up calculation time/waiting for asynchronization time. It can be adjusted gradually upwards starting from 1. - `num_threads` is not as big as possible, it just needs to allow calculation and preprocessing to overlap, and a larger number will preempt CPU resources for model training. Calculation formula: num_threads >= preprocessing time / training time, can be adjusted upwards from 1. - `tf.make_prefetch_hook()` must be added, otherwise it will hang. ## Example ```python import tensorflow as tf filename_queue = tf.train.string_input_producer(['1.txt']) reader = tf.TextLineReader() k, v = reader.read(filename_queue) var = tf.get_variable("var", shape=[100, 3], initializer=tf.ones_initializer()) v = tf.train.batch([v], batch_size=2, capacity=20 * 3) v0, v1 = tf.decode_csv(v, record_defaults=[[''], ['']], field_delim=',') xx = tf.staged([v0, v1]) xx[0]=tf.string_to_hash_bucket(xx[0],num_buckets=10) xx[0] = tf.nn.embedding_lookup(var, xx[0]) xx[1]=tf.concat([xx[1], ['xxx']], axis = 0) target = tf.concat([tf.as_string(xx[0]), [xx[1], xx[1]]], 0) # mark target node tf.train.mark_target_node([target]) with tf.train.MonitoredTrainingSession(hooks=[tf.make_prefetch_hook()]) as sess: for i in range(5): print(sess.run([target])) ```