Skip to content
This repository was archived by the owner on Feb 3, 2021. It is now read-only.
Open
18 changes: 13 additions & 5 deletions aztk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@ def __delete_pool_and_job(self, pool_id: str, keep_logs: bool = False):

return job_exists or pool_exists

def __create_pool_and_job(self, cluster_conf: models.ClusterConfiguration, software_metadata_key: str, start_task, VmImageModel):
def __create_pool_and_job(self, cluster_conf: models.ClusterConfiguration, software_metadata_key: str, start_task, vm_image_model):
"""
Create a pool and job
:param cluster_conf: the configuration object used to create the cluster
:type cluster_conf: aztk.models.ClusterConfiguration
:parm software_metadata_key: the id of the software being used on the cluster
:param start_task: the start task for the cluster
:param VmImageModel: the type of image to provision for the cluster
:param vm_image_model: the type of image to provision for the cluster
:param wait: wait until the cluster is ready
"""
self._get_cluster_data(cluster_conf.cluster_id).save_cluster_config(cluster_conf)
Expand All @@ -85,7 +85,7 @@ def __create_pool_and_job(self, cluster_conf: models.ClusterConfiguration, softw
# Get a verified node agent sku
sku_to_use, image_ref_to_use = \
helpers.select_latest_verified_vm_image_with_node_agent_sku(
VmImageModel.publisher, VmImageModel.offer, VmImageModel.sku, self.batch_client)
vm_image_model.publisher, vm_image_model.offer, vm_image_model.sku, self.batch_client)

network_conf = None
if cluster_conf.subnet_id is not None:
Expand All @@ -99,7 +99,11 @@ def __create_pool_and_job(self, cluster_conf: models.ClusterConfiguration, softw
id=pool_id,
virtual_machine_configuration=batch_models.VirtualMachineConfiguration(
image_reference=image_ref_to_use,
node_agent_sku_id=sku_to_use),
node_agent_sku_id=sku_to_use,
data_disks=[batch_models.DataDisk(
lun=i,
disk_size_gb=data_disk.disk_size_gb
) for i, data_disk in enumerate(vm_image_model.data_disks)]),
vm_size=cluster_conf.vm_size,
enable_auto_scale=True,
auto_scale_formula=auto_scale_formula,
Expand Down Expand Up @@ -382,7 +386,11 @@ def __submit_job(self,
display_name=job_configuration.id,
virtual_machine_configuration=batch_models.VirtualMachineConfiguration(
image_reference=image_ref_to_use,
node_agent_sku_id=sku_to_use),
node_agent_sku_id=sku_to_use,
data_disks=[batch_models.DataDisk(
lun=i,
disk_size_gb=data_disk.disk_size_gb
) for i, data_disk in enumerate(vm_image_model.data_disks)]),
vm_size=job_configuration.vm_size,
enable_auto_scale=True,
auto_scale_formula=autoscale_formula,
Expand Down
11 changes: 10 additions & 1 deletion aztk/core/models/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,15 @@ def __set__(self, instance, value):
value = []
super().__set__(instance, value)

def __get__(self, instance, _):
if instance is not None:
value = instance._data.get(self)
if value is None:
return instance._data.setdefault(self, self._default(instance))
return value

return self

def _resolve(self, value):
result = []
for item in value:
Expand All @@ -158,7 +167,7 @@ def merge(self, instance, value):
value = []

if self.merge_strategy == ListMergeStrategy.Append:
current = instance._data.get(self)
current = instance._data.get(self)
if current is None:
current = []
value = current + value
Expand Down
2 changes: 1 addition & 1 deletion aztk/core/models/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def validate(self):
def merge(self, other):
if not isinstance(other, self.__class__):
raise AztkError("Cannot merge {0} as is it not an instance of {1}".format(other, self.__class__.__name__))

for field in other._fields.values():
if field in other._data:
field.merge(self, other._data[field])
Expand Down
2 changes: 1 addition & 1 deletion aztk/internal/docker_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def pass_env(self, env: str):
self.cmd.add_option('-e', '{0}'.format(env))

def share_folder(self, folder: str):
self.cmd.add_option('-v', '{0}:{0}'.format(folder))
self.cmd.add_option('--mount', 'type=bind,src={0},dst={0}'.format(folder))

def open_port(self, port: int):
self.cmd.add_option('-p', '{0}:{0}'.format(port)) # Spark Master UI
Expand Down
2 changes: 2 additions & 0 deletions aztk/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from .remote_login import RemoteLogin
from .ssh_log import SSHLog
from .vm_image import VmImage
from .data_disk import DataDisk
from .data_disk_format_type import DataDiskFormatType
from .software import Software
from .cluster import Cluster
from .scheduling_target import SchedulingTarget
Expand Down
3 changes: 3 additions & 0 deletions aztk/models/cluster_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
from aztk.utils import deprecated,deprecate, helpers

