• @tf.function :Graph Execution模式 *
    • @tf.function 基础使用方法
    • @tf.function 内在机制
    • AutoGraph:将Python控制流转换为TensorFlow计算图
    • 使用传统的 tf.Session

    @tf.function :Graph Execution模式 *

    @tf.function 基础使用方法

    在TensorFlow 2.0中,推荐使用 @tf.function (而非1.X中的 tf.Session )实现Graph Execution,从而将模型转换为易于部署且高性能的TensorFlow图模型。只需要将我们希望以Graph Execution模式运行的代码封装在一个函数内,并在函数前加上 @tf.function 即可,如下例所示。关于TensorFlow 1.X版本中的Graph Execution可参考 附录 。

    警告

    并不是任何函数都可以被 @tf.function 修饰!@tf.function 使用静态编译将函数内的代码转换成计算图,因此对函数内可使用的语句有一定限制(仅支持Python语言的一个子集),且需要函数内的操作本身能够被构建为计算图。建议在函数内只使用TensorFlow的原生操作,不要使用过于复杂的Python语句,函数参数只包括TensorFlow张量或NumPy数组,并最好是能够按照计算图的思想去构建函数(换言之,@tf.function 只是给了你一种更方便的写计算图的方法,而不是一颗能给任何函数加速的 银子弹 )。详细内容可参考 AutoGraph Capabilities and Limitations 。

    1. import tensorflow as tf
    2. import time
    3. from zh.model.mnist.cnn import CNN
    4. from zh.model.utils import MNISTLoader
    5.  
    6. num_batches = 400
    7. batch_size = 50
    8. learning_rate = 0.001
    9. data_loader = MNISTLoader()
    10.  
    11. @tf.function
    12. def train_one_step(X, y):
    13. with tf.GradientTape() as tape:
    14. y_pred = model(X)
    15. loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=y, y_pred=y_pred)
    16. loss = tf.reduce_mean(loss)
    17. # 注意这里使用了TensorFlow内置的tf.print()。@tf.function不支持Python内置的print方法
    18. tf.print("loss", loss)
    19. grads = tape.gradient(loss, model.variables)
    20. optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))
    21.  
    22. if __name__ == '__main__':
    23. model = CNN()
    24. optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
    25. start_time = time.time()
    26. for batch_index in range(num_batches):
    27. X, y = data_loader.get_batch(batch_size)
    28. train_one_step(X, y)
    29. end_time = time.time()
    30. print(end_time - start_time)

    运行400个Batch进行测试,加入 @tf.function 的程序耗时35.5秒,未加入 @tf.function 的纯Eager Execution程序耗时43.8秒。可见 @tf.function 带来了一定的性能提升。一般而言,当模型由较多小的操作组成的时候, @tf.function 带来的提升效果较大。而当模型的操作数量较少,但单一操作均很耗时的时候,则 @tf.function 带来的性能提升不会太大。

    @tf.function 内在机制

    当被 @tf.function 修饰的函数第一次被调用的时候,进行以下操作:

    • 在Eager Execution模式关闭的环境下,函数内的代码依次运行。也就是说,每个 tf. 方法都只是定义了计算节点,而并没有进行任何实质的计算。这与TensorFlow 1.X的Graph Execution是一致的;

    • 使用AutoGraph将函数中的Python控制流语句转换成TensorFlow计算图中的对应节点(比如说 whilefor 语句转换为 tf.whileif 语句转换为 tf.cond 等等;

    • 基于上面的两步,建立函数内代码的计算图表示(为了保证图的计算顺序,图中还会自动加入一些 tf.control_dependencies 节点);

    • 运行一次这个计算图;

    • 基于函数的名字和输入的函数参数的类型生成一个哈希值,并将建立的计算图缓存到一个哈希表中。

    在被 @tf.function 修饰的函数之后再次被调用的时候,根据函数名和输入的函数参数的类型计算哈希值,检查哈希表中是否已经有了对应计算图的缓存。如果是,则直接使用已缓存的计算图,否则重新按上述步骤建立计算图。

    以下是一个测试题:

    1. import tensorflow as tf
    2. import numpy as np
    3.  
    4. @tf.function
    5. def f(x):
    6. print("The function is running in Python")
    7. tf.print(x)
    8.  
    9. a = tf.constant(1, dtype=tf.int32)
    10. f(a)
    11. b = tf.constant(2, dtype=tf.int32)
    12. f(b)
    13. b_ = np.array(2, dtype=np.int32)
    14. f(b_)
    15. c = tf.constant(0.1, dtype=tf.float32)
    16. f(c)
    17. d = tf.constant(0.2, dtype=tf.float32)
    18. f(d)
    19. f(1)
    20. f(2)
    21. f(1)
    22. f(0.1)
    23. f(0.2)
    24. f(0.1)

    思考一下,上面这段程序的结果是什么?

    答案是:

    1. The function is running in Python
    2. 1
    3. 2
    4. 2
    5. The function is running in Python
    6. 0.1
    7. 0.2
    8. The function is running in Python
    9. 1
    10. The function is running in Python
    11. 2
    12. 1
    13. The function is running in Python
    14. 0.1
    15. The function is running in Python
    16. 0.2
    17. 0.1

    当计算 f(a) 时,由于是第一次调用该函数,TensorFlow进行了以下操作:

    • 将函数内的代码依次运行了一遍(因此输出了文本);

    • 构建了计算图,然后运行了一次该计算图(因此输出了1)。这里 tf.print(x) 可以作为计算图的节点,但Python内置的 print 则不能被转换成计算图的节点。因此,计算图中只包含了 tf.print(x) 这一操作;

    • 将该计算图缓存到了一个哈希表中(如果之后再有类型为 tf.int32 ,shape为空的张量输入,则重复使用已构建的计算图)。

    计算 f(b) 时,由于b的类型与a相同,所以TensorFlow重复使用了之前已构建的计算图并运行(因此输出了2)。这里由于并没有真正地逐行运行函数中的代码,所以函数第一行的文本输出代码没有运行。计算 f(b_) 时,TensorFlow自动将numpy的数据结构转换成了TensorFlow中的张量,因此依然能够复用之前已构建的计算图。

    计算 f(c) 时,虽然张量 c 的shape和 ab 均相同,但类型为 tf.float32 ,因此TensorFlow重新运行了函数内代码(从而再次输出了文本)并建立了一个输入为 tf.float32 类型的计算图。

    计算 f(d) 时,由于 dc 的类型相同,所以TensorFlow复用了计算图,同理没有输出文本。

    之后的计算结果则显示出 @tf.function 对Python内置的整数和浮点数类型的处理方式。简而言之,只有当值完全一致的时候, @tf.function 才会复用之前建立的计算图,而并不会自动将Python内置的整数或浮点数等转换成张量。因此,当函数参数包含Python内置整数或浮点数时,需要额外小心。一般而言,应当只在指定超参数等少数场合使用Python内置类型作为被 @tf.function 修饰的函数的参数。

    下一个思考题:

    1. import tensorflow as tf
    2.  
    3. a = tf.Variable(0.0)
    4.  
    5. @tf.function
    6. def g():
    7. a.assign(a + 1.0)
    8. return a
    9.  
    10. print(g())
    11. print(g())
    12. print(g())

    这段代码的输出是:

    1. tf.Tensor(1.0, shape=(), dtype=float32)
    2. tf.Tensor(2.0, shape=(), dtype=float32)
    3. tf.Tensor(3.0, shape=(), dtype=float32)

    正如同正文里的例子一样,你可以在被 @tf.function 修饰的函数里调用 tf.Variabletf.keras.optimizerstf.keras.Model 等包含有变量的数据结构。一旦被调用,这些结构将作为隐含的参数提供给函数。当这些结构内的值在函数内被修改时,在函数外也同样生效。

    AutoGraph:将Python控制流转换为TensorFlow计算图

    前面提到,@tf.function 使用名为AutoGraph的机制将函数中的Python控制流语句转换成TensorFlow计算图中的对应节点。以下是一个示例,使用 tf.autograph 模块的低层API tf.autograph.to_code 将函数 square_if_positive 转换成TensorFlow计算图:

    1. import tensorflow as tf
    2.  
    3. @tf.function
    4. def square_if_positive(x):
    5. if x > 0:
    6. x = x * x
    7. else:
    8. x = 0
    9. return x
    10.  
    11. a = tf.constant(1)
    12. b = tf.constant(-1)
    13. print(square_if_positive(a), square_if_positive(b))
    14. print(tf.autograph.to_code(square_if_positive.python_function))

    输出:

    1. tf.Tensor(1, shape=(), dtype=int32) tf.Tensor(0, shape=(), dtype=int32)
    2. def tf__square_if_positive(x):
    3. do_return = False
    4. retval_ = ag__.UndefinedReturnValue()
    5. cond = x > 0
    6.  
    7. def get_state():
    8. return ()
    9.  
    10. def set_state(_):
    11. pass
    12.  
    13. def if_true():
    14. x_1, = x,
    15. x_1 = x_1 * x_1
    16. return x_1
    17.  
    18. def if_false():
    19. x = 0
    20. return x
    21. x = ag__.if_stmt(cond, if_true, if_false, get_state, set_state)
    22. do_return = True
    23. retval_ = x
    24. cond_1 = ag__.is_undefined_return(retval_)
    25.  
    26. def get_state_1():
    27. return ()
    28.  
    29. def set_state_1(_):
    30. pass
    31.  
    32. def if_true_1():
    33. retval_ = None
    34. return retval_
    35.  
    36. def if_false_1():
    37. return retval_
    38. retval_ = ag__.if_stmt(cond_1, if_true_1, if_false_1, get_state_1, set_state_1)
    39. return retval_

    我们注意到,原函数中的Python控制流 if…else… 被转换为了 x = ag__.if_stmt(cond, if_true, if_false, get_state, set_state) 这种计算图式的写法。AutoGraph起到了类似编译器的作用,能够帮助我们通过更加自然的Python控制流轻松地构建带有条件/循环的计算图,而无需手动使用TensorFlow的API进行构建。

    使用传统的 tf.Session

    不过,如果你依然钟情于TensorFlow传统的Graph Execution模式也没有问题。TensorFlow 2.0提供了 tf.compat.v1 模块以支持TensorFlow 1.X版本的API。同时,只要在编写模型的时候稍加注意,Keras的模型是可以同时兼容Eager Execution模式和Graph Execution模式的。注意,在Graph Execution模式下, model(input_tensor) 只需运行一次以完成图的建立操作。

    例如,通过以下代码,同样可以在MNIST数据集上训练前面所建立的MLP或CNN模型:

    1. optimizer = tf.compat.v1.train.AdamOptimizer(learning_rate=learning_rate)
    2. num_batches = int(data_loader.num_train_data // batch_size * num_epochs)
    3. # 建立计算图
    4. X_placeholder = tf.compat.v1.placeholder(name='X', shape=[None, 28, 28, 1], dtype=tf.float32)
    5. y_placeholder = tf.compat.v1.placeholder(name='y', shape=[None], dtype=tf.int32)
    6. y_pred = model(X_placeholder)
    7. loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=y_placeholder, y_pred=y_pred)
    8. loss = tf.reduce_mean(loss)
    9. train_op = optimizer.minimize(loss)
    10. sparse_categorical_accuracy = tf.keras.metrics.SparseCategoricalAccuracy()
    11. # 建立Session
    12. with tf.compat.v1.Session() as sess:
    13. sess.run(tf.compat.v1.global_variables_initializer())
    14. for batch_index in range(num_batches):
    15. X, y = data_loader.get_batch(batch_size)
    16. # 使用Session.run()将数据送入计算图节点,进行训练以及计算损失函数
    17. _, loss_value = sess.run([train_op, loss], feed_dict={X_placeholder: X, y_placeholder: y})
    18. print("batch %d: loss %f" % (batch_index, loss_value))
    19.  
    20. num_batches = int(data_loader.num_test_data // batch_size)
    21. for batch_index in range(num_batches):
    22. start_index, end_index = batch_index * batch_size, (batch_index + 1) * batch_size
    23. y_pred = model.predict(data_loader.test_data[start_index: end_index])
    24. sess.run(sparse_categorical_accuracy.update(y_true=data_loader.test_label[start_index: end_index], y_pred=y_pred))
    25. print("test accuracy: %f" % sess.run(sparse_categorical_accuracy.result()))

    关于Graph Execution的更多内容可参见 图模型下的TensorFlow。