From f808fd93107b44f115dfbd48b4068d80c050729c Mon Sep 17 00:00:00 2001 From: Captain_Trojan <sejakm@students.zcu.cz> Date: Fri, 16 Apr 2021 19:50:51 +0200 Subject: [PATCH 1/6] Re #8572 - Created endpoint DELETE /api/certificates/<id>, minor reformatting --- app.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/app.py b/app.py index b5aeeb0..7e016b3 100644 --- a/app.py +++ b/app.py @@ -37,6 +37,11 @@ def set_certificate_status(id, certificate_controller: CertController): return certificate_controller.set_certificate_status(id) +@app.route('/api/certificates/<id>', methods=["DELETE"]) +def delete_certificate(id, certificate_controller: CertController): + return certificate_controller.delete_certificate(id) + + @app.route('/api/certificates/<id>/details', methods=["GET"]) def get_cert_details(id, certificate_controller: CertController): return certificate_controller.get_certificate_details_by_id(id) @@ -51,14 +56,17 @@ def get_cert_root(id, certificate_controller: CertController): def get_cert_chain(id, certificate_controller: CertController): return certificate_controller.get_certificate_trust_chain_by_id(id) + @app.route('/api/certificates/<id>/private_key', methods=["GET"]) def get_private_key_of_a_certificate(id, certificate_controller: CertController): return certificate_controller.get_private_key_of_a_certificate(id) + @app.route('/api/certificates/<id>/public_key', methods=["GET"]) def get_public_key_of_a_certificate(id, certificate_controller: CertController): return certificate_controller.get_public_key_of_a_certificate(id) + def initialize_app(application) -> bool: """ Initializes the application -- GitLab From 9cf9a19d10dd0efd16f18e3e8a9fce7e9f091c58 Mon Sep 17 00:00:00 2001 From: Captain_Trojan <sejakm@students.zcu.cz> Date: Fri, 16 Apr 2021 19:52:10 +0200 Subject: [PATCH 2/6] Re #8572 - Added the `delete_certificate(id)` method to the Controller. --- src/controllers/certificates_controller.py | 31 +++++++++++++++++++--- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/src/controllers/certificates_controller.py b/src/controllers/certificates_controller.py index 932c745..87aa740 100644 --- a/src/controllers/certificates_controller.py +++ b/src/controllers/certificates_controller.py @@ -10,6 +10,7 @@ from src.constants import CA_ID, \ SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, \ DATETIME_FORMAT, ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID # TODO DATABASE_FILE - not the Controller's from src.exceptions.database_exception import DatabaseException +from src.exceptions.unknown_exception import UnknownException from src.model.subject import Subject from src.services.certificate_service import CertificateService, RevocationReasonInvalidException, \ CertificateStatusInvalidException, CertificateNotFoundException, CertificateAlreadyRevokedException @@ -161,7 +162,7 @@ class CertController: cert = self.certificate_service.get_certificate(v) if cert is None: - return E_NO_CERTIFICATES_FOUND, C_NO_DATA + return E_NO_CERTIFICATES_FOUND, C_NO_DATA else: return {"success": True, "data": cert.pem_data}, C_SUCCESS @@ -183,7 +184,7 @@ class CertController: cert = self.certificate_service.get_certificate(v) if cert is None: - return E_NO_CERTIFICATES_FOUND, C_NO_DATA + return E_NO_CERTIFICATES_FOUND, C_NO_DATA else: data = self.cert_to_dict_full(cert) if data is None: @@ -239,7 +240,7 @@ class CertController: if certs is None: return E_GENERAL_ERROR, C_INTERNAL_SERVER_ERROR elif len(certs) == 0: - return E_NO_CERTIFICATES_FOUND, C_NO_DATA + return E_NO_CERTIFICATES_FOUND, C_NO_DATA else: ret = [] for c in certs: @@ -296,7 +297,7 @@ class CertController: cert = self.certificate_service.get_certificate(v) if cert is None: - return E_NO_CERTIFICATES_FOUND, C_NO_DATA + return E_NO_CERTIFICATES_FOUND, C_NO_DATA if cert.parent_id is None: return E_NO_CERTIFICATES_FOUND, C_NO_DATA @@ -448,3 +449,25 @@ class CertController: return E_NO_CERTIFICATES_FOUND, C_NOT_FOUND else: return {"success": True, "data": self.certificate_service.get_public_key_from_certificate(cert)}, C_SUCCESS + + def delete_certificate(self, id): + """ + Deletes a certificate identified by ID, including its corresponding subtree (all descendants). + :param id: target certificate ID + :rtype: DeleteResponse + """ + try: + v = int(id) + except ValueError: + return E_WRONG_PARAMETERS, C_BAD_REQUEST + + try: + self.certificate_service.delete_certificate(v) + except CertificateNotFoundException: + return E_NO_CERTIFICATES_FOUND, C_NOT_FOUND + except DatabaseException: + return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR + except CertificateStatusInvalidException or RevocationReasonInvalidException or UnknownException: + return E_GENERAL_ERROR, C_INTERNAL_SERVER_ERROR + + return {"success": True, "data": "The certificate and its descendants have been successfully deleted."} \ No newline at end of file -- GitLab From 5f8a2c074ade26d1bf810fff13ac36092dc41e72 Mon Sep 17 00:00:00 2001 From: Captain_Trojan <sejakm@students.zcu.cz> Date: Fri, 16 Apr 2021 19:53:36 +0200 Subject: [PATCH 3/6] Re #8572 - Extended the method in the CertificateService. --- src/services/certificate_service.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/src/services/certificate_service.py b/src/services/certificate_service.py index a609443..2130bf1 100644 --- a/src/services/certificate_service.py +++ b/src/services/certificate_service.py @@ -256,16 +256,26 @@ class CertificateService: return chain_of_trust - def delete_certificate(self, unique_id) -> bool: + def delete_certificate(self, unique_id): """ - Deletes a certificate + Deletes a certificate. Raises an Exception should any unexpected behavior occur. :param unique_id: ID of specific certificate - - :return: `True` when the deletion was successful. `False` in other case """ - # TODO delete children? - return self.certificate_repository.delete(unique_id) + + to_delete = self.certificate_repository.get_all_descendants_of(unique_id) + if to_delete is None: + raise CertificateNotFoundException(unique_id) + + for cert in to_delete: + try: + self.set_certificate_revocation_status(cert.certificate_id, STATUS_REVOKED) + except CertificateAlreadyRevokedException: + # TODO log as an info/debug, not warning or above <-- perfectly legal + continue + + self.certificate_repository.delete(cert.certificate_id) + # TODO log if not successfully deleted def set_certificate_revocation_status(self, id, status, reason="unspecified"): """ -- GitLab From 850031845d40792f417927a352cb477a07be808a Mon Sep 17 00:00:00 2001 From: Captain_Trojan <sejakm@students.zcu.cz> Date: Fri, 16 Apr 2021 19:55:37 +0200 Subject: [PATCH 4/6] Re #8572 - Added a `get_all_descendants_of(id)` (meaning descendants in general) method to the CertificateRepository. --- src/dao/certificate_repository.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/dao/certificate_repository.py b/src/dao/certificate_repository.py index 5b6c0ab..f1de2ac 100644 --- a/src/dao/certificate_repository.py +++ b/src/dao/certificate_repository.py @@ -389,6 +389,26 @@ class CertificateRepository: return certificates + def get_all_descendants_of(self, certificate_id: int): + """ + Get a list of all certificates C such that the certificate identified by "certificate_id" belongs to a trust chain + between C and its root certificate authority (i.e. is an ancestor of C). + :param certificate_id: target certificate ID + :return: list of all descendants + """ + def dfs(children_of, this, collection: list): + for child in children_of(this.certificate_id): + dfs(children_of, child, collection) + collection.append(this) + + subtree_root = self.read(certificate_id) + if subtree_root is None: + return None + + all_certs = [] + dfs(self.get_all_issued_by, subtree_root, all_certs) + return all_certs + def get_next_id(self) -> int: """ Get identifier of the next certificate that will be inserted into the database -- GitLab From fc874c36c93519d1197aa4030f63c7fe4047f76c Mon Sep 17 00:00:00 2001 From: Captain_Trojan <sejakm@students.zcu.cz> Date: Fri, 16 Apr 2021 19:58:45 +0200 Subject: [PATCH 5/6] Re #8572 - Added tests for certificate deletion. --- .../rest_api/certificates_test.py | 140 ++++++++++++++++-- 1 file changed, 128 insertions(+), 12 deletions(-) diff --git a/tests/integration_tests/rest_api/certificates_test.py b/tests/integration_tests/rest_api/certificates_test.py index a63dad1..de937a5 100644 --- a/tests/integration_tests/rest_api/certificates_test.py +++ b/tests/integration_tests/rest_api/certificates_test.py @@ -311,11 +311,11 @@ def test_list_of_certificates(server): def test_sign_by_non_ca(server): ret = make_end_cert(server, 2, "Fake intermediate cert s.r.o.", usage={ - "CA": False, - "SSL": True, - "authentication": True, - "digitalSignature": True - }) + "CA": False, + "SSL": True, + "authentication": True, + "digitalSignature": True + }) assert ret.status_code == 201 @@ -327,11 +327,11 @@ def test_sign_by_non_ca(server): assert d["success"] ret = make_end_cert(server, 8, "End certificate signed by end certificate s.r.o.", usage={ - "CA": False, - "SSL": True, - "authentication": False, - "digitalSignature": False - }) + "CA": False, + "SSL": True, + "authentication": False, + "digitalSignature": False + }) # TODO discussion -> assert ret.status_code == 400 assert ret.status_code == 201 @@ -592,6 +592,7 @@ def test_get_cert_public_key_incorrect_id(server): assert not ret.json["success"] assert "No such certificate found." == ret.json["data"] + def test_set_certificate_status(server): # Create certificate to be revoked later certificate = { @@ -624,7 +625,7 @@ def test_set_certificate_status(server): # revoke the certificate revocation_body = { - "status": "revoked", + "status": "revoked", "reason": "keyCompromise" } revoke_ret = server.patch(f"/api/certificates/{cert_id}", content_type="application/json", json=revocation_body) @@ -643,7 +644,7 @@ def test_set_certificate_status(server): # set to valid again valid_body = { - "status": "valid" + "status": "valid" } valid_ret = server.patch(f"/api/certificates/{cert_id}", content_type="application/json", json=valid_body) @@ -698,3 +699,118 @@ def test_set_certificate_status(server): assert "data" in revoke_ret.json assert "success" in revoke_ret.json assert not revoke_ret.json["success"] + + +def test_certificate_deletion(server): + + # custom tree for checking node bijection w/ the actual certificate tree + all_certs = {} + parent_of = {} + + ret = make_root_ca(server) + assert ret.json["success"] + root_id = ret.json["data"] + all_certs[root_id] = [] + + def add(add_fn, parent): + """ + Add a node to the global tree (represented by `all_certs` and `parent_of`). + :param add_fn: node type + :param parent: parent node id + :return: new node id + """ + json = add_fn(server, parent).json + assert json["success"] + new_id = json["data"] + all_certs[parent].append(new_id) + all_certs[new_id] = [] + parent_of[new_id] = parent + return new_id + + def remove(id): + """ + Remove a node from the global tree (represented by `all_certs` and `parent_of`). + :param id: + :return: + """ + removed = [] + + def dfs(t): + removed.append(t) + for child in all_certs[t]: + dfs(child) + del all_certs[t] + + dfs(id) + if id != root_id: + all_certs[parent_of[id]].remove(id) + return removed + + # create a chain of certificates + for i in range(10): + add(make_inter_ca, root_id + i) + + # create a branch from the middle + last = add(make_inter_ca, root_id + 5) + + for i in range(5): + add(make_inter_ca, last + i) + + # add end certificates for bonus spice + add(make_end_cert, root_id + 10) + add(make_end_cert, root_id + 9) + last_end_id = add(make_end_cert, root_id + 16) + + # check that all certificates were created successfully + assert last_end_id == root_id + 19 + + # for four pseudorandom nodes in the created tree (descending order, a is not an ancestor of b if a > b) + for target in (root_id + 14, root_id + 10, root_id + 3, root_id): + # try delete + ret = server.delete(f"/api/certificates/{target}") + assert ret.status_code == 200 + json = ret.json + assert "data" in json + assert "success" in json + assert json["success"] + + # for everything that should be removed + currently_removed = remove(target) + for cert_id in currently_removed: + # check if it actually is + ret = server.delete(f"/api/certificates/{cert_id}") + assert ret.status_code == 404 + json = ret.json + assert "data" in json + assert "success" in json + assert not json["success"] + + # twice, just in case + ret = server.get(f"/api/certificates/{cert_id}") + assert ret.status_code == 205 # TODO change to 404 after someone gets the issue assigned and resolves it + json = ret.json + assert "data" in json + assert "success" in json + assert not json["success"] + + # for everything that should not be removed + for cert_id in all_certs: + # check that it exists + ret = server.get(f"/api/certificates/{cert_id}") + assert ret.status_code == 200 + json = ret.json + assert "data" in json + assert "success" in json + assert json["success"] + + +def test_certificate_deletion_invalid_params_1(server): + ret = server.delete("/api/certificates/120938") + assert ret.status_code == 404 + assert not ret.json["success"] + + +def test_certificate_deletion_invalid_params_2(server): + ret = server.delete("/api/certificates/a_big_number") + assert ret.status_code == 400 + assert not ret.json["success"] \ No newline at end of file -- GitLab From 970c303de57314548bf0c1bc495fe2b975204f34 Mon Sep 17 00:00:00 2001 From: Captain_Trojan <sejakm@students.zcu.cz> Date: Fri, 16 Apr 2021 19:59:29 +0200 Subject: [PATCH 6/6] Re #8572 - Adjusted CertificateService unit tests according to the new `delete_certificate` specification. --- .../services/certificate_service_test.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/integration_tests/services/certificate_service_test.py b/tests/integration_tests/services/certificate_service_test.py index 08e022f..97d9135 100644 --- a/tests/integration_tests/services/certificate_service_test.py +++ b/tests/integration_tests/services/certificate_service_test.py @@ -210,7 +210,8 @@ def test_get_chain_of_trust(private_key_service, certificate_service): def test_delete_cert(private_key_service, certificate_service): - assert not certificate_service.delete_certificate(-1) + with pytest.raises(CertificateNotFoundException): + certificate_service.delete_certificate(-1) root_ca_private_key = private_key_service.create_new_key(passphrase="foobar") @@ -225,8 +226,9 @@ def test_delete_cert(private_key_service, certificate_service): assert original_len + 1 == len_inserted # TODO delete should delete all children? - assert certificate_service.delete_certificate(root_ca_cert.certificate_id) - assert not certificate_service.delete_certificate(root_ca_cert.certificate_id) + certificate_service.delete_certificate(root_ca_cert.certificate_id) + with pytest.raises(CertificateNotFoundException): + certificate_service.delete_certificate(root_ca_cert.certificate_id) assert len_inserted - 1 == len(certificate_service.get_certificates()) -- GitLab