In [None]:
!pip install --user kubeflow
!pip install --user git+https://github.com/kubeflow/training-operator.git#subdirectory=sdk/python

In [None]:
def train_pytorch_model(parameters):
    import logging
    import os

    import torch
    import torch.distributed as dist
    import torch.nn.functional as F
    from torch import nn
    from torch.utils.data import DistributedSampler
    from torchvision import datasets, transforms

    logging.basicConfig(
        format="%(asctime)s %(levelname)-8s %(message)s",
        datefmt="%Y-%m-%dT%H:%M:%SZ",
        level=logging.INFO,
    )

    # Create PyTorch CNN Model.
    class Net(nn.Module):
        def __init__(self):
            super(Net, self).__init__()
            self.conv1 = nn.Conv2d(1, 20, 5, 1)
            self.conv2 = nn.Conv2d(20, 50, 5, 1)
            self.fc1 = nn.Linear(4 * 4 * 50, 500)
            self.fc2 = nn.Linear(500, 10)

        def forward(self, x):
            x = F.relu(self.conv1(x))
            x = F.max_pool2d(x, 2, 2)
            x = F.relu(self.conv2(x))
            x = F.max_pool2d(x, 2, 2)
            x = x.view(-1, 4 * 4 * 50)
            x = F.relu(self.fc1(x))
            x = self.fc2(x)
            return F.log_softmax(x, dim=1)

    # Get dist parameters.
    # Kubeflow Training Operator automatically set appropriate RANK and WORLD_SIZE based on the configuration.
    RANK = int(os.environ["RANK"])
    WORLD_SIZE = int(os.environ["WORLD_SIZE"])

    # IF GPU is available, nccl dist backend is used. Otherwise, gloo dist backend is used.
    if torch.cuda.is_available():
        device = "cuda"
        backend = "nccl"
    else:
        device = "cpu"
        backend = "gloo"

    logging.info(f"Using Device: {device}, Backend: {backend}")

    model = Net()
    # Attach model to the device.
    model = model.to(device)

    # Attach model to DistributedDataParallel strategy.
    dist.init_process_group(backend="gloo", rank=RANK, world_size=WORLD_SIZE)
    Distributor = nn.parallel.DistributedDataParallel
    model = Distributor(model)

    # Get Fashion MNIST Dataset.
    dataset = datasets.FashionMNIST(
        "./data",
        train=True,
        download=True,
        transform=transforms.Compose([transforms.ToTensor()]),
    )

    # Every PyTorchJob worker gets distributed sampler of dataset.
    train_loader = torch.utils.data.DataLoader(
        dataset,
        batch_size=128,
        sampler=DistributedSampler(dataset),
    )

    # Start Training.
    logging.info(f"Start training for RANK: {RANK}. WORLD_SIZE: {WORLD_SIZE}")

    for epoch in range(int(parameters["NUM_EPOCHS"])):
        model.train()
        optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.5)

        for batch_idx, (data, target) in enumerate(train_loader):
            # Attach tensors to the device.
            data = data.to(device)
            target = target.to(device)

            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            loss.backward()
            optimizer.step()
            if batch_idx % 10 == 0:
                logging.info(
                    "Train Epoch: {} [{}/{} ({:.0f}%)]\tloss={:.4f}".format(
                        epoch,
                        batch_idx * len(data),
                        len(train_loader.dataset),
                        100.0 * batch_idx / len(train_loader),
                        loss.item(),
                    )
                )

    logging.info("Training is finished")

In [None]:
# Set dist env variables to run the above training locally on the Notebook.
import os

os.environ["RANK"] = "0"
os.environ["WORLD_SIZE"] = "1"
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "1234"

# Train Model locally in the Notebook.
train_pytorch_model({"NUM_EPOCHS": "1"})

In [None]:
from kubeflow.training import TrainingClient, constants
import time

# Start PyTorchJob Training.
pytorchjob_name = "train-pytorch"

# Since we set `job_kind = PyTorchJob` APIs are going to use PyTorchJob as a default Job kind.
training_client = TrainingClient(job_kind=constants.PYTORCHJOB_KIND)

training_client.create_job(
    name=pytorchjob_name,
    train_func=train_pytorch_model,
    parameters={"NUM_EPOCHS": "3"}, # Input parameters for the train function.
    num_worker_replicas=3,  # How many PyTorch Workers will be created.
)
time.sleep(5)

In [None]:
print(f"PyTorchJob Status: {training_client.is_job_running(name=pytorchjob_name)}")

In [None]:
training_client.get_job_pod_names(pytorchjob_name)

In [None]:
import time
time.sleep(120)
logs = training_client.get_job_logs(pytorchjob_name)
print(logs)

In [None]:
training_client.delete_job(pytorchjob_name)