from .custom_script import CustomScript
from .data_disk import DataDisk
from .file_share import FileShare
from .plugins import PluginConfiguration
from .toolkit import Toolkit
from .user_configuration import UserConfiguration
from .scheduling_target import SchedulingTarget


class ClusterConfiguration(Model):
"""
Cluster configuration model
Expand Down Expand Up @@ -36,6 +38,7 @@ class ClusterConfiguration(Model):
plugins = fields.List(PluginConfiguration)
custom_scripts = fields.List(CustomScript)
file_shares = fields.List(FileShare)
data_disks = fields.List(DataDisk)
user_configuration = fields.Model(UserConfiguration, default=None)
scheduling_target = fields.Enum(SchedulingTarget, default=None)

Expand Down
17 changes: 17 additions & 0 deletions aztk/models/data_disk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from aztk.core.models import Model, fields

from .data_disk_format_type import DataDiskFormatType

class DataDisk(Model):
"""
Configuration for an additional local storage disk that is attached to the virtual machine,
formatted and mounted into the Spark Docker container

Args:
disk_size_gb (int): Which docker endpoint to use. Default to docker hub.
mount_path (:obj:`str`, optional): the path where the disk should be mounted
format_type (:obj:`aztk.models.DataDiskFormatType`, optional): the type of file system format
"""
disk_size_gb = fields.Integer()
mount_path = fields.String()
format_type = fields.String(default=DataDiskFormatType.ext4)
24 changes: 24 additions & 0 deletions aztk/models/data_disk_format_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
class DataDiskFormatType:
"""
The valid file system formats for a Data Disk

Attributes:
bfs (:obj:`str`)
btrfs (:obj:`str`)
cramfs (:obj:`str`)
ext2 (:obj:`str`)
ext3 (:obj:`str`)
ext4 (:obj:`str`)
fat (:obj:`str`)
minix (:obj:`str`)
xfs (:obj:`str`)
"""
bfs = "bfs"
btrfs = "btrfs"
cramfs = "cramfs"
ext2 = "ext2"
ext3 = "ext3"
ext4 = "ext4"
fat = "fat"
minix = "minix"
xfs = "xfs"
9 changes: 9 additions & 0 deletions aztk/models/file_share.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
from aztk.core.models import Model, fields

class FileShare(Model):
"""
Azure Files file share to mount to each node in the cluster

