Source code for linode_api4.objects.linode

import string
import sys
from datetime import datetime
from enum import Enum
from os import urandom
from random import randint

from linode_api4.common import load_and_validate_keys
from linode_api4.errors import UnexpectedResponseError
from linode_api4.objects import Base, DerivedBase, Image, Property, Region
from linode_api4.objects.base import MappedObject
from linode_api4.objects.filtering import FilterableAttribute
from linode_api4.objects.networking import IPAddress, IPv6Pool
from linode_api4.paginated_list import PaginatedList

PASSWORD_CHARS = string.ascii_letters + string.digits + string.punctuation


[docs]class Backup(DerivedBase): api_endpoint = '/linode/instances/{linode_id}/backups/{id}' derived_url_path = 'backups' parent_id_name='linode_id' properties = { 'id': Property(identifier=True), 'created': Property(is_datetime=True), 'duration': Property(), 'updated': Property(is_datetime=True), 'finished': Property(is_datetime=True), 'message': Property(), 'status': Property(volatile=True), 'type': Property(), 'linode_id': Property(identifier=True), 'label': Property(), 'configs': Property(), 'disks': Property(), 'region': Property(slug_relationship=Region), }
[docs] def restore_to(self, linode, **kwargs): d = { "linode_id": linode.id if issubclass(type(linode), Base) else linode, } d.update(kwargs) self._client.post("{}/restore".format(Backup.api_endpoint), model=self, data=d) return True
[docs]class Disk(DerivedBase): api_endpoint = '/linode/instances/{linode_id}/disks/{id}' derived_url_path = 'disks' parent_id_name='linode_id' properties = { 'id': Property(identifier=True), 'created': Property(is_datetime=True), 'label': Property(mutable=True, filterable=True), 'size': Property(filterable=True), 'status': Property(filterable=True, volatile=True), 'filesystem': Property(), 'updated': Property(is_datetime=True), 'linode_id': Property(identifier=True), }
[docs] def duplicate(self): result = self._client.post(Disk.api_endpoint, model=self, data={}) if not 'id' in result: raise UnexpectedResponseError('Unexpected response duplicating disk!', json=result) d = Disk(self._client, result['id'], self.linode_id, result) return d
[docs] def reset_root_password(self, root_password=None): rpass = root_password if not rpass: rpass = Instance.generate_root_password() params = { 'password': rpass, } result = self._client.post(Disk.api_endpoint, model=self, data=params) if not 'id' in result: raise UnexpectedResponseError('Unexpected response duplicating disk!', json=result) self._populate(result) if not root_password: return True, rpass return True
[docs] def resize(self, new_size): """ Resizes this disk. The Linode Instance this disk belongs to must have sufficient space available to accommodate the new size, and must be offline. **NOTE** If resizing a disk down, the filesystem on the disk must still fit on the new disk size. You may need to resize the filesystem on the disk first before performing this action. :param new_size: The intended new size of the disk, in MB :type new_size: int :returns: True if the resize was initiated successfully. :rtype: bool """ self._client.post('{}/resize'.format(Disk.api_endpoint), model=self, data={"size": new_size}) return True
[docs]class Kernel(Base): api_endpoint="/linode/kernels/{id}" properties = { "created": Property(is_datetime=True), "deprecated": Property(filterable=True), "description": Property(), "id": Property(identifier=True), "kvm": Property(filterable=True), "label": Property(filterable=True), "updates": Property(), "version": Property(filterable=True), "architecture": Property(filterable=True), "xen": Property(filterable=True), }
[docs]class Type(Base): api_endpoint = "/linode/types/{id}" properties = { 'disk': Property(filterable=True), 'id': Property(identifier=True), 'label': Property(filterable=True), 'network_out': Property(filterable=True), 'price': Property(), 'addons': Property(), 'memory': Property(filterable=True), 'transfer': Property(filterable=True), 'vcpus': Property(filterable=True), 'gpus': Property(filterable=True), # type_class is populated from the 'class' attribute of the returned JSON } def _populate(self, json): """ Allows changing the name "class" in JSON to "type_class" in python """ super()._populate(json) if 'class' in json: setattr(self, 'type_class', json['class']) else: setattr(self, 'type_class', None) # allow filtering on this converted type type_class = FilterableAttribute('class')
[docs]class ConfigInterface: """ This is a helper class used to populate 'interfaces' in the Config calss below. """ def __init__(self, purpose, label="", ipam_address=""): """ Creates a new ConfigInterface """ #: The Label for the VLAN this interface is connected to. Blank for public #: interfaces. self.label = label #: The IPAM Address this interface will bring up. Blank for public interfaces. self.ipam_address = ipam_address #: The purpose of this interface. "public" means this interface can access #: the internet, "vlan" means it is a VLAN interface. self.purpose = purpose def __repr__(self): if self.purpose == "public": return "Public Interface" return "Interface {}; purpose: {}; ipam_address: {}".format( self.label, self.purpose, self.ipam_address ) def _serialize(self): """ Returns this object as a dict """ return { "label": self.label, "ipam_address": self.ipam_address, "purpose": self.purpose, }
[docs]class Config(DerivedBase): api_endpoint="/linode/instances/{linode_id}/configs/{id}" derived_url_path="configs" parent_id_name="linode_id" properties = { "id": Property(identifier=True), "linode_id": Property(identifier=True), "helpers": Property(),#TODO: mutable=True), "created": Property(is_datetime=True), "root_device": Property(mutable=True), "kernel": Property(relationship=Kernel, mutable=True, filterable=True), "devices": Property(filterable=True),#TODO: mutable=True), "initrd": Property(relationship=Disk), "updated": Property(), "comments": Property(mutable=True, filterable=True), "label": Property(mutable=True, filterable=True), "run_level": Property(mutable=True, filterable=True), "virt_mode": Property(mutable=True, filterable=True), "memory_limit": Property(mutable=True, filterable=True), "interfaces": Property(mutable=True), # gets setup in _populate below } def _populate(self, json): """ Map devices more nicely while populating. """ # needed here to avoid circular imports from .volume import Volume # pylint: disable=import-outside-toplevel DerivedBase._populate(self, json) devices = {} for device_index, device in json['devices'].items(): if not device: devices[device_index] = None continue dev = None if 'disk_id' in device and device['disk_id']: # this is a disk dev = Disk.make_instance(device['disk_id'], self._client, parent_id=self.linode_id) else: dev = Volume.make_instance(device['volume_id'], self._client, parent_id=self.linode_id) devices[device_index] = dev self._set('devices', MappedObject(**devices)) interfaces = [] if "interfaces" in json: interfaces = [ ConfigInterface(c["purpose"], label=c["label"], ipam_address=c["ipam_address"]) for c in json["interfaces"] ] self._set("interfaces", interfaces) def _serialize(self): """ Overrides _serialize to transform interfaces into json """ partial = DerivedBase._serialize(self) interfaces = [] for c in self.interfaces: if isinstance(c, ConfigInterface): interfaces.append(c._serialize()) else: interfaces.append(c) partial["interfaces"] = interfaces return partial
[docs]class Instance(Base): api_endpoint = '/linode/instances/{id}' properties = { 'id': Property(identifier=True, filterable=True), 'label': Property(mutable=True, filterable=True), 'group': Property(mutable=True, filterable=True), 'status': Property(volatile=True), 'created': Property(is_datetime=True), 'updated': Property(volatile=True, is_datetime=True), 'region': Property(slug_relationship=Region, filterable=True), 'alerts': Property(mutable=True), 'image': Property(slug_relationship=Image, filterable=True), 'disks': Property(derived_class=Disk), 'configs': Property(derived_class=Config), 'type': Property(slug_relationship=Type), 'backups': Property(), 'ipv4': Property(), 'ipv6': Property(), 'hypervisor': Property(), 'specs': Property(), 'tags': Property(mutable=True), } @property def ips(self): """ The ips related collection is not normalized like the others, so we have to make an ad-hoc object to return for its response """ if not hasattr(self, '_ips'): result = self._client.get("{}/ips".format(Instance.api_endpoint), model=self) if not "ipv4" in result: raise UnexpectedResponseError('Unexpected response loading IPs', json=result) v4pub = [] for c in result['ipv4']['public']: i = IPAddress(self._client, c['address'], c) v4pub.append(i) v4pri = [] for c in result['ipv4']['private']: i = IPAddress(self._client, c['address'], c) v4pri.append(i) shared_ips = [] for c in result['ipv4']['shared']: i = IPAddress(self._client, c['address'], c) shared_ips.append(i) slaac = IPAddress(self._client, result['ipv6']['slaac']['address'], result['ipv6']['slaac']) link_local = IPAddress(self._client, result['ipv6']['link_local']['address'], result['ipv6']['link_local']) pools = [] for p in result['ipv6']['global']: pools.append(IPv6Pool(self._client, p['range'])) ips = MappedObject(**{ "ipv4": { "public": v4pub, "private": v4pri, "shared": shared_ips, }, "ipv6": { "slaac": slaac, "link_local": link_local, "pools": pools, }, }) self._set('_ips', ips) return self._ips @property def available_backups(self): """ The backups response contains what backups are available to be restored. """ if not hasattr(self, '_avail_backups'): result = self._client.get("{}/backups".format(Instance.api_endpoint), model=self) if not 'automatic' in result: raise UnexpectedResponseError('Unexpected response loading available backups!', json=result) automatic = [] for a in result['automatic']: cur = Backup(self._client, a['id'], self.id, a) automatic.append(cur) snap = None if result['snapshot']['current']: snap = Backup(self._client, result['snapshot']['current']['id'], self.id, result['snapshot']['current']) psnap = None if result['snapshot']['in_progress']: psnap = Backup(self._client, result['snapshot']['in_progress']['id'], self.id, result['snapshot']['in_progress']) self._set('_avail_backups', MappedObject(**{ "automatic": automatic, "snapshot": { "current": snap, "in_progress": psnap, } })) return self._avail_backups @property def transfer(self): """ Get per-linode transfer """ if not hasattr(self, '_transfer'): result = self._client.get("{}/transfer".format(Instance.api_endpoint), model=self) if not 'used' in result: raise UnexpectedResponseError('Unexpected response when getting Transfer Pool!') mapped = MappedObject(**result) setattr(self, '_transfer', mapped) return self._transfer def _populate(self, json): if json is not None: # fixes ipv4 and ipv6 attribute of json to make base._populate work if 'ipv4' in json and 'address' in json['ipv4']: json['ipv4']['id'] = json['ipv4']['address'] if 'ipv6' in json and isinstance(json['ipv6'], list): for j in json['ipv6']: j['id'] = j['range'] Base._populate(self, json)
[docs] def invalidate(self): """ Clear out cached properties """ if hasattr(self, '_avail_backups'): del self._avail_backups if hasattr(self, '_ips'): del self._ips if hasattr(self, '_transfer'): del self._transfer Base.invalidate(self)
[docs] def boot(self, config=None): resp = self._client.post("{}/boot".format(Instance.api_endpoint), model=self, data={'config_id': config.id} if config else None) if 'error' in resp: return False return True
[docs] def shutdown(self): resp = self._client.post("{}/shutdown".format(Instance.api_endpoint), model=self) if 'error' in resp: return False return True
[docs] def reboot(self): resp = self._client.post("{}/reboot".format(Instance.api_endpoint), model=self) if 'error' in resp: return False return True
[docs] def resize(self, new_type, **kwargs): new_type = new_type.id if issubclass(type(new_type), Base) else new_type params = { "type": new_type, } params.update(kwargs) resp = self._client.post("{}/resize".format(Instance.api_endpoint), model=self, data=params) if 'error' in resp: return False return True
[docs] @staticmethod def generate_root_password(): def _func(value): if sys.version_info[0] < 3: value = int(value.encode('hex'), 16) return value password = ''.join([ PASSWORD_CHARS[_func(c) % len(PASSWORD_CHARS)] for c in urandom(randint(50, 110)) ]) # ensure the generated password is not too long if len(password) > 110: password = password[:110] return password
# create derived objects
[docs] def config_create(self, kernel=None, label=None, devices=[], disks=[], volumes=[], **kwargs): """ Creates a Linode Config with the given attributes. :param kernel: The kernel to boot with. :param label: The config label :param disks: The list of disks, starting at sda, to map to this config. :param volumes: The volumes, starting after the last disk, to map to this config :param devices: A list of devices to assign to this config, in device index order. Values must be of type Disk or Volume. If this is given, you may not include disks or volumes. :param **kwargs: Any other arguments accepted by the api. :returns: A new Linode Config """ # needed here to avoid circular imports from .volume import Volume # pylint: disable=import-outside-toplevel hypervisor_prefix = 'sd' if self.hypervisor == 'kvm' else 'xvd' device_names = [hypervisor_prefix + string.ascii_lowercase[i] for i in range(0, 8)] device_map = {device_names[i]: None for i in range(0, len(device_names))} if devices and (disks or volumes): raise ValueError('You may not call config_create with "devices" and ' 'either of "disks" or "volumes" specified!') if not devices: if not isinstance(disks, list): disks = [disks] if not isinstance(volumes, list): volumes = [volumes] devices = [] for d in disks: if d is None: devices.append(None) elif isinstance(d, Disk): devices.append(d) else: devices.append(Disk(self._client, int(d), self.id)) for v in volumes: if v is None: devices.append(None) elif isinstance(v, Volume): devices.append(v) else: devices.append(Volume(self._client, int(v))) if not devices: raise ValueError('Must include at least one disk or volume!') for i, d in enumerate(devices): if d is None: pass elif isinstance(d, Disk): device_map[device_names[i]] = {'disk_id': d.id } elif isinstance(d, Volume): device_map[device_names[i]] = {'volume_id': d.id } else: raise TypeError('Disk or Volume expected!') params = { 'kernel': kernel.id if issubclass(type(kernel), Base) else kernel, 'label': label if label else "{}_config_{}".format(self.label, len(self.configs)), 'devices': device_map, } params.update(kwargs) result = self._client.post("{}/configs".format(Instance.api_endpoint), model=self, data=params) self.invalidate() if not 'id' in result: raise UnexpectedResponseError('Unexpected response creating config!', json=result) c = Config(self._client, result['id'], self.id, result) return c
[docs] def disk_create(self, size, label=None, filesystem=None, read_only=False, image=None, root_pass=None, authorized_keys=None, authorized_users=None, stackscript=None, **stackscript_args): """ Creates a new Disk for this Instance. :param size: The size of the disk, in MB :param label: The label of the disk. If not given, a default label will be generated. :param filesystem: The filesystem type for the disk. If not given, the default for the image deployed the disk will be used. Required if creating a disk without an image. :param read_only: If True, creates a read-only disk :param image: The Image to deploy to the disk. :param root_pass: The password to configure for the root user when deploying an image to this disk. Not used if image is not given. If an image is given and root_pass is not, a password will be generated and returned alongside the new disk. :param authorized_keys: A list of SSH keys to install as trusted for the root user. :param authorized_users: A list of usernames whose keys should be installed as trusted for the root user. These user's keys should already be set up, see :any:`ProfileGroup.ssh_keys` for details. :param stackscript: A StackScript object, or the ID of one, to deploy to this disk. Requires deploying a compatible image. :param **stackscript_args: Any arguments to pass to the StackScript, as defined by its User Defined Fields. """ gen_pass = None if image and not root_pass: gen_pass = Instance.generate_root_password() root_pass = gen_pass authorized_keys = load_and_validate_keys(authorized_keys) if image and not label: label = "My {} Disk".format(image.label) params = { 'size': size, 'label': label if label else "{}_disk_{}".format(self.label, len(self.disks)), 'read_only': read_only, 'filesystem': filesystem, 'authorized_keys': authorized_keys, 'authorized_users': authorized_users, } if image: params.update({ 'image': image.id if issubclass(type(image), Base) else image, 'root_pass': root_pass, }) if stackscript: params['stackscript_id'] = stackscript.id if stackscript_args: params['stackscript_data'] = stackscript_args result = self._client.post("{}/disks".format(Instance.api_endpoint), model=self, data=params) self.invalidate() if not 'id' in result: raise UnexpectedResponseError('Unexpected response creating disk!', json=result) d = Disk(self._client, result['id'], self.id, result) if gen_pass: return d, gen_pass return d
[docs] def enable_backups(self): """ Enable Backups for this Instance. When enabled, we will automatically backup your Instance's data so that it can be restored at a later date. For more information on Instance's Backups service and pricing, see our `Backups Page`_ .. _Backups Page: https://www.linode.com/backups """ self._client.post("{}/backups/enable".format(Instance.api_endpoint), model=self) self.invalidate() return True
[docs] def cancel_backups(self): """ Cancels Backups for this Instance. All existing Backups will be lost, including any snapshots that have been taken. This cannot be undone, but Backups can be re-enabled at a later date. """ self._client.post("{}/backups/cancel".format(Instance.api_endpoint), model=self) self.invalidate() return True
[docs] def snapshot(self, label=None): result = self._client.post("{}/backups".format(Instance.api_endpoint), model=self, data={ "label": label }) if not 'id' in result: raise UnexpectedResponseError('Unexpected response taking snapshot!', json=result) # so the changes show up the next time they're accessed if hasattr(self, '_avail_backups'): del self._avail_backups b = Backup(self._client, result['id'], self.id, result) return b
[docs] def ip_allocate(self, public=False): """ Allocates a new :any:`IPAddress` for this Instance. Additional public IPs require justification, and you may need to open a :any:`SupportTicket` before you can add one. You may only have, at most, one private IP per Instance. :param public: If the new IP should be public or private. Defaults to private. :type public: bool :returns: The new IPAddress :rtype: IPAddress """ result = self._client.post( "{}/ips".format(Instance.api_endpoint), model=self, data={ "type": "ipv4", "public": public, }) if not 'address' in result: raise UnexpectedResponseError('Unexpected response allocating IP!', json=result) i = IPAddress(self._client, result['address'], result) return i
[docs] def rebuild(self, image, root_pass=None, authorized_keys=None, **kwargs): """ Rebuilding an Instance deletes all existing Disks and Configs and deploys a new :any:`Image` to it. This can be used to reset an existing Instance or to install an Image on an empty Instance. :param image: The Image to deploy to this Instance :type image: str or Image :param root_pass: The root password for the newly rebuilt Instance. If omitted, a password will be generated and returned. :type root_pass: str :param authorized_keys: The ssh public keys to install in the linode's /root/.ssh/authorized_keys file. Each entry may be a single key, or a path to a file containing the key. :type authorized_keys: list or str :returns: The newly generated password, if one was not provided (otherwise True) :rtype: str or bool """ ret_pass = None if not root_pass: ret_pass = Instance.generate_root_password() root_pass = ret_pass authorized_keys = load_and_validate_keys(authorized_keys) params = { 'image': image.id if issubclass(type(image), Base) else image, 'root_pass': root_pass, 'authorized_keys': authorized_keys, } params.update(kwargs) result = self._client.post('{}/rebuild'.format(Instance.api_endpoint), model=self, data=params) if not 'id' in result: raise UnexpectedResponseError('Unexpected response issuing rebuild!', json=result) # update ourself with the newly-returned information self._populate(result) if not ret_pass: return True else: return ret_pass
[docs] def rescue(self, *disks): if disks: disks = { x: { 'disk_id': y } for x,y in zip(('sda','sdb','sdc','sdd','sde','sdf','sdg'), disks) } else: disks=None result = self._client.post('{}/rescue'.format(Instance.api_endpoint), model=self, data={ "devices": disks }) return result
[docs] def kvmify(self): """ Converts this linode to KVM from Xen """ self._client.post('{}/kvmify'.format(Instance.api_endpoint), model=self) return True
[docs] def mutate(self): """ Upgrades this Instance to the latest generation type """ self._client.post('{}/mutate'.format(Instance.api_endpoint), model=self) return True
[docs] def initiate_migration(self): """ Initiates a pending migration that is already scheduled for this Linode Instance """ self._client.post('{}/migrate'.format(Instance.api_endpoint), model=self)
[docs] def clone(self, to_linode=None, region=None, service=None, configs=[], disks=[], label=None, group=None, with_backups=None): """ Clones this linode into a new linode or into a new linode in the given region """ if to_linode and region: raise ValueError('You may only specify one of "to_linode" and "region"') if region and not service: raise ValueError('Specifying a region requires a "service" as well') if not isinstance(configs, list) and not isinstance(configs, PaginatedList): configs = [configs] if not isinstance(disks, list) and not isinstance(disks, PaginatedList): disks = [disks] cids = [ c.id if issubclass(type(c), Base) else c for c in configs ] dids = [ d.id if issubclass(type(d), Base) else d for d in disks ] params = { "linode_id": to_linode.id if issubclass(type(to_linode), Base) else to_linode, "region": region.id if issubclass(type(region), Base) else region, "type": service.id if issubclass(type(service), Base) else service, "configs": cids if cids else None, "disks": dids if dids else None, "label": label, "group": group, "with_backups": with_backups, } result = self._client.post('{}/clone'.format(Instance.api_endpoint), model=self, data=params) if not 'id' in result: raise UnexpectedResponseError('Unexpected response cloning Instance!', json=result) l = Instance(self._client, result['id'], result) return l
@property def stats(self): """ Returns the JSON stats for this Instance """ # TODO - this would be nicer if we formatted the stats return self._client.get('{}/stats'.format(Instance.api_endpoint), model=self)
[docs] def stats_for(self, dt): """ Returns stats for the month containing the given datetime """ # TODO - this would be nicer if we formatted the stats if not isinstance(dt, datetime): raise TypeError('stats_for requires a datetime object!') return self._client.get('{}/stats/{}'.format(Instance.api_endpoint, dt.strftime('%Y/%m')), model=self)
[docs]class UserDefinedFieldType(Enum): text = 1 select_one = 2 select_many = 3
[docs]class UserDefinedField(): def __init__(self, name, label, example, field_type, choices=None): self.name = name self.label = label self.example = example self.field_type = field_type self.choices = choices def __repr__(self): return "{}({}): {}".format(self.label, self.field_type.name, self.example)
[docs]class StackScript(Base): api_endpoint = '/linode/stackscripts/{id}' properties = { "user_defined_fields": Property(), "label": Property(mutable=True, filterable=True), "rev_note": Property(mutable=True), "username": Property(filterable=True), "user_gravatar_id": Property(), "is_public": Property(mutable=True, filterable=True), "created": Property(is_datetime=True), "deployments_active": Property(), "script": Property(mutable=True), "images": Property(mutable=True, filterable=True), # TODO make slug_relationship "deployments_total": Property(), "description": Property(mutable=True, filterable=True), "updated": Property(is_datetime=True), } def _populate(self, json): """ Override the populate method to map user_defined_fields to fancy values """ Base._populate(self, json) mapped_udfs = [] for udf in self.user_defined_fields: t = UserDefinedFieldType.text choices = None if hasattr(udf, 'oneof'): t = UserDefinedFieldType.select_one choices = udf.oneof.split(',') elif hasattr(udf, 'manyof'): t = UserDefinedFieldType.select_many choices = udf.manyof.split(',') mapped_udfs.append(UserDefinedField(udf.name, udf.label if hasattr(udf, 'label') else None, udf.example if hasattr(udf, 'example') else None, t, choices=choices)) self._set('user_defined_fields', mapped_udfs) ndist = [ Image(self._client, d) for d in self.images ] self._set('images', ndist) def _serialize(self): dct = Base._serialize(self) dct['images'] = [ d.id for d in self.images ] return dct