|
| 1 | +import requests |
| 2 | +import logging |
| 3 | +import urllib.parse |
| 4 | + |
| 5 | + |
| 6 | +logging.basicConfig(level=logging.INFO, format='dss-plugin-microstrategy %(levelname)s - %(message)s') |
| 7 | +logger = logging.getLogger() |
| 8 | + |
| 9 | + |
| 10 | +class RestApiAuth(requests.auth.AuthBase): |
| 11 | + def __init__(self, credential): |
| 12 | + login_type = credential.get("login_type", "no_auth") |
| 13 | + self.api_key_destination = None |
| 14 | + self.auth_key = None |
| 15 | + self.auth_value = None |
| 16 | + if login_type == "bearer_token": |
| 17 | + token = credential.get("token", "") |
| 18 | + bearer_template = credential.get("bearer_template", "Bearer {{token}}") |
| 19 | + bearer_template = bearer_template.replace("{{token}}", token) |
| 20 | + self.auth_key = "Authorization" |
| 21 | + self.auth_value = bearer_template |
| 22 | + self.api_key_destination = "header" |
| 23 | + elif login_type == "api_key": |
| 24 | + self.auth_key = credential.get("api_key_name", "") |
| 25 | + self.auth_value = credential.get("api_key_value", "") |
| 26 | + self.api_key_destination = credential.get("api_key_destination", "header") |
| 27 | + else: |
| 28 | + return None |
| 29 | + |
| 30 | + def __call__(self, request): |
| 31 | + if self.api_key_destination == "header": |
| 32 | + request.headers[self.auth_key] = self.auth_value |
| 33 | + elif self.api_key_destination == "params": |
| 34 | + request.url = update_query_string(request.url, {self.auth_key:self.auth_value}) |
| 35 | + return request |
| 36 | + |
| 37 | + |
| 38 | +def get_auth(credential): |
| 39 | + login_type = credential.get("login_type", "no_auth") |
| 40 | + if login_type == "basic_login": |
| 41 | + username = credential.get("username", credential.get("user", "")) |
| 42 | + password = credential.get("password", "") |
| 43 | + return (username, password) |
| 44 | + if login_type == "ntlm": |
| 45 | + from requests_ntlm import HttpNtlmAuth |
| 46 | + username = credential.get("username", credential.get("user", "")) |
| 47 | + password = credential.get("password", "") |
| 48 | + return HttpNtlmAuth(username, password) |
| 49 | + if login_type in ["bearer_token", "api_key"]: |
| 50 | + return RestApiAuth(credential) |
| 51 | + |
| 52 | + |
| 53 | +def update_query_string(old_url, request_params_to_update): |
| 54 | + url_parts = urllib.parse.urlparse(old_url) |
| 55 | + request_params = dict(urllib.parse.parse_qsl(url_parts.query)) |
| 56 | + request_params.update(request_params_to_update) |
| 57 | + request_params=urllib.parse.urlencode(request_params) |
| 58 | + new_url = url_parts._replace(query=request_params).geturl() |
| 59 | + return new_url |
0 commit comments