1"""Client for HTTP API communication with AAS server."""
6from pathlib
import Path
10from pydantic
import BaseModel, Field, PrivateAttr, ValidationError
11from requests
import Session
12from requests.auth
import HTTPBasicAuth
14from aas_http_client.classes.client.implementations
import (
16 ExperimentalImplementation,
17 ShellRegistryImplementation,
18 ShellRepoImplementation,
19 SubmodelRegistryImplementation,
20 SubmodelRepoImplementation,
24from aas_http_client.classes.Configuration.config_classes
import AuthenticationConfig
32logger = logging.getLogger(__name__)
36 """Represents a AasHttpClient to communicate with a REST API."""
38 base_url: str = Field(..., alias=
"BaseUrl", description=
"Base URL of the AAS server.")
39 auth_settings: AuthenticationConfig = Field(
40 default_factory=AuthenticationConfig, alias=
"AuthenticationSettings", description=
"Authentication settings for the AAS server."
42 https_proxy: str |
None = Field(default=
None, alias=
"HttpsProxy", description=
"HTTPS proxy URL.")
43 http_proxy: str |
None = Field(default=
None, alias=
"HttpProxy", description=
"HTTP proxy URL.")
44 time_out: int = Field(default=200, alias=
"TimeOut", description=
"Timeout for HTTP requests.")
45 connection_time_out: int = Field(default=100, alias=
"ConnectionTimeOut", description=
"Connection timeout for HTTP requests.")
46 ssl_verify: bool = Field(default=
True, alias=
"SslVerify", description=
"Enable SSL verification.")
47 trust_env: bool = Field(default=
True, alias=
"TrustEnv", description=
"Trust environment variables.")
48 _session: Session |
None = PrivateAttr(default=
None)
49 _auth_method: AuthMethod = PrivateAttr(default=AuthMethod.basic_auth)
50 encoded_ids: bool = Field(default=
True, alias=
"EncodedIds", description=
"If enabled, all IDs used in API requests have to be base64-encoded.")
51 shells: ShellRepoImplementation |
None = Field(default=
None)
52 submodels: SubmodelRepoImplementation |
None = Field(default=
None)
53 shell_registry: ShellRegistryImplementation |
None = Field(default=
None)
54 experimental: ExperimentalImplementation |
None = Field(default=
None)
55 submodel_registry: SubmodelRegistryImplementation |
None = Field(default=
None)
56 _cached_token: TokenData |
None = PrivateAttr(default=
None)
59 """Initialize the AasHttpClient with the given URL, username and password."""
78 "User-Agent":
"python-requests/2.32.5",
79 "Connection":
"close",
90 """Get the HTTP session used by the client.
92 :return: The requests.Session object used for HTTP communication
97 """Handles the authentication method based on the provided settings."""
101 f
"Authentication method: OAuth | '{self.auth_settings.o_auth.client_id}' | '{self.auth_settings.o_auth.token_url}' | '{self.auth_settings.o_auth.grant_type}'"
106 logger.info(f
"Authentication method: Basic Auth | '{self.auth_settings.basic_auth.username}'")
111 logger.info(
"Authentication method: Bearer Token")
112 self.
_session_session.headers.update({
"Authorization": f
"Bearer {self.auth_settings.bearer_auth.get_token()}"})
116 logger.info(
"Authentication method: No Authentication")
119 """Get the root endpoint of the AAS server API to test connectivity.
121 This method calls the '/shells' endpoint to verify that the AAS server is accessible
122 and responding. It automatically handles authentication token setup if service
123 provider authentication is configured.
125 :return: Response data as a dictionary containing shell information, or None if an error occurred
128 logger.error(
"HTTP session is not initialized. Call 'initialize()' method before making API calls.")
132 urls.append(f
"{self.base_url}/shells")
133 urls.append(f
"{self.base_url}/submodels")
134 urls.append(f
"{self.base_url}/shell-descriptors")
135 urls.append(f
"{self.base_url}/submodel-descriptors")
140 logger.debug(f
"Testing connectivity with URL: {url}")
143 logger.debug(f
"Call REST API url '{response.url}'")
145 if response.status_code == STATUS_CODE_200:
146 content = response.content.decode(
"utf-8")
147 return json.loads(content)
149 except requests.exceptions.RequestException
as e:
150 logger.error(f
"Error call REST API: {e}")
155 """Set authentication token in session headers based on configured authentication method.
157 :return: The access token if set, otherwise None
160 logger.error(
"HTTP session is not initialized. Call 'initialize()' method before making API calls.")
174 if token_data
and token_data.access_token:
178 self.
_session_session.headers.update({
"Authorization": f
"Bearer {self._cached_token.access_token}"})
183 def get_endpoint(self, end_point_url: str) ->
None | dict:
184 """Generic GET request for endpoint.
186 :param end_point_url: The endpoint URL to send the GET request to.
187 :return: The base URL of the AAS server.
190 logger.error(
"HTTP session is not initialized. Call 'initialize()' method before making API calls.")
195 logger.debug(f
"Call REST API url '{response.url}'")
197 if response.status_code == STATUS_CODE_200:
198 content = response.content.decode(
"utf-8")
199 return json.loads(content)
201 except requests.exceptions.RequestException
as e:
202 logger.debug(f
"Error call REST API: {e}")
206 def put_endpoint(self, end_point_url: str, request_body: dict) ->
None | dict:
207 """Generic PUT request for endpoint.
209 :param end_point_url: The endpoint URL to send the PUT request to.
210 :param request_body: The request body to send with the PUT request.
211 :return: The base URL of the AAS server.
214 logger.error(
"HTTP session is not initialized. Call 'initialize()' method before making API calls.")
219 logger.debug(f
"Call REST API url '{response.url}'")
221 if response.status_code
not in (STATUS_CODE_200, STATUS_CODE_201, STATUS_CODE_204):
222 content = response.content.decode(
"utf-8")
223 return json.loads(content)
225 except requests.exceptions.RequestException
as e:
226 logger.debug(f
"Error call REST API: {e}")
230 def post_endpoint(self, end_point_url: str, request_body: dict) ->
None | dict:
231 """Generic POST request for endpoint.
233 :param end_point_url: The endpoint URL to send the POST request to.
234 :param request_body: The request body to send with the POST request.
235 :return: The base URL of the AAS server.
238 logger.error(
"HTTP session is not initialized. Call 'initialize()' method before making API calls.")
243 logger.debug(f
"Call REST API url '{response.url}'")
245 if response.status_code
not in (STATUS_CODE_201, STATUS_CODE_200, STATUS_CODE_202):
246 content = response.content.decode(
"utf-8")
247 return json.loads(content)
249 except requests.exceptions.RequestException
as e:
250 logger.debug(f
"Error call REST API: {e}")
254 def patch_endpoint(self, end_point_url: str, request_body: dict) ->
None | dict:
255 """Generic PATCH request for endpoint.
257 :param end_point_url: The endpoint URL to send the PATCH request to.
258 :param request_body: The request body to send with the PATCH request.
259 :return: The base URL of the AAS server.
262 logger.error(
"HTTP session is not initialized. Call 'initialize()' method before making API calls.")
267 logger.debug(f
"Call REST API url '{response.url}'")
269 if response.status_code
not in (STATUS_CODE_200, STATUS_CODE_204):
270 content = response.content.decode(
"utf-8")
271 return json.loads(content)
273 except requests.exceptions.RequestException
as e:
274 logger.debug(f
"Error call REST API: {e}")
279 """Generic DELETE request for endpoint.
281 :param end_point_url: The endpoint URL to send the DELETE request to.
282 :return: The base URL of the AAS server.
285 logger.error(
"HTTP session is not initialized. Call 'initialize()' method before making API calls.")
290 logger.debug(f
"Call REST API url '{response.url}'")
292 if response.status_code
not in (STATUS_CODE_200, STATUS_CODE_204, STATUS_CODE_202):
293 content = response.content.decode(
"utf-8")
294 return json.loads(content)
296 except requests.exceptions.RequestException
as e:
297 logger.debug(f
"Error call REST API: {e}")
302def create_client_by_url(
304 basic_auth_username: str =
"",
305 basic_auth_password: str =
"",
306 o_auth_client_id: str =
"",
307 o_auth_client_secret: str =
"",
308 o_auth_token_url: str =
"",
309 bearer_auth_token: str =
"",
310 http_proxy: str =
"",
311 https_proxy: str =
"",
313 connection_time_out: int = 60,
314 ssl_verify: bool =
True,
315 trust_env: bool =
True,
316 encoded_ids: bool =
True,
317) -> AasHttpClient |
None:
318 """Create a HTTP client for a AAS server connection from the given parameters.
320 :param base_url: Base URL of the AAS server, e.g. "http://basyx_python_server:80/"
321 :param basic_auth_username: Username for the AAS server basic authentication, defaults to ""
322 :param basic_auth_password: Password for the AAS server basic authentication, defaults to ""
323 :param o_auth_client_id: Client ID for OAuth authentication, defaults to ""
324 :param o_auth_client_secret: Client secret for OAuth authentication, defaults to ""
325 :param o_auth_token_url: Token URL for OAuth authentication, defaults to ""
326 :param bearer_auth_token: Bearer token for authentication, defaults to ""
327 :param http_proxy: HTTP proxy URL, defaults to ""
328 :param https_proxy: HTTPS proxy URL, defaults to ""
329 :param time_out: Timeout for the API calls, defaults to 200
330 :param connection_time_out: Timeout for the connection to the API, defaults to 60
331 :param ssl_verify: Whether to verify SSL certificates, defaults to True
332 :param trust_env: Whether to trust environment variables for proxy settings, defaults to True
333 :param encoded_ids: If enabled, all IDs used in API requests have to be base64-encoded
334 :return: An instance of AasHttpClient initialized with the provided parameters or None if connection fails
336 logger.info(f
"Create AAS server http client from URL '{base_url}'.")
337 config_dict: dict[str, Any] = {}
338 config_dict[
"BaseUrl"] = base_url
339 config_dict[
"HttpProxy"] = http_proxy
340 config_dict[
"HttpsProxy"] = https_proxy
341 config_dict[
"TimeOut"] = str(time_out)
342 config_dict[
"ConnectionTimeOut"] = str(connection_time_out)
343 config_dict[
"SslVerify"] = str(ssl_verify)
344 config_dict[
"TrustEnv"] = str(trust_env)
345 config_dict[
"EncodedIds"] = str(encoded_ids)
347 config_dict[
"AuthenticationSettings"] = {
348 "BasicAuth": {
"Username": basic_auth_username},
350 "ClientId": o_auth_client_id,
351 "TokenUrl": o_auth_token_url,
355 return create_client_by_dict(config_dict, basic_auth_password, o_auth_client_secret, bearer_auth_token)
358def create_client_by_dict(
359 configuration: dict, basic_auth_password: str =
"", o_auth_client_secret: str =
"", bearer_auth_token: str =
""
360) -> AasHttpClient |
None:
361 """Create a HTTP client for a AAS server connection from the given configuration.
363 :param configuration: Dictionary containing the AAS server connection settings
364 :param basic_auth_password: Password for the AAS server basic authentication, defaults to ""
365 :param o_auth_client_secret: Client secret for OAuth authentication, defaults to ""
366 :param bearer_auth_token: Bearer token for authentication, defaults to ""
367 :return: An instance of AasHttpClient initialized with the provided parameters or None if validation fails
369 logger.info(
"Create AAS server http client from dictionary.")
370 config_string = json.dumps(configuration, indent=4)
372 return _create_client(config_string, basic_auth_password, o_auth_client_secret, bearer_auth_token)
375def create_client_by_config(
376 config_file: Path, basic_auth_password: str =
"", o_auth_client_secret: str =
"", bearer_auth_token: str =
""
377) -> AasHttpClient |
None:
378 """Create a HTTP client for a AAS server connection from a given configuration file.
380 :param config_file: Path to the configuration file containing the AAS server connection settings
381 :param basic_auth_password: Password for the AAS server basic authentication, defaults to ""
382 :param o_auth_client_secret: Client secret for OAuth authentication, defaults to ""
383 :param bearer_auth_token: Bearer token for authentication, defaults to ""
384 :return: An instance of AasHttpClient initialized with the provided parameters or None if validation fails
386 config_file = config_file.resolve()
387 logger.info(f
"Create AAS server http client from configuration file '{config_file}'.")
388 if not config_file.exists():
390 logger.warning(f
"Configuration file '{config_file}' not found. Using default configuration.")
392 config_string = config_file.read_text(encoding=
"utf-8")
393 logger.debug(f
"Configuration file '{config_file}' found.")
395 return _create_client(config_string, basic_auth_password, o_auth_client_secret, bearer_auth_token)
398def _create_client(config_string: str, basic_auth_password: str, o_auth_client_secret: str, bearer_auth_token: str) -> AasHttpClient |
None:
399 """Create and initialize an AAS HTTP client from configuration string.
401 This internal method validates the configuration, sets authentication credentials,
402 initializes the client, and tests the connection to the AAS server.
404 :param config_string: JSON configuration string containing AAS server settings
405 :param basic_auth_password: Password for basic authentication, defaults to ""
406 :param o_auth_client_secret: Client secret for OAuth authentication, defaults to ""
407 :param bearer_auth_token: Bearer token for authentication, defaults to ""
408 :return: An initialized and connected AasHttpClient instance or None if connection fails
409 :raises ValidationError: If the configuration string is invalid
410 :raises TimeoutError: If connection to the server times out
413 client = AasHttpClient.model_validate_json(config_string)
414 except ValidationError
as ve:
415 raise ValidationError(f
"Invalid BaSyx server configuration file: {ve}")
from ve
417 client.auth_settings.basic_auth.set_password(basic_auth_password)
418 client.auth_settings.o_auth.set_client_secret(o_auth_client_secret)
419 client.auth_settings.bearer_auth.set_token(bearer_auth_token)
421 logger.info(
"Using server configuration:")
422 logger.info(f
"BaseUrl: '{client.base_url}'")
423 logger.info(f
"TimeOut: '{client.time_out}'")
424 logger.info(f
"HttpsProxy: '{client.https_proxy}'")
425 logger.info(f
"HttpProxy: '{client.http_proxy}'")
426 logger.info(f
"ConnectionTimeOut: '{client.connection_time_out}'.")
427 logger.info(f
"SSLVerify: '{client.ssl_verify}'.")
428 logger.info(f
"TrustEnv: '{client.trust_env}'.")
429 logger.info(f
"EncodedIds: '{client.encoded_ids}'.")
443 """Test the connection to the AAS server API with retry logic.
445 This internal method attempts to establish a connection to the AAS server by calling
446 the get_root() method. It retries the connection for the duration specified in the
447 client's connection_time_out setting, sleeping 1 second between attempts.
449 :param client: The AasHttpClient instance to test the connection for
450 :return: True if connection is successful, False otherwise
451 :raises TimeoutError: If connection attempts fail for the entire timeout duration
453 start_time = time.time()
454 logger.info(f
"Try to connect to REST API '{client.base_url}' for {client.connection_time_out} seconds.")
458 root = client.get_root()
460 logger.info(f
"Connected to server API at '{client.base_url}' successfully.")
463 logger.error(f
"Connection attempt to '{client.base_url}' failed.")
465 except requests.exceptions.ConnectionError:
467 if time.time() - start_time > client.connection_time_out:
468 raise TimeoutError(f
"Connection to server API timed out after {client.connection_time_out} seconds.")
471 logger.warning(f
"Retrying connection (attempt: {counter}).")
Represents a AasHttpClient to communicate with a REST API.
Session|None get_session(self)
Get the HTTP session used by the client.
_handle_auth_method(self)
Handles the authentication method based on the provided settings.
SubmodelRepoImplementation submodels
None|dict patch_endpoint(self, str end_point_url, dict request_body)
Generic PATCH request for endpoint.
None|dict delete_endpoint(self, str end_point_url)
Generic DELETE request for endpoint.
None|dict post_endpoint(self, str end_point_url, dict request_body)
Generic POST request for endpoint.
initialize(self)
Initialize the AasHttpClient with the given URL, username and password.
None|dict put_endpoint(self, str end_point_url, dict request_body)
Generic PUT request for endpoint.
ExperimentalImplementation experimental
AuthenticationConfig auth_settings
dict|None get_root(self)
Get the root endpoint of the AAS server API to test connectivity.
str|None set_token(self)
Set authentication token in session headers based on configured authentication method.
None|dict get_endpoint(self, str end_point_url)
Generic GET request for endpoint.
SubmodelRegistryImplementation submodel_registry
ShellRepoImplementation shells
ShellRegistryImplementation shell_registry
bool _connect_to_api(AasHttpClient client)
Test the connection to the AAS server API with retry logic.