AAS HTTP Client Documentation
Loading...
Searching...
No Matches
aas_client.py
Go to the documentation of this file.
1"""Client for HTTP API communication with AAS server."""
2
3import json
4import logging
5import time
6from pathlib import Path
7from typing import Any
8
9import requests
10from pydantic import BaseModel, Field, PrivateAttr, ValidationError
11from requests import Session
12from requests.auth import HTTPBasicAuth
13
14from aas_http_client.classes.client.implementations import (
15 AuthMethod,
16 ExperimentalImplementation,
17 ShellRegistryImplementation,
18 ShellRepoImplementation,
19 SubmodelRegistryImplementation,
20 SubmodelRepoImplementation,
21 TokenData,
22 get_token,
23)
24from aas_http_client.classes.Configuration.config_classes import AuthenticationConfig
26 STATUS_CODE_200,
27 STATUS_CODE_201,
28 STATUS_CODE_202,
29 STATUS_CODE_204,
30)
31
32logger = logging.getLogger(__name__)
33
34
35class AasHttpClient(BaseModel):
36 """Represents a AasHttpClient to communicate with a REST API."""
37
38 base_url: str = Field(..., alias="BaseUrl", description="Base URL of the AAS server.")
39 auth_settings: AuthenticationConfig = Field(
40 default_factory=AuthenticationConfig, alias="AuthenticationSettings", description="Authentication settings for the AAS server."
41 )
42 https_proxy: str | None = Field(default=None, alias="HttpsProxy", description="HTTPS proxy URL.")
43 http_proxy: str | None = Field(default=None, alias="HttpProxy", description="HTTP proxy URL.")
44 time_out: int = Field(default=200, alias="TimeOut", description="Timeout for HTTP requests.")
45 connection_time_out: int = Field(default=100, alias="ConnectionTimeOut", description="Connection timeout for HTTP requests.")
46 ssl_verify: bool = Field(default=True, alias="SslVerify", description="Enable SSL verification.")
47 trust_env: bool = Field(default=True, alias="TrustEnv", description="Trust environment variables.")
48 _session: Session | None = PrivateAttr(default=None)
49 _auth_method: AuthMethod = PrivateAttr(default=AuthMethod.basic_auth)
50 encoded_ids: bool = Field(default=True, alias="EncodedIds", description="If enabled, all IDs used in API requests have to be base64-encoded.")
51 shells: ShellRepoImplementation | None = Field(default=None)
52 submodels: SubmodelRepoImplementation | None = Field(default=None)
53 shell_registry: ShellRegistryImplementation | None = Field(default=None)
54 experimental: ExperimentalImplementation | None = Field(default=None)
55 submodel_registry: SubmodelRegistryImplementation | None = Field(default=None)
56 _cached_token: TokenData | None = PrivateAttr(default=None)
57
58 def initialize(self):
59 """Initialize the AasHttpClient with the given URL, username and password."""
60 if self.base_urlbase_url.endswith("/"):
62
63 self._session_session = requests.Session()
64
66
67 self._session_session.verify = self.ssl_verify
68 self._session_session.trust_env = self.trust_env
69
70 if self.https_proxy:
71 self._session_session.proxies.update({"https": self.https_proxy})
72 if self.http_proxy:
73 self._session_session.proxies.update({"http": self.http_proxy})
74
75 self._session_session.headers.update(
76 {
77 "Accept": "*/*",
78 "User-Agent": "python-requests/2.32.5",
79 "Connection": "close",
80 }
81 )
82
83 self.shellsshells = ShellRepoImplementation(self)
84 self.submodelssubmodels = SubmodelRepoImplementation(self)
85 self.shell_registryshell_registry = ShellRegistryImplementation(self)
86 self.submodel_registrysubmodel_registry = SubmodelRegistryImplementation(self)
87 self.experimentalexperimental = ExperimentalImplementation(self)
88
89 def get_session(self) -> Session | None:
90 """Get the HTTP session used by the client.
91
92 :return: The requests.Session object used for HTTP communication
93 """
94 return self._session_session
95
96 def _handle_auth_method(self):
97 """Handles the authentication method based on the provided settings."""
98 if self.auth_settings.o_auth.is_active():
99 self._auth_method_auth_method = AuthMethod.o_auth
100 logger.info(
101 f"Authentication method: OAuth | '{self.auth_settings.o_auth.client_id}' | '{self.auth_settings.o_auth.token_url}' | '{self.auth_settings.o_auth.grant_type}'"
102 )
103
104 elif self.auth_settings.basic_auth.is_active():
105 self._auth_method_auth_method = AuthMethod.basic_auth
106 logger.info(f"Authentication method: Basic Auth | '{self.auth_settings.basic_auth.username}'")
107 self._session_session.auth = HTTPBasicAuth(self.auth_settings.basic_auth.username, self.auth_settings.basic_auth.get_password())
108
109 elif self.auth_settings.bearer_auth.is_active():
110 self._auth_method_auth_method = AuthMethod.bearer
111 logger.info("Authentication method: Bearer Token")
112 self._session_session.headers.update({"Authorization": f"Bearer {self.auth_settings.bearer_auth.get_token()}"})
113
114 else:
115 self._auth_method_auth_method = AuthMethod.No
116 logger.info("Authentication method: No Authentication")
117
118 def get_root(self) -> dict | None:
119 """Get the root endpoint of the AAS server API to test connectivity.
120
121 This method calls the '/shells' endpoint to verify that the AAS server is accessible
122 and responding. It automatically handles authentication token setup if service
123 provider authentication is configured.
124
125 :return: Response data as a dictionary containing shell information, or None if an error occurred
126 """
127 if not self._session_session:
128 logger.error("HTTP session is not initialized. Call 'initialize()' method before making API calls.")
129 return None
130
131 urls: list[str] = []
132 urls.append(f"{self.base_url}/shells")
133 urls.append(f"{self.base_url}/submodels")
134 urls.append(f"{self.base_url}/shell-descriptors")
135 urls.append(f"{self.base_url}/submodel-descriptors")
136
137 self.set_token()
138
139 for url in urls:
140 logger.debug(f"Testing connectivity with URL: {url}")
141 try:
142 response = self._session_session.get(url, timeout=10)
143 logger.debug(f"Call REST API url '{response.url}'")
144
145 if response.status_code == STATUS_CODE_200:
146 content = response.content.decode("utf-8")
147 return json.loads(content)
148
149 except requests.exceptions.RequestException as e:
150 logger.error(f"Error call REST API: {e}")
151
152 return None
153
154 def set_token(self) -> str | None:
155 """Set authentication token in session headers based on configured authentication method.
156
157 :return: The access token if set, otherwise None
158 """
159 if not self._session_session:
160 logger.error("HTTP session is not initialized. Call 'initialize()' method before making API calls.")
161 return None
162
163 if self._auth_method_auth_method != AuthMethod.o_auth:
164 return None
165
166 now = time.time()
167 # Check if cached token exists and is not expired
168 if self._cached_token_cached_token and self._cached_token_cached_token.token_expiry > now:
169 return self._cached_token_cached_token.access_token
170
171 # Obtain new token
172 token_data = get_token(self.auth_settings.o_auth)
173
174 if token_data and token_data.access_token:
175 # Cache the token data
176 self._cached_token_cached_token = token_data
177 # Update session headers with the new token
178 self._session_session.headers.update({"Authorization": f"Bearer {self._cached_token.access_token}"})
179 return self._cached_token_cached_token.access_token
180
181 return None
182
183 def get_endpoint(self, end_point_url: str) -> None | dict:
184 """Generic GET request for endpoint.
185
186 :param end_point_url: The endpoint URL to send the GET request to.
187 :return: The base URL of the AAS server.
188 """
189 if not self._session_session:
190 logger.error("HTTP session is not initialized. Call 'initialize()' method before making API calls.")
191 return None
192
193 try:
194 response = self._session_session.get(end_point_url, timeout=self.time_out)
195 logger.debug(f"Call REST API url '{response.url}'")
196
197 if response.status_code == STATUS_CODE_200:
198 content = response.content.decode("utf-8")
199 return json.loads(content)
200
201 except requests.exceptions.RequestException as e:
202 logger.debug(f"Error call REST API: {e}")
203
204 return None
205
206 def put_endpoint(self, end_point_url: str, request_body: dict) -> None | dict:
207 """Generic PUT request for endpoint.
208
209 :param end_point_url: The endpoint URL to send the PUT request to.
210 :param request_body: The request body to send with the PUT request.
211 :return: The base URL of the AAS server.
212 """
213 if not self._session_session:
214 logger.error("HTTP session is not initialized. Call 'initialize()' method before making API calls.")
215 return None
216
217 try:
218 response = self._session_session.put(end_point_url, json=request_body, timeout=self.time_out)
219 logger.debug(f"Call REST API url '{response.url}'")
220
221 if response.status_code not in (STATUS_CODE_200, STATUS_CODE_201, STATUS_CODE_204):
222 content = response.content.decode("utf-8")
223 return json.loads(content)
224
225 except requests.exceptions.RequestException as e:
226 logger.debug(f"Error call REST API: {e}")
227
228 return None
229
230 def post_endpoint(self, end_point_url: str, request_body: dict) -> None | dict:
231 """Generic POST request for endpoint.
232
233 :param end_point_url: The endpoint URL to send the POST request to.
234 :param request_body: The request body to send with the POST request.
235 :return: The base URL of the AAS server.
236 """
237 if not self._session_session:
238 logger.error("HTTP session is not initialized. Call 'initialize()' method before making API calls.")
239 return None
240
241 try:
242 response = self._session_session.post(end_point_url, json=request_body, timeout=self.time_out)
243 logger.debug(f"Call REST API url '{response.url}'")
244
245 if response.status_code not in (STATUS_CODE_201, STATUS_CODE_200, STATUS_CODE_202):
246 content = response.content.decode("utf-8")
247 return json.loads(content)
248
249 except requests.exceptions.RequestException as e:
250 logger.debug(f"Error call REST API: {e}")
251
252 return None
253
254 def patch_endpoint(self, end_point_url: str, request_body: dict) -> None | dict:
255 """Generic PATCH request for endpoint.
256
257 :param end_point_url: The endpoint URL to send the PATCH request to.
258 :param request_body: The request body to send with the PATCH request.
259 :return: The base URL of the AAS server.
260 """
261 if not self._session_session:
262 logger.error("HTTP session is not initialized. Call 'initialize()' method before making API calls.")
263 return None
264
265 try:
266 response = self._session_session.patch(end_point_url, json=request_body, timeout=self.time_out)
267 logger.debug(f"Call REST API url '{response.url}'")
268
269 if response.status_code not in (STATUS_CODE_200, STATUS_CODE_204):
270 content = response.content.decode("utf-8")
271 return json.loads(content)
272
273 except requests.exceptions.RequestException as e:
274 logger.debug(f"Error call REST API: {e}")
275
276 return None
277
278 def delete_endpoint(self, end_point_url: str) -> None | dict:
279 """Generic DELETE request for endpoint.
280
281 :param end_point_url: The endpoint URL to send the DELETE request to.
282 :return: The base URL of the AAS server.
283 """
284 if not self._session_session:
285 logger.error("HTTP session is not initialized. Call 'initialize()' method before making API calls.")
286 return None
287
288 try:
289 response = self._session_session.delete(end_point_url, timeout=self.time_out)
290 logger.debug(f"Call REST API url '{response.url}'")
291
292 if response.status_code not in (STATUS_CODE_200, STATUS_CODE_204, STATUS_CODE_202):
293 content = response.content.decode("utf-8")
294 return json.loads(content)
295
296 except requests.exceptions.RequestException as e:
297 logger.debug(f"Error call REST API: {e}")
298
299 return None
300
301
302def create_client_by_url( # noqa: PLR0913
303 base_url: str,
304 basic_auth_username: str = "",
305 basic_auth_password: str = "",
306 o_auth_client_id: str = "",
307 o_auth_client_secret: str = "",
308 o_auth_token_url: str = "",
309 bearer_auth_token: str = "",
310 http_proxy: str = "",
311 https_proxy: str = "",
312 time_out: int = 200,
313 connection_time_out: int = 60,
314 ssl_verify: bool = True, # noqa: FBT001, FBT002
315 trust_env: bool = True, # noqa: FBT001, FBT002
316 encoded_ids: bool = True, # noqa: FBT001, FBT002
317) -> AasHttpClient | None:
318 """Create a HTTP client for a AAS server connection from the given parameters.
319
320 :param base_url: Base URL of the AAS server, e.g. "http://basyx_python_server:80/"
321 :param basic_auth_username: Username for the AAS server basic authentication, defaults to ""
322 :param basic_auth_password: Password for the AAS server basic authentication, defaults to ""
323 :param o_auth_client_id: Client ID for OAuth authentication, defaults to ""
324 :param o_auth_client_secret: Client secret for OAuth authentication, defaults to ""
325 :param o_auth_token_url: Token URL for OAuth authentication, defaults to ""
326 :param bearer_auth_token: Bearer token for authentication, defaults to ""
327 :param http_proxy: HTTP proxy URL, defaults to ""
328 :param https_proxy: HTTPS proxy URL, defaults to ""
329 :param time_out: Timeout for the API calls, defaults to 200
330 :param connection_time_out: Timeout for the connection to the API, defaults to 60
331 :param ssl_verify: Whether to verify SSL certificates, defaults to True
332 :param trust_env: Whether to trust environment variables for proxy settings, defaults to True
333 :param encoded_ids: If enabled, all IDs used in API requests have to be base64-encoded
334 :return: An instance of AasHttpClient initialized with the provided parameters or None if connection fails
335 """
336 logger.info(f"Create AAS server http client from URL '{base_url}'.")
337 config_dict: dict[str, Any] = {}
338 config_dict["BaseUrl"] = base_url
339 config_dict["HttpProxy"] = http_proxy
340 config_dict["HttpsProxy"] = https_proxy
341 config_dict["TimeOut"] = str(time_out)
342 config_dict["ConnectionTimeOut"] = str(connection_time_out)
343 config_dict["SslVerify"] = str(ssl_verify)
344 config_dict["TrustEnv"] = str(trust_env)
345 config_dict["EncodedIds"] = str(encoded_ids)
346
347 config_dict["AuthenticationSettings"] = {
348 "BasicAuth": {"Username": basic_auth_username},
349 "OAuth": {
350 "ClientId": o_auth_client_id,
351 "TokenUrl": o_auth_token_url,
352 },
353 }
354
355 return create_client_by_dict(config_dict, basic_auth_password, o_auth_client_secret, bearer_auth_token)
356
357
358def create_client_by_dict(
359 configuration: dict, basic_auth_password: str = "", o_auth_client_secret: str = "", bearer_auth_token: str = ""
360) -> AasHttpClient | None:
361 """Create a HTTP client for a AAS server connection from the given configuration.
362
363 :param configuration: Dictionary containing the AAS server connection settings
364 :param basic_auth_password: Password for the AAS server basic authentication, defaults to ""
365 :param o_auth_client_secret: Client secret for OAuth authentication, defaults to ""
366 :param bearer_auth_token: Bearer token for authentication, defaults to ""
367 :return: An instance of AasHttpClient initialized with the provided parameters or None if validation fails
368 """
369 logger.info("Create AAS server http client from dictionary.")
370 config_string = json.dumps(configuration, indent=4)
371
372 return _create_client(config_string, basic_auth_password, o_auth_client_secret, bearer_auth_token)
373
374
375def create_client_by_config(
376 config_file: Path, basic_auth_password: str = "", o_auth_client_secret: str = "", bearer_auth_token: str = ""
377) -> AasHttpClient | None:
378 """Create a HTTP client for a AAS server connection from a given configuration file.
379
380 :param config_file: Path to the configuration file containing the AAS server connection settings
381 :param basic_auth_password: Password for the AAS server basic authentication, defaults to ""
382 :param o_auth_client_secret: Client secret for OAuth authentication, defaults to ""
383 :param bearer_auth_token: Bearer token for authentication, defaults to ""
384 :return: An instance of AasHttpClient initialized with the provided parameters or None if validation fails
385 """
386 config_file = config_file.resolve()
387 logger.info(f"Create AAS server http client from configuration file '{config_file}'.")
388 if not config_file.exists():
389 config_string = "{}"
390 logger.warning(f"Configuration file '{config_file}' not found. Using default configuration.")
391 else:
392 config_string = config_file.read_text(encoding="utf-8")
393 logger.debug(f"Configuration file '{config_file}' found.")
394
395 return _create_client(config_string, basic_auth_password, o_auth_client_secret, bearer_auth_token)
396
397
398def _create_client(config_string: str, basic_auth_password: str, o_auth_client_secret: str, bearer_auth_token: str) -> AasHttpClient | None:
399 """Create and initialize an AAS HTTP client from configuration string.
400
401 This internal method validates the configuration, sets authentication credentials,
402 initializes the client, and tests the connection to the AAS server.
403
404 :param config_string: JSON configuration string containing AAS server settings
405 :param basic_auth_password: Password for basic authentication, defaults to ""
406 :param o_auth_client_secret: Client secret for OAuth authentication, defaults to ""
407 :param bearer_auth_token: Bearer token for authentication, defaults to ""
408 :return: An initialized and connected AasHttpClient instance or None if connection fails
409 :raises ValidationError: If the configuration string is invalid
410 :raises TimeoutError: If connection to the server times out
411 """
412 try:
413 client = AasHttpClient.model_validate_json(config_string)
414 except ValidationError as ve:
415 raise ValidationError(f"Invalid BaSyx server configuration file: {ve}") from ve
416
417 client.auth_settings.basic_auth.set_password(basic_auth_password)
418 client.auth_settings.o_auth.set_client_secret(o_auth_client_secret)
419 client.auth_settings.bearer_auth.set_token(bearer_auth_token)
420
421 logger.info("Using server configuration:")
422 logger.info(f"BaseUrl: '{client.base_url}'")
423 logger.info(f"TimeOut: '{client.time_out}'")
424 logger.info(f"HttpsProxy: '{client.https_proxy}'")
425 logger.info(f"HttpProxy: '{client.http_proxy}'")
426 logger.info(f"ConnectionTimeOut: '{client.connection_time_out}'.")
427 logger.info(f"SSLVerify: '{client.ssl_verify}'.")
428 logger.info(f"TrustEnv: '{client.trust_env}'.")
429 logger.info(f"EncodedIds: '{client.encoded_ids}'.")
430
431 client.initialize()
432
433 # test the connection to the REST API
434 connected = _connect_to_api(client)
435
436 if not connected:
437 return None
438
439 return client
440
441
442def _connect_to_api(client: AasHttpClient) -> bool:
443 """Test the connection to the AAS server API with retry logic.
444
445 This internal method attempts to establish a connection to the AAS server by calling
446 the get_root() method. It retries the connection for the duration specified in the
447 client's connection_time_out setting, sleeping 1 second between attempts.
448
449 :param client: The AasHttpClient instance to test the connection for
450 :return: True if connection is successful, False otherwise
451 :raises TimeoutError: If connection attempts fail for the entire timeout duration
452 """
453 start_time = time.time()
454 logger.info(f"Try to connect to REST API '{client.base_url}' for {client.connection_time_out} seconds.")
455 counter: int = 0
456 while True:
457 try:
458 root = client.get_root()
459 if root:
460 logger.info(f"Connected to server API at '{client.base_url}' successfully.")
461 return True
462
463 logger.error(f"Connection attempt to '{client.base_url}' failed.")
464
465 except requests.exceptions.ConnectionError:
466 pass
467 if time.time() - start_time > client.connection_time_out:
468 raise TimeoutError(f"Connection to server API timed out after {client.connection_time_out} seconds.")
469
470 counter += 1
471 logger.warning(f"Retrying connection (attempt: {counter}).")
472 time.sleep(1)
Represents a AasHttpClient to communicate with a REST API.
Definition aas_client.py:36
Session|None get_session(self)
Get the HTTP session used by the client.
Definition aas_client.py:93
_handle_auth_method(self)
Handles the authentication method based on the provided settings.
Definition aas_client.py:99
SubmodelRepoImplementation submodels
Definition aas_client.py:52
None|dict patch_endpoint(self, str end_point_url, dict request_body)
Generic PATCH request for endpoint.
None|dict delete_endpoint(self, str end_point_url)
Generic DELETE request for endpoint.
None|dict post_endpoint(self, str end_point_url, dict request_body)
Generic POST request for endpoint.
initialize(self)
Initialize the AasHttpClient with the given URL, username and password.
Definition aas_client.py:59
None|dict put_endpoint(self, str end_point_url, dict request_body)
Generic PUT request for endpoint.
ExperimentalImplementation experimental
Definition aas_client.py:54
AuthenticationConfig auth_settings
Definition aas_client.py:39
dict|None get_root(self)
Get the root endpoint of the AAS server API to test connectivity.
str|None set_token(self)
Set authentication token in session headers based on configured authentication method.
None|dict get_endpoint(self, str end_point_url)
Generic GET request for endpoint.
SubmodelRegistryImplementation submodel_registry
Definition aas_client.py:55
ShellRepoImplementation shells
Definition aas_client.py:51
ShellRegistryImplementation shell_registry
Definition aas_client.py:53
bool _connect_to_api(AasHttpClient client)
Test the connection to the AAS server API with retry logic.