Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/azure-cli/azure/cli/command_modules/vm/_vm_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -757,3 +757,26 @@ def _open(filename, mode):
f.write(public_bytes)

return public_bytes.decode()


def safe_get(d: dict, path: str, default=None):
"""
Safely fetch nested keys from a dict.
Path format supports lists by index, e.g. 'storageProfile.dataDisks.0.managedDisk.id'.
Returns `default` when any segment is missing or type doesn't match.
"""
cur = d
for key in path.split('.'):
if isinstance(cur, list):
try:
idx = int(key)
cur = cur[idx]
except Exception:
return default
elif isinstance(cur, dict):
if key not in cur:
return default
cur = cur[key]
else:
return default
return cur
326 changes: 198 additions & 128 deletions src/azure-cli/azure/cli/command_modules/vm/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from azure.cli.core.profiles import ResourceType
from azure.cli.core.util import sdk_no_wait

from ._vm_utils import read_content_if_is_file, import_aaz_by_profile
from ._vm_utils import read_content_if_is_file, import_aaz_by_profile, safe_get
from ._vm_diagnostics_templates import get_default_diag_config

from ._actions import (load_images_from_aliases_doc, load_extension_images_thru_services,
Expand Down Expand Up @@ -2233,121 +2233,161 @@ def show_default_diagnostics_configuration(is_windows_os=False):


# region VirtualMachines Disks (Managed)
def attach_managed_data_disk(cmd, resource_group_name, vm_name, disk=None, ids=None, disks=None, new=False, sku=None,
size_gb=None, lun=None, caching=None, enable_write_accelerator=False, disk_ids=None,
source_snapshots_or_disks=None, source_disk_restore_point=None,
new_names_of_source_snapshots_or_disks=None, new_names_of_source_disk_restore_point=None):
# attach multiple managed disks using disk attach API
vm = get_vm_to_update(cmd, resource_group_name, vm_name)
def attach_managed_data_disk(cmd, resource_group_name, vm_name,
disk=None, ids=None, disks=None, new=False, sku=None,
size_gb=None, lun=None, caching=None,
enable_write_accelerator=False, disk_ids=None,
source_snapshots_or_disks=None,
source_disk_restore_point=None,
new_names_of_source_snapshots_or_disks=None,
new_names_of_source_disk_restore_point=None,
no_wait=False):
if caching is None:
caching = 'None'

# attach existing managed disks
if not new and not sku and not size_gb and disk_ids is not None:
if lun:
disk_lun = lun
else:
disk_lun = _get_disk_lun(vm.storage_profile.data_disks)

data_disks = []
for disk_item in disk_ids:
disk = {
'diskId': disk_item,
'caching': caching,
from .operations.vm import VMShow
vm = VMShow(cli_ctx=cmd.cli_ctx)(command_args={
'resource_group': resource_group_name,
'vm_name': vm_name
})
vm_dict = vm if isinstance(vm, dict) else getattr(vm, 'result', vm)
data_disks = vm_dict.get('storageProfile', {}).get('dataDisks', []) or []
used_luns = {d.get('lun') for d in data_disks if isinstance(d, dict) and d.get('lun') is not None}
def _next_lun(start=0):
i = start
while i in used_luns:
i += 1
used_luns.add(i)
return i
attach_payload = []
current_lun = lun
for disk_id in disk_ids:
if current_lun is not None:
disk_lun = _next_lun(start=current_lun)
current_lun = disk_lun + 1
else:
disk_lun = _next_lun()
payload = {
'diskId': disk_id,
'lun': disk_lun,
'writeAcceleratorEnabled': enable_write_accelerator
'caching': caching,
}
data_disks.append(disk)
disk_lun += 1
result = AttachDetachDataDisk(cli_ctx=cmd.cli_ctx)(command_args={
if enable_write_accelerator:
payload['writeAcceleratorEnabled'] = enable_write_accelerator

attach_payload.append(payload)
return AttachDetachDataDisk(cli_ctx=cmd.cli_ctx)(command_args={
'vm_name': vm_name,
'resource_group': resource_group_name,
'data_disks_to_attach': data_disks
'data_disks_to_attach': attach_payload,
'no_wait': no_wait
})
return result
else:
# attach multiple managed disks using vm PUT API
from azure.mgmt.core.tools import parse_resource_id
DataDisk, ManagedDiskParameters, DiskCreateOption = cmd.get_models(
'DataDisk', 'ManagedDiskParameters', 'DiskCreateOptionTypes')
if size_gb is None:
default_size_gb = 1023

if disk_ids is not None:
disks = disk_ids

for disk_item in disks:
if lun:
disk_lun = lun
else:
disk_lun = _get_disk_lun(vm.storage_profile.data_disks)

if new:
data_disk = DataDisk(lun=disk_lun, create_option=DiskCreateOption.empty,
name=parse_resource_id(disk_item)['name'],
disk_size_gb=size_gb if size_gb else default_size_gb, caching=caching,
managed_disk=ManagedDiskParameters(storage_account_type=sku))
else:
params = ManagedDiskParameters(id=disk_item, storage_account_type=sku)
data_disk = DataDisk(lun=disk_lun, create_option=DiskCreateOption.attach, managed_disk=params,
caching=caching)

if enable_write_accelerator:
data_disk.write_accelerator_enabled = enable_write_accelerator

vm.storage_profile.data_disks.append(data_disk)
disk_lun = _get_disk_lun(vm.storage_profile.data_disks)
if source_snapshots_or_disks is not None:
if new_names_of_source_snapshots_or_disks is None:
new_names_of_source_snapshots_or_disks = [None] * len(source_snapshots_or_disks)
for disk_id, disk_name in zip(source_snapshots_or_disks, new_names_of_source_snapshots_or_disks):
disk = {
'name': disk_name,
'create_option': 'Copy',
'caching': caching,
'lun': disk_lun,
'writeAcceleratorEnabled': enable_write_accelerator,
"sourceResource": {
"id": disk_id
}
}
if size_gb is not None:
disk.update({
'diskSizeGb': size_gb
})
if sku is not None:
disk.update({
"managedDisk": {
"storageAccountType": sku
# new / copy / restore
from azure.mgmt.core.tools import parse_resource_id
from .operations.vm import VMUpdate as _VMUpdate
class VMUpdate(_VMUpdate):
def pre_instance_update(self, instance):
storage_profile = instance.properties.storage_profile
data_disks = storage_profile.data_disks
data_disks_list = data_disks.to_serialized_data() if hasattr(data_disks, 'to_serialized_data') else data_disks
used_luns = set()
if isinstance(data_disks_list, list):
for d in data_disks_list:
if isinstance(d, dict) and 'lun' in d and d['lun'] is not None:
used_luns.add(d['lun'])
def _next_lun(start=0):
i = start
while i in used_luns:
i += 1
used_luns.add(i)
return i
default_size_gb = 1023
disks_to_process = disk_ids if disk_ids is not None else disks

# attach existing / new disks
if disks_to_process:
# if a user-specified LUN is provided, allocate it (or the next available
# starting from that value) for the first disk, then auto-increment for
# subsequent disks to avoid LUN conflicts.
next_lun = _next_lun(start=lun) if lun is not None else None
for disk_item in disks_to_process:
if lun is not None:
disk_lun = next_lun
next_lun = _next_lun()
else:
disk_lun = _next_lun()

if new:
disk_name = parse_resource_id(disk_item)['name']
disk_obj = {
'name': disk_name,
'lun': disk_lun,
'createOption': 'Empty',
'diskSizeGb': size_gb if size_gb else default_size_gb,
'caching': caching
}
})
disk_lun += 1
vm.storage_profile.data_disks.append(disk)
if source_disk_restore_point is not None:
if new_names_of_source_disk_restore_point is None:
new_names_of_source_disk_restore_point = [None] * len(source_disk_restore_point)
for disk_id, disk_name in zip(source_disk_restore_point, new_names_of_source_disk_restore_point):
disk = {
'name': disk_name,
'create_option': 'Restore',
'caching': caching,
'lun': disk_lun,
'writeAcceleratorEnabled': enable_write_accelerator,
"sourceResource": {
"id": disk_id
}
}
if size_gb is not None:
disk.update({
'diskSizeGb': size_gb
})
if sku is not None:
disk.update({
"managedDisk": {
"storageAccountType": sku
if sku:
disk_obj['managedDisk'] = {'storageAccountType': sku}
else:
disk_obj = {
'lun': disk_lun,
'createOption': 'Attach',
'caching': caching,
'managedDisk': {'id': disk_item}
}
})
disk_lun += 1
vm.storage_profile.data_disks.append(disk)

set_vm(cmd, vm)
if sku:
disk_obj['managedDisk']['storageAccountType'] = sku
if enable_write_accelerator:
disk_obj['writeAcceleratorEnabled'] = True
data_disks.append(disk_obj)

# snapshot / copy
if source_snapshots_or_disks:
_new_names = new_names_of_source_snapshots_or_disks or [None] * len(source_snapshots_or_disks)
for src_id, name in zip(source_snapshots_or_disks, _new_names):
disk_lun = _next_lun()
disk_obj = {
'name': name,
'lun': disk_lun,
'createOption': 'Copy',
'sourceResource': {'id': src_id},
'caching': caching,
'writeAcceleratorEnabled': enable_write_accelerator
}
if size_gb is not None:
disk_obj['diskSizeGb'] = size_gb
if sku is not None:
disk_obj['managedDisk'] = {'storageAccountType': sku}
data_disks.append(disk_obj)

# restore point
if source_disk_restore_point:
_new_names_rp = new_names_of_source_disk_restore_point or [None] * len(source_disk_restore_point)
for src_id, name in zip(source_disk_restore_point, _new_names_rp):
disk_lun = _next_lun()
disk_obj = {
'name': name,
'lun': disk_lun,
'createOption': 'Restore',
'sourceResource': {'id': src_id},
'caching': caching,
'writeAcceleratorEnabled': enable_write_accelerator
}
if size_gb is not None:
disk_obj['diskSizeGb'] = size_gb
if sku is not None:
disk_obj['managedDisk'] = {'storageAccountType': sku}
data_disks.append(disk_obj)

args = {
'resource_group': resource_group_name,
'vm_name': vm_name,
'no_wait': no_wait
}
return VMUpdate(cli_ctx=cmd.cli_ctx)(command_args=args)

def detach_unmanaged_data_disk(cmd, resource_group_name, vm_name, disk_name):
# here we handle unmanaged disk
Expand All @@ -2361,7 +2401,9 @@ def detach_unmanaged_data_disk(cmd, resource_group_name, vm_name, disk_name):
# endregion


def detach_managed_data_disk(cmd, resource_group_name, vm_name, disk_name=None, force_detach=None, disk_ids=None):
def detach_managed_data_disk(cmd, resource_group_name, vm_name, disk_name=None,
force_detach=None, disk_ids=None,
no_wait=False):
if disk_ids is not None:
data_disks = []
for disk_item in disk_ids:
Expand All @@ -2375,27 +2417,55 @@ def detach_managed_data_disk(cmd, resource_group_name, vm_name, disk_name=None,
return result
else:
# here we handle managed disk
vm = get_vm_to_update(cmd, resource_group_name, vm_name)
if not force_detach:
# pylint: disable=no-member
leftovers = [d for d in vm.storage_profile.data_disks if d.name.lower() != disk_name.lower()]
if len(vm.storage_profile.data_disks) == len(leftovers):
raise ResourceNotFoundError("No disk with the name '{}' was found".format(disk_name))
else:
DiskDetachOptionTypes = cmd.get_models('DiskDetachOptionTypes', resource_type=ResourceType.MGMT_COMPUTE,
operation_group='virtual_machines')
leftovers = vm.storage_profile.data_disks
is_contains = False
for d in leftovers:
if d.name.lower() == disk_name.lower():
d.to_be_detached = True
d.detach_option = DiskDetachOptionTypes.FORCE_DETACH
is_contains = True
break
if not is_contains:
raise ResourceNotFoundError("No disk with the name '{}' was found".format(disk_name))
vm.storage_profile.data_disks = leftovers
set_vm(cmd, vm)
from .operations.vm import VMShow

vm = VMShow(cli_ctx=cmd.cli_ctx)(command_args={
'resource_group': resource_group_name,
'vm_name': vm_name
})

# work on a local copy of the VM dict to avoid mutating the original object.
vm_result = vm if isinstance(vm, dict) else getattr(vm, 'result', vm)
vm_dict = json.loads(json.dumps(vm_result))

# to avoid unnecessary permission check of image
storage_profile = vm_dict.get('storageProfile', {})
storage_profile["imageReference"] = None

target_disk = None
data_disks = safe_get(vm_dict, 'storageProfile.dataDisks', default=[]) or []
for d in data_disks:
# Use dict-style access; AAZ returns dicts.
name = (d.get('name') or '').lower()
if name == (disk_name or '').lower():
target_disk = d
break

if not target_disk:
attached_names = [d.get('name') for d in (safe_get(vm_dict, 'storageProfile.dataDisks', []) or [])]
raise ResourceNotFoundError(
"No disk with the name '{}' was found. Attached: {}".format(disk_name, attached_names)
)

disk_id = safe_get(target_disk, 'managedDisk.id')
if not disk_id:
raise CLIError(
"Disk '{}' is not a managed disk (no managedDisk.id). Only managed disks are supported for this operation."
.format(disk_name)
)

args = {
'vm_name': vm_name,
'resource_group': resource_group_name,
'data_disks_to_detach': [{
'diskId': disk_id,
'detachOption': 'ForceDetach' if force_detach else None
}],
'no_wait': no_wait
}

result = AttachDetachDataDisk(cli_ctx=cmd.cli_ctx)(command_args=args)
return result
# endregion


Expand Down
Loading