ดูบน TensorFlow.org | ทำงานใน Google Colab | ดูแหล่งที่มาบน GitHub | ดาวน์โหลดโน๊ตบุ๊ค |
ภาพรวม
บทช่วยสอนนี้สาธิตวิธีดำเนินการฝึกอบรมแบบกระจายผู้ปฏิบัติงานหลายคนด้วยโมเดล Keras และ Model.fit API โดยใช้ tf.distribute.Strategy API โดยเฉพาะคลาส tf.distribute.MultiWorkerMirroredStrategy ด้วยความช่วยเหลือของกลยุทธ์นี้ โมเดล Keras ที่ออกแบบมาให้ทำงานบนผู้ปฏิบัติงานคนเดียวสามารถทำงานกับผู้ปฏิบัติงานหลายคนได้อย่างราบรื่นโดยมีการเปลี่ยนแปลงโค้ดเพียงเล็กน้อย
สำหรับผู้ที่สนใจเข้าใจอย่างลึกซึ้งยิ่งขึ้นเกี่ยวกับ tf.distribute.Strategy APIs การ ฝึกอบรมแบบกระจายในคู่มือ TensorFlow จะมีให้สำหรับภาพรวมของกลยุทธ์การจัดจำหน่ายที่ TensorFlow รองรับ
หากต้องการเรียนรู้วิธีใช้ MultiWorkerMirroredStrategy กับ Keras และลูปการฝึกแบบกำหนดเอง โปรดดูที่ Custom training loop กับ Keras และ MultiWorkerMirroredStrategy
โปรดทราบว่าจุดประสงค์ของบทช่วยสอนนี้คือเพื่อแสดงตัวอย่างผู้ปฏิบัติงานหลายคนขั้นต่ำที่มีผู้ปฏิบัติงานสองคน
ติดตั้ง
เริ่มต้นด้วยการนำเข้าที่จำเป็น:
import json
import os
import sys
ก่อนนำเข้า TensorFlow ให้ทำการเปลี่ยนแปลงบางอย่างกับสภาพแวดล้อม:
- ปิดการใช้งาน GPU ทั้งหมด ซึ่งจะป้องกันข้อผิดพลาดที่เกิดจากพนักงานทุกคนที่พยายามใช้ GPU เดียวกัน ในการใช้งานจริง ผู้ปฏิบัติงานแต่ละคนจะอยู่คนละเครื่องกัน
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
- รีเซ็ตตัวแปรสภาพแวดล้อม
TF_CONFIG(คุณจะได้เรียนรู้เพิ่มเติมเกี่ยวกับสิ่งนี้ในภายหลัง):
os.environ.pop('TF_CONFIG', None)
- ตรวจสอบให้แน่ใจว่าไดเร็กทอรีปัจจุบันอยู่บนเส้นทางของ Python ซึ่งจะทำให้โน้ตบุ๊กนำเข้าไฟล์ที่เขียนโดย
%%writefileได้ในภายหลัง:
if '.' not in sys.path:
sys.path.insert(0, '.')
ตอนนี้นำเข้า TensorFlow:
import tensorflow as tf
นิยามชุดข้อมูลและโมเดล
ถัดไป สร้างไฟล์ mnist_setup.py ด้วยการตั้งค่าโมเดลและชุดข้อมูลอย่างง่าย ไฟล์ Python นี้จะถูกใช้โดยกระบวนการของผู้ปฏิบัติงานในบทช่วยสอนนี้:
%%writefile mnist_setup.py
import os
import tensorflow as tf
import numpy as np
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the [0, 255] range.
# You need to convert them to float32 with values in the [0, 1] range.
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
return train_dataset
def build_and_compile_cnn_model():
model = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'])
return model
Writing mnist_setup.py
อบรมต้นแบบกับคนทำงานคนเดียว
ลองฝึกแบบจำลองสำหรับยุคสมัยจำนวนน้อยๆ และสังเกตผลลัพธ์ของ ผู้ปฏิบัติงานคนเดียว เพื่อให้แน่ใจว่าทุกอย่างทำงานอย่างถูกต้อง เมื่อการฝึกดำเนินไป การสูญเสียควรลดลงและความแม่นยำควรเพิ่มขึ้น
import mnist_setup
batch_size = 64
single_worker_dataset = mnist_setup.mnist_dataset(batch_size)
single_worker_model = mnist_setup.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz 11493376/11490434 [==============================] - 0s 0us/step 11501568/11490434 [==============================] - 0s 0us/step 2022-02-05 02:20:59.945141: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected Epoch 1/3 70/70 [==============================] - 1s 12ms/step - loss: 2.2839 - accuracy: 0.1788 Epoch 2/3 70/70 [==============================] - 1s 12ms/step - loss: 2.2492 - accuracy: 0.3185 Epoch 3/3 70/70 [==============================] - 1s 12ms/step - loss: 2.2012 - accuracy: 0.4795 <keras.callbacks.History at 0x7f666a2e4510>
การกำหนดค่าผู้ปฏิบัติงานหลายคน
มาเข้าสู่โลกของการฝึกอบรมผู้ปฏิบัติงานหลายคนกันเถอะ
คลัสเตอร์ที่มีงานและงาน
ใน TensorFlow การฝึกอบรมแบบกระจายเกี่ยวข้องกับ: 'cluster' ที่มีหลายงาน และแต่ละงานอาจมี 'task' หนึ่งงานขึ้นไป
คุณจะต้องใช้ตัวแปรสภาพแวดล้อมการกำหนดค่า TF_CONFIG สำหรับการฝึกอบรมบนเครื่องหลายเครื่อง ซึ่งแต่ละเครื่องอาจมีบทบาทที่แตกต่างกัน TF_CONFIG เป็นสตริง JSON ที่ใช้ในการระบุการกำหนดค่าคลัสเตอร์สำหรับผู้ปฏิบัติงานแต่ละคนที่เป็นส่วนหนึ่งของคลัสเตอร์
มีสององค์ประกอบของตัวแปร TF_CONFIG : 'cluster' และ 'task'
'cluster'จะเหมือนกันสำหรับผู้ปฏิบัติงานทุกคน และให้ข้อมูลเกี่ยวกับคลัสเตอร์การฝึกอบรม ซึ่งเป็นคำสั่งที่ประกอบด้วยงานประเภทต่างๆ เช่น'worker'หรือ'chief'- ในการฝึกอบรมผู้ปฏิบัติงานหลายคนด้วย
tf.distribute.MultiWorkerMirroredStrategyมักจะมี'worker'หนึ่งที่รับผิดชอบ เช่น การบันทึกจุดตรวจและการเขียนไฟล์สรุปสำหรับ TensorBoard นอกเหนือจากสิ่งที่'worker'ปกติทำ'worker'ดังกล่าวเรียกว่า chief worker (มีชื่องานว่า'chief') - เป็นเรื่องปกติสำหรับ
'chief'ที่จะต้องแต่งตั้ง'index'0ให้ (อันที่จริง นี่คือวิธีการใช้งานtf.distribute.Strategy)
- ในการฝึกอบรมผู้ปฏิบัติงานหลายคนด้วย
'task'ให้ข้อมูลของงานปัจจุบันและแตกต่างกันไปสำหรับผู้ปฏิบัติงานแต่ละคน ระบุ'type'และ'index'ของผู้ปฏิบัติงานนั้น
ด้านล่างนี้คือตัวอย่างการกำหนดค่า:
tf_config = {
'cluster': {
'worker': ['localhost:12345', 'localhost:23456']
},
'task': {'type': 'worker', 'index': 0}
}
นี่คือ TF_CONFIG เดียวกันที่จัดลำดับเป็นสตริง JSON:
json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'
โปรดทราบว่า tf_config เป็นเพียงตัวแปรโลคัลใน Python เพื่อให้สามารถใช้สำหรับการกำหนดค่าการฝึกอบรม dict นี้ต้องได้รับการจัดลำดับเป็น JSON และวางไว้ในตัวแปรสภาพแวดล้อม TF_CONFIG
ในการกำหนดค่าตัวอย่างด้านบน คุณตั้งค่างาน 'type' เป็น 'worker' และงาน 'index' เป็น 0 ดังนั้นเครื่องนี้จึงเป็นผู้ปฏิบัติงานคน แรก โดยจะได้รับการแต่งตั้งให้เป็น 'chief' ผู้ปฏิบัติงานและทำงานมากกว่าคนอื่นๆ
เพื่อจุดประสงค์ในการอธิบายประกอบ บทช่วยสอนนี้จะแสดงวิธีตั้งค่าตัวแปร TF_CONFIG กับผู้ปฏิบัติงานสองคนบน localhost
ในทางปฏิบัติ คุณจะต้องสร้างผู้ปฏิบัติงานหลายคนบนที่อยู่/พอร์ต IP ภายนอก และตั้งค่าตัวแปร TF_CONFIG ให้กับผู้ปฏิบัติงานแต่ละคนตามลำดับ
ในบทช่วยสอนนี้ คุณจะใช้คนงานสองคน:
-
TF_CONFIGของผู้ปฏิบัติงานคนแรก ('chief') แสดงไว้ด้านบน - สำหรับผู้ปฏิบัติงานคนที่สอง คุณจะต้องตั้งค่า
tf_config['task']['index']=1
ตัวแปรสภาพแวดล้อมและกระบวนการย่อยในโน้ตบุ๊ก
กระบวนการย่อยรับช่วงตัวแปรสภาพแวดล้อมจากพาเรนต์
ตัวอย่างเช่น คุณสามารถตั้งค่าตัวแปรสภาพแวดล้อมในกระบวนการ Jupyter Notebook ได้ดังนี้:
os.environ['GREETINGS'] = 'Hello TensorFlow!'
จากนั้น คุณสามารถเข้าถึงตัวแปรสภาพแวดล้อมจากกระบวนการย่อยได้:
echo ${GREETINGS}
Hello TensorFlow!
ในส่วนถัดไป คุณจะใช้วิธีการที่คล้ายกันเพื่อส่ง TF_CONFIG ไปยังกระบวนการย่อยของผู้ปฏิบัติงาน ในสถานการณ์จริง คุณจะไม่เริ่มงานด้วยวิธีนี้ แต่ในตัวอย่างนี้ก็เพียงพอแล้ว
เลือกกลยุทธ์ที่เหมาะสม
ใน TensorFlow การฝึกอบรมแบบกระจายมีสองรูปแบบหลัก:
- การฝึกอบรมแบบซิงโครนัส โดยที่ขั้นตอนของการฝึกอบรมจะซิงค์กันระหว่างผู้ปฏิบัติงานและแบบจำลอง และ
- การฝึกอบรมแบบอะซิงโครนัส โดยที่ขั้นตอนการฝึกอบรมไม่ได้ซิงค์อย่างเคร่งครัด (เช่น การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ )
บทช่วยสอนนี้สาธิตวิธีดำเนินการฝึกอบรมผู้ปฏิบัติงานหลายคนแบบซิงโครนัสโดยใช้อินสแตนซ์ของ tf.distribute.MultiWorkerMirroredStrategy
MultiWorkerMirroredStrategy สร้างสำเนาของตัวแปรทั้งหมดในเลเยอร์ของโมเดลบนอุปกรณ์แต่ละเครื่องของผู้ปฏิบัติงานทุกคน มันใช้ CollectiveOps ซึ่งเป็น TensorFlow op สำหรับการสื่อสารแบบกลุ่ม เพื่อรวมการไล่ระดับสีและทำให้ตัวแปรซิงค์กัน คู่มือ tf.distribute.Strategy มีรายละเอียดเพิ่มเติมเกี่ยวกับกลยุทธ์นี้
strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
MultiWorkerMirroredStrategy มีการใช้งานหลายอย่าง tf.distribute.experimental.CommunicationOptions RING 2) NCCL ใช้ NVIDIA Collective Communication Library เพื่อใช้งานส่วนรวม; และ 3) AUTO เลื่อนตัวเลือกไปที่รันไทม์ ทางเลือกที่ดีที่สุดของการใช้งานแบบรวมขึ้นอยู่กับจำนวนและประเภทของ GPU และการเชื่อมต่อเครือข่ายในคลัสเตอร์
ฝึกโมเดล
ด้วยการรวม tf.distribute.Strategy API เข้ากับ tf.keras การเปลี่ยนแปลงเพียงอย่างเดียวที่คุณจะทำเพื่อแจกจ่ายการฝึกอบรมให้กับผู้ปฏิบัติงานหลายคนคือการปิด model building และ model.compile() ไว้ภายใน strategy.scope() ขอบเขตของกลยุทธ์การกระจายกำหนดวิธีการและตำแหน่งที่ตัวแปรถูกสร้างขึ้น และในกรณีของ MultiWorkerMirroredStrategy ตัวแปรที่สร้างขึ้นคือ MirroredVariable และจะถูกจำลองแบบกับผู้ปฏิบัติงานแต่ละคน
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
หากต้องการใช้งาน MultiWorkerMirroredStrategy คุณจะต้องเรียกใช้กระบวนการของผู้ปฏิบัติงานและส่ง TF_CONFIG ให้พวกเขา
เช่นเดียวกับไฟล์ mnist_setup.py ที่เขียนไว้ก่อนหน้านี้ นี่คือ main.py ที่พนักงานแต่ละคนจะทำงาน:
%%writefile main.py
import os
import json
import tensorflow as tf
import mnist_setup
per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])
strategy = tf.distribute.MultiWorkerMirroredStrategy()
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py
ในข้อมูลโค้ดด้านบน โปรดทราบว่า global_batch_size ซึ่งส่งผ่านไปยัง Dataset.batch ถูกตั้งค่าเป็น per_worker_batch_size * num_workers เพื่อให้แน่ใจว่าผู้ปฏิบัติงานแต่ละคนประมวลผลกลุ่มตัวอย่าง per_worker_batch_size โดยไม่คำนึงถึงจำนวนผู้ปฏิบัติงาน
ไดเร็กทอรีปัจจุบันมีทั้งไฟล์ Python:
ls *.py
main.py mnist_setup.pyตัวยึดตำแหน่ง22
ดังนั้น json-serialize TF_CONFIG และเพิ่มลงในตัวแปรสภาพแวดล้อม:
os.environ['TF_CONFIG'] = json.dumps(tf_config)
ตอนนี้คุณสามารถเปิดกระบวนการของผู้ปฏิบัติงานที่จะเรียกใช้ main.py และใช้ TF_CONFIG :
# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log
มีบางสิ่งที่ควรทราบเกี่ยวกับคำสั่งข้างต้น:
- มันใช้
%%bashซึ่งเป็น "เวทย์มนตร์" ของโน้ตบุ๊ก เพื่อเรียกใช้คำสั่งทุบตี - มันใช้แฟ
--bgเพื่อรันกระบวนการbashในพื้นหลัง เนื่องจากผู้ปฏิบัติงานนี้จะไม่ยุติการทำงาน มันรอคนงานทั้งหมดก่อนที่จะเริ่ม
กระบวนการทำงานเบื้องหลังจะไม่พิมพ์ผลลัพธ์ไปยังสมุดบันทึกนี้ ดังนั้น &> จึงเปลี่ยนเส้นทางเอาต์พุตไปยังไฟล์ เพื่อให้คุณสามารถตรวจสอบสิ่งที่เกิดขึ้นในไฟล์บันทึกได้ในภายหลัง
ดังนั้น รอสักครู่เพื่อให้กระบวนการเริ่มต้นขึ้น:
import time
time.sleep(10)
ตอนนี้ ให้ตรวจสอบสิ่งที่ส่งออกไปยังล็อกไฟล์ของพนักงาน:
cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
บรรทัดสุดท้ายของล็อกไฟล์ควรระบุว่า: Started server with target: grpc://localhost:12345 ตอนนี้ผู้ปฏิบัติงานคนแรกพร้อมแล้ว และกำลังรอผู้ปฏิบัติงานคนอื่นๆ ให้พร้อมดำเนินการต่อไป
ดังนั้นให้อัปเดต tf_config เพื่อให้กระบวนการของผู้ปฏิบัติงานคนที่สองได้รับ:
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)
เปิดตัวคนงานที่สอง การดำเนินการนี้จะเริ่มการฝึกอบรมเนื่องจากพนักงานทุกคนทำงานอยู่ (ดังนั้นจึงไม่จำเป็นต้องดำเนินการตามขั้นตอน):
python main.py
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157
Epoch 3/3
70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901
2022-02-05 02:21:16.367945: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-02-05 02:21:17.234030: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
key: "Toutput_types"
value {
list {
type: DT_FLOAT
type: DT_INT64
}
}
}
attr {
key: "_cardinality"
value {
i: 60000
}
}
attr {
key: "is_files"
value {
b: false
}
}
attr {
key: "metadata"
value {
s: "\n\024TensorSliceDataset:0"
}
}
attr {
key: "output_shapes"
value {
list {
shape {
dim {
size: 28
}
dim {
size: 28
}
}
shape {
}
}
}
}
experimental_type {
type_id: TFT_PRODUCT
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
args {
type_id: TFT_TENSOR
args {
type_id: TFT_INT64
}
}
}
}
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
args {
type_id: TFT_TENSOR
args {
type_id: TFT_INT64
}
}
}
}
}
2022-02-05 02:21:17.450972: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
ตัวยึดตำแหน่ง32หากคุณตรวจสอบบันทึกที่เขียนโดยผู้ปฏิบัติงานคนแรกอีกครั้ง คุณจะได้เรียนรู้ว่าบันทึกนั้นได้เข้าร่วมในการฝึกอบรมโมเดลดังกล่าว:
cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-02-05 02:21:17.232316: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
key: "Toutput_types"
value {
list {
type: DT_FLOAT
type: DT_INT64
}
}
}
attr {
key: "_cardinality"
value {
i: 60000
}
}
attr {
key: "is_files"
value {
b: false
}
}
attr {
key: "metadata"
value {
s: "\n\024TensorSliceDataset:0"
}
}
attr {
key: "output_shapes"
value {
list {
shape {
dim {
size: 28
}
dim {
size: 28
}
}
shape {
}
}
}
}
experimental_type {
type_id: TFT_PRODUCT
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
args {
type_id: TFT_TENSOR
args {
type_id: TFT_INT64
}
}
}
}
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
args {
type_id: TFT_TENSOR
args {
type_id: TFT_INT64
}
}
}
}
}
2022-02-05 02:21:17.457812: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157
Epoch 3/3
70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901
ไม่น่าแปลกใจเลยที่การดำเนินการนี้ ช้า กว่าการทดสอบในตอนต้นของบทช่วยสอนนี้
การเรียกใช้คนงานหลายคนในเครื่องเดียวจะเพิ่มค่าใช้จ่ายเท่านั้น
เป้าหมายที่นี่ไม่ใช่เพื่อปรับปรุงเวลาการฝึกอบรม แต่เพียงเพื่อให้ตัวอย่างการฝึกอบรมหลายคน
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.ตัวยึดตำแหน่ง36
การฝึกอบรมพนักงานหลายคนในเชิงลึก
จนถึงตอนนี้ คุณได้เรียนรู้วิธีตั้งค่าพื้นฐานสำหรับผู้ปฏิบัติงานหลายคนแล้ว
ในช่วงที่เหลือของบทช่วยสอน คุณจะได้เรียนรู้เกี่ยวกับปัจจัยอื่นๆ ที่อาจเป็นประโยชน์หรือสำคัญสำหรับกรณีการใช้งานจริงโดยละเอียด
การแบ่งกลุ่มข้อมูล
ในการฝึกอบรมผู้ปฏิบัติงานหลายคน จำเป็นต้องมีการ แบ่งกลุ่มข้อมูล เพื่อให้แน่ใจว่ามีการบรรจบกันและประสิทธิภาพ
ตัวอย่างในส่วนก่อนหน้านี้อาศัย autosharding เริ่มต้นที่จัดเตรียมโดย tf.distribute.Strategy API คุณสามารถควบคุมการแบ่งส่วนข้อมูลโดยการตั้งค่า tf.data.experimental.AutoShardPolicy ของ tf.data.experimental.DistributeOptions
หากต้องการเรียนรู้เพิ่มเติมเกี่ยวกับการชา ร์ดอัตโนมัติ โปรดดู คู่มือการป้อนข้อมูลแบบกระจาย
ต่อไปนี้คือตัวอย่างโดยย่อของวิธีปิดการแบ่งกลุ่มอัตโนมัติ เพื่อให้แต่ละแบบจำลองประมวลผลทุกตัวอย่าง ( ไม่แนะนำ ):
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
global_batch_size = 64
multi_worker_dataset = mnist_setup.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)
การประเมิน
หากคุณส่งผ่าน validation_data ไปที่ Model.fit มันจะสลับกันระหว่างการฝึกอบรมและการประเมินสำหรับแต่ละยุค การประเมินที่ใช้ validation_data จะถูกกระจายไปยังผู้ปฏิบัติงานชุดเดียวกัน และผลการประเมินจะถูกรวบรวมและพร้อมใช้งานสำหรับผู้ปฏิบัติงานทั้งหมด
เช่นเดียวกับการฝึก ชุดข้อมูลการตรวจสอบจะถูกแบ่งโดยอัตโนมัติที่ระดับไฟล์ คุณต้องตั้งค่าขนาดแบตช์ส่วนกลางในชุดข้อมูลการตรวจสอบและตั้งค่า validation_steps
ขอแนะนำให้ใช้ชุดข้อมูลซ้ำสำหรับการประเมิน
อีกวิธีหนึ่ง คุณยังสามารถสร้างงานอื่นที่อ่านจุดตรวจสอบเป็นระยะและดำเนินการประเมินได้ นี่คือสิ่งที่ Estimator ทำ แต่นี่ไม่ใช่วิธีที่แนะนำในการประเมิน ดังนั้นจึงละเว้นรายละเอียด
ประสิทธิภาพ
ตอนนี้คุณมีโมเดล Keras ที่ตั้งค่าให้ทำงานในผู้ปฏิบัติงานหลายคนด้วย MultiWorkerMirroredStrategy
ในการปรับแต่งประสิทธิภาพของการฝึกอบรมผู้ปฏิบัติงานหลายคน คุณสามารถลองทำสิ่งต่อไปนี้:
tf.distribute.MultiWorkerMirroredStrategyจัดเตรียม การใช้งานการสื่อสารแบบรวม หลายรายการ:-
RINGใช้กลุ่มตามวงแหวนโดยใช้ gRPC เป็นเลเยอร์การสื่อสารข้ามโฮสต์ -
NCCLใช้ NVIDIA Collective Communication Library เพื่อใช้งานส่วนรวม -
AUTOเลื่อนตัวเลือกไปที่รันไทม์
ทางเลือกที่ดีที่สุดของการใช้งานแบบรวมขึ้นอยู่กับจำนวนของ GPU ประเภทของ GPU และการเชื่อมต่อเครือข่ายในคลัสเตอร์ หากต้องการลบล้างตัวเลือกอัตโนมัติ ให้ระบุพารามิเตอร์
communication_optionsของ Constructor ของMultiWorkerMirroredStrategyตัวอย่างเช่น:communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)-
ส่งตัวแปรไปที่
tf.floatถ้าเป็นไปได้:- โมเดล ResNet อย่างเป็นทางการมี ตัวอย่าง วิธีการทำสิ่งนี้
ความทนทานต่อความผิดพลาด
ในการฝึกอบรมแบบซิงโครนัส คลัสเตอร์จะล้มเหลวหากพนักงานคนใดคนหนึ่งล้มเหลวและไม่มีกลไกการกู้คืนความล้มเหลวอยู่
การใช้ Keras กับ tf.distribute.Strategy มาพร้อมกับข้อดีของความทนทานต่อข้อผิดพลาดในกรณีที่คนงานเสียชีวิตหรือไม่เสถียร คุณสามารถทำได้โดยคงสถานะการฝึกไว้ในระบบไฟล์แบบกระจายที่คุณเลือก เช่น เมื่อรีสตาร์ทอินสแตนซ์ที่ล้มเหลวหรือจองไว้ก่อนหน้านี้ สถานะการฝึกจะถูกกู้คืน
เมื่อผู้ปฏิบัติงานไม่พร้อมใช้งาน ผู้ปฏิบัติงานอื่นจะล้มเหลว (อาจหลังจากหมดเวลา) ในกรณีดังกล่าว ผู้ปฏิบัติงานที่ไม่พร้อมใช้งานจำเป็นต้องเริ่มต้นใหม่ เช่นเดียวกับผู้ปฏิบัติงานอื่นที่ล้มเหลว
ModelCheckpoint โทรกลับ
การเรียกกลับ ModelCheckpoint ไม่มีฟังก์ชันความทนทานต่อข้อผิดพลาดอีกต่อไป โปรดใช้การเรียกกลับ BackupAndRestore แทน
ยังสามารถใช้การเรียกกลับ ModelCheckpoint เพื่อบันทึกจุดตรวจได้ แต่ด้วยสิ่งนี้ หากการฝึกอบรมถูกขัดจังหวะหรือเสร็จสิ้น เพื่อที่จะดำเนินการฝึกอบรมต่อจากจุดตรวจ ผู้ใช้มีหน้าที่โหลดแบบจำลองด้วยตนเอง
ผู้ใช้สามารถเลือกที่จะบันทึกและกู้คืนรุ่น/น้ำหนักภายนอกการเรียกกลับของ ModelCheckpoint
การบันทึกและการโหลดโมเดล
ในการบันทึกแบบจำลองของคุณโดยใช้ model.save หรือ tf.saved_model.save ปลายทางการบันทึกจะต้องแตกต่างกันสำหรับผู้ปฏิบัติงานแต่ละคน
- สำหรับผู้ปฏิบัติงานที่ไม่ใช่หัวหน้างาน คุณจะต้องบันทึกโมเดลลงในไดเร็กทอรีชั่วคราว
- สำหรับหัวหน้า คุณจะต้องบันทึกลงในไดเร็กทอรีโมเดลที่ให้มา
ไดเร็กทอรีชั่วคราวของผู้ปฏิบัติงานต้องไม่ซ้ำกันเพื่อป้องกันข้อผิดพลาดที่เกิดจากผู้ปฏิบัติงานหลายคนพยายามเขียนไปยังตำแหน่งเดียวกัน
แบบจำลองที่บันทึกไว้ในไดเร็กทอรีทั้งหมดนั้นเหมือนกัน และโดยทั่วไปควรอ้างอิงเฉพาะแบบจำลองที่บันทึกไว้โดยหัวหน้าเท่านั้นเพื่อการกู้คืนหรือให้บริการ
คุณควรมีตรรกะการล้างข้อมูลที่จะลบไดเร็กทอรีชั่วคราวที่สร้างโดยผู้ปฏิบัติงานเมื่อการฝึกอบรมของคุณเสร็จสิ้น
เหตุผลในการประหยัดค่าใช้จ่ายสำหรับหัวหน้าและพนักงานก็เพราะว่าคุณอาจกำลังรวมตัวแปรระหว่างจุดตรวจ ซึ่งต้องใช้ทั้งหัวหน้าและพนักงานเพื่อเข้าร่วมในโปรโตคอลการสื่อสาร allreduce ในทางกลับกัน การปล่อยให้หัวหน้าและพนักงานบันทึกลงในไดเร็กทอรีรุ่นเดียวกันจะส่งผลให้เกิดข้อผิดพลาดเนื่องจากการโต้แย้ง
การใช้ MultiWorkerMirroredStrategy โปรแกรมจะทำงานกับผู้ปฏิบัติงานทุกคน และเพื่อที่จะทราบว่าผู้ปฏิบัติงานปัจจุบันเป็นหัวหน้าหรือไม่ โปรแกรมจะใช้ประโยชน์จากอ็อบเจ็กต์ตัวแก้ไขคลัสเตอร์ที่มีแอตทริบิวต์ task_type และ task_id :
-
task_typeจะบอกคุณว่างานปัจจุบันคืออะไร (เช่น'worker') -
task_idจะบอกให้คุณทราบถึงตัวระบุของผู้ปฏิบัติงาน - ผู้ปฏิบัติงานที่มี
task_id == 0ถูกกำหนดให้เป็นหัวหน้าคนงาน
ในข้อมูลโค้ดด้านล่าง ฟังก์ชัน write_filepath จัดเตรียมพาธของไฟล์ที่จะเขียน ซึ่งขึ้นอยู่กับ task_id ของผู้ปฏิบัติงาน:
- สำหรับหัวหน้าคนงาน (ด้วย
task_id == 0) จะเขียนไปยังพาธไฟล์ดั้งเดิม - สำหรับผู้ปฏิบัติงานอื่นๆ จะสร้างไดเร็กทอรีชั่วคราว—
temp_dir— โดยมีtask_idในพาธไดเร็กทอรีที่จะเขียนใน:
model_path = '/tmp/keras-model'
def _is_chief(task_type, task_id):
# Note: there are two possible `TF_CONFIG` configuration.
# 1) In addition to `worker` tasks, a `chief` task type is use;
# in this case, this function should be modified to
# `return task_type == 'chief'`.
# 2) Only `worker` task type is used; in this case, worker 0 is
# regarded as the chief. The implementation demonstrated here
# is for this case.
# For the purpose of this Colab section, the `task_type is None` case
# is added because it is effectively run with only a single worker.
return (task_type == 'worker' and task_id == 0) or task_type is None
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)
เท่านี้คุณก็พร้อมที่จะบันทึกแล้ว:
multi_worker_model.save(write_model_path)
2022-02-05 02:21:31.809502: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them. INFO:tensorflow:Assets written to: /tmp/keras-model/assets INFO:tensorflow:Assets written to: /tmp/keras-model/assetsตัวยึดตำแหน่ง41
ตามที่อธิบายไว้ข้างต้น ในภายหลังควรโหลดโมเดลจากเส้นทางที่หัวหน้าบันทึกไว้เท่านั้น ดังนั้นเรามาลบโมเดลชั่วคราวที่คนงานที่ไม่ใช่หัวหน้าบันทึกไว้:
if not _is_chief(task_type, task_id):
tf.io.gfile.rmtree(os.path.dirname(write_model_path))
ตอนนี้ เมื่อถึงเวลาโหลด ลองใช้ tf.keras.models.load_model API ที่สะดวก และทำงานต่อไป
ในที่นี้ สมมติโดยใช้ผู้ปฏิบัติงานคนเดียวในการโหลดและดำเนินการฝึกอบรมต่อ ซึ่งในกรณีนี้ คุณจะไม่เรียก tf.keras.models.load_model ภายใน strategy.scope() อื่น (โปรดทราบว่า strategy = tf.distribute.MultiWorkerMirroredStrategy() ตามที่กำหนดไว้ก่อนหน้านี้ ):
loaded_model = tf.keras.models.load_model(model_path)
# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2 20/20 [==============================] - 1s 12ms/step - loss: 2.2949 - accuracy: 0.0492 Epoch 2/2 20/20 [==============================] - 0s 13ms/step - loss: 2.2680 - accuracy: 0.0773 <keras.callbacks.History at 0x7f6669989750>
การบันทึกและฟื้นฟูจุดตรวจ
ในทางกลับกัน จุดตรวจสอบช่วยให้คุณสามารถบันทึกน้ำหนักของแบบจำลองของคุณและเรียกคืนได้โดยไม่ต้องบันทึกทั้งแบบจำลอง
ที่นี่ คุณจะต้องสร้าง tf.train.Checkpoint ที่ติดตามโมเดล ซึ่งจัดการโดย tf.train.CheckpointManager เพื่อให้คงไว้เฉพาะจุดตรวจล่าสุดเท่านั้น:
checkpoint_dir = '/tmp/ckpt'
checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
เมื่อตั้งค่า CheckpointManager แล้ว คุณก็พร้อมที่จะบันทึกและลบจุดตรวจที่ผู้ปฏิบัติงานที่ไม่ใช่หัวหน้าบันทึกไว้:
checkpoint_manager.save()
if not _is_chief(task_type, task_id):
tf.io.gfile.rmtree(write_checkpoint_dir)
ตอนนี้ เมื่อคุณต้องการคืนค่าโมเดล คุณสามารถค้นหาจุดตรวจล่าสุดที่บันทึกไว้โดยใช้ฟังก์ชัน tf.train.latest_checkpoint ที่สะดวก หลังจากฟื้นฟูจุดตรวจแล้วคุณสามารถฝึกต่อได้
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2022-02-05 02:21:33.584421: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
key: "Toutput_types"
value {
list {
type: DT_FLOAT
type: DT_INT64
}
}
}
attr {
key: "_cardinality"
value {
i: 60000
}
}
attr {
key: "is_files"
value {
b: false
}
}
attr {
key: "metadata"
value {
s: "\n\024TensorSliceDataset:5"
}
}
attr {
key: "output_shapes"
value {
list {
shape {
dim {
size: 28
}
dim {
size: 28
}
}
shape {
}
}
}
}
experimental_type {
type_id: TFT_PRODUCT
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
args {
type_id: TFT_TENSOR
args {
type_id: TFT_INT64
}
}
}
}
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
args {
type_id: TFT_TENSOR
args {
type_id: TFT_INT64
}
}
}
}
}
Epoch 1/2
2022-02-05 02:21:33.803317: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
20/20 [==============================] - 3s 13ms/step - loss: 2.2970 - accuracy: 0.0547
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2690 - accuracy: 0.0938
<keras.callbacks.History at 0x7f6669589850>
BackupAndRestore โทรกลับ
การเรียกกลับ tf.keras.callbacks.BackupAndRestore จัดเตรียมฟังก์ชันความทนทานต่อข้อผิดพลาด โดยการสำรองข้อมูลรุ่นและหมายเลขยุคปัจจุบันในไฟล์จุดตรวจสอบชั่วคราวภายใต้อาร์กิวเมนต์ backup_dir ไปยัง BackupAndRestore สิ่งนี้จะทำในตอนท้ายของแต่ละยุค
เมื่องานถูกขัดจังหวะและเริ่มต้นใหม่ การเรียกกลับจะคืนค่าจุดตรวจสุดท้าย และการฝึกอบรมจะดำเนินต่อไปตั้งแต่เริ่มต้นยุคที่ถูกขัดจังหวะ การฝึกบางส่วนที่ทำไปแล้วในยุคที่ยังไม่เสร็จก่อนการหยุดชะงักจะถูกโยนทิ้งไป เพื่อไม่ให้กระทบต่อสถานะของโมเดลขั้นสุดท้าย
หากต้องการใช้งาน ให้ระบุอินสแตนซ์ของ tf.keras.callbacks.BackupAndRestore ที่การโทร Model.fit
ด้วย MultiWorkerMirroredStrategy หากผู้ปฏิบัติงานถูกขัดจังหวะ ทั้งคลัสเตอร์จะหยุดชั่วคราวจนกว่าผู้ปฏิบัติงานที่ถูกขัดจังหวะจะเริ่มต้นใหม่ ผู้ปฏิบัติงานคนอื่นๆ จะรีสตาร์ทด้วย และผู้ปฏิบัติงานที่ถูกขัดจังหวะจะเข้าร่วมคลัสเตอร์อีกครั้ง จากนั้น พนักงานทุกคนจะอ่านไฟล์จุดตรวจสอบที่บันทึกไว้ก่อนหน้านี้และเลือกสถานะเดิม ซึ่งจะทำให้คลัสเตอร์กลับมาซิงค์กันได้ จากนั้นการฝึกอบรมจะดำเนินต่อไป
การเรียกกลับของ BackupAndRestore ใช้ CheckpointManager เพื่อบันทึกและกู้คืนสถานะการฝึก ซึ่งจะสร้างไฟล์ที่เรียกว่าจุดตรวจสอบ ซึ่งติดตามจุดตรวจสอบที่มีอยู่พร้อมกับจุดตรวจสอบล่าสุด ด้วยเหตุผลนี้ ไม่ควรใช้ backup_dir ซ้ำเพื่อจัดเก็บจุดตรวจอื่นๆ เพื่อหลีกเลี่ยงความขัดแย้งของชื่อ
ในปัจจุบัน การเรียกกลับของ BackupAndRestore รองรับการฝึกอบรมพนักงานคนเดียวโดยไม่มีกลยุทธ์— MirroredStrategy — และการฝึกอบรมหลายคนด้วย MultiWorkerMirroredStrategy
ด้านล่างนี้คือตัวอย่างสองตัวอย่างสำหรับการฝึกอบรมทั้งแบบหลายคนและคนเดียว:
# Multi-worker training with `MultiWorkerMirroredStrategy`
# and the `BackupAndRestore` callback.
callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
epochs=3,
steps_per_epoch=70,
callbacks=callbacks)
2022-02-05 02:21:37.063622: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
key: "Toutput_types"
value {
list {
type: DT_FLOAT
type: DT_INT64
}
}
}
attr {
key: "_cardinality"
value {
i: 60000
}
}
attr {
key: "is_files"
value {
b: false
}
}
attr {
key: "metadata"
value {
s: "\n\024TensorSliceDataset:5"
}
}
attr {
key: "output_shapes"
value {
list {
shape {
dim {
size: 28
}
dim {
size: 28
}
}
shape {
}
}
}
}
experimental_type {
type_id: TFT_PRODUCT
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
args {
type_id: TFT_TENSOR
args {
type_id: TFT_INT64
}
}
}
}
args {
type_id: TFT_DATASET
args {
type_id: TFT_PRODUCT
args {
type_id: TFT_TENSOR
args {
type_id: TFT_FLOAT
}
}
args {
type_id: TFT_TENSOR
args {
type_id: TFT_INT64
}
}
}
}
}
Epoch 1/3
70/70 [==============================] - 3s 13ms/step - loss: 2.2667 - accuracy: 0.2123
Epoch 2/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1925 - accuracy: 0.4509
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1057 - accuracy: 0.5614
<keras.callbacks.History at 0x7f6669555d90>
หากคุณตรวจสอบไดเร็กทอรีของ backup_dir ที่คุณระบุไว้ใน BackupAndRestore คุณอาจสังเกตเห็นไฟล์จุดตรวจสอบที่สร้างขึ้นชั่วคราว ไฟล์เหล่านี้จำเป็นสำหรับการกู้คืนอินสแตนซ์ที่หายไปก่อนหน้านี้ และไลบรารีจะถูกลบออกเมื่อสิ้นสุด Model.fit เมื่อคุณออกจากการฝึกสำเร็จ
แหล่งข้อมูลเพิ่มเติม
- คู่มือ การฝึกอบรมแบบกระจายใน TensorFlow จะให้ภาพรวมของกลยุทธ์การจัดจำหน่ายที่มีอยู่
- ลูปการฝึกแบบกำหนดเองด้วย Keras และ บทช่วยสอน MultiWorkerMirroredStrategy จะแสดงวิธีใช้
MultiWorkerMirroredStrategyกับ Keras และลูปการฝึกแบบกำหนดเอง - ตรวจสอบ รุ่นอย่างเป็นทางการ ซึ่งหลายรุ่น สามารถกำหนดค่าให้เรียกใช้กลยุทธ์การจัดจำหน่ายได้หลายแบบ
- คู่มือประสิทธิภาพที่ ดีขึ้นด้วย tf.function ให้ข้อมูลเกี่ยวกับกลยุทธ์และเครื่องมืออื่นๆ เช่น TensorFlow Profiler ที่คุณสามารถใช้เพื่อเพิ่มประสิทธิภาพการทำงานของโมเดล TensorFlow ของคุณ
ดูบน TensorFlow.org
ทำงานใน Google Colab
ดูแหล่งที่มาบน GitHub
ดาวน์โหลดโน๊ตบุ๊ค