View on TensorFlow.org | Run in Google Colab | View source on GitHub | Download notebook |
Overview
This Colab gives a brief introduction into the TPUEmbeddingLayer of TensorFlow 2.
The TPUEmbeddingLayer can use the embedding accelerator on the Cloud TPU to speed up embedding lookups when you have many large embedding tables. This is particularly useful when creating recommendation models as these models typically have very large embedding tables.
Please follow the Google Cloud TPU quickstart for how to create GCP account and GCS bucket. You have $300 free credit to get started with any GCP product. You can learn more about Cloud TPU at https://cloud.google.com/tpu/docs
Setup
Install Tensorflow 2.0 and Tensorflow-Recommenders
pip install -U tensorflow-recommenders
import numpy as np
import tensorflow as tf
import tensorflow_recommenders as tfrs
Connect to the TPU node or local TPU and initialize the TPU system.
resolver = tf.distribute.cluster_resolver.TPUClusterResolver('').connect('')
Create the TPU strategy. Model that needs to run on TPU should be created under TPUStrategy.
strategy = tf.distribute.TPUStrategy(resolver)
You can also check the tpu hardware feature in the TPUStrategy object.
For example, you can check which version of embedding feature is supported on this TPU. Check the tf.tpu.experimental.HardwareFeature
for detailed documentation.
embedding_feature = strategy.extended.tpu_hardware_feature.embedding_feature
assert embedding_feature == tf.tpu.experimental.HardwareFeature.EmbeddingFeature.V1, 'Make sure that you have the right TPU Hardware'
TPUEmbedding API break down
Feature and table configuration
When creating an instance of this layer, you must specify:
- The complete set of embedding tables,
- The features you expect to lookup in those tables and
- The optimizer(s) you wish to use on the tables.
See the documentation of tf.tpu.experimental.embedding.TableConfig
and tf.tpu.experimental.embedding.FeatureConfig
for more details on the complete set of options. We will cover the basic usage here.
Multiple FeatureConfig objects can use the same TableConfig object, allowing different features to share the same table:
table_config_one = tf.tpu.experimental.embedding.TableConfig(
vocabulary_size=8, dim=8)
table_config_two = tf.tpu.experimental.embedding.TableConfig(
vocabulary_size=16, dim=4)
feature_config = {
'feature_one':
tf.tpu.experimental.embedding.FeatureConfig(table=table_config_one),
'feature_two':
tf.tpu.experimental.embedding.FeatureConfig(table=table_config_one),
'feature_three':
tf.tpu.experimental.embedding.FeatureConfig(table=table_config_two)
}
Optimizers
An optimizer can be globally specified by passing one of the following types of input to the optimizer argument:
- A string, one of 'sgd', 'adagrad' or 'adam', which uses the given optimizer with the default parameters.
- An instance of a Keras optimizer.
- An instance of an optimizer class from the tf.tpu.experimental.embedding module.
You may also specify an optimizer at the table level via the optimizer argument of tf.tpu.experimental.embedding.TableConfig
. This will completely override the global optimizer for this table. For performance reasons it is recommended that you minimize the total number of distinct optimizers.
optimizer=tf.tpu.experimental.embedding.SGD(0.1)
Model Creation
Here are two examples of creating a keras model with tpu embedding layer in it.
For a functional style Keras model:
with strategy.scope():
embedding_inputs = {
'feature_one':
tf.keras.Input(batch_size=1024, shape=(1,), dtype=tf.int32),
'feature_two':
tf.keras.Input(
batch_size=1024, shape=(1,), dtype=tf.int32, ragged=True),
'feature_three':
tf.keras.Input(batch_size=1024, shape=(1,), dtype=tf.int32)
}
# embedding, feature_config and embedding_inputs all have the same nested
# structure.
embedding = tfrs.layers.embedding.TPUEmbedding(
feature_config=feature_config, optimizer=optimizer)(
embedding_inputs)
logits = tf.keras.layers.Dense(1)(
tf.concat(tf.nest.flatten(embedding), axis=1))
model = tf.keras.Model(embedding_inputs, logits)
For a subclass style model:
class ModelWithEmbeddings(tf.keras.Model):
def __init__(self):
super(ModelWithEmbeddings, self).__init__()
self.embedding_layer = tfrs.layers.embedding.TPUEmbedding(
feature_config=feature_config, optimizer=optimizer)
self.dense = tf.keras.layers.Dense(1)
def call(self, inputs):
embedding = self.embedding_layer(inputs)
logits = self.dense(tf.concat(tf.nest.flatten(embedding), axis=1))
return logits
# Make sure that the tpu is reinitialized when you try to create another mdoel
tf.tpu.experimental.initialize_tpu_system(resolver)
with strategy.scope():
model = ModelWithEmbeddings()
WARNING:tensorflow:TPU system grpc://10.3.32.50:8470 has already been initialized. Reinitializing the TPU can cause previously created variables on TPU to be lost.
tf.tpu.experimental.initialize_tpu_system(resolver)
WARNING:tensorflow:TPU system grpc://10.3.32.50:8470 has already been initialized. Reinitializing the TPU can cause previously created variables on TPU to be lost. <tensorflow.python.tpu.topology.Topology at 0x7f2085f74400>
Simple TPUEmbeddingLayer example
In this tutorial, we build a simple ranking model using the MovieLens 100K dataset with TPUEmbeddingLayer. We can use this model to predict ratings based on user_id
and movie_id
.
Install and import tensorflow datasets
pip install -q --upgrade tensorflow-datasets
import tensorflow_datasets as tfds
Read the data
In order to make the dataset accessible to the Cloud TPU worker. You need to create a gcs bucket and download the dataset to the bucket. Follow this instructions to create your gcs bucket.
gcs_bucket = 'gs://YOUR-BUCKET-NAME'
from google.colab import auth
auth.authenticate_user()
First we fetch the data using tensorflow_dataset. The data that we need is movie_id
, user_id
and user_rating
.
Then preprocess the data and convert them to integers.
# Ratings data.
ratings = tfds.load(
"movielens/100k-ratings", data_dir=gcs_bucket, split="train")
# Select the basic features.
ratings = ratings.map(
lambda x: {
"movie_id": tf.strings.to_number(x["movie_id"]),
"user_id": tf.strings.to_number(x["user_id"]),
"user_rating": x["user_rating"],
})
Prepare the dataset and model
Here we define some hyperparameters for the model.
per_replica_batch_size = 16
movie_vocabulary_size = 2048
movie_embedding_size = 64
user_vocabulary_size = 2048
user_embedding_size = 64
We'll split the data by putting 80% of the ratings in the train set, and 20% in the test set.
shuffled = ratings.shuffle(100_000, seed=42, reshuffle_each_iteration=False)
train = shuffled.take(80_000)
test = shuffled.skip(80_000).take(20_000)
Batch the dataset and convert it to a distributed dataset.
train_dataset = train.batch(
per_replica_batch_size * strategy.num_replicas_in_sync,
drop_remainder=True).cache()
test_dataset = test.batch(
per_replica_batch_size * strategy.num_replicas_in_sync,
drop_remainder=True).cache()
distribute_train_dataset = strategy.experimental_distribute_dataset(
train_dataset,
options=tf.distribute.InputOptions(experimental_fetch_to_device=False))
distribute_test_dataset = strategy.experimental_distribute_dataset(
test_dataset,
options=tf.distribute.InputOptions(experimental_fetch_to_device=False))
Here we create the optimizer, specify the feature and table config. Then we create the model with embedding layer.
optimizer = tf.keras.optimizers.Adagrad(learning_rate=0.1)
user_table = tf.tpu.experimental.embedding.TableConfig(
vocabulary_size=user_vocabulary_size, dim=user_embedding_size)
movie_table = tf.tpu.experimental.embedding.TableConfig(
vocabulary_size=movie_vocabulary_size, dim=movie_embedding_size)
feature_config = {
"movie_id": tf.tpu.experimental.embedding.FeatureConfig(table=movie_table),
"user_id": tf.tpu.experimental.embedding.FeatureConfig(table=user_table)
}
# Define a ranking model with embedding layer.
class EmbeddingModel(tfrs.models.Model):
def __init__(self):
super().__init__()
self.embedding_layer = tfrs.layers.embedding.TPUEmbedding(
feature_config=feature_config, optimizer=optimizer)
self.ratings = tf.keras.Sequential([
# Learn multiple dense layers.
tf.keras.layers.Dense(256, activation="relu"),
tf.keras.layers.Dense(64, activation="relu"),
# Make rating predictions in the final layer.
tf.keras.layers.Dense(1)
])
self.task: tf.keras.layers.Layer = tfrs.tasks.Ranking(
loss=tf.keras.losses.MeanSquaredError(
reduction=tf.keras.losses.Reduction.NONE),
metrics=[tf.keras.metrics.RootMeanSquaredError()])
def compute_loss(self, features, training=False):
embedding = self.embedding_layer({
"user_id": features["user_id"],
"movie_id": features["movie_id"]
})
rating_predictions = self.ratings(
tf.concat([embedding["user_id"], embedding["movie_id"]], axis=1))
return tf.reduce_sum(
self.task(
labels=features["user_rating"], predictions=rating_predictions)) * (
1 / (per_replica_batch_size * strategy.num_replicas_in_sync))
def call(self, features, serving_config=None):
embedding = self.embedding_layer(
{
"user_id": features["user_id"],
"movie_id": features["movie_id"]
},
serving_config=serving_config)
return self.ratings(
tf.concat([embedding["user_id"], embedding["movie_id"]], axis=1))
Make sure that you initialize the model under TPUStrategy.
with strategy.scope():
model = EmbeddingModel()
model.compile(optimizer=optimizer)
Train and eval the model
import os
Train the model
model.fit(distribute_train_dataset, steps_per_epoch=10, epochs=10)
Epoch 1/10 10/10 [==============================] - 7s 32ms/step - root_mean_squared_error: 2.7897 - loss: 0.0564 - regularization_loss: 0.0000e+00 - total_loss: 0.0564 Epoch 2/10 10/10 [==============================] - 0s 26ms/step - root_mean_squared_error: 1.1963 - loss: 0.0088 - regularization_loss: 0.0000e+00 - total_loss: 0.0088 Epoch 3/10 10/10 [==============================] - 0s 25ms/step - root_mean_squared_error: 1.1261 - loss: 0.0089 - regularization_loss: 0.0000e+00 - total_loss: 0.0089 Epoch 4/10 10/10 [==============================] - 0s 35ms/step - root_mean_squared_error: 1.1403 - loss: 0.0094 - regularization_loss: 0.0000e+00 - total_loss: 0.0094 Epoch 5/10 10/10 [==============================] - 0s 40ms/step - root_mean_squared_error: 1.1269 - loss: 0.0103 - regularization_loss: 0.0000e+00 - total_loss: 0.0103 Epoch 6/10 10/10 [==============================] - 0s 36ms/step - root_mean_squared_error: 1.1162 - loss: 0.0100 - regularization_loss: 0.0000e+00 - total_loss: 0.0100 Epoch 7/10 10/10 [==============================] - 0s 36ms/step - root_mean_squared_error: 1.1365 - loss: 0.0097 - regularization_loss: 0.0000e+00 - total_loss: 0.0097 Epoch 8/10 10/10 [==============================] - 0s 47ms/step - root_mean_squared_error: 1.1171 - loss: 0.0110 - regularization_loss: 0.0000e+00 - total_loss: 0.0110 Epoch 9/10 10/10 [==============================] - 0s 48ms/step - root_mean_squared_error: 1.1037 - loss: 0.0100 - regularization_loss: 0.0000e+00 - total_loss: 0.0100 Epoch 10/10 10/10 [==============================] - 0s 51ms/step - root_mean_squared_error: 1.0953 - loss: 0.0092 - regularization_loss: 0.0000e+00 - total_loss: 0.0092 <keras.callbacks.History at 0x7f2084d7ddf0>
Evaluate the model on test dataset
model.evaluate(distribute_test_dataset, steps=10)
10/10 [==============================] - 4s 27ms/step - root_mean_squared_error: 1.1339 - loss: 0.0090 - regularization_loss: 0.0000e+00 - total_loss: 0.0090 [1.1338995695114136, 0.009662957862019539, 0, 0.009662957862019539]
Save and restore the checkpoint
You can use a gcs bucket to store your checkpoint.
Make sure that you give the tpu worker access to the bucket by following the instructions.
model_dir = os.path.join(gcs_bucket, "saved_model")
Create the checkpoint for the TPU model and save the model to the bucket.
checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
saved_tpu_model_path = checkpoint.save(os.path.join(model_dir, "ckpt"))
You can list the variables that get stored in that path.
tf.train.list_variables(saved_tpu_model_path)
[('_CHECKPOINTABLE_OBJECT_GRAPH', []), ('model/embedding_layer/_tpu_embedding/.ATTRIBUTES/TPUEmbedding_saveable', []), ('model/embedding_layer/_tpu_embedding/table_0/.ATTRIBUTES/VARIABLE_VALUE', [2048, 64]), ('model/embedding_layer/_tpu_embedding/table_0/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE', [2048, 64]), ('model/embedding_layer/_tpu_embedding/table_1/.ATTRIBUTES/VARIABLE_VALUE', [2048, 64]), ('model/embedding_layer/_tpu_embedding/table_1/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE', [2048, 64]), ('model/ratings/layer_with_weights-0/bias/.ATTRIBUTES/VARIABLE_VALUE', [256]), ('model/ratings/layer_with_weights-0/bias/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE', [256]), ('model/ratings/layer_with_weights-0/kernel/.ATTRIBUTES/VARIABLE_VALUE', [128, 256]), ('model/ratings/layer_with_weights-0/kernel/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE', [128, 256]), ('model/ratings/layer_with_weights-1/bias/.ATTRIBUTES/VARIABLE_VALUE', [64]), ('model/ratings/layer_with_weights-1/bias/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE', [64]), ('model/ratings/layer_with_weights-1/kernel/.ATTRIBUTES/VARIABLE_VALUE', [256, 64]), ('model/ratings/layer_with_weights-1/kernel/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE', [256, 64]), ('model/ratings/layer_with_weights-2/bias/.ATTRIBUTES/VARIABLE_VALUE', [1]), ('model/ratings/layer_with_weights-2/bias/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE', [1]), ('model/ratings/layer_with_weights-2/kernel/.ATTRIBUTES/VARIABLE_VALUE', [64, 1]), ('model/ratings/layer_with_weights-2/kernel/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE', [64, 1]), ('model/task/_ranking_metrics/0/count/.ATTRIBUTES/VARIABLE_VALUE', []), ('model/task/_ranking_metrics/0/total/.ATTRIBUTES/VARIABLE_VALUE', []), ('optimizer/decay/.ATTRIBUTES/VARIABLE_VALUE', []), ('optimizer/iter/.ATTRIBUTES/VARIABLE_VALUE', []), ('optimizer/learning_rate/.ATTRIBUTES/VARIABLE_VALUE', []), ('save_counter/.ATTRIBUTES/VARIABLE_VALUE', [])]
You can restore the checkpoint later. This is a common practice to checkpoint your model for every epoch and restore that afterwards.
with strategy.scope():
checkpoint.restore(saved_tpu_model_path)
Addtionally you can create a cpu model and restore the weights that gets trained on TPU.
cpu_model = EmbeddingModel()
# Create the cpu checkpoint and restore the tpu checkpoint.
cpu_checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=cpu_model)
cpu_checkpoint.restore(saved_tpu_model_path)
<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7f20830fe5b0>
You can also restore embedding weights partially.
embedding_checkpoint = tf.train.Checkpoint(embedding=model.embedding_layer)
saved_embedding_path = embedding_checkpoint.save(
os.path.join(model_dir, 'tpu-embedding'))
# Restore the embedding parameters on cpu model.
cpu_embedding_checkpoint = tf.train.Checkpoint(
embeddign=cpu_model.embedding_layer)
cpu_embedding_checkpoint.restore(saved_embedding_path)
<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7f20831bbeb0>
# Save the embedding parameters on cpu model and restore it to the tpu model.
saved_cpu_embedding_path = embedding_checkpoint.save(
os.path.join(model_dir, 'cpu-embedding'))
with strategy.scope():
embedding_checkpoint.restore(saved_cpu_embedding_path)
Serving
Finally, You can use the exported cpu model to do serving. Serving is
accomplished through the tf.saved_model
API
@tf.function
def serve_tensors(features):
return cpu_model(features)
signatures = {
'serving':
serve_tensors.get_concrete_function(
features={
'movie_id':
tf.TensorSpec(shape=(1,), dtype=tf.int32, name='movie_id'),
'user_id':
tf.TensorSpec(shape=(1,), dtype=tf.int32, name='user_id'),
}),
}
tf.saved_model.save(
cpu_model,
export_dir=os.path.join(model_dir, 'exported_model'),
signatures=signatures)
WARNING:tensorflow:Skipping full serialization of Keras layer <tensorflow_recommenders.tasks.ranking.Ranking object at 0x7f20831ead00>, because it is not built.
The exported model can now be loaded (in Python or C) and used for serving
imported = tf.saved_model.load(os.path.join(model_dir, 'exported_model'))
predict_fn = imported.signatures['serving']
# Dummy serving data.
input_batch = {
'movie_id': tf.constant(np.array([100]), dtype=tf.int32),
'user_id': tf.constant(np.array([30]), dtype=tf.int32)
}
# The prediction it generates.
prediction = predict_fn(**input_batch)['output_0']
WARNING:tensorflow:Detecting that an object or model or tf.train.Checkpoint is being deleted with unrestored values. See the following logs for the specific values in question. To silence these warnings, use `status.expect_partial()`. See https://www.tensorflow.org/api_docs/python/tf/train/Checkpoint#restorefor details about the status object returned by the restore function. WARNING:tensorflow:An attribute in the restored object could not be found in the checkpoint. Object: (root).embedding_layer._tpu_embedding, attribute: ['TPUEmbedding_saveable']
Additionally, you can pass the serving config to do serving.
Note that: You can use a subset of the trained embedding tables to do serving by using a serving config.
serving_config = {
'movie_id': tf.tpu.experimental.embedding.FeatureConfig(table=movie_table),
'user_id': tf.tpu.experimental.embedding.FeatureConfig(table=user_table)
}
prediction = cpu_model(input_batch, serving_config=serving_config)