Args:
storage_account_name (int): the name of the Azure Storage Account
storage_account_key (:obj:`str`, optional): the shared key to the Azure Storage Account
file_share_path (:obj:`str`, optional): the path of the file share in Azure Files
mount_path (:obj:`str`, optional): the path on the node to mount the file share
"""
storage_account_name = fields.String()
storage_account_key = fields.String()
file_share_path = fields.String()
Expand Down
3 changes: 2 additions & 1 deletion aztk/models/vm_image.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
class VmImage:
def __init__(self, publisher, offer, sku):
def __init__(self, publisher, offer, sku, data_disks):
self.publisher = publisher
self.offer = offer
self.sku = sku
self.data_disks = data_disks
16 changes: 13 additions & 3 deletions aztk/node_scripts/install/install.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import os
from core import config
from install import pick_master, spark, scripts, create_user, plugins, spark_container

import wait_until_master_selected
from aztk.models.plugins import PluginTarget
from aztk.internal import cluster_data
from aztk.models.plugins import PluginTarget
from core import config
from install import (create_user, pick_master, plugins, scripts, spark, spark_container)

from .node_scheduling import setup_node_scheduling
from .setup_data_disks import setup_data_disks


def read_cluster_config():
Expand All @@ -13,10 +16,14 @@ def read_cluster_config():
print("Got cluster config", cluster_config)
return cluster_config




def setup_host(docker_repo: str):
"""
Code to be run on the node(NOT in a container)
"""

client = config.batch_client

create_user.create_user(batch_client=client)
Expand All @@ -43,13 +50,16 @@ def setup_host(docker_repo: str):

cluster_conf = read_cluster_config()

setup_data_disks(cluster_conf)

setup_node_scheduling(client, cluster_conf, is_master)

#TODO pass azure file shares
spark_container.start_spark_container(
docker_repo=docker_repo,
gpu_enabled=os.environ.get("AZTK_GPU_ENABLED") == "true",
plugins=cluster_conf.plugins,
data_disks=cluster_conf.data_disks,
)
plugins.setup_plugins(target=PluginTarget.Host, is_master=is_master, is_worker=is_worker)

Expand Down
25 changes: 25 additions & 0 deletions aztk/node_scripts/install/setup_data_disk.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/bin/bash

set -e

devicename="/dev/"$1
format_type=$2
mount_path=$3
device_partition_name="${devicename}1"

# make parition
parted --script --align optimal ${devicename} mklabel gpt
parted --script --align optimal ${devicename} mkpart primary ext4 0% 100%

# format partition
sleep 1
mkfs.${format_type} ${device_partition_name}

# make partition directory
mkdir -p ${mount_path}

# auto mount parition on reboot
echo "${device_partition_name} ${mount_path} auto defaults,nofail 0 0" >> /etc/fstab

# mount partition
mount ${device_partition_name} ${mount_path}
49 changes: 49 additions & 0 deletions aztk/node_scripts/install/setup_data_disks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import subprocess
import sys
import os


def mount_data_disk(data_disk, device_name, number):
cmd = os.environ["AZTK_WORKING_DIR"] + "/aztk/node_scripts/install/setup_data_disk.sh "
data_disk.mount_path = "/data-disk" + str(number) if not data_disk.mount_path else data_disk.mount_path
args = device_name + " " + data_disk.format_type + " " + data_disk.mount_path
cmd = cmd + args
print("mount disk cmd:", cmd)
p = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
if p.returncode != 0:
print("ERROR: failed to mount data_disk device {}", device_name)
sys.exit(p.returncode)

return data_disk


def setup_data_disks(cluster_configuration):
cmd = 'lsblk -lnS --sort name | wc -l'
p = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, _ = p.communicate()
if int(output) <= 3:
return

# by default, there are 3 devices on each host: sda, sdb, sr0
cmd = 'lsblk -lnbS --sort=name --output NAME,SIZE | grep -v "sr0\|sd[ab]"'
p = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
disks = stdout.decode('UTF-8').split('\n')[:-1]

disk_size_mapping = {}
for disk in disks:
assert len(disk.split()) == 2
name, size = disk.split()
# convert size from bytes to gb
size = int(size) / 1024 / 1024 / 1024
if not disk_size_mapping.get(size):
disk_size_mapping[size] = [name]
else:
disk_size_mapping[size].append(name)

for i, defined_data_disk in enumerate(cluster_configuration.data_disks):
device_name = disk_size_mapping[defined_data_disk.disk_size_gb].pop()
mounted_data_disk = mount_data_disk(data_disk=defined_data_disk, device_name=device_name, number=i)
# update cluster_configuration in case mount_path changed
cluster_configuration[i] = mounted_data_disk
6 changes: 4 additions & 2 deletions aztk/node_scripts/install/spark_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ def start_spark_container(
docker_repo: str=None,
gpu_enabled: bool=False,
file_mounts=None,
plugins=None):
plugins=None,
data_disks=None):

cmd = DockerCmd(
name=constants.DOCKER_SPARK_CONTAINER_NAME,
Expand All @@ -18,7 +19,8 @@ def start_spark_container(
if file_mounts:
for mount in file_mounts:
cmd.share_folder(mount.mount_path)
cmd.share_folder('/mnt')
cmd.share_folder('/mnt/batch')
[cmd.share_folder(data_disk.mount_path) for data_disk in data_disks]

cmd.pass_env('AZTK_WORKING_DIR')
cmd.pass_env('AZ_BATCH_ACCOUNT_NAME')
Expand Down
1 change: 1 addition & 0 deletions aztk/node_scripts/setup_host.sh
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ main () {

# Unzip resource files and set permissions
chmod 777 $AZTK_WORKING_DIR/aztk/node_scripts/docker_main.sh
chmod +x $AZTK_WORKING_DIR/aztk/node_scripts/install/setup_data_disk.sh

# Check docker is running
docker info > /dev/null 2>&1
Expand Down
9 changes: 6 additions & 3 deletions aztk/spark/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def create_cluster(self, cluster_conf: models.ClusterConfiguration, wait: bool =
aztk.spark.models.Cluster
"""
cluster_conf = _apply_default_for_cluster_config(cluster_conf)

cluster_conf.validate()

cluster_data = self._get_cluster_data(cluster_conf.cluster_id)
Expand All @@ -56,11 +57,11 @@ def create_cluster(self, cluster_conf: models.ClusterConfiguration, wait: bool =
cluster_conf.worker_on_master)

software_metadata_key = "spark"

vm_image = models.VmImage(
publisher='Canonical',
offer='UbuntuServer',
sku='16.04')
sku='16.04',
data_disks=cluster_conf.data_disks)

cluster = self.__create_pool_and_job(
cluster_conf, software_metadata_key, start_task, vm_image)
Expand Down Expand Up @@ -249,7 +250,8 @@ def submit_job(self, job_configuration: models.JobConfiguration):
vm_image = models.VmImage(
publisher='Canonical',
offer='UbuntuServer',
sku='16.04')
sku='16.04',
data_disks=job_configuration.data_disks)

autoscale_formula = "$TargetDedicatedNodes = {0}; " \
"$TargetLowPriorityNodes = {1}".format(
Expand Down Expand Up @@ -352,6 +354,7 @@ def _apply_default_for_cluster_config(configuration: models.ClusterConfiguration
cluster_conf.merge(configuration)
if cluster_conf.scheduling_target is None:
cluster_conf.scheduling_target = _default_scheduling_target(cluster_conf.size)

return cluster_conf

def _apply_default_for_job_config(job_conf: models.JobConfiguration):
Expand Down
Loading