import datetime
from typing import Any, Dict, List, Optional
import polling
from linode_api4.objects import Event
class EventError(Exception):
"""
Represents a failed Linode event.
"""
def __init__(self, event_id: int, message: Optional[str]):
# Edge case, sometimes the message is populated with an empty string
if len(message) < 1:
message = None
self.event_id = event_id
self.message = message
error_fmt = f"Event {event_id} failed"
if message is not None:
error_fmt += f": {message}"
super().__init__(error_fmt)
class TimeoutContext:
"""
TimeoutContext should be used by polling resources to track their provisioning time.
"""
def __init__(self, timeout_seconds=120):
self._start_time = datetime.datetime.now()
self._timeout_seconds = timeout_seconds
def start(self, start_time=datetime.datetime.now()):
"""
Sets the timeout start time to the current time.
:param start_time: The moment when the context started.
:type start_time: datetime
"""
self._start_time = start_time
def extend(self, seconds: int):
"""
Extends the timeout window.
:param seconds: The number of seconds to extend the timeout period by.
:type seconds: int
"""
self._timeout_seconds += seconds
@property
def expired(self):
"""
Whether the current timeout period has been exceeded.
:returns: Whether this context is expired.
:rtype: bool
"""
return self.seconds_remaining < 0
@property
def valid(self):
"""
Whether the current timeout period has not been exceeded.
:returns: Whether this context is valid.
:rtype: bool
"""
return not self.expired
@property
def seconds_remaining(self):
"""
The number of seconds until the timeout period has expired.
:returns: The number of seconds remaining in this context.
:rtype: int
"""
return self._timeout_seconds - self.seconds_since_started
@property
def seconds_since_started(self):
"""
The number of seconds since the timeout period started.
:returns: The number of seconds since the context started.
:rtype: int
"""
return (datetime.datetime.now() - self._start_time).seconds
[docs]
class EventPoller:
"""
EventPoller allows modules to dynamically poll for Linode events
"""
def __init__(
self,
client: "LinodeClient",
entity_type: str,
action: str,
entity_id: int = None,
):
self._client = client
self._entity_type = entity_type
self._entity_id = entity_id
self._action = action
# Initialize with an empty cache if no entity is specified
if self._entity_id is None:
self._previous_event_cache = {}
return
# We only want the first page of this response
result = client.get("/account/events", filters=self._build_filter())
self._previous_event_cache = {v["id"]: v for v in result["data"]}
def _build_filter(self) -> Dict[str, Any]:
"""Generates a filter dict to use in HTTP requests"""
return {
"+order": "asc",
"+order_by": "created",
"entity.id": self._entity_id,
"entity.type": self._entity_type,
"action": self._action,
}
[docs]
def set_entity_id(self, entity_id: int) -> None:
"""
Sets the ID of the entity to filter on.
This is useful for create operations where
the entity id might not be known in __init__.
:param entity_id: The ID of the entity to poll for.
:type entity_id: int
"""
self._entity_id = entity_id
def _attempt_merge_event_into_cache(self, event: Dict[str, Any]):
"""
Attempts to merge the given event into the event cache.
"""
if event["id"] in self._previous_event_cache:
return
self._previous_event_cache[event["id"]] = event
def _check_has_new_event(
self, events: List[Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
"""
If a new event is found in the given list, return it.
"""
for event in events:
# Ignore cached events
if event["id"] in self._previous_event_cache:
continue
return event
return None
[docs]
def wait_for_next_event(
self, timeout: int = 240, interval: int = 5
) -> Event:
"""
Waits for and returns the next event matching the
poller's configuration.
:param timeout: The timeout in seconds before this polling operation will fail.
:type timeout: int
:param interval: The time in seconds to wait between polls.
:type interval: int
:returns: The resulting event.
:rtype: Event
"""
result_event: Dict[str, Any] = {}
def poll_func():
new_event = self._check_has_new_event(
self._client.get(
"/account/events", filters=self._build_filter()
)["data"]
)
event_exists = new_event is not None
if event_exists:
nonlocal result_event
result_event = new_event
self._attempt_merge_event_into_cache(new_event)
return event_exists
if poll_func():
return Event(self._client, result_event["id"], json=result_event)
polling.poll(
poll_func,
step=interval,
timeout=timeout,
)
return Event(self._client, result_event["id"], json=result_event)
[docs]
def wait_for_next_event_finished(
self, timeout: int = 240, interval: int = 5
) -> Event:
"""
Waits for the next event to enter status `finished` or `notification`.
:param timeout: The timeout in seconds before this polling operation will fail.
:type timeout: int
:param interval: The time in seconds to wait between polls.
:type interval: int
:returns: The resulting event.
:rtype: Event
"""
timeout_ctx = TimeoutContext(timeout_seconds=timeout)
event = self.wait_for_next_event(timeout_ctx.seconds_remaining)
def poll_func():
event._api_get()
if event.status == "failed":
raise EventError(event.id, event.message)
return event.status in ["finished", "notification"]
if poll_func():
return event
polling.poll(
poll_func,
step=interval,
timeout=timeout_ctx.seconds_remaining,
)
return event