2from pathlib
import Path
3from aas_http_client.classes.client.aas_client
import create_by_config, AasHttpClient
4from basyx.aas
import model
7from aas_http_client.demo.logging_handler
import initialize_logging
10logger = logging.getLogger(__name__)
12JAVA_SERVER_PORTS = [8075]
13PYTHON_SERVER_PORTS = [5080, 80]
15SM_ID =
"fluid40/sm_http_client_unit_tests"
16SHELL_ID =
"fluid40/aas_http_client_unit_tests"
18CONFIG_FILE_ENV =
"./tests/server_configs/test_java_server_config.yml"
19CONFIG_FILE_AAS_REG_ENV =
"./tests/server_configs/test_aas_reg_server_config.yml"
20CONFIG_FILE_SM_REG_ENV =
"./tests/server_configs/test_sm_reg_server_config.yml"
22shared_shell_descriptor: dict = {}
23shared_sm_descriptor: dict = {}
25@pytest.fixture(scope="module")
29 client = create_by_config(Path(CONFIG_FILE_ENV))
31 rand = random.randint(0, 10)
33 client.encoded_ids =
True
35 except Exception
as e:
36 raise RuntimeError(
"Unable to connect to server.")
38 shells = client.shells.get_all_asset_administration_shells()
40 raise RuntimeError(
"No shells found on server. Please check the server configuration.")
44@pytest.fixture(scope="module")
48 reg_client = create_by_config(Path(CONFIG_FILE_AAS_REG_ENV))
49 reg_client.encoded_ids = client.encoded_ids
50 except Exception
as e:
51 raise RuntimeError(
"Unable to connect to server.")
53 descriptors = reg_client.shell_registry.get_all_asset_administration_shell_descriptors()
54 if descriptors
is None:
55 raise RuntimeError(
"No descriptors found on server. Please check the server configuration.")
59@pytest.fixture(scope="module")
63 reg_client = create_by_config(Path(CONFIG_FILE_SM_REG_ENV))
64 reg_client.encoded_ids = client.encoded_ids
65 except Exception
as e:
66 raise RuntimeError(
"Unable to connect to server.")
68 descriptors = reg_client.submodel_registry.get_all_submodel_descriptors()
69 if descriptors
is None:
70 raise RuntimeError(
"No descriptors found on server. Please check the server configuration.")
74@pytest.fixture(scope="module")
77 return model_builder.create_base_submodel(identifier=SM_ID, id_short=
"sm_http_client_unit_tests")
79@pytest.fixture(scope="module")
80def shared_aas(shared_sm: model.Submodel) -> model.AssetAdministrationShell:
82 aas = model_builder.create_base_aas(identifier=SHELL_ID, id_short=
"aas_http_client_unit_tests")
85 sdk_tools.add_submodel_to_aas(aas, shared_sm)
89@pytest.fixture(scope="module")
91 return shared_shell_descriptor
93@pytest.fixture(scope="module")
95 return shared_sm_descriptor
98 shells_result = client.shells.get_all_asset_administration_shells()
100 for shell
in shells_result.get(
"result", []):
102 shell_id = shell[
"id"]
104 if client.encoded_ids:
105 shell_id = encoder.encode_base_64(shell_id)
107 client.shells.delete_asset_administration_shell_by_id(shell_id)
109 submodels_result = client.submodels.get_all_submodels()
110 for submodel
in submodels_result.get(
"result", []):
112 sm_id = submodel[
"id"]
114 if client.encoded_ids:
115 sm_id = encoder.encode_base_64(sm_id)
117 client.submodels.delete_submodel_by_id(sm_id)
119 client_aas_reg.shell_registry.delete_all_asset_administration_shell_descriptors()
120 client_sm_reg.submodel_registry.delete_all_submodel_descriptors()
122 shells_result = client.shells.get_all_asset_administration_shells()
123 assert len(shells_result.get(
"result")) == 0
124 submodels_result = client.submodels.get_all_submodels()
125 assert len(submodels_result.get(
"result")) == 0
126 shell_descriptors_result = client_aas_reg.shell_registry.get_all_asset_administration_shell_descriptors()
127 assert len(shell_descriptors_result.get(
"result")) == 0
128 sm_descriptors_result = client_sm_reg.submodel_registry.get_all_submodel_descriptors()
129 assert len(sm_descriptors_result.get(
"result")) == 0
131def test_000b_post_assets(client: AasHttpClient, shared_aas: model.AssetAdministrationShell, shared_sm: model.Submodel):
132 sm_data = sdk_tools.convert_to_dict(shared_sm)
133 sm_result = client.submodels.post_submodel(sm_data)
135 assert sm_result
is not None
137 shell_data = sdk_tools.convert_to_dict(shared_aas)
138 shell_result = client.shells.post_asset_administration_shell(shell_data)
139 assert shell_result
is not None
142 description = client_aas_reg.shell_registry.get_self_description()
144 assert description
is not None
145 assert "profiles" in description
146 assert len(description[
"profiles"]) == 2
149 description = client_sm_reg.submodel_registry.get_self_description()
151 assert description
is not None
152 assert "profiles" in description
153 assert len(description[
"profiles"]) == 2
169 search_result = client_aas_reg.shell_registry.search(request_body)
171 assert search_result
is not None
172 assert "total" in search_result
173 total = search_result[
"total"]
175 assert "hits" in search_result
176 hits = search_result[
"hits"]
177 assert hits
is not None
178 assert len(hits) == 1
179 assert hits[0][
"id"] == SHELL_ID
182 descriptors = client_aas_reg.shell_registry.get_all_asset_administration_shell_descriptors()
184 assert descriptors
is not None
185 assert "result" in descriptors
186 results = descriptors[
"result"]
187 assert results
is not None
188 assert len(results) == 1
189 assert results[0][
"id"] == SHELL_ID
191 global shared_shell_descriptor
192 shared_shell_descriptor.clear()
193 shared_shell_descriptor.update(results[0])
196 descriptors = client_sm_reg.submodel_registry.get_all_submodel_descriptors()
198 assert descriptors
is not None
199 assert "result" in descriptors
200 results = descriptors[
"result"]
201 assert results
is not None
202 assert len(results) == 1
203 assert results[0][
"id"] == SM_ID
205 global shared_sm_descriptor
206 shared_sm_descriptor.clear()
207 shared_sm_descriptor.update(results[0])
211 if client.encoded_ids:
212 sm_id = encoder.encode_base_64(SM_ID)
214 descriptor = client_sm_reg.submodel_registry.get_submodel_descriptor_by_id(sm_id)
215 assert descriptor
is not None
216 assert descriptor.get(
"id") == SM_ID
217 endpoints = descriptor.get(
"endpoints", [])
218 assert len(endpoints) == 1
219 endpoint = endpoints[0]
220 assert "protocolInformation" in endpoint
221 protocol_info: dict = endpoint[
"protocolInformation"]
222 assert protocol_info.get(
"endpointProtocol") ==
"http"
223 href = protocol_info.get(
"href",
"")
224 assert href
is not None
226 print(f
"Testing endpoint href: {href}")
228 sm = client.get_endpoint(href)
229 assert sm
is not None
230 assert sm.get(
"id") == SM_ID
236 if client.encoded_ids:
237 sm_id = encoder.encode_base_64(SM_ID)
238 shell_id = encoder.encode_base_64(SHELL_ID)
240 result = client.submodels.delete_submodel_by_id(sm_id)
243 submodels = client.shells.delete_asset_administration_shell_by_id(shell_id)
246 shells_result = client.shells.get_all_asset_administration_shells()
247 assert len(shells_result.get(
"result")) == 0
248 submodels_result = client.submodels.get_all_submodels()
249 assert len(submodels_result.get(
"result")) == 0
250 shell_descriptors_result = client_aas_reg.shell_registry.get_all_asset_administration_shell_descriptors()
251 assert len(shell_descriptors_result.get(
"result")) == 0
252 sm_descriptors_result = client_sm_reg.submodel_registry.get_all_submodel_descriptors()
253 assert len(sm_descriptors_result.get(
"result")) == 0
256 result = client_aas_reg.shell_registry.post_asset_administration_shell_descriptor(global_shell_descriptor)
258 assert result
is not None
259 assert "id" in result
260 assert result[
"id"] == SHELL_ID
262 descriptors = client_aas_reg.shell_registry.get_all_asset_administration_shell_descriptors()
263 assert descriptors
is not None
264 assert "result" in descriptors
265 results = descriptors[
"result"]
266 assert results
is not None
267 assert len(results) == 1
268 assert results[0][
"id"] == SHELL_ID
273 if client_aas_reg.encoded_ids:
274 shell_id = encoder.encode_base_64(SHELL_ID)
276 descriptor = client_aas_reg.shell_registry.get_asset_administration_shell_descriptor_by_id(shell_id)
278 assert descriptor
is not None
279 assert descriptor[
"id"] == SHELL_ID
282 global_shell_descriptor[
"idShort"] =
"sm_http_client_unit_tests_updated"
286 if client_aas_reg.encoded_ids:
287 shell_id = encoder.encode_base_64(SHELL_ID)
289 result = client_aas_reg.shell_registry.put_asset_administration_shell_descriptor_by_id(shell_id, global_shell_descriptor)
291 assert result
is True
293 descriptor = client_aas_reg.shell_registry.get_asset_administration_shell_descriptor_by_id(shell_id)
294 assert descriptor
is not None
295 assert descriptor[
"id"] == SHELL_ID
296 assert descriptor[
"idShort"] == global_shell_descriptor[
"idShort"]
301 if client_aas_reg.encoded_ids:
302 shell_id = encoder.encode_base_64(SHELL_ID)
304 result = client_aas_reg.shell_registry.delete_asset_administration_shell_descriptor_by_id(shell_id)
306 assert result
is True
308 descriptors = client_aas_reg.shell_registry.get_all_asset_administration_shell_descriptors()
309 assert descriptors
is not None
310 assert "result" in descriptors
311 results = descriptors[
"result"]
312 assert results
is not None
313 assert len(results) == 0
316 global_shell_descriptor[
"idShort"] =
"sm_http_client_unit_tests"
317 result = client_aas_reg.shell_registry.post_asset_administration_shell_descriptor(global_shell_descriptor)
319 descriptors = client_aas_reg.shell_registry.get_all_asset_administration_shell_descriptors()
320 assert descriptors
is not None
321 assert "result" in descriptors
322 results = descriptors[
"result"]
323 assert results
is not None
324 assert len(results) == 1
327 result = client_sm_reg.submodel_registry.post_submodel_descriptor(global_sm_descriptor)
329 assert result
is not None
330 assert "id" in result
331 assert result[
"id"] == SM_ID
333 descriptors = client_sm_reg.submodel_registry.get_all_submodel_descriptors()
334 assert descriptors
is not None
335 assert "result" in descriptors
336 results = descriptors[
"result"]
337 assert results
is not None
338 assert len(results) == 1
339 assert results[0][
"id"] == SM_ID
342 result = client_aas_reg.shell_registry.delete_all_asset_administration_shell_descriptors()
345 descriptors = client_aas_reg.shell_registry.get_all_asset_administration_shell_descriptors()
346 assert descriptors
is not None
347 assert "result" in descriptors
348 results = descriptors[
"result"]
349 assert results
is not None
350 assert len(results) == 0
353 result = client_sm_reg.submodel_registry.delete_all_submodel_descriptors()
356 descriptors = client_sm_reg.submodel_registry.get_all_submodel_descriptors()
357 assert descriptors
is not None
358 assert "result" in descriptors
359 results = descriptors[
"result"]
360 assert results
is not None
361 assert len(results) == 0
364 global_shell_descriptor[
"submodelDescriptors"] = []
365 sm_descriptors: list[dict] = global_shell_descriptor.get(
"submodelDescriptors", [])
366 sm_descriptors.append(global_sm_descriptor)
368 result = client_aas_reg.shell_registry.post_asset_administration_shell_descriptor(global_shell_descriptor)
369 assert result
is not None
370 assert "id" in result
371 assert result[
"id"] == SHELL_ID
372 assert "submodelDescriptors" in result
373 assert len(result[
"submodelDescriptors"]) == 1
377 if client_aas_reg.encoded_ids:
378 shell_id = encoder.encode_base_64(SHELL_ID)
380 get_result = client_aas_reg.shell_registry.get_asset_administration_shell_descriptor_by_id(shell_id)
381 assert get_result
is not None
382 assert "id" in get_result
383 assert get_result[
"id"] == SHELL_ID
384 assert "submodelDescriptors" in get_result
385 assert len(get_result[
"submodelDescriptors"])
391 if client_aas_reg.encoded_ids:
392 shell_id = encoder.encode_base_64(SHELL_ID)
393 sm_id = encoder.encode_base_64(SM_ID)
395 descriptor = client_aas_reg.shell_registry.get_submodel_descriptor_by_id_through_superpath(shell_id, sm_id)
396 assert descriptor
is not None
397 assert descriptor[
"id"] == SM_ID
400 global_sm_descriptor[
"idShort"] =
"sm_http_client_unit_tests_updated"
405 if client_aas_reg.encoded_ids:
406 shell_id = encoder.encode_base_64(SHELL_ID)
407 sm_id = encoder.encode_base_64(SM_ID)
409 result = client_aas_reg.shell_registry.put_submodel_descriptor_by_id_through_superpath(shell_id, sm_id, global_sm_descriptor)
410 assert result
is True
412 descriptor = client_aas_reg.shell_registry.get_submodel_descriptor_by_id_through_superpath(shell_id, sm_id)
413 assert descriptor
is not None
414 assert descriptor[
"id"] == SM_ID
415 assert descriptor[
"idShort"] ==
"sm_http_client_unit_tests_updated"
421 if client_aas_reg.encoded_ids:
422 shell_id = encoder.encode_base_64(SHELL_ID)
423 sm_id = encoder.encode_base_64(SM_ID)
425 result = client_aas_reg.shell_registry.delete_submodel_descriptor_by_id_through_superpath(shell_id, sm_id)
426 assert result
is True
428 sm_descriptors = client_aas_reg.shell_registry.get_submodel_descriptor_by_id_through_superpath(shell_id, sm_id)
429 assert sm_descriptors
is None
431 shell_descriptor = client_aas_reg.shell_registry.get_asset_administration_shell_descriptor_by_id(shell_id)
432 assert shell_descriptor
is not None
433 assert "submodelDescriptors" in shell_descriptor
434 assert len(shell_descriptor[
"submodelDescriptors"]) == 0
439 if client_aas_reg.encoded_ids:
440 shell_id = encoder.encode_base_64(SHELL_ID)
442 descriptor = client_aas_reg.shell_registry.post_submodel_descriptor_through_superpath(shell_id, global_sm_descriptor)
443 assert descriptor
is not None
444 assert "id" in descriptor
445 assert descriptor[
"id"] == SM_ID
450 if client_aas_reg.encoded_ids:
451 shell_id = encoder.encode_base_64(SHELL_ID)
453 descriptors = client_aas_reg.shell_registry.get_all_submodel_descriptors_through_superpath(shell_id)
454 assert descriptors
is not None
455 assert "result" in descriptors
456 results = descriptors[
"result"]
457 assert results
is not None
458 assert len(results) == 1
459 assert results[0][
"id"] == SM_ID
462 result = client_sm_reg.submodel_registry.post_submodel_descriptor(global_sm_descriptor)
463 assert result
is not None
464 assert "id" in result
465 assert result[
"id"] == SM_ID
470 if client_sm_reg.encoded_ids:
471 sm_id = encoder.encode_base_64(SM_ID)
473 descriptor = client_sm_reg.submodel_registry.get_submodel_descriptor_by_id(sm_id)
474 assert descriptor
is not None
475 assert descriptor[
"id"] == SM_ID
478 global_sm_descriptor[
"idShort"] =
"sm_http_client_unit_tests_updated"
482 if client_sm_reg.encoded_ids:
483 sm_id = encoder.encode_base_64(SM_ID)
485 result = client_sm_reg.submodel_registry.put_submodel_descriptor_by_id(sm_id, global_sm_descriptor)
486 assert result
is True
488 descriptor = client_sm_reg.submodel_registry.get_submodel_descriptor_by_id(sm_id)
489 assert descriptor
is not None
490 assert descriptor[
"id"] == SM_ID
491 assert descriptor[
"idShort"] ==
"sm_http_client_unit_tests_updated"
496 if client_sm_reg.encoded_ids:
497 sm_id = encoder.encode_base_64(SM_ID)
499 result = client_sm_reg.submodel_registry.delete_submodel_descriptor_by_id(sm_id)
500 assert result
is True
502 sm_descriptors = client_sm_reg.submodel_registry.get_submodel_descriptor_by_id(sm_id)
503 assert sm_descriptors
is None
505def test_099_cleanup(client: AasHttpClient, client_aas_reg: AasHttpClient, client_sm_reg: AasHttpClient):
506 shells_result = client.shells.get_all_asset_administration_shells()
508 for shell
in shells_result.get(
"result", []):
509 client.shells.delete_asset_administration_shell_by_id(shell[
"id"])
511 submodels_result = client.submodels.get_all_submodels()
512 for submodel
in submodels_result.get(
"result", []):
513 client.submodels.delete_submodel_by_id(submodel[
"id"])
515 client_aas_reg.shell_registry.delete_all_asset_administration_shell_descriptors()
516 client_sm_reg.submodel_registry.delete_all_submodel_descriptors()
518 shells_result = client.shells.get_all_asset_administration_shells()
519 assert len(shells_result.get(
"result")) == 0
520 submodels_result = client.submodels.get_all_submodels()
521 assert len(submodels_result.get(
"result")) == 0
522 shell_descriptors_result = client_aas_reg.shell_registry.get_all_asset_administration_shell_descriptors()
523 assert len(shell_descriptors_result.get(
"result")) == 0
524 sm_descriptors_result = client_sm_reg.submodel_registry.get_all_submodel_descriptors()
525 assert len(sm_descriptors_result.get(
"result")) == 0
test_014_delete_submodel_descriptor_by_id_through_superpath(AasHttpClient client_aas_reg)
test_012_get_submodel_descriptor_by_id_through_superpath(AasHttpClient client_aas_reg)
test_009_delete_asset_administration_shell_descriptor_by_id(AasHttpClient client_aas_reg, dict global_shell_descriptor)
model.Submodel shared_sm()
test_011c_post_aas_descriptor(AasHttpClient client_aas_reg, dict global_shell_descriptor, dict global_sm_descriptor)
test_006_post_asset_administration_shell_descriptor(AasHttpClient client_aas_reg, dict global_shell_descriptor)
test_008_put_asset_administration_shell_descriptor_by_id(AasHttpClient client_aas_reg, dict global_shell_descriptor)
test_013_put_submodel_descriptor_by_id_through_superpath(AasHttpClient client_aas_reg, dict global_sm_descriptor)
dict global_sm_descriptor()
test_005b_delete_assets(AasHttpClient client, AasHttpClient client_aas_reg, AasHttpClient client_sm_reg)
test_099_cleanup(AasHttpClient client, AasHttpClient client_aas_reg, AasHttpClient client_sm_reg)
test_017_post_submodel_descriptor(AasHttpClient client_sm_reg, dict global_sm_descriptor)
test_007_get_asset_administration_shell_descriptor_by_id(AasHttpClient client_aas_reg)
test_000a_clean_server(AasHttpClient client, AasHttpClient client_aas_reg, AasHttpClient client_sm_reg)
test_003_get_all_asset_administration_shell_descriptors(AasHttpClient client_aas_reg)
test_005a_get_endpoint(AasHttpClient client, AasHttpClient client_sm_reg)
AasHttpClient client_aas_reg(AasHttpClient client)
test_002_search(AasHttpClient client_aas_reg)
AasHttpClient client_sm_reg(AasHttpClient client)
test_019_put_submodel_descriptor_by_id(AasHttpClient client_sm_reg, dict global_sm_descriptor)
test_018_get_submodel_descriptor_by_id(AasHttpClient client_sm_reg)
test_001a_get_self_description_shell(AasHttpClient client_aas_reg)
test_011a_delete_all_asset_administration_shell_descriptors(AasHttpClient client_aas_reg)
test_016_get_all_submodel_descriptors_through_superpath(AasHttpClient client_aas_reg)
dict global_shell_descriptor()
test_001b_get_self_description_sm(AasHttpClient client_sm_reg)
test_015_post_submodel_descriptor_through_superpath(AasHttpClient client_aas_reg, dict global_sm_descriptor)
test_011b_delete_all_submodel_descriptors(AasHttpClient client_sm_reg)
test_020_delete_submodel_descriptor_by_id(AasHttpClient client_sm_reg)
test_010_post_submodel_descriptor(AasHttpClient client_sm_reg, dict global_sm_descriptor)
model.AssetAdministrationShell shared_aas(model.Submodel shared_sm)
test_000b_post_assets(AasHttpClient client, model.AssetAdministrationShell shared_aas, model.Submodel shared_sm)
test_004_get_all_submodel_descriptors(AasHttpClient client_sm_reg)