AAS HTTP Client Documentation
Loading...
Searching...
No Matches
aas_client.py
Go to the documentation of this file.
1"""Client for HTTP API communication with AAS server."""
2
3import json
4import logging
5import time
6from pathlib import Path
7from typing import Any
8
9import requests
10from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, ValidationError
11from requests import Session
12from requests.auth import HTTPBasicAuth
13
14from aas_http_client.classes.client.implementations import (
15 AuthMethod,
16 ExperimentalImplementation,
17 ShellRegistryImplementation,
18 ShellRepoImplementation,
19 SubmodelRegistryImplementation,
20 SubmodelRepoImplementation,
21 TokenData,
22 get_token,
23)
24from aas_http_client.classes.Configuration.config_classes import AuthenticationConfig
26 STATUS_CODE_200,
27 STATUS_CODE_201,
28 STATUS_CODE_202,
29 STATUS_CODE_204,
30)
31
32_logger = logging.getLogger(__name__)
33
34
35class AASConnectionError(ConnectionError):
36 """Exception raised for errors in the AAS connection.
37
38 :param message: Error message
39 :param errors: Dictionary of error codes and their corresponding messages
40 """
41
42 def __init__(self, message: str, errors: dict[int, str]):
43 """
44 Initialize the AASConnectionError.
45
46 :param message: Error message
47 :param errors: Dictionary of error codes and their corresponding messages
48 """
49 super().__init__(message)
50 self.errors = errors
51 self.status_code = next(iter(errors), None) # Get the first status code if available
52
54class AasHttpClient(BaseModel):
55 """Represents a AasHttpClient to communicate with a REST API."""
56
57 model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True)
59 base_url: str = Field(..., alias="BaseUrl", description="Base URL of the AAS server.")
60 auth_settings: AuthenticationConfig = Field(
61 default_factory=AuthenticationConfig, alias="AuthenticationSettings", description="Authentication settings for the AAS server."
62 )
63 https_proxy: str | None = Field(default=None, alias="HttpsProxy", description="HTTPS proxy URL.")
64 http_proxy: str | None = Field(default=None, alias="HttpProxy", description="HTTP proxy URL.")
65 time_out: int = Field(default=200, alias="TimeOut", description="Timeout for HTTP requests.")
66 connection_time_out: int = Field(default=100, alias="ConnectionTimeOut", description="Connection timeout for HTTP requests.")
67 ssl_verify: bool = Field(default=True, alias="SslVerify", description="Enable SSL verification.")
68 trust_env: bool = Field(default=True, alias="TrustEnv", description="Trust environment variables.")
69 _session: Session | None = PrivateAttr(default=None)
70 _auth_method: AuthMethod = PrivateAttr(default=AuthMethod.basic_auth)
71 encoded_ids: bool = Field(default=True, alias="EncodedIds", description="If enabled, all IDs used in API requests have to be base64-encoded.")
72 shells: ShellRepoImplementation | None = Field(default=None)
73 submodels: SubmodelRepoImplementation | None = Field(default=None)
74 shell_registry: ShellRegistryImplementation | None = Field(default=None)
75 experimental: ExperimentalImplementation | None = Field(default=None)
76 submodel_registry: SubmodelRegistryImplementation | None = Field(default=None)
77 _cached_token: TokenData | None = PrivateAttr(default=None)
79 def initialize(self):
80 """Initialize the AasHttpClient with the given URL, username and password."""
81 if self.base_urlbase_url.endswith("/"):
82 self.base_urlbase_url = self.base_urlbase_url[:-1]
84 self._session_session = requests.Session()
88 self._session_session.verify = self.ssl_verify
89 self._session_session.trust_env = self.trust_env
90
91 if self.https_proxy:
92 self._session_session.proxies.update({"https": self.https_proxy})
93 if self.http_proxy:
94 self._session_session.proxies.update({"http": self.http_proxy})
95
96 self._session_session.headers.update(
97 {
98 "Accept": "*/*",
99 "User-Agent": "python-requests/2.32.5",
100 "Connection": "close",
101 }
102 )
103
104 self.shellsshells = ShellRepoImplementation(self)
105 self.submodelssubmodels = SubmodelRepoImplementation(self)
106 self.shell_registryshell_registry = ShellRegistryImplementation(self)
107 self.submodel_registrysubmodel_registry = SubmodelRegistryImplementation(self)
108 self.experimentalexperimental = ExperimentalImplementation(self)
110 def get_auth_method(self) -> AuthMethod:
111 """Get the authentication method used by the client.
112
113 :return: The authentication method used by the client
114 """
115 return self._auth_method_auth_method
116
117 def get_session(self) -> Session | None:
118 """Get the HTTP session used by the client.
119
120 :return: The requests.Session object used for HTTP communication
121 """
122 return self._session_session
123
125 """Handles the authentication method based on the provided settings."""
126 if self.auth_settings.o_auth.is_active():
127 self._auth_method_auth_method = AuthMethod.o_auth
128 _logger.debug(
129 f"Authentication method: OAuth | '{self.auth_settings.o_auth.client_id}' | '{self.auth_settings.o_auth.token_url}' | '{self.auth_settings.o_auth.grant_type}'"
131
132 elif self.auth_settings.basic_auth.is_active():
133 self._auth_method_auth_method = AuthMethod.basic_auth
134 _logger.debug(f"Authentication method: Basic Auth | '{self.auth_settings.basic_auth.username}'")
135 self._session_session.auth = HTTPBasicAuth(self.auth_settings.basic_auth.username, self.auth_settings.basic_auth.get_password())
136
137 elif self.auth_settings.bearer_auth.is_active():
138 self._auth_method_auth_method = AuthMethod.bearer
139 _logger.debug("Authentication method: Bearer Token")
140 self._session_session.headers.update({"Authorization": f"Bearer {self.auth_settings.bearer_auth.get_token()}"})
141
142 else:
143 self._auth_method_auth_method = AuthMethod.No
144 _logger.debug("Authentication method: No Authentication")
145
146 def get_root(self) -> dict | None:
147 """Get the root endpoint of the AAS server API to test connectivity.
148
149 This method calls the '/shells' endpoint to verify that the AAS server is accessible
150 and responding. It automatically handles authentication token setup if service
151 provider authentication is configured.
152
153 :return: Response data as a dictionary containing shell information, or None if an error occurred
154 """
155 if not self._session_session:
156 _logger.error("HTTP session is not initialized. Call 'initialize()' method before making API calls.")
157 return None
158
159 urls: list[str] = []
160 urls.append(f"{self.base_url}/shells")
161 urls.append(f"{self.base_url}/submodels")
162 urls.append(f"{self.base_url}/shell-descriptors")
163 urls.append(f"{self.base_url}/submodel-descriptors")
164
165 self.set_token()
166
167 error_messages: dict[int, str] = {}
168
169 for url in urls:
170 _logger.debug(f"Testing connectivity with URL: {url}")
171 try:
172 response = self._session_session.get(url, timeout=10)
173 _logger.debug(f"Call REST API url '{response.url}'")
174
175 if response.status_code == STATUS_CODE_200:
176 content = response.content.decode("utf-8")
177 return json.loads(content)
178
179 if response.status_code not in (STATUS_CODE_200, STATUS_CODE_201, STATUS_CODE_204):
180 error_messages.update({response.status_code: response.reason})
181
182 except requests.exceptions.RequestException as e:
183 _logger.error(f"Error call REST API: {e}")
184
185 raise AASConnectionError("Failed to connect to AAS server API", error_messages)
186
187 def set_token(self) -> str | None:
188 """Set authentication token in session headers based on configured authentication method.
189
190 :return: The access token if set, otherwise None
191 """
192 if not self._session_session:
193 _logger.error("HTTP session is not initialized. Call 'initialize()' method before making API calls.")
194 return None
195
196 if self._auth_method_auth_method != AuthMethod.o_auth:
197 return None
198
199 now = time.time()
200 # Check if cached token exists and is not expired
201 if self._cached_token_cached_token and self._cached_token_cached_token.token_expiry > now:
202 return self._cached_token_cached_token.access_token
203
204 # Obtain new token
205 token_data = get_token(self.auth_settings.o_auth, self.ssl_verify)
206
207 if token_data and token_data.access_token:
208 # Cache the token data
209 self._cached_token_cached_token = token_data
210 # Update session headers with the new token
211 self._session_session.headers.update({"Authorization": f"Bearer {self._cached_token.access_token}"})
212 return self._cached_token_cached_token.access_token
213
214 return None
215
216 def get_endpoint(self, end_point_url: str) -> None | dict:
217 """Generic GET request for endpoint.
218
219 :param end_point_url: The endpoint URL to send the GET request to.
220 :return: The base URL of the AAS server.
221 """
222 if not self._session_session:
223 _logger.error("HTTP session is not initialized. Call 'initialize()' method before making API calls.")
224 return None
225
226 try:
227 response = self._session_session.get(end_point_url, timeout=self.time_out)
228 _logger.debug(f"Call REST API url '{response.url}'")
229
230 if response.status_code == STATUS_CODE_200:
231 content = response.content.decode("utf-8")
232 return json.loads(content)
233
234 except requests.exceptions.RequestException as e:
235 _logger.debug(f"Error call REST API: {e}")
236
237 return None
238
239 def put_endpoint(self, end_point_url: str, request_body: dict) -> None | dict:
240 """Generic PUT request for endpoint.
241
242 :param end_point_url: The endpoint URL to send the PUT request to.
243 :param request_body: The request body to send with the PUT request.
244 :return: The base URL of the AAS server.
245 """
246 if not self._session_session:
247 _logger.error("HTTP session is not initialized. Call 'initialize()' method before making API calls.")
248 return None
249
250 try:
251 response = self._session_session.put(end_point_url, json=request_body, timeout=self.time_out)
252 _logger.debug(f"Call REST API url '{response.url}'")
253
254 if response.status_code not in (STATUS_CODE_200, STATUS_CODE_201, STATUS_CODE_204):
255 content = response.content.decode("utf-8")
256 return json.loads(content)
257
258 except requests.exceptions.RequestException as e:
259 _logger.debug(f"Error call REST API: {e}")
260
261 return None
262
263 def post_endpoint(self, end_point_url: str, request_body: dict) -> None | dict:
264 """Generic POST request for endpoint.
265
266 :param end_point_url: The endpoint URL to send the POST request to.
267 :param request_body: The request body to send with the POST request.
268 :return: The base URL of the AAS server.
269 """
270 if not self._session_session:
271 _logger.error("HTTP session is not initialized. Call 'initialize()' method before making API calls.")
272 return None
273
274 try:
275 response = self._session_session.post(end_point_url, json=request_body, timeout=self.time_out)
276 _logger.debug(f"Call REST API url '{response.url}'")
277
278 if response.status_code not in (STATUS_CODE_201, STATUS_CODE_200, STATUS_CODE_202):
279 content = response.content.decode("utf-8")
280 return json.loads(content)
281
282 except requests.exceptions.RequestException as e:
283 _logger.debug(f"Error call REST API: {e}")
284
285 return None
286
287 def patch_endpoint(self, end_point_url: str, request_body: dict) -> None | dict:
288 """Generic PATCH request for endpoint.
289
290 :param end_point_url: The endpoint URL to send the PATCH request to.
291 :param request_body: The request body to send with the PATCH request.
292 :return: The base URL of the AAS server.
293 """
294 if not self._session_session:
295 _logger.error("HTTP session is not initialized. Call 'initialize()' method before making API calls.")
296 return None
297
298 try:
299 response = self._session_session.patch(end_point_url, json=request_body, timeout=self.time_out)
300 _logger.debug(f"Call REST API url '{response.url}'")
301
302 if response.status_code not in (STATUS_CODE_200, STATUS_CODE_204):
303 content = response.content.decode("utf-8")
304 return json.loads(content)
305
306 except requests.exceptions.RequestException as e:
307 _logger.debug(f"Error call REST API: {e}")
308
309 return None
310
311 def delete_endpoint(self, end_point_url: str) -> None | dict:
312 """Generic DELETE request for endpoint.
313
314 :param end_point_url: The endpoint URL to send the DELETE request to.
315 :return: The base URL of the AAS server.
316 """
317 if not self._session_session:
318 _logger.error("HTTP session is not initialized. Call 'initialize()' method before making API calls.")
319 return None
320
321 try:
322 response = self._session_session.delete(end_point_url, timeout=self.time_out)
323 _logger.debug(f"Call REST API url '{response.url}'")
324
325 if response.status_code not in (STATUS_CODE_200, STATUS_CODE_204, STATUS_CODE_202):
326 content = response.content.decode("utf-8")
327 return json.loads(content)
328
329 except requests.exceptions.RequestException as e:
330 _logger.debug(f"Error call REST API: {e}")
331
332 return None
333
334
335def create_by_url( # noqa: PLR0913
336 base_url: str,
337 basic_auth_username: str = "",
338 basic_auth_password: str = "",
339 o_auth_client_id: str = "",
340 o_auth_client_secret: str = "",
341 o_auth_token_url: str = "",
342 bearer_auth_token: str = "",
343 http_proxy: str = "",
344 https_proxy: str = "",
345 time_out: int = 200,
346 connection_time_out: int = 60,
347 ssl_verify: bool = True, # noqa: FBT001, FBT002
348 trust_env: bool = True, # noqa: FBT001, FBT002
349 encoded_ids: bool = True, # noqa: FBT001, FBT002
350) -> AasHttpClient | None:
351 """Create a HTTP client for a AAS server connection from the given parameters.
352
353 :param base_url: Base URL of the AAS server, e.g. "http://basyx_python_server:80/"
354 :param basic_auth_username: Username for the AAS server basic authentication, defaults to ""
355 :param basic_auth_password: Password for the AAS server basic authentication, defaults to ""
356 :param o_auth_client_id: Client ID for OAuth authentication, defaults to ""
357 :param o_auth_client_secret: Client secret for OAuth authentication, defaults to ""
358 :param o_auth_token_url: Token URL for OAuth authentication, defaults to ""
359 :param bearer_auth_token: Bearer token for authentication, defaults to ""
360 :param http_proxy: HTTP proxy URL, defaults to ""
361 :param https_proxy: HTTPS proxy URL, defaults to ""
362 :param time_out: Timeout for the API calls, defaults to 200
363 :param connection_time_out: Timeout for the connection to the API, defaults to 60
364 :param ssl_verify: Whether to verify SSL certificates, defaults to True
365 :param trust_env: Whether to trust environment variables for proxy settings, defaults to True
366 :param encoded_ids: If enabled, all IDs used in API requests have to be base64-encoded
367 :return: An instance of AasHttpClient initialized with the provided parameters or None if connection fails
368 """
369 _logger.info(f"Create AAS server http client from URL '{base_url}'.")
370 config_dict: dict[str, Any] = {}
371 config_dict["BaseUrl"] = base_url
372 config_dict["HttpProxy"] = http_proxy
373 config_dict["HttpsProxy"] = https_proxy
374 config_dict["TimeOut"] = str(time_out)
375 config_dict["ConnectionTimeOut"] = str(connection_time_out)
376 config_dict["SslVerify"] = str(ssl_verify)
377 config_dict["TrustEnv"] = str(trust_env)
378 config_dict["EncodedIds"] = str(encoded_ids)
379
380 config_dict["AuthenticationSettings"] = {
381 "BasicAuth": {"Username": basic_auth_username},
382 "OAuth": {
383 "ClientId": o_auth_client_id,
384 "TokenUrl": o_auth_token_url,
385 },
386 }
387
388 return create_by_dict(config_dict, basic_auth_password, o_auth_client_secret, bearer_auth_token)
389
390
391def create_by_dict(
392 configuration: dict, basic_auth_password: str = "", o_auth_client_secret: str = "", bearer_auth_token: str = ""
393) -> AasHttpClient | None:
394 """Create a HTTP client for a AAS server connection from the given configuration.
395
396 :param configuration: Dictionary containing the AAS server connection settings
397 :param basic_auth_password: Password for the AAS server basic authentication, defaults to ""
398 :param o_auth_client_secret: Client secret for OAuth authentication, defaults to ""
399 :param bearer_auth_token: Bearer token for authentication, defaults to ""
400 :return: An instance of AasHttpClient initialized with the provided parameters or None if validation fails
401 """
402 _logger.info("Create AAS server http client from dictionary.")
403
404 return _create_client(configuration, basic_auth_password, o_auth_client_secret, bearer_auth_token)
405
406
407def create_by_config(
408 config_file: Path, basic_auth_password: str = "", o_auth_client_secret: str = "", bearer_auth_token: str = ""
409) -> AasHttpClient | None:
410 """Create a HTTP client for a AAS server connection from a given configuration file.
411
412 :param config_file: Path to the configuration file containing the AAS server connection settings
413 :param basic_auth_password: Password for the AAS server basic authentication, defaults to ""
414 :param o_auth_client_secret: Client secret for OAuth authentication, defaults to ""
415 :param bearer_auth_token: Bearer token for authentication, defaults to ""
416 :return: An instance of AasHttpClient initialized with the provided parameters or None if validation fails
417 """
418 config_file = config_file.resolve()
419 _logger.info(f"Create AAS server http client from configuration file '{config_file}'.")
420 if not config_file.exists():
421 configuration = {}
422 _logger.warning(f"Configuration file '{config_file}' not found. Using default configuration.")
423 else:
424 config_string = config_file.read_text(encoding="utf-8")
425 try:
426 configuration = json.loads(config_string)
427 except json.JSONDecodeError as e:
428 _logger.error(f"Configuration file '{config_file}' is not a valid JSON file: {e}")
429 return None
430 _logger.debug(f"Configuration file '{config_file}' found.")
431
432 return _create_client(configuration, basic_auth_password, o_auth_client_secret, bearer_auth_token)
433
434
435def _create_client(config_dict: dict, basic_auth_password: str, o_auth_client_secret: str, bearer_auth_token: str) -> AasHttpClient | None:
436 """Create and initialize an AAS HTTP client from configuration dictionary.
437
438 This internal method validates the configuration, sets authentication credentials,
439 initializes the client, and tests the connection to the AAS server.
440
441 :param config_dict: Dictionary containing AAS server settings
442 :param basic_auth_password: Password for basic authentication, defaults to ""
443 :param o_auth_client_secret: Client secret for OAuth authentication, defaults to ""
444 :param bearer_auth_token: Bearer token for authentication, defaults to ""
445 :return: An initialized and connected AasHttpClient instance or None if connection fails
446 :raises ValidationError: If the configuration dictionary is invalid
447 :raises TimeoutError: If connection to the server times out
448 """
449 try:
450 client = AasHttpClient.model_validate(config_dict)
451 except ValidationError as ve:
452 raise ValidationError(f"Invalid BaSyx server configuration file: {ve}") from ve
453
454 client.auth_settings.basic_auth.set_password(basic_auth_password)
455 client.auth_settings.o_auth.set_client_secret(o_auth_client_secret)
456 client.auth_settings.bearer_auth.set_token(bearer_auth_token)
457
458 _logger.debug("Using server configuration:")
459 _logger.debug(f"BaseUrl: '{client.base_url}'")
460 _logger.debug(f"TimeOut: '{client.time_out}'")
461 _logger.debug(f"HttpsProxy: '{client.https_proxy}'")
462 _logger.debug(f"HttpProxy: '{client.http_proxy}'")
463 _logger.debug(f"ConnectionTimeOut: '{client.connection_time_out}'.")
464 _logger.debug(f"SSLVerify: '{client.ssl_verify}'.")
465 _logger.debug(f"TrustEnv: '{client.trust_env}'.")
466 _logger.debug(f"EncodedIds: '{client.encoded_ids}'.")
467
468 client.initialize()
469
470 # test the connection to the REST API
471 connected = __connect_to_api(client)
472
473 if not connected:
474 return None
475
476 return client
477
478
479def __connect_to_api(client: AasHttpClient) -> bool:
480 """Test the connection to the AAS server API with retry logic.
481
482 This internal method attempts to establish a connection to the AAS server by calling
483 the get_root() method. It retries the connection for the duration specified in the
484 client's connection_time_out setting, sleeping 1 second between attempts.
485
486 :param client: The AasHttpClient instance to test the connection for
487 :return: True if connection is successful, False otherwise
488 :raises TimeoutError: If connection attempts fail for the entire timeout duration
489 """
490 start_time = time.time()
491 _logger.info(f"Try to connect to REST API '{client.base_url}' for {client.connection_time_out} seconds.")
492 counter: int = 0
493 while True:
494 try:
495 root = client.get_root()
496 if root:
497 _logger.info(f"Connected to server API at '{client.base_url}' successfully.")
498 return True
499
500 _logger.error(f"Connection attempt to '{client.base_url}' failed.")
501
502 except requests.exceptions.ConnectionError:
503 pass
504 if time.time() - start_time > client.connection_time_out:
505 raise TimeoutError(f"Connection to server API timed out after {client.connection_time_out} seconds.")
506
507 counter += 1
508 _logger.warning(f"Retrying connection (attempt: {counter}).")
509 time.sleep(1)
Exception raised for errors in the AAS connection.
Definition aas_client.py:43
__init__(self, str message, dict[int, str] errors)
Initialize the AASConnectionError.
Definition aas_client.py:51
Represents a AasHttpClient to communicate with a REST API.
Definition aas_client.py:58
Session|None get_session(self)
Get the HTTP session used by the client.
_handle_auth_method(self)
Handles the authentication method based on the provided settings.
SubmodelRepoImplementation submodels
Definition aas_client.py:76
None|dict patch_endpoint(self, str end_point_url, dict request_body)
Generic PATCH request for endpoint.
None|dict delete_endpoint(self, str end_point_url)
Generic DELETE request for endpoint.
None|dict post_endpoint(self, str end_point_url, dict request_body)
Generic POST request for endpoint.
initialize(self)
Initialize the AasHttpClient with the given URL, username and password.
Definition aas_client.py:83
None|dict put_endpoint(self, str end_point_url, dict request_body)
Generic PUT request for endpoint.
ExperimentalImplementation experimental
Definition aas_client.py:78
AuthenticationConfig auth_settings
Definition aas_client.py:63
dict|None get_root(self)
Get the root endpoint of the AAS server API to test connectivity.
str|None set_token(self)
Set authentication token in session headers based on configured authentication method.
AuthMethod get_auth_method(self)
Get the authentication method used by the client.
None|dict get_endpoint(self, str end_point_url)
Generic GET request for endpoint.
SubmodelRegistryImplementation submodel_registry
Definition aas_client.py:79
ShellRepoImplementation shells
Definition aas_client.py:75
ShellRegistryImplementation shell_registry
Definition aas_client.py:77