1"""Implements authentication methods for the HTTP client."""
9from requests.auth
import HTTPBasicAuth
11from aas_http_client.classes.Configuration.config_classes
import OAuth
14_logger = logging.getLogger(__name__)
18 """Defines authentication methods.
20 :param Enum: Base class for enumerations
30 """Holds token data."""
32 def __init__(self, access_token: str, token_type: str, token_expiry: float):
33 """Initializes the TokenData with the given parameters."""
34 self.access_token: str = access_token
35 self.token_type: str = token_type
36 self.token_expiry: float = token_expiry
39def get_token(o_auth_configuration: OAuth, ssl_verify: bool) -> TokenData |
None:
40 """Get token based on the provided OAuth configuration.
42 :param auth_configuration: Authentication configuration
43 :param ssl_verify: Whether to verify SSL certificates, defaults to True
44 :return: Access token or None if an error occurred
46 if o_auth_configuration.grant_type ==
"password":
48 o_auth_configuration.token_url,
49 o_auth_configuration.client_id,
50 o_auth_configuration.get_client_secret(),
51 ssl_verify=ssl_verify,
54 elif o_auth_configuration.is_active()
and o_auth_configuration.grant_type ==
"client_credentials":
56 o_auth_configuration.token_url,
57 o_auth_configuration.client_id,
58 o_auth_configuration.get_client_secret(),
59 ssl_verify=ssl_verify,
63 _logger.error(f
"Failed to receive token from endpoint '{o_auth_configuration.token_url}'")
69def get_token_by_basic_auth(endpoint: str, username: str, password: str, timeout=200, ssl_verify=
True) -> TokenData |
None:
70 """Get token from a specific authentication service provider by basic authentication.
72 :param endpoint: Get token endpoint for the authentication service provider
73 :param username: Username for the authentication service provider
74 :param password: Password for the authentication service provider
75 :param timeout: Timeout for the API calls, defaults to 200
76 :param ssl_verify: Whether to verify SSL certificates, defaults to True
77 :return: Access token or None if an error occurred
79 data = {
"grant_type":
"client_credentials"}
81 auth = HTTPBasicAuth(username, password)
83 return __get_token_from_endpoint(endpoint, data, auth, timeout, ssl_verify)
86def get_token_by_password(endpoint: str, username: str, password: str, timeout=200, ssl_verify=
True) -> TokenData |
None:
87 """Get token from a specific authentication service provider by username and password.
89 :param endpoint: Get token endpoint for the authentication service provider
90 :param username: Username for the authentication service provider
91 :param password: Password for the authentication service provider
92 :param timeout: Timeout for the API calls, defaults to 200
93 :param ssl_verify: Whether to verify SSL certificates, defaults to True
94 :return: Access token or None if an error occurred
96 data = {
"grant_type":
"password",
"username": username,
"password": password}
98 return __get_token_from_endpoint(endpoint, data,
None, timeout, ssl_verify)
101def __get_token_from_endpoint(
102 endpoint: str, data: dict[str, str], auth: HTTPBasicAuth |
None =
None, timeout: int = 200, ssl_verify: bool =
True
103) -> TokenData |
None:
104 """Get token from a specific authentication service provider.
106 :param endpoint: Get token endpoint for the authentication service provider
107 :param data: Data for the authentication service provider
108 :param timeout: Timeout for the API calls, defaults to 200
109 :param ssl_verify: Whether to verify SSL certificates, defaults to True
110 :return: Access token or None if an error occurred
113 response = requests.post(endpoint, auth=auth, data=data, timeout=timeout, verify=ssl_verify)
114 _logger.debug(f
"Call REST API url '{response.url}'")
116 if response.status_code != 200:
117 log_response(response)
120 except requests.exceptions.RequestException
as e:
121 _logger.error(f
"Error call REST API: {e}")
124 content = response.content.decode(
"utf-8")
127 _logger.error(
"No content in token response")
130 data = json.loads(content)
133 _logger.error(
"No data in token response")
136 access_token: str = data.get(
"access_token",
"").strip()
137 expires_in: int = int(data.get(
"expires_in",
"0"))
138 if not access_token
or not expires_in:
139 _logger.error(
"Invalid token data in response")
142 token_type: str = data.get(
"token_type",
"").strip()
143 now: float = time.time()
144 token_expiry: float = now + int(expires_in) - 60
146 return TokenData(access_token=access_token, token_type=token_type, token_expiry=token_expiry)
Defines authentication methods.
__init__(self, str access_token, str token_type, float token_expiry)
Initializes the TokenData with the given parameters.
TokenData|None get_token_by_password(str endpoint, str username, str password, timeout=200, ssl_verify=True)
Get token from a specific authentication service provider by username and password.
TokenData|None get_token_by_basic_auth(str endpoint, str username, str password, timeout=200, ssl_verify=True)
Get token from a specific authentication service provider by basic authentication.