Source code for ciscoisesdk.restsession

# -*- coding: utf-8 -*-
"""RestSession class for creating connections to the Identity Services Engine APIs.

Copyright (c) 2021 Cisco and/or its affiliates.

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""

from __future__ import absolute_import, division, print_function, unicode_literals

from future import standard_library

standard_library.install_aliases()

import errno
import logging
import os
import re
import socket
import time
import urllib.parse
import warnings
from builtins import *

import requests
from past.builtins import basestring
from requests.packages.urllib3.response import HTTPResponse
from requests_toolbelt.multipart import encoder

from .config import (
    DEFAULT_SINGLE_REQUEST_TIMEOUT,
    DEFAULT_VERIFY,
    DEFAULT_WAIT_ON_RATE_LIMIT,
)
from .exceptions import (
    ApiError,
    DownloadFailure,
    RateLimitError,
    RateLimitWarning,
    ciscoisesdkException,
)
from .misc import check_response_code
from .response_codes import EXPECTED_RESPONSE_CODE
from .restresponse import RestResponse
from .utils import (
    check_type,
    get_exception_additional_data,
    pprint_request_info,
    pprint_response_info,
    validate_base_url,
)

logger = logging.getLogger(__name__)


[docs]class DownloadResponse(HTTPResponse): """Download Response wrapper. Bases: urllib3.response.HTTPResponse. For more information check the `urlib3 documentation <https://urllib3.readthedocs.io/en/latest/reference/urllib3.response.html>`_ HTTP Response container. """ def __init__(self, response, path, filename, dirpath, collected_data): """ Creates a new DownloadResponse object. By calling urllib3.response.HTTPResponse __init__ method with the raw property of requests.Response, it recovers data from the API response It adds properties regarding the download: filename, dirpath and path Args: response(requests.Response): The Response object, which contains a server's response to an HTTP request. path(basestring): The downloaded file path. filename(basestring): The downloaded filename. dirpath(basestring): The download directory path. collected_data(bytes): HTTP response's data. """ super(DownloadResponse, self).__init__( body=response.raw, headers=response.headers, status=response.status_code, reason=response.reason, request_method=response.request.method, request_url=response.request.url, ) # adds additional and important information self._filename = filename self._dirpath = dirpath self._path = path self._collected_data = collected_data @property def data(self): """The HTTPResponse's data""" # Call HTTPResponse's data property original_data = super(DownloadResponse, self).data # It uses the one that has value prioritizing the HTTPResponse's data return original_data or self._collected_data @property def filename(self): """The downloaded filename""" return self._filename @property def dirpath(self): """The downloaded directory path""" return self._dirpath @property def path(self): """The download file path""" return self._path
# Main module interface class RestSession(object): """RESTful HTTP session class for making calls to the Identity Services Engine APIs.""" def __init__(self, get_access_token, access_token, base_url, single_request_timeout=DEFAULT_SINGLE_REQUEST_TIMEOUT, wait_on_rate_limit=DEFAULT_WAIT_ON_RATE_LIMIT, verify=DEFAULT_VERIFY, version=None, headers={'Content-type': 'application/json;charset=utf-8', 'Accept': 'application/json'}, debug=False, uses_csrf_token=None, get_csrf_token=None): """Initialize a new RestSession object. Args: get_access_token(callable): The Identity Services Engine method to get a new access token. access_token(basestring): The Identity Services Engine access token to be used for this session. base_url(basestring): The base URL that will be suffixed onto API endpoint relative URLs to produce a callable absolute URL. single_request_timeout(int): The timeout (seconds) for a single HTTP REST API request. wait_on_rate_limit(bool): Enable or disable automatic rate-limit handling. verify(bool,basestring): Controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use. version(basestring): Controls which version of IDENTITY_SERVICES_ENGINE to use. Defaults to ciscoisesdk.config.IDENTITY_SERVICES_ENGINE_VERSION headers(dict): Allows to add headers to RestSession requests. debug(bool,basestring): Controls whether to log information about Identity Services Engine APIs' request and response process. Defaults to the DEBUG environment variable or False if the environment variable is not set. uses_csrf_token(bool): Controls whether we send the CSRF token to ISE's ERS APIs. get_csrf_token(callable): The Identity Services Engine method to get a new CSRF token. Raises: TypeError: If the parameter types are incorrect. """ check_type(access_token, basestring, may_be_none=False) check_type(base_url, basestring, may_be_none=False) check_type(single_request_timeout, int) check_type(wait_on_rate_limit, bool, may_be_none=False) check_type(verify, (bool, basestring), may_be_none=False) check_type(version, basestring, may_be_none=False) check_type(debug, (bool), may_be_none=False) check_type(uses_csrf_token, (bool), may_be_none=False) super(RestSession, self).__init__() # Initialize attributes and properties self._base_url = str(validate_base_url(base_url)) self._get_access_token = get_access_token self._get_csrf_token = get_csrf_token self._csrf_token = None self._uses_csrf_token = uses_csrf_token self._access_token = str(access_token) self._single_request_timeout = single_request_timeout self._wait_on_rate_limit = wait_on_rate_limit self._verify = verify self._version = version self._debug = debug if self._debug: logger.setLevel(logging.DEBUG) logger.propagate = True else: logger.setLevel(logging.INFO) if verify is False: requests.packages.urllib3.disable_warnings() # Initialize a new `requests` session self._req_session = requests.session() # Update the headers of the `requests` session self.update_headers({'authorization': 'Basic ' + access_token}) if headers and isinstance(headers, dict): self.update_headers(headers) @property def version(self): """The API version of Identity Services Engine.""" return self._version @property def verify(self): """The verify (TLS Certificate) for the API endpoints.""" return self._verify @verify.setter def verify(self, value): """The verify (TLS Certificate) for the API endpoints.""" check_type(value, (bool, basestring), may_be_none=False) self._verify = value @property def base_url(self): """The base URL for the API endpoints.""" return self._base_url @base_url.setter def base_url(self, value): """The base URL for the API endpoints.""" check_type(value, basestring, may_be_none=False) self._base_url = str(validate_base_url(value)) @property def single_request_timeout(self): """The timeout (seconds) for a single HTTP REST API request.""" return self._single_request_timeout @single_request_timeout.setter def single_request_timeout(self, value): """The timeout (seconds) for a single HTTP REST API request.""" check_type(value, int) assert value is None or value > 0 self._single_request_timeout = value @property def wait_on_rate_limit(self): """Automatic rate-limit handling. This setting enables or disables automatic rate-limit handling. When enabled, rate-limited requests will be automatically be retried after waiting `Retry-After` seconds (provided by Identity Services Engine in the rate-limit response header). """ return self._wait_on_rate_limit @wait_on_rate_limit.setter def wait_on_rate_limit(self, value): """Enable or disable automatic rate-limit handling.""" check_type(value, bool, may_be_none=False) self._wait_on_rate_limit = value @property def headers(self): """The HTTP headers used for requests in this session.""" return self._req_session.headers.copy() @property def debug(self): """If log information about this REST session of the Identity Services Engine API's request and response process is shown.""" return self._debug @debug.setter def debug(self, value): """If log information about this REST session of the Identity Services Engine API's request and response process is shown.""" self._debug = value if self._debug: logger.setLevel(logging.DEBUG) logger.propagate = True else: logger.setLevel(logging.INFO) @property def uses_csrf_token(self): """If this RestSession requires the X-CSRF-Token to be sent.""" return self._uses_csrf_token @uses_csrf_token.setter def uses_csrf_token(self, value): """If this RestSession requires the X-CSRF-Token to be sent.""" check_type(value, (bool, basestring), may_be_none=False) self._uses_csrf_token = value if isinstance(self._uses_csrf_token, str): self._uses_csrf_token = 'true' in self._uses_csrf_token.lower() def update_headers(self, headers): """Update the HTTP headers used for requests in this session. Note: Updates provided by the dictionary passed as the `headers` parameter to this method are merged into the session headers by adding new key-value pairs and/or updating the values of existing keys. The session headers are not replaced by the provided dictionary. Args: headers(dict): Updates to the current session headers. """ check_type(headers, dict, may_be_none=False) self._req_session.headers.update(headers) def refresh_token(self): """Call the get_access_token method and update the session's auth header with the new token. """ self._access_token = self._get_access_token() self.update_headers({'authorization': 'Basic {}'.format(self._access_token)}) def set_csrf_token(self): """Call the get_csrf_token method and update the session's X-CSRF-Token header with the new token. """ if self._get_csrf_token is not None and callable(self._get_csrf_token): if self._csrf_token is None and self._uses_csrf_token: logger.debug('Refreshing CSRF token') self._csrf_token = self._get_csrf_token() logger.debug('Refreshed CSRF token.') self.update_headers({'X-CSRF-Token': self._csrf_token}) def reset_csrf_token(self): self._csrf_token = None def abs_url(self, url): """Given a relative or absolute URL; return an absolute URL. Args: url(basestring): A relative or absolute URL. Returns: str: An absolute URL. """ parsed_url = urllib.parse.urlparse(url) if not parsed_url.scheme and not parsed_url.netloc: # url is a relative URL; combine with base_url return urllib.parse.urljoin(str(self.base_url), str(url)) else: # url is already an absolute URL; return as is return url def get_filename(self, content): """Get the filename from the Content-Disposition's header Args: content(basestring): the Content-Disposition's header Returns: str: the filename from the Content-Disposition's header Raises: Exception: If was not able to find the header's filename value. """ content_file_list = re.findall('filename=(.*)', content) if len(content_file_list) > 0: content_file_name = content_file_list[0].replace('"', '') else: raise Exception("Could not find the header's filename value") return content_file_name def download(self, method, url, erc, custom_refresh, **kwargs): """It immediately downloads the response content. Args: method(basestring): The request-method type ('GET', 'POST', etc.). url(basestring): The URL of the API endpoint to be called. erc(int): The expected response code that should be returned by the Identity Services Engine API endpoint to indicate success. **kwargs: Passed on to the requests package. To download it to a file use the `save_file` kwarg equal to True. It defaults to False. If False is only 'downloaded' to a data property. To specify the downloaded file use the `filename` kwarg. It defaults to the value of the Content-Disposition header's filename. To specify the downloaded directory path use the `dirpath` kwarg. It defaults to the os.getcwd() result. Returns: DownloadResponse: The DownloadResponse wrapper. Wraps the urllib3.response.HTTPResponse. For more information check the `urlib3 documentation <https://urllib3.readthedocs.io/en/latest/reference/urllib3.response.html>`_ Raises: DownloadFailure: If was not able to download the raw response to a file. """ save_file = kwargs.pop('save_file', False) dirpath = kwargs.pop('dirpath', None) filename = kwargs.pop('filename', None) filepath = None collected_data = bytes() if not(dirpath) or not(os.path.isdir(dirpath)): dirpath = os.getcwd() with self.request(method, url, erc, 0, **kwargs) as resp: if resp.headers and resp.headers.get('Content-Disposition'): try: content = resp.headers.get('Content-Disposition') filename = filename or self.get_filename(content) filepath = os.path.join(dirpath, filename) except Exception as e: raise DownloadFailure(resp, e) if save_file and filepath: try: with open(filepath, 'wb') as f: logger.debug('Downloading {0}'.format(filepath)) for chunk in resp.iter_content(chunk_size=1024): if chunk: collected_data += chunk f.write(chunk) except Exception as e: raise DownloadFailure(resp, e) logger.debug('Downloaded {0}'.format(filepath)) final_response = DownloadResponse(resp, filepath, filename, dirpath, collected_data) return final_response def request(self, method, url, erc, custom_refresh, **kwargs): """Abstract base method for making requests to the Identity Services Engine APIs. This base method: * Expands the API endpoint URL to an absolute URL * Makes the actual HTTP request to the API endpoint * Provides support for Identity Services Engine rate-limiting * Inspects response codes and raises exceptions as appropriate * Updates the token if response code is 401 - Unauthorized and makes the request to the API endpoint again Args: method(basestring): The request-method type ('GET', 'POST', etc.). url(basestring): The URL of the API endpoint to be called. erc(int): The expected response code that should be returned by the Identity Services Engine API endpoint to indicate success. **kwargs: Passed on to the requests package. Returns: requests.Response: The Response object, which contains a server's response to an HTTP request. Raises: ApiError: If anything other than the expected response code is returned by the Identity Services Engine API endpoint. """ # Ensure the url is an absolute URL abs_url = self.abs_url(url) # Update request kwargs with session defaults kwargs.setdefault('timeout', self.single_request_timeout) kwargs.setdefault('verify', self.verify) # Fixes requests inconsistent behavior with additional parameters if not kwargs.get('json'): kwargs.pop('json', None) if not kwargs.get('data'): kwargs.pop('data', None) requires_csrf = method in ['POST', 'PUT', 'DELETE'] if requires_csrf: self.set_csrf_token() c = custom_refresh while True: c += 1 # Make the HTTP request to the API endpoint try: logger.debug('Attempt {}'.format(c)) logger.debug(pprint_request_info(abs_url, method, _headers=self.headers, **kwargs)) response = self._req_session.request(method, abs_url, **kwargs) except socket.error: # A socket error self.reset_csrf_token() try: c += 1 logger.debug('Attempt {}'.format(c)) response = self._req_session.request(method, abs_url, **kwargs) except Exception as e: raise ciscoisesdkException('Socket error {}'.format(e)) except IOError as e: self.reset_csrf_token() if e.errno == errno.EPIPE: # EPIPE error try: c += 1 logger.debug('Attempt {}'.format(c)) response = self._req_session.request(method, abs_url, **kwargs) except Exception as e: raise ciscoisesdkException('PipeError {}'.format(e)) else: raise ciscoisesdkException('IOError {}'.format(e)) try: # Check the response code for error conditions check_response_code(response, erc, additional_data=get_exception_additional_data(headers=self.headers, response=response)) except RateLimitError as e: self.reset_csrf_token() # Catch rate-limit errors # Wait and retry if automatic rate-limit handling is enabled if self.wait_on_rate_limit: warnings.warn(RateLimitWarning(response)) time.sleep(e.retry_after) continue else: # Re-raise the RateLimitError raise except ApiError as e: self.reset_csrf_token() if e.status_code == 401 and custom_refresh < 1: logger.debug(pprint_response_info(response)) logger.debug('Refreshing access token') self.refresh_token() logger.debug('Refreshed token.') return self.request(method, url, erc, 1, **kwargs) elif e.status_code == 403 and custom_refresh < 1: logger.debug(pprint_response_info(response)) return self.request(method, url, erc, 1, **kwargs) else: # Re-raise the ApiError logger.debug(pprint_response_info(response)) raise else: logger.debug(pprint_response_info(response)) return response def multipart_data(self, fields, create_callback): """Creates a multipart/form-data body. Args: fields(dict,list): form data values. create_callback(function): function that creates a function that monitors the progress of the upload. boundary: MultipartEncoder's boundary. Default value: None. encoding(string): MultipartEncoder's encoding. Default value: utf-8. """ if fields is not None: e = encoder.MultipartEncoder( fields=fields ) if create_callback is not None: callback = create_callback(e) m = encoder.MultipartEncoderMonitor(e, callback) return m else: return e else: return None def get(self, url, params=None, **kwargs): """Sends a GET request. Args: url(basestring): The URL of the API endpoint. params(dict): The parameters for the HTTP GET request. **kwargs: erc(int): The expected (success) response code for the request. others: Passed on to the requests package. Returns: DownloadResponse: If it has `stream` kwarg with a True value. Any: Result of the `json.loads` of the server's response to an HTTP request. Raises: ApiError: If anything other than the expected response code is returned by the Identity Services Engine API endpoint. """ check_type(url, basestring, may_be_none=False) check_type(params, dict) # Expected response code erc = kwargs.pop('erc', EXPECTED_RESPONSE_CODE['GET']) stream = kwargs.get('stream', None) if stream: return self.download('GET', url, erc, 0, params=params, **kwargs) else: response = self.request('GET', url, erc, 0, params=params, **kwargs) return RestResponse(response) def post(self, url, params=None, json=None, data=None, **kwargs): """Sends a POST request. Args: url(basestring): The URL of the API endpoint. json: Data to be sent in JSON format in tbe body of the request. data: Data to be sent in the body of the request. **kwargs: erc(int): The expected (success) response code for the request. others: Passed on to the requests package. Returns: DownloadResponse: If it has `stream` kwarg with a True value. Any: Result of the `json.loads` of the server's response to an HTTP request. Raises: ApiError: If anything other than the expected response code is returned by the Identity Services Engine API endpoint. """ check_type(url, basestring, may_be_none=False) check_type(params, dict) # Expected response code erc = kwargs.pop('erc', EXPECTED_RESPONSE_CODE['POST']) stream = kwargs.get('stream', None) if stream: return self.download('POST', url, erc, 0, params=params, json=json, data=data, **kwargs) else: response = self.request('POST', url, erc, 0, params=params, json=json, data=data, **kwargs) return RestResponse(response) def put(self, url, params=None, json=None, data=None, **kwargs): """Sends a PUT request. Args: url(basestring): The URL of the API endpoint. json: Data to be sent in JSON format in tbe body of the request. data: Data to be sent in the body of the request. **kwargs: erc(int): The expected (success) response code for the request. others: Passed on to the requests package. Returns: DownloadResponse: If it has `stream` kwarg with a True value. Any: Result of the `json.loads` of the server's response to an HTTP request. Raises: ApiError: If anything other than the expected response code is returned by the Identity Services Engine API endpoint. """ check_type(url, basestring, may_be_none=False) check_type(params, dict) # Expected response code erc = kwargs.pop('erc', EXPECTED_RESPONSE_CODE['PUT']) stream = kwargs.get('stream', None) if stream: return self.download('PUT', url, erc, 0, params=params, json=json, data=data, **kwargs) else: response = self.request('PUT', url, erc, 0, params=params, json=json, data=data, **kwargs) return RestResponse(response) def delete(self, url, params=None, **kwargs): """Sends a DELETE request. Args: url(basestring): The URL of the API endpoint. **kwargs: erc(int): The expected (success) response code for the request. others: Passed on to the requests package. Raises: ApiError: If anything other than the expected response code is returned by the Identity Services Engine API endpoint. """ check_type(url, basestring, may_be_none=False) check_type(params, dict) # Expected response code erc = kwargs.pop('erc', EXPECTED_RESPONSE_CODE['DELETE']) response = self.request('DELETE', url, erc, 0, params=params, **kwargs) return RestResponse(response)