AAS HTTP Client Documentation
Loading...
Searching...
No Matches
test_client_reg.py
Go to the documentation of this file.
1import pytest
2from pathlib import Path
3from aas_http_client.classes.client.aas_client import create_by_config, AasHttpClient
4from basyx.aas import model
5from aas_http_client.utilities import encoder, sdk_tools, model_builder
6import logging
7from aas_http_client.demo.logging_handler import initialize_logging
8import random
9
10logger = logging.getLogger(__name__)
11
12JAVA_SERVER_PORTS = [8075]
13PYTHON_SERVER_PORTS = [5080, 80]
14
15SM_ID = "fluid40/sm_http_client_unit_tests"
16SHELL_ID = "fluid40/aas_http_client_unit_tests"
17
18CONFIG_FILE_ENV = "./tests/server_configs/test_java_server_config.yml"
19CONFIG_FILE_AAS_REG_ENV = "./tests/server_configs/test_aas_reg_server_config.yml"
20CONFIG_FILE_SM_REG_ENV = "./tests/server_configs/test_sm_reg_server_config.yml"
21
22shared_shell_descriptor: dict = {}
23shared_sm_descriptor: dict = {}
24
25@pytest.fixture(scope="module")
26def client() -> AasHttpClient:
27 try:
28 initialize_logging()
29 client = create_by_config(Path(CONFIG_FILE_ENV))
30
31 rand = random.randint(0, 10)
32 if (rand % 2) == 0:
33 client.encoded_ids = True
34
35 except Exception as e:
36 raise RuntimeError("Unable to connect to server.")
37
38 shells = client.shells.get_all_asset_administration_shells()
39 if shells is None:
40 raise RuntimeError("No shells found on server. Please check the server configuration.")
41
42 return client
43
44@pytest.fixture(scope="module")
45def client_aas_reg(client: AasHttpClient) -> AasHttpClient:
46 try:
47 initialize_logging()
48 reg_client = create_by_config(Path(CONFIG_FILE_AAS_REG_ENV))
49 reg_client.encoded_ids = client.encoded_ids
50 except Exception as e:
51 raise RuntimeError("Unable to connect to server.")
52
53 descriptors = reg_client.shell_registry.get_all_asset_administration_shell_descriptors()
54 if descriptors is None:
55 raise RuntimeError("No descriptors found on server. Please check the server configuration.")
56
57 return reg_client
58
59@pytest.fixture(scope="module")
60def client_sm_reg(client: AasHttpClient) -> AasHttpClient:
61 try:
62 initialize_logging()
63 reg_client = create_by_config(Path(CONFIG_FILE_SM_REG_ENV))
64 reg_client.encoded_ids = client.encoded_ids
65 except Exception as e:
66 raise RuntimeError("Unable to connect to server.")
67
68 descriptors = reg_client.submodel_registry.get_all_submodel_descriptors()
69 if descriptors is None:
70 raise RuntimeError("No descriptors found on server. Please check the server configuration.")
71
72 return reg_client
73
74@pytest.fixture(scope="module")
75def shared_sm() -> model.Submodel:
76 # create a Submodel
77 return model_builder.create_base_submodel(identifier=SM_ID, id_short="sm_http_client_unit_tests")
78
79@pytest.fixture(scope="module")
80def shared_aas(shared_sm: model.Submodel) -> model.AssetAdministrationShell:
81 # create an AAS
82 aas = model_builder.create_base_aas(identifier=SHELL_ID, id_short="aas_http_client_unit_tests")
83
84 # add Submodel to AAS
85 sdk_tools.add_submodel_to_aas(aas, shared_sm)
86
87 return aas
88
89@pytest.fixture(scope="module")
91 return shared_shell_descriptor
92
93@pytest.fixture(scope="module")
95 return shared_sm_descriptor
96
97def test_000a_clean_server(client: AasHttpClient, client_aas_reg: AasHttpClient, client_sm_reg: AasHttpClient):
98 shells_result = client.shells.get_all_asset_administration_shells()
99
100 for shell in shells_result.get("result", []):
101
102 shell_id = shell["id"]
103
104 if client.encoded_ids:
105 shell_id = encoder.encode_base_64(shell_id)
106
107 client.shells.delete_asset_administration_shell_by_id(shell_id)
108
109 submodels_result = client.submodels.get_all_submodels()
110 for submodel in submodels_result.get("result", []):
111
112 sm_id = submodel["id"]
113
114 if client.encoded_ids:
115 sm_id = encoder.encode_base_64(sm_id)
116
117 client.submodels.delete_submodel_by_id(sm_id)
118
119 client_aas_reg.shell_registry.delete_all_asset_administration_shell_descriptors()
120 client_sm_reg.submodel_registry.delete_all_submodel_descriptors()
121
122 shells_result = client.shells.get_all_asset_administration_shells()
123 assert len(shells_result.get("result")) == 0
124 submodels_result = client.submodels.get_all_submodels()
125 assert len(submodels_result.get("result")) == 0
126 shell_descriptors_result = client_aas_reg.shell_registry.get_all_asset_administration_shell_descriptors()
127 assert len(shell_descriptors_result.get("result")) == 0
128 sm_descriptors_result = client_sm_reg.submodel_registry.get_all_submodel_descriptors()
129 assert len(sm_descriptors_result.get("result")) == 0
130
131def test_000b_post_assets(client: AasHttpClient, shared_aas: model.AssetAdministrationShell, shared_sm: model.Submodel):
132 sm_data = sdk_tools.convert_to_dict(shared_sm)
133 sm_result = client.submodels.post_submodel(sm_data)
134
135 assert sm_result is not None
136
137 shell_data = sdk_tools.convert_to_dict(shared_aas)
138 shell_result = client.shells.post_asset_administration_shell(shell_data)
139 assert shell_result is not None
140
141def test_001a_get_self_description_shell(client_aas_reg: AasHttpClient):
142 description = client_aas_reg.shell_registry.get_self_description()
143
144 assert description is not None
145 assert "profiles" in description
146 assert len(description["profiles"]) == 2
147
148def test_001b_get_self_description_sm(client_sm_reg: AasHttpClient):
149 description = client_sm_reg.submodel_registry.get_self_description()
150
151 assert description is not None
152 assert "profiles" in description
153 assert len(description["profiles"]) == 2
154
155def test_002_search(client_aas_reg: AasHttpClient):
156 request_body = {
157 "page": {
158 "index": 0,
159 "size": 1
160 },
161 "sortBy": {
162 "direction": "ASC",
163 "path": [
164 "idShort"
165 ]
166 }
167 }
168
169 search_result = client_aas_reg.shell_registry.search(request_body)
170
171 assert search_result is not None
172 assert "total" in search_result
173 total = search_result["total"]
174 assert total == 1
175 assert "hits" in search_result
176 hits = search_result["hits"]
177 assert hits is not None
178 assert len(hits) == 1
179 assert hits[0]["id"] == SHELL_ID
180
182 descriptors = client_aas_reg.shell_registry.get_all_asset_administration_shell_descriptors()
183
184 assert descriptors is not None
185 assert "result" in descriptors
186 results = descriptors["result"]
187 assert results is not None
188 assert len(results) == 1
189 assert results[0]["id"] == SHELL_ID
190
191 global shared_shell_descriptor
192 shared_shell_descriptor.clear()
193 shared_shell_descriptor.update(results[0])
194
195def test_004_get_all_submodel_descriptors(client_sm_reg: AasHttpClient):
196 descriptors = client_sm_reg.submodel_registry.get_all_submodel_descriptors()
197
198 assert descriptors is not None
199 assert "result" in descriptors
200 results = descriptors["result"]
201 assert results is not None
202 assert len(results) == 1
203 assert results[0]["id"] == SM_ID
204
205 global shared_sm_descriptor
206 shared_sm_descriptor.clear()
207 shared_sm_descriptor.update(results[0])
208
209def test_005a_get_endpoint(client: AasHttpClient, client_sm_reg: AasHttpClient):
210 sm_id = SM_ID
211 if client.encoded_ids:
212 sm_id = encoder.encode_base_64(SM_ID)
213
214 descriptor = client_sm_reg.submodel_registry.get_submodel_descriptor_by_id(sm_id)
215 assert descriptor is not None
216 assert descriptor.get("id") == SM_ID
217 endpoints = descriptor.get("endpoints", [])
218 assert len(endpoints) == 1
219 endpoint = endpoints[0]
220 assert "protocolInformation" in endpoint
221 protocol_info: dict = endpoint["protocolInformation"]
222 assert protocol_info.get("endpointProtocol") == "http"
223 href = protocol_info.get("href", "")
224 assert href is not None
225
226 print(f"Testing endpoint href: {href}")
227
228 sm = client.get_endpoint(href)
229 assert sm is not None
230 assert sm.get("id") == SM_ID
231
232def test_005b_delete_assets(client: AasHttpClient, client_aas_reg: AasHttpClient, client_sm_reg: AasHttpClient):
233 sm_id = SM_ID
234 shell_id = SHELL_ID
235
236 if client.encoded_ids:
237 sm_id = encoder.encode_base_64(SM_ID)
238 shell_id = encoder.encode_base_64(SHELL_ID)
239
240 result = client.submodels.delete_submodel_by_id(sm_id)
241 assert result
242
243 submodels = client.shells.delete_asset_administration_shell_by_id(shell_id)
244 assert submodels
245
246 shells_result = client.shells.get_all_asset_administration_shells()
247 assert len(shells_result.get("result")) == 0
248 submodels_result = client.submodels.get_all_submodels()
249 assert len(submodels_result.get("result")) == 0
250 shell_descriptors_result = client_aas_reg.shell_registry.get_all_asset_administration_shell_descriptors()
251 assert len(shell_descriptors_result.get("result")) == 0
252 sm_descriptors_result = client_sm_reg.submodel_registry.get_all_submodel_descriptors()
253 assert len(sm_descriptors_result.get("result")) == 0
254
255def test_006_post_asset_administration_shell_descriptor(client_aas_reg: AasHttpClient, global_shell_descriptor: dict):
256 result = client_aas_reg.shell_registry.post_asset_administration_shell_descriptor(global_shell_descriptor)
257
258 assert result is not None
259 assert "id" in result
260 assert result["id"] == SHELL_ID
261
262 descriptors = client_aas_reg.shell_registry.get_all_asset_administration_shell_descriptors()
263 assert descriptors is not None
264 assert "result" in descriptors
265 results = descriptors["result"]
266 assert results is not None
267 assert len(results) == 1
268 assert results[0]["id"] == SHELL_ID
269
271 shell_id = SHELL_ID
272
273 if client_aas_reg.encoded_ids:
274 shell_id = encoder.encode_base_64(SHELL_ID)
275
276 descriptor = client_aas_reg.shell_registry.get_asset_administration_shell_descriptor_by_id(shell_id)
277
278 assert descriptor is not None
279 assert descriptor["id"] == SHELL_ID
280
281def test_008_put_asset_administration_shell_descriptor_by_id(client_aas_reg: AasHttpClient, global_shell_descriptor: dict):
282 global_shell_descriptor["idShort"] = "sm_http_client_unit_tests_updated"
283
284 shell_id = SHELL_ID
285
286 if client_aas_reg.encoded_ids:
287 shell_id = encoder.encode_base_64(SHELL_ID)
288
289 result = client_aas_reg.shell_registry.put_asset_administration_shell_descriptor_by_id(shell_id, global_shell_descriptor)
290
291 assert result is True
292
293 descriptor = client_aas_reg.shell_registry.get_asset_administration_shell_descriptor_by_id(shell_id)
294 assert descriptor is not None
295 assert descriptor["id"] == SHELL_ID
296 assert descriptor["idShort"] == global_shell_descriptor["idShort"]
297
298def test_009_delete_asset_administration_shell_descriptor_by_id(client_aas_reg: AasHttpClient, global_shell_descriptor: dict):
299 shell_id = SHELL_ID
300
301 if client_aas_reg.encoded_ids:
302 shell_id = encoder.encode_base_64(SHELL_ID)
303
304 result = client_aas_reg.shell_registry.delete_asset_administration_shell_descriptor_by_id(shell_id)
305
306 assert result is True
307
308 descriptors = client_aas_reg.shell_registry.get_all_asset_administration_shell_descriptors()
309 assert descriptors is not None
310 assert "result" in descriptors
311 results = descriptors["result"]
312 assert results is not None
313 assert len(results) == 0
314
315 # Re-post the descriptor for further tests
316 global_shell_descriptor["idShort"] = "sm_http_client_unit_tests"
317 result = client_aas_reg.shell_registry.post_asset_administration_shell_descriptor(global_shell_descriptor)
318
319 descriptors = client_aas_reg.shell_registry.get_all_asset_administration_shell_descriptors()
320 assert descriptors is not None
321 assert "result" in descriptors
322 results = descriptors["result"]
323 assert results is not None
324 assert len(results) == 1
325
326def test_010_post_submodel_descriptor(client_sm_reg: AasHttpClient, global_sm_descriptor: dict):
327 result = client_sm_reg.submodel_registry.post_submodel_descriptor(global_sm_descriptor)
328
329 assert result is not None
330 assert "id" in result
331 assert result["id"] == SM_ID
332
333 descriptors = client_sm_reg.submodel_registry.get_all_submodel_descriptors()
334 assert descriptors is not None
335 assert "result" in descriptors
336 results = descriptors["result"]
337 assert results is not None
338 assert len(results) == 1
339 assert results[0]["id"] == SM_ID
340
342 result = client_aas_reg.shell_registry.delete_all_asset_administration_shell_descriptors()
343 assert result
344
345 descriptors = client_aas_reg.shell_registry.get_all_asset_administration_shell_descriptors()
346 assert descriptors is not None
347 assert "result" in descriptors
348 results = descriptors["result"]
349 assert results is not None
350 assert len(results) == 0
351
352def test_011b_delete_all_submodel_descriptors(client_sm_reg: AasHttpClient):
353 result = client_sm_reg.submodel_registry.delete_all_submodel_descriptors()
354 assert result
355
356 descriptors = client_sm_reg.submodel_registry.get_all_submodel_descriptors()
357 assert descriptors is not None
358 assert "result" in descriptors
359 results = descriptors["result"]
360 assert results is not None
361 assert len(results) == 0
362
363def test_011c_post_aas_descriptor(client_aas_reg: AasHttpClient, global_shell_descriptor: dict, global_sm_descriptor: dict):
364 global_shell_descriptor["submodelDescriptors"] = []
365 sm_descriptors: list[dict] = global_shell_descriptor.get("submodelDescriptors", [])
366 sm_descriptors.append(global_sm_descriptor)
367
368 result = client_aas_reg.shell_registry.post_asset_administration_shell_descriptor(global_shell_descriptor)
369 assert result is not None
370 assert "id" in result
371 assert result["id"] == SHELL_ID
372 assert "submodelDescriptors" in result
373 assert len(result["submodelDescriptors"]) == 1
374
375 shell_id = SHELL_ID
376
377 if client_aas_reg.encoded_ids:
378 shell_id = encoder.encode_base_64(SHELL_ID)
379
380 get_result = client_aas_reg.shell_registry.get_asset_administration_shell_descriptor_by_id(shell_id)
381 assert get_result is not None
382 assert "id" in get_result
383 assert get_result["id"] == SHELL_ID
384 assert "submodelDescriptors" in get_result
385 assert len(get_result["submodelDescriptors"])
386
388 shell_id = SHELL_ID
389 sm_id = SM_ID
390
391 if client_aas_reg.encoded_ids:
392 shell_id = encoder.encode_base_64(SHELL_ID)
393 sm_id = encoder.encode_base_64(SM_ID)
394
395 descriptor = client_aas_reg.shell_registry.get_submodel_descriptor_by_id_through_superpath(shell_id, sm_id)
396 assert descriptor is not None
397 assert descriptor["id"] == SM_ID
398
399def test_013_put_submodel_descriptor_by_id_through_superpath(client_aas_reg: AasHttpClient, global_sm_descriptor: dict):
400 global_sm_descriptor["idShort"] = "sm_http_client_unit_tests_updated"
401
402 shell_id = SHELL_ID
403 sm_id = SM_ID
404
405 if client_aas_reg.encoded_ids:
406 shell_id = encoder.encode_base_64(SHELL_ID)
407 sm_id = encoder.encode_base_64(SM_ID)
408
409 result = client_aas_reg.shell_registry.put_submodel_descriptor_by_id_through_superpath(shell_id, sm_id, global_sm_descriptor)
410 assert result is True
411
412 descriptor = client_aas_reg.shell_registry.get_submodel_descriptor_by_id_through_superpath(shell_id, sm_id)
413 assert descriptor is not None
414 assert descriptor["id"] == SM_ID
415 assert descriptor["idShort"] == "sm_http_client_unit_tests_updated"
416
418 shell_id = SHELL_ID
419 sm_id = SM_ID
420
421 if client_aas_reg.encoded_ids:
422 shell_id = encoder.encode_base_64(SHELL_ID)
423 sm_id = encoder.encode_base_64(SM_ID)
424
425 result = client_aas_reg.shell_registry.delete_submodel_descriptor_by_id_through_superpath(shell_id, sm_id)
426 assert result is True
427
428 sm_descriptors = client_aas_reg.shell_registry.get_submodel_descriptor_by_id_through_superpath(shell_id, sm_id)
429 assert sm_descriptors is None
430
431 shell_descriptor = client_aas_reg.shell_registry.get_asset_administration_shell_descriptor_by_id(shell_id)
432 assert shell_descriptor is not None
433 assert "submodelDescriptors" in shell_descriptor
434 assert len(shell_descriptor["submodelDescriptors"]) == 0
435
436def test_015_post_submodel_descriptor_through_superpath(client_aas_reg: AasHttpClient, global_sm_descriptor: dict):
437 shell_id = SHELL_ID
438
439 if client_aas_reg.encoded_ids:
440 shell_id = encoder.encode_base_64(SHELL_ID)
441
442 descriptor = client_aas_reg.shell_registry.post_submodel_descriptor_through_superpath(shell_id, global_sm_descriptor)
443 assert descriptor is not None
444 assert "id" in descriptor
445 assert descriptor["id"] == SM_ID
446
448 shell_id = SHELL_ID
449
450 if client_aas_reg.encoded_ids:
451 shell_id = encoder.encode_base_64(SHELL_ID)
452
453 descriptors = client_aas_reg.shell_registry.get_all_submodel_descriptors_through_superpath(shell_id)
454 assert descriptors is not None
455 assert "result" in descriptors
456 results = descriptors["result"]
457 assert results is not None
458 assert len(results) == 1
459 assert results[0]["id"] == SM_ID
460
461def test_017_post_submodel_descriptor(client_sm_reg: AasHttpClient, global_sm_descriptor: dict):
462 result = client_sm_reg.submodel_registry.post_submodel_descriptor(global_sm_descriptor)
463 assert result is not None
464 assert "id" in result
465 assert result["id"] == SM_ID
466
467def test_018_get_submodel_descriptor_by_id(client_sm_reg: AasHttpClient):
468 sm_id = SM_ID
469
470 if client_sm_reg.encoded_ids:
471 sm_id = encoder.encode_base_64(SM_ID)
472
473 descriptor = client_sm_reg.submodel_registry.get_submodel_descriptor_by_id(sm_id)
474 assert descriptor is not None
475 assert descriptor["id"] == SM_ID
476
477def test_019_put_submodel_descriptor_by_id(client_sm_reg: AasHttpClient, global_sm_descriptor: dict):
478 global_sm_descriptor["idShort"] = "sm_http_client_unit_tests_updated"
479
480 sm_id = SM_ID
481
482 if client_sm_reg.encoded_ids:
483 sm_id = encoder.encode_base_64(SM_ID)
484
485 result = client_sm_reg.submodel_registry.put_submodel_descriptor_by_id(sm_id, global_sm_descriptor)
486 assert result is True
487
488 descriptor = client_sm_reg.submodel_registry.get_submodel_descriptor_by_id(sm_id)
489 assert descriptor is not None
490 assert descriptor["id"] == SM_ID
491 assert descriptor["idShort"] == "sm_http_client_unit_tests_updated"
492
493def test_020_delete_submodel_descriptor_by_id(client_sm_reg: AasHttpClient):
494 sm_id = SM_ID
495
496 if client_sm_reg.encoded_ids:
497 sm_id = encoder.encode_base_64(SM_ID)
498
499 result = client_sm_reg.submodel_registry.delete_submodel_descriptor_by_id(sm_id)
500 assert result is True
501
502 sm_descriptors = client_sm_reg.submodel_registry.get_submodel_descriptor_by_id(sm_id)
503 assert sm_descriptors is None
504
505def test_099_cleanup(client: AasHttpClient, client_aas_reg: AasHttpClient, client_sm_reg: AasHttpClient):
506 shells_result = client.shells.get_all_asset_administration_shells()
507
508 for shell in shells_result.get("result", []):
509 client.shells.delete_asset_administration_shell_by_id(shell["id"])
510
511 submodels_result = client.submodels.get_all_submodels()
512 for submodel in submodels_result.get("result", []):
513 client.submodels.delete_submodel_by_id(submodel["id"])
514
515 client_aas_reg.shell_registry.delete_all_asset_administration_shell_descriptors()
516 client_sm_reg.submodel_registry.delete_all_submodel_descriptors()
517
518 shells_result = client.shells.get_all_asset_administration_shells()
519 assert len(shells_result.get("result")) == 0
520 submodels_result = client.submodels.get_all_submodels()
521 assert len(submodels_result.get("result")) == 0
522 shell_descriptors_result = client_aas_reg.shell_registry.get_all_asset_administration_shell_descriptors()
523 assert len(shell_descriptors_result.get("result")) == 0
524 sm_descriptors_result = client_sm_reg.submodel_registry.get_all_submodel_descriptors()
525 assert len(sm_descriptors_result.get("result")) == 0
test_014_delete_submodel_descriptor_by_id_through_superpath(AasHttpClient client_aas_reg)
test_012_get_submodel_descriptor_by_id_through_superpath(AasHttpClient client_aas_reg)
AasHttpClient client()
test_009_delete_asset_administration_shell_descriptor_by_id(AasHttpClient client_aas_reg, dict global_shell_descriptor)
model.Submodel shared_sm()
test_011c_post_aas_descriptor(AasHttpClient client_aas_reg, dict global_shell_descriptor, dict global_sm_descriptor)
test_006_post_asset_administration_shell_descriptor(AasHttpClient client_aas_reg, dict global_shell_descriptor)
test_008_put_asset_administration_shell_descriptor_by_id(AasHttpClient client_aas_reg, dict global_shell_descriptor)
test_013_put_submodel_descriptor_by_id_through_superpath(AasHttpClient client_aas_reg, dict global_sm_descriptor)
test_005b_delete_assets(AasHttpClient client, AasHttpClient client_aas_reg, AasHttpClient client_sm_reg)
test_099_cleanup(AasHttpClient client, AasHttpClient client_aas_reg, AasHttpClient client_sm_reg)
test_017_post_submodel_descriptor(AasHttpClient client_sm_reg, dict global_sm_descriptor)
test_007_get_asset_administration_shell_descriptor_by_id(AasHttpClient client_aas_reg)
test_000a_clean_server(AasHttpClient client, AasHttpClient client_aas_reg, AasHttpClient client_sm_reg)
test_003_get_all_asset_administration_shell_descriptors(AasHttpClient client_aas_reg)
test_005a_get_endpoint(AasHttpClient client, AasHttpClient client_sm_reg)
AasHttpClient client_aas_reg(AasHttpClient client)
test_002_search(AasHttpClient client_aas_reg)
AasHttpClient client_sm_reg(AasHttpClient client)
test_019_put_submodel_descriptor_by_id(AasHttpClient client_sm_reg, dict global_sm_descriptor)
test_018_get_submodel_descriptor_by_id(AasHttpClient client_sm_reg)
test_001a_get_self_description_shell(AasHttpClient client_aas_reg)
test_011a_delete_all_asset_administration_shell_descriptors(AasHttpClient client_aas_reg)
test_016_get_all_submodel_descriptors_through_superpath(AasHttpClient client_aas_reg)
test_001b_get_self_description_sm(AasHttpClient client_sm_reg)
test_015_post_submodel_descriptor_through_superpath(AasHttpClient client_aas_reg, dict global_sm_descriptor)
test_011b_delete_all_submodel_descriptors(AasHttpClient client_sm_reg)
test_020_delete_submodel_descriptor_by_id(AasHttpClient client_sm_reg)
test_010_post_submodel_descriptor(AasHttpClient client_sm_reg, dict global_sm_descriptor)
model.AssetAdministrationShell shared_aas(model.Submodel shared_sm)
test_000b_post_assets(AasHttpClient client, model.AssetAdministrationShell shared_aas, model.Submodel shared_sm)
test_004_get_all_submodel_descriptors(AasHttpClient client_sm_reg)