自动流水线-SmartStage

背景

DeepRec已经提供了stage 功能,该功能可以实现IO Bound操作和计算Bound操作在TensorFlow runtime的驱动下异步执行,从而提高整张图的执行效率。

由于tf.staged需要用户指定stage的边界,一方面会增加使用难度,另一方面会导致stage颗粒度不够精细,难以做到更多op的异步执行。因此我们提出了SmartStage功能。用户不需要对TF Graph有OP级别理解的情况下,就可以使stage发挥最大的性能提升。

功能说明

通过开启smart stage功能,自动化的寻优最大可以stage的范围,修改实际物理计算图(不影响Graphdef图),从而提高性能。

用户接口

1. 自动SmartStage(推荐)

自动SmartStage的前提是模型使用了tf.data.Iterator接口从tf.data.Dataset中读取样本数据。

  1. tf.SmartStageOptions接口返回执行stage子图的配置,其参数如下:

参数

含义

默认

capacity

缓存的异步化执行结果的最大个数

1

num_threads

异步化执行stage子图的线程数

1

num_clients

消耗预取结果的消费者数量

1

timeout_millis

预取结果等待缓存区可用的最大等待时间,超时后本次预取结果将会被丢弃

300000 ms

closed_exception_types

被识别为正常退出的异常类型

(tf.errors.OUT_OF_RANGE,)

ignored_exception_types

被识别可忽略跳过的异常类型

()

use_stage_subgraph_thread_pool

是否在独立线程池上运行Stage子图,需要先创建独立线程池

False(若为True则必须先创建独立线程池)

stage_subgraph_thread_pool_id

如果开启了在独立线程池上运行Stage子图,用于指定独立线程池索引,需要先创建独立线程池,并打开use_stage_subgraph_thread_pool选项

0,索引范围为[0, 创建的独立线程池数量-1]

stage_subgraph_stream_id

GPU Multi-Stream 场景下, stage子图执行使用的gpu stream的索引

0 (0表示stage子图共享计算主图使用的gpu stream, 索引范围为[0, gpu stream总数-1])

graph

需要执行SmartStage优化的Graph,需要与传递给Session的Graph相同

None (表示使用默认Graph)

name

预取操作的名称

None (表示自动生成)

关于如何创建独立线程池以及如何使用GPU Multi-Stream,请参见流水线

  1. tf.SmartStageOptions接口生成的配置需要赋值给tf.ConfigProto

    sess_config = tf.ConfigProto()
    smart_stage_options = tf.SmartStageOptions(capacity=40, num_threads=4)
    sess_config.graph_options.optimizer_options.smart_stage_options.CopyFrom(smart_stage_options)
    
  2. 设置tf.ConfigProto中的如下选项来开启SmartStage。

    • CPU场景

    sess_config = tf.ConfigProto()
    sess_config.graph_options.optimizer_options.do_smart_stage = True # 开启SmartStage
    
    • GPU场景

    sess_config = tf.ConfigProto()
    sess_config.graph_options.optimizer_options.do_smart_stage = True # 开启SmartStage
    sess_config.graph_options.optimizer_options.stage_subgraph_on_cpu = True # 针对GPU场景优化的选项
    
  3. Session中加入tf.make_prefetch_hook() hook

2. 图中存在Stage阶段时的SmartStage

原图已经使用tf.staged接口手动分图。

关于tf.staged接口请参见流水线

  1. 直接设置tf.ConfigProto中的相关选项即可开启SmartStage。 CPU场景

    sess_config = tf.ConfigProto()
    sess_config.graph_options.optimizer_options.do_smart_stage = True # 开启SmartStage
    

    GPU场景

    sess_config = tf.ConfigProto()
    sess_config.graph_options.optimizer_options.do_smart_stage = True # 开启SmartStage
    sess_config.graph_options.optimizer_options.stage_subgraph_on_cpu = True # 针对GPU场景优化的选项
    
  2. Session中加入tf.make_prefetch_hook() hook

代码示例

自动SmartStage(推荐)

import tensorflow as tf

def parse_csv(value):
    v = tf.io.decode_csv(value, record_defaults=[[''], ['']])
    return v
    
