Veja no TensorFlow.org | Executar no Google Colab | Ver fonte no GitHub | Baixar caderno |
Este tutorial demonstra como usar tf.distribute.Strategy com loops de treinamento personalizados. Vamos treinar um modelo CNN simples no conjunto de dados fashion MNIST. O conjunto de dados fashion MNIST contém 60.000 imagens de trem de tamanho 28 x 28 e 10.000 imagens de teste de tamanho 28 x 28.
Estamos usando loops de treinamento personalizados para treinar nosso modelo porque eles nos dão flexibilidade e maior controle no treinamento. Além disso, é mais fácil depurar o modelo e o loop de treinamento.
# Import TensorFlow
import tensorflow as tf
# Helper libraries
import numpy as np
import os
print(tf.__version__)
2.8.0-rc1
Faça o download do conjunto de dados MNIST de moda
fashion_mnist = tf.keras.datasets.fashion_mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()
# Adding a dimension to the array -> new shape == (28, 28, 1)
# We are doing this because the first layer in our model is a convolutional
# layer and it requires a 4D input (batch_size, height, width, channels).
# batch_size dimension will be added later on.
train_images = train_images[..., None]
test_images = test_images[..., None]
# Getting the images in [0, 1] range.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)
Crie uma estratégia para distribuir as variáveis e o gráfico
Como funciona a estratégia tf.distribute.MirroredStrategy ?
- Todas as variáveis e o gráfico do modelo são replicados nas réplicas.
- A entrada é distribuída uniformemente pelas réplicas.
- Cada réplica calcula a perda e os gradientes para a entrada recebida.
- Os gradientes são sincronizados em todas as réplicas somando-os.
- Após a sincronização, a mesma atualização é feita nas cópias das variáveis em cada réplica.
# If the list of devices is not specified in the
# `tf.distribute.MirroredStrategy` constructor, it will be auto-detected.
strategy = tf.distribute.MirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))
Number of devices: 1
Configurar pipeline de entrada
Exporte o gráfico e as variáveis para o formato SavedModel independente de plataforma. Depois que seu modelo for salvo, você poderá carregá-lo com ou sem o escopo.
BUFFER_SIZE = len(train_images)
BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
EPOCHS = 10
Crie os conjuntos de dados e distribua-os:
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE)
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)
2022-01-26 05:45:53.991501: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
key: "Toutput_types"
value {
list {
type: DT_FLOAT
type: DT_UINT8
}
}
}
attr {
key: "_cardinality"
value {
i: 60000
}
}
attr {
key: "is_files"
value {
b: false
}
}
attr {
key: "metadata"
value {
s: "\n\024TensorSliceDataset:0"
}
}
attr {
key: "output_shapes"
value {
list {
shape {
dim {
size: 28
}
dim {
size: 28
}
dim {
size: 1
}
}
shape {
}
}
}
}
experimental_type {
type_id: TFT_PRODUCT
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
args {
type_id: TFT_TENSOR
args {
type_id: TFT_UINT8
}
}
}
}
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
args {
type_id: TFT_TENSOR
args {
type_id: TFT_UINT8
}
}
}
}
}
2022-01-26 05:45:54.034762: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
key: "Toutput_types"
value {
list {
type: DT_FLOAT
type: DT_UINT8
}
}
}
attr {
key: "_cardinality"
value {
i: 10000
}
}
attr {
key: "is_files"
value {
b: false
}
}
attr {
key: "metadata"
value {
s: "\n\024TensorSliceDataset:3"
}
}
attr {
key: "output_shapes"
value {
list {
shape {
dim {
size: 28
}
dim {
size: 28
}
dim {
size: 1
}
}
shape {
}
}
}
}
experimental_type {
type_id: TFT_PRODUCT
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
args {
type_id: TFT_TENSOR
args {
type_id: TFT_UINT8
}
}
}
}
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
args {
type_id: TFT_TENSOR
args {
type_id: TFT_UINT8
}
}
}
}
}
Crie o modelo
Crie um modelo usando tf.keras.Sequential . Você também pode usar a API Model Subclassing para fazer isso.
def create_model():
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Conv2D(64, 3, activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10)
])
return model
# Create a checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
Defina a função de perda
Normalmente, em uma única máquina com 1 GPU/CPU, a perda é dividida pelo número de exemplos no lote de entrada.
Então, como a perda deve ser calculada ao usar um tf.distribute.Strategy ?
Por exemplo, digamos que você tenha 4 GPUs e um tamanho de lote de 64. Um lote de entrada é distribuído pelas réplicas (4 GPUs), cada réplica recebendo uma entrada de tamanho 16.
O modelo em cada réplica faz uma passagem direta com sua respectiva entrada e calcula a perda. Agora, ao invés de dividir a perda pelo número de exemplos em sua respectiva entrada (BATCH_SIZE_PER_REPLICA = 16), a perda deve ser dividida pelo GLOBAL_BATCH_SIZE (64).
Por que fazer isso?
- Isso precisa ser feito porque depois que os gradientes são calculados em cada réplica, eles são sincronizados entre as réplicas somando -os.
Como fazer isso no TensorFlow?
Se você estiver escrevendo um loop de treinamento personalizado, como neste tutorial, você deve somar as perdas por exemplo e dividir a soma pelo GLOBAL_BATCH_SIZE:
scale_loss = tf.reduce_sum(loss) * (1. / GLOBAL_BATCH_SIZE)ou você pode usartf.nn.compute_average_lossque usa a perda por exemplo, pesos de amostra opcionais e GLOBAL_BATCH_SIZE como argumentos e retorna a perda em escala.Se você estiver usando perdas de regularização em seu modelo, precisará dimensionar o valor da perda por número de réplicas. Você pode fazer isso usando a função
tf.nn.scale_regularization_loss.O uso
tf.reduce_meannão é recomendado. Isso divide a perda pelo tamanho real do lote de réplica, que pode variar passo a passo.Essa redução e dimensionamento é feito automaticamente em keras
model.compileemodel.fitSe estiver usando as classes
tf.keras.losses(como no exemplo abaixo), a redução de perda precisa ser explicitamente especificada comoNONEouSUM.AUTOeSUM_OVER_BATCH_SIZEnão são permitidos quando usados comtf.distribute.Strategy.AUTOnão é permitido porque o usuário deve pensar explicitamente sobre qual redução deseja ter certeza de que está correta no caso distribuído.SUM_OVER_BATCH_SIZEnão é permitido porque atualmente ele só dividiria por tamanho de lote de réplica e deixaria a divisão pelo número de réplicas para o usuário, o que pode ser fácil de perder. Então, em vez disso, pedimos ao usuário que faça a redução explicitamente.Se os
labelsforem multidimensionais, calcule a média deper_example_lossno número de elementos em cada amostra. Por exemplo, se a forma daspredictionsfor(batch_size, H, W, n_classes)e oslabelsforem(batch_size, H, W), será necessário atualizarper_example_losscomo:per_example_loss /= tf.cast(tf.reduce_prod(tf.shape(labels)[1:]), tf.float32)
with strategy.scope():
# Set reduction to `none` so we can do the reduction afterwards and divide by
# global batch size.
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)
def compute_loss(labels, predictions):
per_example_loss = loss_object(labels, predictions)
return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)
Defina as métricas para rastrear a perda e a precisão
Essas métricas rastreiam a perda de teste e o treinamento e a precisão do teste. Você pode usar .result() para obter as estatísticas acumuladas a qualquer momento.
with strategy.scope():
test_loss = tf.keras.metrics.Mean(name='test_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='test_accuracy')
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Ciclo de treinamento
# model, optimizer, and checkpoint must be created under `strategy.scope`.
with strategy.scope():
model = create_model()
optimizer = tf.keras.optimizers.Adam()
checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
def train_step(inputs):
images, labels = inputs
with tf.GradientTape() as tape:
predictions = model(images, training=True)
loss = compute_loss(labels, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
train_accuracy.update_state(labels, predictions)
return loss
def test_step(inputs):
images, labels = inputs
predictions = model(images, training=False)
t_loss = loss_object(labels, predictions)
test_loss.update_state(t_loss)
test_accuracy.update_state(labels, predictions)
# `run` replicates the provided computation and runs it
# with the distributed input.
@tf.function
def distributed_train_step(dataset_inputs):
per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
axis=None)
@tf.function
def distributed_test_step(dataset_inputs):
return strategy.run(test_step, args=(dataset_inputs,))
for epoch in range(EPOCHS):
# TRAIN LOOP
total_loss = 0.0
num_batches = 0
for x in train_dist_dataset:
total_loss += distributed_train_step(x)
num_batches += 1
train_loss = total_loss / num_batches
# TEST LOOP
for x in test_dist_dataset:
distributed_test_step(x)
if epoch % 2 == 0:
checkpoint.save(checkpoint_prefix)
template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
"Test Accuracy: {}")
print (template.format(epoch+1, train_loss,
train_accuracy.result()*100, test_loss.result(),
test_accuracy.result()*100))
test_loss.reset_states()
train_accuracy.reset_states()
test_accuracy.reset_states()
Epoch 1, Loss: 0.5106383562088013, Accuracy: 81.77999877929688, Test Loss: 0.39399346709251404, Test Accuracy: 85.79000091552734 Epoch 2, Loss: 0.3362727463245392, Accuracy: 87.91333770751953, Test Loss: 0.35871225595474243, Test Accuracy: 86.7699966430664 Epoch 3, Loss: 0.2928692400455475, Accuracy: 89.2683334350586, Test Loss: 0.2999486029148102, Test Accuracy: 89.04000091552734 Epoch 4, Loss: 0.2605818510055542, Accuracy: 90.41999816894531, Test Loss: 0.28474125266075134, Test Accuracy: 89.47000122070312 Epoch 5, Loss: 0.23641237616539001, Accuracy: 91.32166290283203, Test Loss: 0.26421546936035156, Test Accuracy: 90.41000366210938 Epoch 6, Loss: 0.2192477434873581, Accuracy: 91.90499877929688, Test Loss: 0.2650589942932129, Test Accuracy: 90.4800033569336 Epoch 7, Loss: 0.20016911625862122, Accuracy: 92.66999816894531, Test Loss: 0.25025954842567444, Test Accuracy: 90.9000015258789 Epoch 8, Loss: 0.18381091952323914, Accuracy: 93.26499938964844, Test Loss: 0.2585820257663727, Test Accuracy: 90.95999908447266 Epoch 9, Loss: 0.1699329912662506, Accuracy: 93.67500305175781, Test Loss: 0.26234227418899536, Test Accuracy: 91.0199966430664 Epoch 10, Loss: 0.15756534039974213, Accuracy: 94.16333770751953, Test Loss: 0.25516414642333984, Test Accuracy: 90.93000030517578
Pontos a serem observados no exemplo acima:
- Estamos iterando sobre o
train_dist_datasetetest_dist_datasetusando uma construçãofor x in ... - A perda escalonada é o valor de retorno do
distributed_train_step. Esse valor é agregado nas réplicas usando a chamadatf.distribute.Strategy.reducee, em seguida, nos lotes somando o valor de retorno das chamadastf.distribute.Strategy.reduce. -
tf.keras.Metricsdeve ser atualizado dentrotrain_stepetest_stepque é executado portf.distribute.Strategy.run. *tf.distribute.Strategy.runretorna resultados de cada réplica local na estratégia e há várias maneiras de consumir esse resultado. Você pode fazertf.distribute.Strategy.reducepara obter um valor agregado. Você também pode fazertf.distribute.Strategy.experimental_local_resultspara obter a lista de valores contidos no resultado, um por réplica local.
Restaure o último ponto de verificação e teste
Um modelo com checkpoint tf.distribute.Strategy pode ser restaurado com ou sem uma estratégia.
eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='eval_accuracy')
new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
@tf.function
def eval_step(images, labels):
predictions = new_model(images, training=False)
eval_accuracy(labels, predictions)
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))
for images, labels in test_dataset:
eval_step(images, labels)
print ('Accuracy after restoring the saved model without strategy: {}'.format(
eval_accuracy.result()*100))
Accuracy after restoring the saved model without strategy: 91.0199966430664
Formas alternativas de iterar em um conjunto de dados
Usando iteradores
Se você deseja iterar em um determinado número de etapas e não em todo o conjunto de dados, você pode criar um iterador usando a chamada iter e explicitamente chamar next no iterador. Você pode optar por iterar sobre o conjunto de dados dentro e fora do tf.function. Aqui está um pequeno trecho demonstrando a iteração do conjunto de dados fora do tf.function usando um iterador.
for _ in range(EPOCHS):
total_loss = 0.0
num_batches = 0
train_iter = iter(train_dist_dataset)
for _ in range(10):
total_loss += distributed_train_step(next(train_iter))
num_batches += 1
average_train_loss = total_loss / num_batches
template = ("Epoch {}, Loss: {}, Accuracy: {}")
print (template.format(epoch+1, average_train_loss, train_accuracy.result()*100))
train_accuracy.reset_states()
Epoch 10, Loss: 0.17486707866191864, Accuracy: 93.4375 Epoch 10, Loss: 0.12386945635080338, Accuracy: 95.3125 Epoch 10, Loss: 0.16411852836608887, Accuracy: 93.90625 Epoch 10, Loss: 0.10728752613067627, Accuracy: 96.40625 Epoch 10, Loss: 0.11865834891796112, Accuracy: 95.625 Epoch 10, Loss: 0.12875251471996307, Accuracy: 95.15625 Epoch 10, Loss: 0.1189488023519516, Accuracy: 95.625 Epoch 10, Loss: 0.1456708014011383, Accuracy: 95.15625 Epoch 10, Loss: 0.12446556240320206, Accuracy: 95.3125 Epoch 10, Loss: 0.1380888819694519, Accuracy: 95.46875
Iterando dentro de um tf.function
Você também pode iterar sobre toda a entrada train_dist_dataset dentro de um tf.function usando a construção for x in ... ou criando iteradores como fizemos acima. O exemplo abaixo demonstra o envolvimento de uma época de treinamento em um tf.function e a iteração sobre train_dist_dataset dentro da função.
@tf.function
def distributed_train_epoch(dataset):
total_loss = 0.0
num_batches = 0
for x in dataset:
per_replica_losses = strategy.run(train_step, args=(x,))
total_loss += strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
num_batches += 1
return total_loss / tf.cast(num_batches, dtype=tf.float32)
for epoch in range(EPOCHS):
train_loss = distributed_train_epoch(train_dist_dataset)
template = ("Epoch {}, Loss: {}, Accuracy: {}")
print (template.format(epoch+1, train_loss, train_accuracy.result()*100))
train_accuracy.reset_states()
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:449: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options.
warnings.warn("To make it possible to preserve tf.data options across "
Epoch 1, Loss: 0.14398494362831116, Accuracy: 94.63999938964844
Epoch 2, Loss: 0.13246288895606995, Accuracy: 94.97333526611328
Epoch 3, Loss: 0.11922841519117355, Accuracy: 95.63833618164062
Epoch 4, Loss: 0.11084160208702087, Accuracy: 95.99333190917969
Epoch 5, Loss: 0.10420522093772888, Accuracy: 96.0816650390625
Epoch 6, Loss: 0.09215126931667328, Accuracy: 96.63500213623047
Epoch 7, Loss: 0.0878651961684227, Accuracy: 96.67666625976562
Epoch 8, Loss: 0.07854588329792023, Accuracy: 97.09333038330078
Epoch 9, Loss: 0.07217177003622055, Accuracy: 97.34833526611328
Epoch 10, Loss: 0.06753655523061752, Accuracy: 97.48999786376953
Rastreamento de perda de treinamento em réplicas
Não recomendamos o uso de tf.metrics.Mean para rastrear a perda de treinamento em diferentes réplicas, devido ao cálculo de escala de perda que é realizado.
Por exemplo, se você executar um trabalho de treinamento com as seguintes características:
- Duas réplicas
- Duas amostras são processadas em cada réplica
- Valores de perda resultantes: [2, 3] e [4, 5] em cada réplica
- Tamanho global do lote = 4
Com o dimensionamento de perda, você calcula o valor de perda por amostra em cada réplica adicionando os valores de perda e, em seguida, dividindo pelo tamanho global do lote. Neste caso: (2 + 3) / 4 = 1.25 e (4 + 5) / 4 = 2.25 .
Se você usar tf.metrics.Mean para rastrear a perda nas duas réplicas, o resultado será diferente. Neste exemplo, você termina com um total de 3,50 e count de 2, o que resulta em total / count = 1,75 quando result() é chamado na métrica. A perda calculada com tf.keras.Metrics é dimensionada por um fator adicional que é igual ao número de réplicas em sincronia.
Guia e exemplos
Aqui estão alguns exemplos de uso da estratégia de distribuição com loops de treinamento personalizados:
- Guia de treinamento distribuído
- Exemplo DenseNet usando
MirroredStrategy. - Exemplo de BERT treinado usando
MirroredStrategyeTPUStrategy. Este exemplo é particularmente útil para entender como carregar de um ponto de verificação e gerar pontos de verificação periódicos durante o treinamento distribuído, etc. - Exemplo de NCF treinado usando
MirroredStrategyque pode ser habilitado usando o sinalizadorkeras_use_ctl. - Exemplo de NMT treinado usando
MirroredStrategy.
Mais exemplos listados no guia de estratégia de distribuição .
Próximos passos
- Experimente a nova API
tf.distribute.Strategyem seus modelos. - Visite a seção Desempenho no guia para saber mais sobre outras estratégias e ferramentas que você pode usar para otimizar o desempenho de seus modelos do TensorFlow.
Veja no TensorFlow.org
Executar no Google Colab
Ver fonte no GitHub
Baixar caderno