Source code for linode_api4.login_client

import re
from datetime import datetime, timedelta
from enum import Enum

import requests

from linode_api4.errors import ApiError

try:
    from urllib.parse import urlencode, urlparse, urlunparse
except ImportError:
    from urllib import urlencode

    from urlparse import urlparse, urlunparse


class AllWrapper:
    def __repr__(self):
        return "*"


[docs] class OAuthScopes: """ Represents the OAuth Scopes available to an application. In general, an application should request no more scopes than it requires. This class should be treated like a Enum, and used as follows:: required_scopes = [OAuthScopes.Linodes.all, OAuthScopes.Domains.read_only] Lists of OAuth Scopes are accepted when calling the :any:`generate_login_url` method of the :any:`LinodeLoginClient`. All contained enumerations of OAuth Scopes have two levels, "read_only" and "read_write". "read_only" access grants you the ability to get resources and of that type, but not to change, create, or delete them. "read_write" access allows to full access to resources of the requested type. In the above example, you are requesting access to view, modify, create, and delete Linodes, and to view Domains. """ #: If necessary, an application may request all scopes by using OAuthScopes.all all = AllWrapper()
[docs] class Linodes(Enum): """ Access to Linodes """ read_only = 0 read_write = 1 all = 2 def __repr__(self): if self.name == "all": return "linodes:*" return "linodes:{}".format(self.name)
[docs] class Domains(Enum): """ Access to Domains """ read_only = 0 read_write = 1 all = 2 def __repr__(self): if self.name == "all": return "domains:*" return "domains:{}".format(self.name)
[docs] class StackScripts(Enum): """ Access to private StackScripts """ read_only = 0 read_write = 1 all = 2 def __repr__(self): if self.name == "all": return "stackscripts:*" return "stackscripts:{}".format(self.name)
[docs] class Users(Enum): read_only = 0 read_write = 1 all = 2 def __repr__(self): if self.name == "all": return "users:*" return "users:{}".format(self.name)
[docs] class NodeBalancers(Enum): """ Access to NodeBalancers """ read_only = 0 read_write = 1 all = 2 def __repr__(self): if self.name == "all": return "nodebalancers:*" return "nodebalancers:{}".format(self.name)
[docs] class Tokens(Enum): read_only = 0 read_write = 1 all = 2 def __repr__(self): if self.name == "all": return "tokens:*" return "tokens:{}".format(self.name)
[docs] class IPs(Enum): """ Access to IPs and networking managements """ read_only = 0 read_write = 1 all = 2 def __repr__(self): if self.name == "all": return "ips:*" return "ips:{}".format(self.name)
[docs] class Firewalls(Enum): """ Access to Firewalls """ read_only = 0 read_write = 1 all = 2 def __repr__(self): if self.name == "all": return "firewall:*" return "firewall:{}".format(self.name)
[docs] class Tickets(Enum): """ Access to view, open, and respond to Support Tickets """ read_only = 0 read_write = 1 all = 2 def __repr__(self): if self.name == "all": return "tickets:*" return "tickets:{}".format(self.name)
[docs] class Clients(Enum): read_only = 0 read_write = 1 all = 2 def __repr__(self): if self.name == "all": return "clients:*" return "clients:{}".format(self.name)
[docs] class Account(Enum): """ Access to the user's account, including billing information, tokens management, user management, etc. """ read_only = 0 read_write = 1 all = 2 def __repr__(self): if self.name == "all": return "account:*" return "account:{}".format(self.name)
[docs] class Events(Enum): """ Access to a user's Events """ read_only = 0 read_write = 1 all = 2 def __repr__(self): if self.name == "all": return "events:*" return "events:{}".format(self.name)
[docs] class Volumes(Enum): """ Access to Block Storage Volumes """ read_only = 0 read_write = 1 all = 2 def __repr__(self): if self.name == "all": return "volumes:*" return "volumes:{}".format(self.name)
[docs] class LKE(Enum): """ Access to LKE Endpoint """ read_only = 0 read_write = 1 all = 2 def __repr__(self): if self.name == "all": return "lke:*" return "lke:{}".format(self.name)
[docs] class ObjectStorage(Enum): """ Access to Object Storage """ read_only = 0 read_write = 1 all = 2 def __repr__(self): if self.name == "all": return "object_storage:*" return "object_storage:{}".format(self.name)
[docs] class Longview(Enum): """ Access to Longview """ read_only = 0 read_write = 1 all = 2 def __repr__(self): if self.name == "all": return "longview:*" return "longview:{}".format(self.name)
_scope_families = { "linodes": Linodes, "domains": Domains, "stackscripts": StackScripts, "users": Users, "tokens": Tokens, "ips": IPs, "firewall": Firewalls, "tickets": Tickets, "clients": Clients, "account": Account, "events": Events, "volumes": Volumes, "lke": LKE, "object_storage": ObjectStorage, "nodebalancers": NodeBalancers, "longview": Longview, } @staticmethod def parse(scopes): ret = [] # special all-scope case if scopes == "*": return [ getattr(scope, "all") for scope in OAuthScopes._scope_families.values() ] for scope in re.split("[, ]", scopes): resource = access = None if ":" in scope: resource, access = scope.split(":") else: resource = scope access = "*" parsed_scope = OAuthScopes._get_parsed_scope(resource, access) if parsed_scope: ret.append(parsed_scope) return ret @staticmethod def _get_parsed_scope(resource, access): resource = resource.lower() access = access.lower() if resource in OAuthScopes._scope_families: if access == "*": access = "all" if hasattr(OAuthScopes._scope_families[resource], access): return getattr(OAuthScopes._scope_families[resource], access) return None @staticmethod def serialize(scopes): ret = "" if not type(scopes) is list: scopes = [scopes] for scope in scopes: ret += "{},".format(repr(scope)) if ret: ret = ret[:-1] return ret
[docs] class LinodeLoginClient:
[docs] def __init__( self, client_id, client_secret, base_url="https://login.linode.com", ca_path=None, ): """ Create a new LinodeLoginClient. These clients do not make any requests on creation, and can safely be created and thrown away as needed. For complete usage information, see the :doc:`OAuth guide<../guides/oauth>`. :param client_id: The OAuth Client ID for this client. :type client_id: str :param client_secret: The OAuth Client Secret for this client. :type client_secret: str :param base_url: The URL for Linode's OAuth server. This should not be changed. :type base_url: str :param ca_path: The path to the CA file to use for requests run by this client. :type ca_path: str """ self.base_url = base_url self.client_id = client_id self.client_secret = client_secret self.ca_path = ca_path
def _login_uri(self, path): return "{}{}".format(self.base_url, path)
[docs] def generate_login_url(self, scopes=None, redirect_uri=None): """ Generates a url to send users so that they may authenticate to this application. This url is suitable for redirecting a user to. For example, in `Flask`_, a login route might be implemented like this:: @app.route("/login") def begin_oauth_login(): login_client = LinodeLoginClient(client_id, client_secret) return redirect(login_client.generate_login_url()) .. _Flask:: http://flask.pocoo.org :param scopes: The OAuth scopes to request for this login. :type scopes: list :param redirect_uri: The requested redirect uri. The login service enforces that this is under the registered redirect path. :type redirect_uri: str :returns: The uri to send users to for this login attempt. :rtype: str """ url = self.base_url + "/oauth/authorize" split = list(urlparse(url)) params = { "client_id": self.client_id, "response_type": "code", # needed for all logins } if scopes: params["scopes"] = OAuthScopes.serialize(scopes) if redirect_uri: params["redirect_uri"] = redirect_uri split[4] = urlencode(params) return urlunparse(split)
[docs] def finish_oauth(self, code): """ Given an OAuth Exchange Code, completes the OAuth exchange with the authentication server. This should be called once the user has already been directed to the login_uri, and has been sent back after successfully authenticating. For example, in `Flask`_, this might be implemented as a route like this:: @app.route("/oauth-redirect") def oauth_redirect(): exchange_code = request.args.get("code") login_client = LinodeLoginClient(client_id, client_secret) token, scopes, expiry, refresh_token = login_client.finish_oauth(exchange_code) # store the user's OAuth token in their session for later use # and mark that they are logged in. return redirect("/") .. _Flask: http://flask.pocoo.org :param code: The OAuth Exchange Code returned from the authentication server in the query string. :type code: str :returns: The new OAuth token, and a list of scopes the token has, when the token expires, and a refresh token that can generate a new valid token when this one is expired. :rtype: tuple(str, list, datetime, str) :raise ApiError: If the OAuth exchange fails. """ r = requests.post( self._login_uri("/oauth/token"), data={ "code": code, "client_id": self.client_id, "client_secret": self.client_secret, }, verify=self.ca_path or True, ) if r.status_code != 200: raise ApiError( "OAuth token exchange failed", status=r.status_code, json=r.json(), ) token = r.json()["access_token"] scopes = OAuthScopes.parse(r.json()["scopes"]) expiry = datetime.now() + timedelta(seconds=r.json()["expires_in"]) refresh_token = r.json()["refresh_token"] return token, scopes, expiry, refresh_token
[docs] def refresh_oauth_token(self, refresh_token): """ Some tokens are generated with refresh tokens (namely tokens generated through an OAuth Exchange). These tokens may be renewed, or "refreshed", with the auth server, generating a new OAuth Token with a new (later) expiry. This method handles refreshing an OAuth Token using the refresh token that was generated at the time of its issuance, and returns a new OAuth token and refresh token for the same client and user. :param refresh_token: The refresh token returned for the OAuth Token we are renewing. :type refresh_token: str :returns: The new OAuth token, and a list of scopes the token has, when the token expires, and a refresh token that can generate a new valid token when this one is expired. :rtype: tuple(str, list) :raise ApiError: If the refresh fails.. """ r = requests.post( self._login_uri("/oauth/token"), data={ "grant_type": "refresh_token", "client_id": self.client_id, "client_secret": self.client_secret, "refresh_token": refresh_token, }, verify=self.ca_path or True, ) if r.status_code != 200: raise ApiError("Refresh failed", r) token = r.json()["access_token"] scopes = OAuthScopes.parse(r.json()["scopes"]) expiry = datetime.now() + timedelta(seconds=r.json()["expires_in"]) refresh_token = r.json()["refresh_token"] return token, scopes, expiry, refresh_token
[docs] def expire_token(self, token): """ Given a token, makes a request to the authentication server to expire both access token and refresh token. This is considered a responsible way to log out a user. If you remove only the session your application has for the user without expiring their token, the user is not _really_ logged out. :param token: The OAuth token you wish to expire :type token: str :returns: If the expiration attempt succeeded. :rtype: bool :raises ApiError: If the expiration attempt failed. """ r = requests.post( self._login_uri("/oauth/revoke"), data={ "token_type_hint": "access_token", "client_id": self.client_id, "client_secret": self.client_secret, "token": token, }, verify=self.ca_path or True, ) if r.status_code != 200: raise ApiError("Failed to expire token!", r) return True