AAS HTTP Client Documentation
Loading...
Searching...
No Matches
authentication.py
Go to the documentation of this file.
1"""Implements authentication methods for the HTTP client."""
2
3import json
4import logging
5import time
6from enum import Enum
7
8import requests
9from requests.auth import HTTPBasicAuth
10
11from aas_http_client.classes.Configuration.config_classes import OAuth
12from aas_http_client.utilities.http_helper import log_response
13
14_logger = logging.getLogger(__name__)
15
16
17class AuthMethod(Enum):
18 """Defines authentication methods.
19
20 :param Enum: Base class for enumerations
21 """
22
23 No = 1
24 basic_auth = 2
25 o_auth = 3
26 bearer = 4
30 """Holds token data."""
31
32 def __init__(self, access_token: str, token_type: str, token_expiry: float):
33 """Initializes the TokenData with the given parameters."""
34 self.access_token: str = access_token
35 self.token_type: str = token_type
36 self.token_expiry: float = token_expiry
37
38
39def get_token(o_auth_configuration: OAuth) -> TokenData | None:
40 """Get token based on the provided OAuth configuration.
41
42 :param auth_configuration: Authentication configuration
43 :return: Access token or None if an error occurred
44 """
45 if o_auth_configuration.grant_type == "password":
47 o_auth_configuration.token_url,
48 o_auth_configuration.client_id,
49 o_auth_configuration.get_client_secret(),
50 )
51
52 elif o_auth_configuration.is_active() and o_auth_configuration.grant_type == "client_credentials":
54 o_auth_configuration.token_url,
55 o_auth_configuration.client_id,
56 o_auth_configuration.get_client_secret(),
57 )
58
59 if not token:
60 _logger.error(f"Failed to receive token from endpoint '{o_auth_configuration.token_url}'")
61 return None
62
63 return token
64
65
66def get_token_by_basic_auth(endpoint: str, username: str, password: str, timeout=200) -> TokenData | None:
67 """Get token from a specific authentication service provider by basic authentication.
68
69 :param endpoint: Get token endpoint for the authentication service provider
70 :param username: Username for the authentication service provider
71 :param password: Password for the authentication service provider
72 :param timeout: Timeout for the API calls, defaults to 200
73 :return: Access token or None if an error occurred
74 """
75 data = {"grant_type": "client_credentials"}
76
77 auth = HTTPBasicAuth(username, password)
78
79 return _get_token_from_endpoint(endpoint, data, auth, timeout)
80
81
82def get_token_by_password(endpoint: str, username: str, password: str, timeout=200) -> TokenData | None:
83 """Get token from a specific authentication service provider by username and password.
84
85 :param endpoint: Get token endpoint for the authentication service provider
86 :param username: Username for the authentication service provider
87 :param password: Password for the authentication service provider
88 :param timeout: Timeout for the API calls, defaults to 200
89 :return: Access token or None if an error occurred
90 """
91 data = {"grant_type": "password", "username": username, "password": password}
92
93 return _get_token_from_endpoint(endpoint, data, None, timeout)
94
95
96def _get_token_from_endpoint(endpoint: str, data: dict[str, str], auth: HTTPBasicAuth | None = None, timeout: int = 200) -> TokenData | None:
97 """Get token from a specific authentication service provider.
98
99 :param endpoint: Get token endpoint for the authentication service provider
100 :param data: Data for the authentication service provider
101 :param timeout: Timeout for the API calls, defaults to 200
102 :return: Access token or None if an error occurred
103 """
104 try:
105 response = requests.post(endpoint, auth=auth, data=data, timeout=timeout)
106 _logger.debug(f"Call REST API url '{response.url}'")
107
108 if response.status_code != 200:
109 log_response(response)
110 return None
111
112 except requests.exceptions.RequestException as e:
113 _logger.error(f"Error call REST API: {e}")
114 return None
115
116 content = response.content.decode("utf-8")
117
118 if not content:
119 _logger.error("No content in token response")
120 return None
121
122 data = json.loads(content)
123
124 if not data:
125 _logger.error("No data in token response")
126 return None
127
128 access_token: str = data.get("access_token", "").strip()
129 expires_in: int = int(data.get("expires_in", "0"))
130 if not access_token or not expires_in:
131 _logger.error("Invalid token data in response")
132 return None
133
134 token_type: str = data.get("token_type", "").strip()
135 now: float = time.time()
136 token_expiry: float = now + int(expires_in) - 60 # Subtract 60 seconds as buffer
137
138 return TokenData(access_token=access_token, token_type=token_type, token_expiry=token_expiry)
Defines authentication methods.
__init__(self, str access_token, str token_type, float token_expiry)
Initializes the TokenData with the given parameters.
TokenData|None get_token_by_basic_auth(str endpoint, str username, str password, timeout=200)
Get token from a specific authentication service provider by basic authentication.
TokenData|None _get_token_from_endpoint(str endpoint, dict[str, str] data, HTTPBasicAuth|None auth=None, int timeout=200)
Get token from a specific authentication service provider.
TokenData|None get_token_by_password(str endpoint, str username, str password, timeout=200)
Get token from a specific authentication service provider by username and password.