Source code for iland.api

# -*- coding: utf-8 -*-

"""A library that provides a Python interface to the iland cloud API."""

import json
import time

import requests

from .constant import BASE_URL, ACCESS_URL, REFRESH_URL
from .log import LOG
from .exception import ApiException, UnauthorizedException


[docs]class Api(object): """A Python interface into the iland cloud API """ _client_id = None _client_secret = None _username = None _password = None _base_url = BASE_URL _access_token_url_ = ACCESS_URL _refresh_token_url = REFRESH_URL _proxies = None _token = None _token_expiration_time = None _verify_ssl = True _session = None def __init__(self, client_id, client_secret, username, password): """Instantiate a new iland.Api object. :param client_id: the client identifier :param client_secret: the client secret :param username: the iland cloud username :param password: the iland cloud password :return: Api Object """ self._client_id = client_id self._client_secret = client_secret self._username = username self._password = password self._session = requests.Session() @property def _access_token_url(self): return self._access_token_url_ @_access_token_url.setter def _access_token_url(self, access_token_url): if access_token_url is not None: self._access_token_url_ = access_token_url self._refresh_token_url = access_token_url + '?refresh=1' def _get_access_token(self): if self._valid_token(): return self._token LOG.info("SSO Request %s" % self._access_token_url) params = {'client_id': self._client_id, 'client_secret': self._client_secret, 'username': self._username, 'password': self._password, 'grant_type': 'password'} r = self._session.post( self._access_token_url, data=params, verify=self._verify_ssl) json_payload = json.loads(r.content.decode('ascii')) if r.status_code not in [200, 201, 202]: raise UnauthorizedException(json_payload) self._token = json_payload self._set_token_expiration_time() return self._token def _set_token_expiration_time(self): time_buffer = 10 self._token_expiration_time = \ ((self._token['expires_in'] - time_buffer) * 1000) + \ int(round(time.time() * 1000)) def _refresh_token(self): if not self._valid_token(): if self._token is not None: LOG.info("SSO Request %s" % self._refresh_token_url) params = {'client_id': self._client_id, 'client_secret': self._client_secret, 'grant_type': 'refresh_token', 'refresh_token': self._token['refresh_token'] } r = self._session.post(self._refresh_token_url, data=params, verify=self._verify_ssl) json_payload = json.loads(r.content.decode('ascii')) if r.status_code not in [200, 201, 202]: raise UnauthorizedException(json_payload) self._token = json_payload self._set_token_expiration_time() else: self._get_access_token() return self._token def _valid_token(self): if self._token is not None: return int(round(time.time() * 1000)) < self._token_expiration_time return False def _get_access_token_string(self): token_string = self._get_access_token()['access_token'] return token_string def _do_request(self, rpath, verb='GET', form_data=None, headers=None): self._refresh_token() data = None if form_data is not None: data = json.dumps(form_data, ensure_ascii=False).encode("UTF8") url = self._base_url + rpath LOG.info("Request %s rpath %s" % (verb, url)) default_headers = { 'Authorization': 'Bearer %s' % self._get_access_token_string(), 'Accept': 'application/vnd.ilandcloud.api.v1.0+json' } if verb in ('PUT', 'POST'): default_headers[ 'Content-Type'] = 'application/json' merged_headers = default_headers.copy() if headers and isinstance(headers, dict): for header, value in headers.items(): # don't allow overriding of our default headers if header in default_headers: LOG.warning("Header '%s' can't be overridden" % header) else: merged_headers[header] = value request_params = { 'headers': merged_headers, 'verify': self._verify_ssl } if verb in ('PUT', 'POST'): request_params['data'] = data if self._proxies and isinstance(self._proxies, dict): request_params['proxies'] = self._proxies if verb == 'GET': r = self._session.get(url, **request_params) elif verb == 'PUT': r = self._session.put(url, **request_params) elif verb == 'POST': r = self._session.post(url, **request_params) elif verb == 'DELETE': r = self._session.delete(url, **request_params) else: raise ApiException({'message': 'Unsupported HTTP verb %s' % verb}) try: json_obj = r.json() except ValueError: raise ApiException(r.content) if r.status_code not in [200, 201, 202, 204]: raise ApiException(json_obj) return json_obj
[docs] def get(self, rpath, headers=None): """ Perform a GET request against the iland cloud API given its resource path. `iland.Api` will refresh the access token if non valid. :param rpath: the resource path as a Python builtin String object :param headers: an optional dictionary of http headers to send with \ the request :raises: ApiException: API requests returns an error :raises: UnauthorizedException: credentials / grants invalids :return: a JSON Object or a list of JSON Objects. """ return self._do_request(rpath, headers=headers)
[docs] def put(self, rpath, form_data=None, headers=None): """ Perform a PUT request against the iland cloud API given its resource path. `iland.Api` will refresh the access token if non valid. :param rpath: the resource path as a Python builtin String object :param form_data: a Python builtin dict object :param headers: an optional dictionary of http headers to send with \ the request :raises: ApiException: API requests returns an error :raises: UnauthorizedException: credentials / grants invalids :return: a JSON Object or a list of JSON Objects. """ return self._do_request(rpath, verb='PUT', form_data=form_data, headers=headers)
[docs] def post(self, rpath, form_data=None, headers=None): """ Perform a POST request against the iland cloud API given its resource path. `iland.Api` will refresh the access token if non valid. :param rpath: the resource path as a Python builtin String object :param form_data: a Python builtin dict object :param headers: an optional dictionary of http headers to send with \ the request :raises: ApiException: API requests returns an error :raises: UnauthorizedException: credentials / grants invalids :return: a JSON Object or a list of JSON Objects. """ return self._do_request(rpath, verb='POST', form_data=form_data, headers=headers)
[docs] def delete(self, rpath, headers=None): """ Perform a DELETE request against the iland cloud API given its resource path. `iland.Api` will refresh the access token if non valid. :param rpath: the resource path as a Python builtin String object :param headers: an optional dictionary of http headers to send with \ the request :raises: ApiException: API requests returns an error :raises: UnauthorizedException: credentials / grants invalids :return: a JSON Object or a list of JSON Objects. """ return self._do_request(rpath, verb='DELETE', headers=headers)
[docs] def get_access_token(self): """ Returns the access token in use for this session. This method is exposed in case you are interested in managing the token life cycle yourself. `iland.Api` will refresh the token on your behalf while performing queries. :raises: UnauthorizedException: credentials / grants invalids :return: JSON Object containing the actual access token """ return self._get_access_token()
[docs] def refresh_access_token(self): """ Refresh token if token is not valid: None or expired. This method is exposed in case you are interested in managing the token life cycle yourself. `iland.Api` will refresh the token on your behalf while performing queries. :raises: UnauthorizedException: credentials / grants invalids :return: JSON Object containing the actual access token """ return self._refresh_token()
[docs] def login(self): """ Requests an access token. This method is exposed in case you are interested in managing the token life cycle yourself. `iland.Api` will refresh the token on your behalf while performing queries. :raises: UnauthorizedException: credentials / grants invalids :return: JSON Object containing the actual access token """ return self._get_access_token()