dataset = tf.data.TextLineDataset('./test_data.csv')
dataset = dataset.batch(2)
dataset = dataset.map(parse_csv, num_parallel_calls=2)
dataset_output_types = tf.data.get_output_types(dataset)
dataset_output_shapes = tf.data.get_output_shapes(dataset)
iterator = tf.data.Iterator.from_structure(dataset_output_types, dataset_output_shapes)
xx = iterator.get_next()
xx = list(xx)

init_op = iterator.make_initializer(dataset)

var = tf.get_variable("var", shape=[100, 3], initializer=tf.ones_initializer())
xx[0] = tf.string_to_hash_bucket(xx[0], num_buckets=10)
xx[0] = tf.nn.embedding_lookup(var, xx[0])
xx[1]=tf.concat([xx[1], ['xxx']], axis = 0)
target = tf.concat([tf.as_string(xx[0]), [xx[1], xx[1]]], 0)

config = tf.ConfigProto()
# enable smart stage
config.graph_options.optimizer_options.do_smart_stage = True
smart_stage_options = tf.SmartStageOptions(capacity=1, num_threads=1)
config.graph_options.optimizer_options.smart_stage_options.CopyFrom(smart_stage_options)

# 对于GPU训练,可以考虑开启以下选项来获得更好的性能
# config.graph_options.optimizer_options.stage_subgraph_on_cpu = True
    
# mark target 节点
tf.train.mark_target_node([target])

scaffold = tf.train.Scaffold(
    local_init_op=tf.group(tf.local_variables_initializer(), init_op))
with tf.train.MonitoredTrainingSession(config=config, scaffold=scaffold, 
                                       hooks=[tf.make_prefetch_hook()]) as sess:
    for i in range(5):
        print(sess.run([target]))

图中存在Stage阶段时的SmartStage

import tensorflow as tf

filename_queue = tf.train.string_input_producer(['1.txt'])
reader = tf.TextLineReader()
k, v = reader.read(filename_queue)

var = tf.get_variable("var", shape=[100, 3], initializer=tf.ones_initializer())
v = tf.train.batch([v], batch_size=2, capacity=20 * 3)
v0, v1 = tf.decode_csv(v, record_defaults=[[''], ['']], field_delim=',')
xx = tf.staged([v0, v1])

xx[0]=tf.string_to_hash_bucket(xx[0],num_buckets=10)
xx[0] = tf.nn.embedding_lookup(var, xx[0])
xx[1]=tf.concat([xx[1], ['xxx']], axis = 0)
target = tf.concat([tf.as_string(xx[0]), [xx[1], xx[1]]], 0)

config = tf.ConfigProto()
# enable smart stage
config.graph_options.optimizer_options.do_smart_stage = True
# 对于GPU训练,可以考虑开启以下选项来获得更好的性能
# config.graph_options.optimizer_options.stage_subgraph_on_cpu = True

# mark target 节点
tf.train.mark_target_node([target])

with tf.train.MonitoredTrainingSession(config=config,
                                       hooks=[tf.make_prefetch_hook()]) as sess:
for i in range(5):
    print(sess.run([target]))

性能对比

CPU场景

在modelzoo中的DLRM模型中测试该功能 机型为Aliyun ECS 实例 ecs.hfg7.8xlarge

  • Model name: Intel(R) Xeon(R) Platinum 8369HC CPU @ 3.30GHz

  • CPU(s): 32

  • Socket(s): 1

  • Core(s) per socket: 16

  • Thread(s) per core: 2

  • Memory: 128G

case

global steps/sec

DLRM

w/o smart stage

201 (baseline)

DLRM

w/ smart stage

212 (+ 1.05x)

GPU场景

在modelzoo中的模型测试该功能在GPU训练场景下的性能。

机器配置:

CPU

Intel(R) Xeon(R) Platinum 8369B CPU @ 2.90GHz

64核心

GPU

NVIDIA A100 80G

单卡

MEM

492G

性能结果对比:

模型

不开启SmartStage
(global steps/sec)

do_smartstage
(global steps/sec)

do_smartstage_gpu
(global steps/sec)

DIEN

17.1673

16.918

17.2557

DIN

137.584

132.619

165.069

DLRM

91.6982

67.735

188.105

DSSM

92.4544

83.7194

101.352

DeepFM

74.7011

62.1227

93.0858