# 自动流水线-SmartStage
## 背景
DeepRec已经提供了stage 功能,该功能可以实现IO Bound操作和计算Bound操作在TensorFlow runtime的驱动下异步执行,从而提高整张图的执行效率。
由于`tf.staged`需要用户指定stage的边界,一方面会增加使用难度,另一方面会导致stage颗粒度不够精细,难以做到更多op的异步执行。因此我们提出了SmartStage功能。用户不需要对TF Graph有OP级别理解的情况下,就可以使stage发挥最大的性能提升。
## 功能说明
通过开启smart stage功能,自动化的寻优最大可以stage的范围,修改实际物理计算图(不影响Graphdef图),从而提高性能。
## 用户接口
### 1. 自动SmartStage(推荐)
自动SmartStage的前提是模型使用了`tf.data.Iterator`接口从`tf.data.Dataset`中读取样本数据。
1. `tf.SmartStageOptions`接口返回执行stage子图的配置,其参数如下:
| 参数 | 含义 | 默认 |
| ------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------ | ------------------------------------------------------------------------ |
| capacity | 缓存的异步化执行结果的最大个数 | 1 |
| num_threads | 异步化执行stage子图的线程数 | 1 |
| num_clients | 消耗预取结果的消费者数量 | 1 |
| timeout_millis | 预取结果等待缓存区可用的最大等待时间,超时后本次预取结果将会被丢弃 | 300000 ms |
| closed_exception_types | 被识别为正常退出的异常类型 | (`tf.errors.OUT_OF_RANGE`,) |
| ignored_exception_types | 被识别可忽略跳过的异常类型 | () |
| use_stage_subgraph_thread_pool | 是否在独立线程池上运行Stage子图,需要先创建独立线程池 | False(若为True则必须先创建独立线程池) |
| stage_subgraph_thread_pool_id | 如果开启了在独立线程池上运行Stage子图,用于指定独立线程池索引,需要先创建独立线程池,并打开use_stage_subgraph_thread_pool选项 | 0,索引范围为[0, 创建的独立线程池数量-1] |
| stage_subgraph_stream_id | GPU Multi-Stream 场景下, stage子图执行使用的gpu stream的索引 | 0 (0表示stage子图共享计算主图使用的gpu stream, 索引范围为[0, gpu stream总数-1]) |
| graph | 需要执行SmartStage优化的Graph,需要与传递给Session的Graph相同 | None (表示使用默认Graph) |
| name | 预取操作的名称 | None (表示自动生成) |
> 关于如何创建独立线程池以及如何使用GPU Multi-Stream,请参见[流水线](./Stage.md)。
2. `tf.SmartStageOptions`接口生成的配置需要赋值给`tf.ConfigProto`。
```python
sess_config = tf.ConfigProto()
smart_stage_options = tf.SmartStageOptions(capacity=40, num_threads=4)
sess_config.graph_options.optimizer_options.smart_stage_options.CopyFrom(smart_stage_options)
```
3. 设置`tf.ConfigProto`中的如下选项来开启SmartStage。
- CPU场景
```python
sess_config = tf.ConfigProto()
sess_config.graph_options.optimizer_options.do_smart_stage = True # 开启SmartStage
```
- GPU场景
```python
sess_config = tf.ConfigProto()
sess_config.graph_options.optimizer_options.do_smart_stage = True # 开启SmartStage
sess_config.graph_options.optimizer_options.stage_subgraph_on_cpu = True # 针对GPU场景优化的选项
```
4. Session中加入`tf.make_prefetch_hook()` hook
### 2. 图中存在Stage阶段时的SmartStage
原图已经使用`tf.staged`接口手动分图。
> 关于`tf.staged`接口请参见[流水线](./Stage.md)。
1. 直接设置`tf.ConfigProto`中的相关选项即可开启SmartStage。
**CPU场景**
```python
sess_config = tf.ConfigProto()
sess_config.graph_options.optimizer_options.do_smart_stage = True # 开启SmartStage
```
**GPU场景**
```python
sess_config = tf.ConfigProto()
sess_config.graph_options.optimizer_options.do_smart_stage = True # 开启SmartStage
sess_config.graph_options.optimizer_options.stage_subgraph_on_cpu = True # 针对GPU场景优化的选项
```
2. Session中加入`tf.make_prefetch_hook()` hook
## 代码示例
### 自动SmartStage(推荐)
```python
import tensorflow as tf
def parse_csv(value):
v = tf.io.decode_csv(value, record_defaults=[[''], ['']])
return v
dataset = tf.data.TextLineDataset('./test_data.csv')
dataset = dataset.batch(2)
dataset = dataset.map(parse_csv, num_parallel_calls=2)
dataset_output_types = tf.data.get_output_types(dataset)
dataset_output_shapes = tf.data.get_output_shapes(dataset)
iterator = tf.data.Iterator.from_structure(dataset_output_types, dataset_output_shapes)
xx = iterator.get_next()
xx = list(xx)
init_op = iterator.make_initializer(dataset)
var = tf.get_variable("var", shape=[100, 3], initializer=tf.ones_initializer())
xx[0] = tf.string_to_hash_bucket(xx[0], num_buckets=10)
xx[0] = tf.nn.embedding_lookup(var, xx[0])
xx[1]=tf.concat([xx[1], ['xxx']], axis = 0)
target = tf.concat([tf.as_string(xx[0]), [xx[1], xx[1]]], 0)
config = tf.ConfigProto()
# enable smart stage
config.graph_options.optimizer_options.do_smart_stage = True
smart_stage_options = tf.SmartStageOptions(capacity=1, num_threads=1)
config.graph_options.optimizer_options.smart_stage_options.CopyFrom(smart_stage_options)
# 对于GPU训练,可以考虑开启以下选项来获得更好的性能
# config.graph_options.optimizer_options.stage_subgraph_on_cpu = True
# mark target 节点
tf.train.mark_target_node([target])
scaffold = tf.train.Scaffold(
local_init_op=tf.group(tf.local_variables_initializer(), init_op))
with tf.train.MonitoredTrainingSession(config=config, scaffold=scaffold,
hooks=[tf.make_prefetch_hook()]) as sess:
for i in range(5):
print(sess.run([target]))
```
### 图中存在Stage阶段时的SmartStage
```python
import tensorflow as tf
filename_queue = tf.train.string_input_producer(['1.txt'])
reader = tf.TextLineReader()
k, v = reader.read(filename_queue)
var = tf.get_variable("var", shape=[100, 3], initializer=tf.ones_initializer())
v = tf.train.batch([v], batch_size=2, capacity=20 * 3)
v0, v1 = tf.decode_csv(v, record_defaults=[[''], ['']], field_delim=',')
xx = tf.staged([v0, v1])
xx[0]=tf.string_to_hash_bucket(xx[0],num_buckets=10)
xx[0] = tf.nn.embedding_lookup(var, xx[0])
xx[1]=tf.concat([xx[1], ['xxx']], axis = 0)
target = tf.concat([tf.as_string(xx[0]), [xx[1], xx[1]]], 0)
config = tf.ConfigProto()
# enable smart stage
config.graph_options.optimizer_options.do_smart_stage = True
# 对于GPU训练,可以考虑开启以下选项来获得更好的性能
# config.graph_options.optimizer_options.stage_subgraph_on_cpu = True
# mark target 节点
tf.train.mark_target_node([target])
with tf.train.MonitoredTrainingSession(config=config,
hooks=[tf.make_prefetch_hook()]) as sess:
for i in range(5):
print(sess.run([target]))
```
## 性能对比
### CPU场景
在modelzoo中的DLRM模型中测试该功能
机型为Aliyun ECS 实例 ecs.hfg7.8xlarge
- Model name: Intel(R) Xeon(R) Platinum 8369HC CPU @ 3.30GHz
- CPU(s): 32
- Socket(s): 1
- Core(s) per socket: 16
- Thread(s) per core: 2
- Memory: 128G
| | case | global steps/sec |
| :--: | :-------------: | :--------------: |
| DLRM | w/o smart stage | 201 (baseline) |
| DLRM | w/ smart stage | 212 (+ 1.05x) |
### GPU场景
在modelzoo中的模型测试该功能在GPU训练场景下的性能。
机器配置:
| CPU | Intel(R) Xeon(R) Platinum 8369B CPU @ 2.90GHz | 64核心 |
| ---- | --------------------------------------------- | ------ |
| GPU | NVIDIA A100 80G | 单卡 |
| MEM | 492G | |
性能结果对比:
| 模型 | 不开启SmartStage
(global steps/sec) | do_smartstage
(global steps/sec) | do_smartstage_gpu
(global steps/sec) |
| ------ | --------------------------------------- | ------------------------------------ | ---------------------------------------- |
| DIEN | 17.1673 | 16.918 | 17.2557 |
| DIN | 137.584 | 132.619 | 165.069 |
| DLRM | 91.6982 | 67.735 | 188.105 |
| DSSM | 92.4544 | 83.7194 | 101.352 |
| DeepFM | 74.7011 | 62.1227 | 93.0858 |