TensorFlow.org-এ দেখুন | Google Colab-এ চালান | GitHub-এ উৎস দেখুন | নোটবুক ডাউনলোড করুন |
ওভারভিউ
এই টিউটোরিয়ালটি কাস্টম ট্রেনিং লুপ API সহ মাল্টি-ওয়ার্কার প্রশিক্ষণ প্রদর্শন করে, মাল্টিওয়ার্কার মিররড স্ট্র্যাটেজির মাধ্যমে বিতরণ করা হয়েছে, তাই একক-কর্মীর উপর চালানোর জন্য ডিজাইন করা কেরাস মডেলটি ন্যূনতম কোড পরিবর্তন সহ একাধিক কর্মীদের উপর নির্বিঘ্নে কাজ করতে পারে।
আমরা আমাদের মডেলকে প্রশিক্ষণ দেওয়ার জন্য কাস্টম প্রশিক্ষণ লুপ ব্যবহার করছি কারণ তারা আমাদের নমনীয়তা দেয় এবং প্রশিক্ষণের উপর আরও বেশি নিয়ন্ত্রণ দেয়। তাছাড়া, মডেল এবং প্রশিক্ষণ লুপ ডিবাগ করা সহজ। স্ক্র্যাচ থেকে একটি প্রশিক্ষণ লুপ লেখাতে আরও বিশদ তথ্য পাওয়া যায়।
আপনি যদি keras model.fit এর সাথে MultiWorkerMirroredStrategy কীভাবে ব্যবহার করবেন তা খুঁজছেন তবে পরিবর্তে এই টিউটোরিয়ালটি পড়ুন।
যারা tf.distribute.Strategy APIs সম্পর্কে গভীরভাবে বুঝতে আগ্রহী তাদের জন্য TensorFlow সমর্থন করে এমন বিতরণ কৌশলগুলির একটি সংক্ষিপ্ত বিবরণের জন্য TensorFlow গাইডে বিতরণকৃত প্রশিক্ষণ উপলব্ধ।
সেটআপ
প্রথমত, কিছু প্রয়োজনীয় আমদানি।
import json
import os
import sys
TensorFlow আমদানি করার আগে, পরিবেশে কিছু পরিবর্তন করুন।
সমস্ত GPU নিষ্ক্রিয় করুন। এটি একই GPU ব্যবহার করার চেষ্টাকারী কর্মীদের দ্বারা সৃষ্ট ত্রুটিগুলিকে প্রতিরোধ করে৷ একটি বাস্তব প্রয়োগের জন্য প্রতিটি কর্মী একটি ভিন্ন মেশিনে থাকবে।
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
TF_CONFIG এনভায়রনমেন্ট ভেরিয়েবল রিসেট করুন, আপনি এই সম্পর্কে পরে আরও দেখতে পাবেন।
os.environ.pop('TF_CONFIG', None)
নিশ্চিত করুন যে বর্তমান ডিরেক্টরিটি পাইথনের পথে রয়েছে। এটি %%writefile দ্বারা লিখিত ফাইলগুলি আমদানি করার অনুমতি দেয়।
if '.' not in sys.path:
sys.path.insert(0, '.')
এখন TensorFlow আমদানি করুন।
import tensorflow as tf
ডেটাসেট এবং মডেল সংজ্ঞা
এরপর একটি সাধারণ মডেল এবং ডেটাসেট সেটআপ সহ একটি mnist.py ফাইল তৈরি করুন। এই টিউটোরিয়ালটিতে কর্মী-প্রক্রিয়াগুলি দ্বারা এই পাইথন ফাইলটি ব্যবহার করা হবে:
%%writefile mnist.py
import os
import tensorflow as tf
import numpy as np
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the range [0, 255].
# You need to convert them to float32 with values in the range [0, 1]
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000)
return train_dataset
def dataset_fn(global_batch_size, input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = mnist_dataset(batch_size)
dataset = dataset.shard(input_context.num_input_pipelines,
input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
return dataset
def build_cnn_model():
return tf.keras.Sequential([
tf.keras.Input(shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
Writing mnist.py
মাল্টি-কর্মী কনফিগারেশন
এখন মাল্টি-কর্মী প্রশিক্ষণের জগতে প্রবেশ করা যাক। TensorFlow-এ, একাধিক মেশিনে প্রশিক্ষণের জন্য TF_CONFIG এনভায়রনমেন্ট ভেরিয়েবল প্রয়োজন, যার প্রতিটিরই সম্ভবত আলাদা ভূমিকা রয়েছে। নীচে ব্যবহৃত TF_CONFIG হল একটি JSON স্ট্রিং যা ক্লাস্টারের অংশ প্রতিটি কর্মীর উপর ক্লাস্টার কনফিগারেশন নির্দিষ্ট করতে ব্যবহৃত হয়। এটি একটি ক্লাস্টার নির্দিষ্ট করার জন্য ডিফল্ট পদ্ধতি, cluster_resolver.TFConfigClusterResolver ব্যবহার করে, কিন্তু distribute.cluster_resolver মডিউলে অন্যান্য বিকল্প উপলব্ধ রয়েছে।
আপনার ক্লাস্টার বর্ণনা করুন
এখানে একটি উদাহরণ কনফিগারেশন আছে:
tf_config = {
'cluster': {
'worker': ['localhost:12345', 'localhost:23456']
},
'task': {'type': 'worker', 'index': 0}
}
এখানে একটি JSON স্ট্রিং হিসাবে একই TF_CONFIG সিরিয়াল করা হয়েছে:
json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'
TF_CONFIG এর দুটি উপাদান রয়েছে: cluster এবং task ।
clusterসকল কর্মীদের জন্য একই এবং প্রশিক্ষণ ক্লাস্টার সম্পর্কে তথ্য প্রদান করে, যেটি বিভিন্ন ধরণের কাজ যেমনworkerসমন্বিত একটি ডিক্ট।MultiWorkerMirroredStrategyএর সাথে মাল্টি-ওয়ার্কার প্রশিক্ষণে, সাধারণত একজনworkerথাকে যে একজন নিয়মিতworkerযা করে তার পাশাপাশি টেনসরবোর্ডের জন্য চেকপয়েন্ট সংরক্ষণ এবং সারাংশ ফাইল লেখার মতো একটু বেশি দায়িত্ব নেয়। এই ধরনের একজন কর্মীকেchiefকর্মী হিসাবে উল্লেখ করা হয়, এবং এটি প্রথাগত যেindex0 সহworkerপ্রধানworkerহিসাবে নিযুক্ত করা হয় (আসলে এইভাবেtf.distribute.Strategyপ্রয়োগ করা হয়)।taskবর্তমান কাজের তথ্য প্রদান করে এবং প্রতিটি কর্মীর জন্য আলাদা। এটি সেই কর্মীরtypeএবংindexনির্দিষ্ট করে।
এই উদাহরণে, আপনি টাস্ক type "worker" এবং টাস্ক index 0 এ সেট করুন। এই মেশিনটিই প্রথম কর্মী এবং প্রধান কর্মী হিসেবে নিযুক্ত হবেন এবং অন্যদের চেয়ে বেশি কাজ করবেন। মনে রাখবেন যে অন্যান্য মেশিনগুলিতেও TF_CONFIG এনভায়রনমেন্ট ভেরিয়েবল সেট থাকতে হবে, এবং এটিতে একই cluster ডিক্ট থাকা উচিত, কিন্তু সেই মেশিনগুলির ভূমিকা কী তার উপর নির্ভর করে ভিন্ন টাস্ক type বা টাস্ক index ।
উদাহরণের উদ্দেশ্যে, এই টিউটোরিয়ালটি দেখায় কিভাবে একজন localhost 2 জন কর্মী নিয়ে একটি TF_CONFIG সেট করতে পারে। অনুশীলনে, ব্যবহারকারীরা বাহ্যিক IP ঠিকানা/পোর্টে একাধিক কর্মী তৈরি করবে এবং প্রতিটি কর্মীকে যথাযথভাবে TF_CONFIG সেট করবে।
এই উদাহরণে আপনি 2 জন কর্মী ব্যবহার করবেন, প্রথম কর্মীর TF_CONFIG উপরে দেখানো হয়েছে। দ্বিতীয় কর্মীর জন্য আপনি tf_config['task']['index']=1 সেট করবেন
উপরে, tf_config একটি স্থানীয় পরিবর্তনশীল। আসলে প্রশিক্ষণ কনফিগার করতে এটি ব্যবহার করার জন্য, এই অভিধানটিকে JSON হিসাবে সিরিয়াল করা দরকার এবং TF_CONFIG এনভায়রনমেন্ট ভেরিয়েবলে স্থাপন করা দরকার।
নোটবুকগুলিতে পরিবেশের পরিবর্তনশীল এবং সাবপ্রসেস
সাবপ্রসেসগুলি তাদের পিতামাতার কাছ থেকে এনভায়রনমেন্ট ভেরিয়েবলের উত্তরাধিকারী হয়। সুতরাং আপনি যদি এই jupyter notebook প্রক্রিয়াতে একটি পরিবেশ পরিবর্তনশীল সেট করেন:
os.environ['GREETINGS'] = 'Hello TensorFlow!'
আপনি একটি সাবপ্রসেস থেকে পরিবেশ পরিবর্তনশীল অ্যাক্সেস করতে পারেন:
echo ${GREETINGS}
Hello TensorFlow!
পরবর্তী বিভাগে, আপনি TF_CONFIG কর্মী সাবপ্রসেসে পাস করতে এটি ব্যবহার করবেন। আপনি সত্যিই এইভাবে আপনার কাজগুলি কখনই চালু করবেন না, তবে এই টিউটোরিয়ালের উদ্দেশ্যে এটি যথেষ্ট: একটি ন্যূনতম বহু-কর্মী উদাহরণ প্রদর্শন করার জন্য।
মাল্টিওয়ার্কার মিররড স্ট্র্যাটেজি
মডেলটি প্রশিক্ষণের জন্য, tf.distribute.MultiWorkerMirroredStrategy এর একটি উদাহরণ ব্যবহার করুন, যা সমস্ত কর্মী জুড়ে প্রতিটি ডিভাইসে মডেলের স্তরগুলিতে সমস্ত ভেরিয়েবলের অনুলিপি তৈরি করে৷ tf.distribute.Strategy এই কৌশল সম্পর্কে আরও বিশদ রয়েছে।
strategy = tf.distribute.MultiWorkerMirroredStrategy()
2022-01-26 05:35:39.353025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
2022-01-26 05:35:39.353298: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
আপনার মডেল তৈরি করার সময় একটি কৌশল ব্যবহার করা উচিত তা নির্দিষ্ট করতে tf.distribute.Strategy.scope ব্যবহার করুন। এটি আপনাকে এই কৌশলটির জন্য " ক্রস-প্রতিলিপি প্রসঙ্গে " রাখে, যার অর্থ কৌশলটি পরিবর্তনশীল স্থান নির্ধারণের মতো জিনিসগুলির নিয়ন্ত্রণে রাখা হয়।
import mnist
with strategy.scope():
# Model building needs to be within `strategy.scope()`.
multi_worker_model = mnist.build_cnn_model()
কর্মীদের জুড়ে আপনার ডেটা স্বয়ংক্রিয়ভাবে ভাগ করুন
বহু-কর্মী প্রশিক্ষণে, ডেটাসেট শার্ডিংয়ের প্রয়োজন হয় না, তবে এটি আপনাকে ঠিক-একবার শব্দার্থবিদ্যা দেয় যা আরও প্রশিক্ষণকে আরও পুনরুত্পাদনযোগ্য করে তোলে, অর্থাৎ একাধিক কর্মীদের প্রশিক্ষণ একজন শ্রমিকের প্রশিক্ষণের মতোই হওয়া উচিত। দ্রষ্টব্য: কিছু ক্ষেত্রে কর্মক্ষমতা প্রভাবিত হতে পারে।
দেখুন: distribute_datasets_from_function
per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers
with strategy.scope():
multi_worker_dataset = strategy.distribute_datasets_from_function(
lambda input_context: mnist.dataset_fn(global_batch_size, input_context))
কাস্টম ট্রেনিং লুপ সংজ্ঞায়িত করুন এবং মডেলটিকে প্রশিক্ষণ দিন
একটি অপ্টিমাইজার নির্দিষ্ট করুন
with strategy.scope():
# The creation of optimizer and train_accuracy will need to be in
# `strategy.scope()` as well, since they create variables.
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
tf.function সহ একটি প্রশিক্ষণ ধাপ সংজ্ঞায়িত করুন
@tf.function
def train_step(iterator):
"""Training step function."""
def step_fn(inputs):
"""Per-Replica step function."""
x, y = inputs
with tf.GradientTape() as tape:
predictions = multi_worker_model(x, training=True)
per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
loss = tf.nn.compute_average_loss(
per_batch_loss, global_batch_size=global_batch_size)
grads = tape.gradient(loss, multi_worker_model.trainable_variables)
optimizer.apply_gradients(
zip(grads, multi_worker_model.trainable_variables))
train_accuracy.update_state(y, predictions)
return loss
per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
return strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
চেকপয়েন্ট সংরক্ষণ এবং পুনরুদ্ধার
একটি কাস্টম ট্রেনিং লুপে চেকপয়েন্টিং বাস্তবায়নের জন্য ব্যবহারকারীকে কেরাস কলব্যাক ব্যবহার করার পরিবর্তে এটি পরিচালনা করতে হবে। এটি আপনাকে মডেলের ওজন সংরক্ষণ করতে এবং পুরো মডেলটি সংরক্ষণ না করেই সেগুলি পুনরুদ্ধার করতে দেয়।
from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')
def _is_chief(task_type, task_id, cluster_spec):
return (task_type is None
or task_type == 'chief'
or (task_type == 'worker'
and task_id == 0
and "chief" not in cluster_spec.as_dict()))
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id, cluster_spec):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id, cluster_spec):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
এখানে, আপনি একটি tf.train.Checkpoint তৈরি করবেন যা মডেলটিকে ট্র্যাক করে, যেটি একটি tf.train.CheckpointManager দ্বারা পরিচালিত হয় যাতে শুধুমাত্র সর্বশেষ চেকপয়েন্টটি সংরক্ষিত থাকে৷
epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64),
name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
# We normally don't need to manually instantiate a ClusterSpec, but in this
# illustrative example we did not set TF_CONFIG before initializing the
# strategy. See the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])
checkpoint = tf.train.Checkpoint(
model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
এখন, যখন আপনাকে পুনরুদ্ধার করতে হবে, আপনি সুবিধাজনক tf.train.latest_checkpoint ফাংশন ব্যবহার করে সংরক্ষিত সর্বশেষ চেকপয়েন্টটি খুঁজে পেতে পারেন।
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
checkpoint.restore(latest_checkpoint)
চেকপয়েন্ট পুনরুদ্ধার করার পরে, আপনি আপনার কাস্টম প্রশিক্ষণ লুপ প্রশিক্ষণ চালিয়ে যেতে পারেন।
num_epochs = 3
num_steps_per_epoch = 70
while epoch.numpy() < num_epochs:
iterator = iter(multi_worker_dataset)
total_loss = 0.0
num_batches = 0
while step_in_epoch.numpy() < num_steps_per_epoch:
total_loss += train_step(iterator)
num_batches += 1
step_in_epoch.assign_add(1)
train_loss = total_loss / num_batches
print('Epoch: %d, accuracy: %f, train_loss: %f.'
%(epoch.numpy(), train_accuracy.result(), train_loss))
train_accuracy.reset_states()
# Once the `CheckpointManager` is set up, you're now ready to save, and remove
# the checkpoints non-chief workers saved.
checkpoint_manager.save()
if not _is_chief(task_type, task_id, cluster_spec):
tf.io.gfile.rmtree(write_checkpoint_dir)
epoch.assign_add(1)
step_in_epoch.assign(0)
2022-01-26 05:35:40.200068: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch: 0, accuracy: 0.849107, train_loss: 0.491886. Epoch: 1, accuracy: 0.937835, train_loss: 0.197650. Epoch: 2, accuracy: 0.963170, train_loss: 0.129683.
কর্মীদের উপর সম্পূর্ণ কোড সেটআপ
আসলে MultiWorkerMirroredStrategy সাথে চালানোর জন্য আপনাকে কর্মী প্রক্রিয়া চালাতে হবে এবং তাদের কাছে একটি TF_CONFIG হবে।
আগে লেখা mnist.py ফাইলের মতো, এখানে main.py একই কোড রয়েছে যা আমরা এই কোল্যাবে আগে ধাপে ধাপে দেখেছি, আমরা এটিকে একটি ফাইলে লিখছি যাতে প্রতিটি কর্মী এটি চালাতে পারে:
ফাইল: main.py
%%writefile main.py
import os
import json
import tensorflow as tf
import mnist
from multiprocessing import util
per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers
num_epochs = 3
num_steps_per_epoch=70
# Checkpoint saving and restoring
def _is_chief(task_type, task_id, cluster_spec):
return (task_type is None
or task_type == 'chief'
or (task_type == 'worker'
and task_id == 0
and 'chief' not in cluster_spec.as_dict()))
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id, cluster_spec):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id, cluster_spec):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')
# Define Strategy
strategy = tf.distribute.MultiWorkerMirroredStrategy()
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist.build_cnn_model()
multi_worker_dataset = strategy.distribute_datasets_from_function(
lambda input_context: mnist.dataset_fn(global_batch_size, input_context))
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
@tf.function
def train_step(iterator):
"""Training step function."""
def step_fn(inputs):
"""Per-Replica step function."""
x, y = inputs
with tf.GradientTape() as tape:
predictions = multi_worker_model(x, training=True)
per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
loss = tf.nn.compute_average_loss(
per_batch_loss, global_batch_size=global_batch_size)
grads = tape.gradient(loss, multi_worker_model.trainable_variables)
optimizer.apply_gradients(
zip(grads, multi_worker_model.trainable_variables))
train_accuracy.update_state(y, predictions)
return loss
per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
return strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64),
name='step_in_epoch')
task_type, task_id, cluster_spec = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id,
strategy.cluster_resolver.cluster_spec())
checkpoint = tf.train.Checkpoint(
model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
# Restoring the checkpoint
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
checkpoint.restore(latest_checkpoint)
# Resume our CTL training
while epoch.numpy() < num_epochs:
iterator = iter(multi_worker_dataset)
total_loss = 0.0
num_batches = 0
while step_in_epoch.numpy() < num_steps_per_epoch:
total_loss += train_step(iterator)
num_batches += 1
step_in_epoch.assign_add(1)
train_loss = total_loss / num_batches
print('Epoch: %d, accuracy: %f, train_loss: %f.'
%(epoch.numpy(), train_accuracy.result(), train_loss))
train_accuracy.reset_states()
checkpoint_manager.save()
if not _is_chief(task_type, task_id, cluster_spec):
tf.io.gfile.rmtree(write_checkpoint_dir)
epoch.assign_add(1)
step_in_epoch.assign(0)
Writing main.py
ট্রেন এবং মূল্যায়ন
বর্তমান ডিরেক্টরিতে এখন উভয় পাইথন ফাইল রয়েছে:
ls *.py
main.py mnist.py
তাই TF_CONFIG-কে TF_CONFIG ক্রমিক করুন এবং পরিবেশের ভেরিয়েবলে যোগ করুন:
os.environ['TF_CONFIG'] = json.dumps(tf_config)
এখন, আপনি একটি কর্মী প্রক্রিয়া চালু করতে পারেন যা TF_CONFIG চালাবে এবং main.py ব্যবহার করবে:
# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log
উপরের কমান্ড সম্পর্কে নোট করার জন্য কয়েকটি জিনিস রয়েছে:
- এটি
%%bashব্যবহার করে যা কিছু ব্যাশ কমান্ড চালানোর জন্য একটি নোটবুক "জাদু" । - এটি ব্যাকগ্রাউন্ডে
bashপ্রক্রিয়া চালানোর জন্য--bgপতাকা ব্যবহার করে, কারণ এই কর্মী শেষ হবে না। এটি শুরু হওয়ার আগে সমস্ত কর্মীদের জন্য অপেক্ষা করে।
ব্যাকগ্রাউন্ডেড ওয়ার্কার প্রসেস এই নোটবুকে আউটপুট প্রিন্ট করবে না, তাই &> তার আউটপুটকে একটি ফাইলে রিডাইরেক্ট করে, যাতে আপনি দেখতে পারেন কি হয়েছে।
সুতরাং, প্রক্রিয়াটি শুরু হওয়ার জন্য কয়েক সেকেন্ড অপেক্ষা করুন:
import time
time.sleep(20)
এখন পর্যন্ত কর্মীর লগফাইলে কী আউটপুট হয়েছে তা দেখুন:
cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
লগ ফাইলের শেষ লাইনে বলা উচিত: Started server with target: grpc://localhost:12345 । প্রথম কর্মী এখন প্রস্তুত, এবং অন্য সমস্ত কর্মী(গুলি) এগিয়ে যাওয়ার জন্য প্রস্তুত হওয়ার জন্য অপেক্ষা করছে৷
সুতরাং দ্বিতীয় কর্মীর প্রক্রিয়া বাছাই করার জন্য tf_config আপডেট করুন:
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)
এখন দ্বিতীয় কর্মী চালু করুন। এটি প্রশিক্ষণ শুরু করবে যেহেতু সমস্ত কর্মী সক্রিয় রয়েছে (তাই এই প্রক্রিয়াটির ব্যাকগ্রাউন্ড করার দরকার নেই):
python main.py > /dev/null 2>&1
এখন আপনি যদি প্রথম কর্মী দ্বারা লিখিত লগগুলি পুনরায় পরীক্ষা করেন তবে আপনি দেখতে পাবেন যে এটি সেই মডেলের প্রশিক্ষণে অংশগ্রহণ করেছে:
cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration 2022-01-26 05:36:10.343173: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch: 0, accuracy: 0.832589, train_loss: 0.531260. Epoch: 1, accuracy: 0.936161, train_loss: 0.214774. Epoch: 2, accuracy: 0.958594, train_loss: 0.140772.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.
গভীরভাবে বহু কর্মী প্রশিক্ষণ
এই টিউটোরিয়ালটি মাল্টি-ওয়ার্কার সেটআপের একটি Custom Training Loop ওয়ার্কফ্লো প্রদর্শন করেছে। মাল্টি-ওয়ার্কার সেটআপের model.fit's guide ক্ষেত্রে প্রযোজ্য।
আরো দেখুন
- TensorFlow গাইডে বিতরণকৃত প্রশিক্ষণ উপলব্ধ বিতরণ কৌশলগুলির একটি ওভারভিউ প্রদান করে।
- অফিসিয়াল মডেল , যার মধ্যে অনেকগুলি একাধিক বিতরণ কৌশল চালানোর জন্য কনফিগার করা যেতে পারে।
- গাইডের পারফরম্যান্স বিভাগটি আপনার টেনসরফ্লো মডেলের পারফরম্যান্স অপ্টিমাইজ করতে আপনি ব্যবহার করতে পারেন এমন অন্যান্য কৌশল এবং সরঞ্জাম সম্পর্কে তথ্য সরবরাহ করে।
TensorFlow.org-এ দেখুন
Google Colab-এ চালান
GitHub-এ উৎস দেখুন
নোটবুক ডাউনলোড করুন