diff --git a/src/dao/certificate_repository.py b/src/dao/certificate_repository.py index 87a199986ba46b9e94c9eed9cab6186584272223..3b17f2ad72d8fdd9b5ba2f0f1915ac5a3a7acf03 100644 --- a/src/dao/certificate_repository.py +++ b/src/dao/certificate_repository.py @@ -50,11 +50,12 @@ class CertificateRepository: last_id: int = self.cursor.lastrowid - if certificate.usages[ROOT_CA_ID - 1]: + # TODO assure that this is correct + if certificate.type_id == ROOT_CA_ID: certificate.parent_id = last_id - return self.update(last_id, certificate) + self.update(last_id, certificate) else: - for usage_id, usage_value in certificate.usages: + for usage_id, usage_value in certificate.usages.items(): if usage_value: sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} " f"({COL_CERTIFICATE_ID}," @@ -83,7 +84,7 @@ class CertificateRepository: f"WHERE {COL_ID} = ?") values = [certificate_id] self.cursor.execute(sql, values) - certificate_row = self.cursor.fetchall() + certificate_row = self.cursor.fetchone() sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} " f"WHERE {COL_CERTIFICATE_ID} = ?") @@ -112,7 +113,7 @@ class CertificateRepository: else: return None - def read_all(self, filter_type: int = None): + def read_all(self, filter_type: int = None) -> List[Certificate]: """ Reads (selects) all certificates (with type). @@ -126,7 +127,7 @@ class CertificateRepository: values = [] if filter_type is not None: sql_extension = (f" WHERE {COL_TYPE_ID} = (" - f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?") + f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)") values = [filter_type] sql = (f"SELECT * FROM {TAB_CERTIFICATES}{sql_extension}") @@ -158,10 +159,7 @@ class CertificateRepository: print(e) return None - if len(certificates) > 0: - return certificates - else: - return None + return certificates def update(self, certificate_id: int, certificate: Certificate) -> bool: """ @@ -202,7 +200,8 @@ class CertificateRepository: self.cursor.execute(sql, values) self.connection.commit() - for usage_id, usage_value in certificate.usages: + # iterate over usage pairs + for usage_id, usage_value in certificate.usages.items(): if usage_value: sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} " f"({COL_CERTIFICATE_ID}," @@ -236,4 +235,4 @@ class CertificateRepository: print(e) return False - return True + return self.cursor.rowcount > 0 diff --git a/src/dao/private_key_repository.py b/src/dao/private_key_repository.py index f20371db476755ea905181ad48d75605479dcc89..d3ba351b0ba32a48ff80c4f065bb21a98626a950 100644 --- a/src/dao/private_key_repository.py +++ b/src/dao/private_key_repository.py @@ -57,7 +57,7 @@ class PrivateKeyRepository: f"WHERE {COL_ID} = ?") values = [private_key_id] self.cursor.execute(sql, values) - private_key_row = self.cursor.fetchall() + private_key_row = self.cursor.fetchone() private_key: PrivateKey = PrivateKey(private_key_row[0], private_key_row[1], @@ -142,4 +142,4 @@ class PrivateKeyRepository: print(e) return False - return True + return self.cursor.rowcount > 0 diff --git a/tests/dao/__init__.py b/src/db/__init__.py similarity index 100% rename from tests/dao/__init__.py rename to src/db/__init__.py diff --git a/src/db/init_queries.py b/src/db/init_queries.py new file mode 100644 index 0000000000000000000000000000000000000000..f7a5df245d35cf649d244e8e042bf2681f0c0817 --- /dev/null +++ b/src/db/init_queries.py @@ -0,0 +1,95 @@ +SCHEMA_SQL = """ +/* ---------------------------------------------------- */ +/* Generated by Enterprise Architect Version 13.5 */ +/* Created On : 01-dub-2021 15:16:53 */ +/* DBMS : SQLite */ +/* ---------------------------------------------------- */ + +/* Drop Tables */ + +DROP TABLE IF EXISTS 'PrivateKeys' +; + +DROP TABLE IF EXISTS 'CertificateTypes' +; + +DROP TABLE IF EXISTS 'UsageTypes' +; + +DROP TABLE IF EXISTS 'Certificates' +; + +DROP TABLE IF EXISTS 'CertificateUsages' +; + +/* Create Tables with Primary and Foreign Keys, Check and Unique Constraints */ + +CREATE TABLE 'PrivateKeys' +( + 'id' INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + 'private_key' TEXT NOT NULL, + 'password' TEXT NULL +) +; + +CREATE TABLE 'CertificateTypes' +( + 'id' INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + 'certificate_type' TEXT NOT NULL +) +; + +CREATE TABLE 'UsageTypes' +( + 'id' INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + 'usage_type' TEXT NOT NULL +) +; + +CREATE TABLE 'Certificates' +( + 'id' INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + 'common_name' TEXT NOT NULL, + 'valid_from' TEXT NOT NULL, + 'valid_to' TEXT NOT NULL, + 'pem_data' TEXT NOT NULL, + 'private_key_id' INTEGER NOT NULL, + 'certificate_type_id' INTEGER NOT NULL, + 'parent_certificate_id' INTEGER NOT NULL, + CONSTRAINT 'FK_Certificates' FOREIGN KEY ('parent_certificate_id') REFERENCES 'Certificates' ('id') ON DELETE No Action ON UPDATE No Action, + CONSTRAINT 'FK_CertificateTypes' FOREIGN KEY ('certificate_type_id') REFERENCES 'CertificateTypes' ('id') ON DELETE No Action ON UPDATE No Action, + CONSTRAINT 'FK_PrivateKeys' FOREIGN KEY ('private_key_id') REFERENCES 'PrivateKeys' ('id') ON DELETE No Action ON UPDATE No Action +) +; + +CREATE TABLE 'CertificateUsages' +( + 'id' INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + 'certificate_id' INTEGER NOT NULL, + 'usage_type_id' INTEGER NOT NULL, + CONSTRAINT 'FK_Certificates' FOREIGN KEY ('certificate_id') REFERENCES 'Certificates' ('id') ON DELETE Cascade ON UPDATE No Action, + CONSTRAINT 'FK_UsageTypes' FOREIGN KEY ('usage_type_id') REFERENCES 'UsageTypes' ('id') ON DELETE No Action ON UPDATE No Action +) +; + +""" + +DEFAULT_VALUES_SQL = """ +/* ---------------------------------------------------- */ +/* Generated by Enterprise Architect Version 13.5 */ +/* Created On : 26-bře-2021 13:33:05 */ +/* DBMS : SQLite */ +/* ---------------------------------------------------- */ + +/* Insert default values */ + +INSERT INTO CertificateTypes(certificate_type) VALUES('ROOT_CA'); +INSERT INTO CertificateTypes(certificate_type) VALUES('INTERMEDIATE_CA'); +INSERT INTO CertificateTypes(certificate_type) VALUES('CERTIFICATE'); + +INSERT INTO UsageTypes(usage_type) VALUES('CA'); +INSERT INTO UsageTypes(usage_type) VALUES('SSL'); +INSERT INTO UsageTypes(usage_type) VALUES('SIGNATURE'); +INSERT INTO UsageTypes(usage_type) VALUES('AUTHENTICATION'); + +""" \ No newline at end of file diff --git a/src/services/certificate_service.py b/src/services/certificate_service.py new file mode 100644 index 0000000000000000000000000000000000000000..33858611721536c69f4b48cf21e00d77e9451922 --- /dev/null +++ b/src/services/certificate_service.py @@ -0,0 +1,255 @@ +from typing import List + +from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID +from src.dao.certificate_repository import CertificateRepository +from src.model.certificate import Certificate +from src.model.private_key import PrivateKey +from src.model.subject import Subject +from src.services.cryptography import CryptographyService + +import time + +DATE_FORMAT = "%d.%m.%Y %H:%M:%S" +CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE" + + +class CertificateService: + + def __init__(self, cryptography_service: CryptographyService, certificate_repository: CertificateRepository): + self.cryptography_service = cryptography_service + self.certificate_repository = certificate_repository + + # TODO usages present in method parameters but not in class diagram + def create_root_ca(self, key: PrivateKey, subject: Subject, extensions: str = "", config: str = "", + usages=None, days=30): + """ + Creates a root CA certificate based on the given parameters. + :param key: Private key to be used when generating the certificate + :param subject: Subject to be used put into the certificate + :param config: String containing the configuration to be used + :param extensions: Name of the section in the configuration representing extensions + :param usages: A dictionary containing usages of the certificate to be generated (see constants.py) + :param days: Number of days for which the generated cert. will be considered valid + :return: An instance of Certificate class representing the generated root CA cert + """ + if usages is None: + usages = {} + + # create a new self signed certificate + cert_pem = self.cryptography_service.create_sscrt(subject, key.private_key, key_pass=key.password, + extensions=extensions, config=config, days=days) + # specify CA usage + usages[CA_ID] = True + + # wrap into Certificate class + certificate = self.__create_wrapper(cert_pem, key.private_key_id, usages, 0, + ROOT_CA_ID) + + # store the wrapper into the repository + created_id = self.certificate_repository.create(certificate) + + # assign the generated ID to the inserted certificate + certificate.certificate_id = created_id + + return certificate + + def __create_wrapper(self, cert_pem, private_key_id, usages, parent_id, cert_type): + """ + Wraps the given parameters using the Certificate class. Uses CryptographyService to find out the notBefore and + notAfter fields. + :param cert_pem: PEM of the cert. to be wrapped + :param private_key_id: ID of the private key used to create the given certificate + :param usages: A dictionary containing usages of the generated certificate generated (see constants.py) + :param parent_id: ID of the CA that issued this certificate + :param cert_type: Type of this certificate (see constants.py) + :return: An instance of the Certificate class wrapping the values passed via method parameters + """ + # parse the generated pem for subject and notBefore/notAfter fields + # TODO this could be improved in the future in such way that calling openssl is not required to parse the dates + subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem) + # format the parsed date + not_before_formatted = time.strftime(DATE_FORMAT, not_before) + not_after_formatted = time.strftime(DATE_FORMAT, not_after) + + # create a certificate wrapper + certificate = Certificate(-1, subj.common_name, not_before_formatted, not_after_formatted, cert_pem, + private_key_id, cert_type, parent_id, usages) + + return certificate + + # TODO config parameter present in class diagram but not here (unused) + def create_ca(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate, issuer_key: PrivateKey, + extensions: str = "", days: int = 30, usages=None): + """ + Creates an intermediate CA certificate issued by the given parent CA. + :param subject_key: Private key to be used when generating the certificate + :param subject: Subject to be used put into the certificate + :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA + :param issuer_key: PK used to generate the issuer certificate + :param extensions: Extensions to be used when generating the certificate + :param usages: A dictionary containing usages of the certificate to be generated (see constants.py) + :param days: Number of days for which the generated cert. will be considered valid + :return: An instance of Certificate class representing the generated intermediate CA cert + """ + if usages is None: + usages = {} + + extensions = extensions + "\n" + CA_EXTENSIONS + # TODO implement AIA URI via extensions + cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data, + issuer_key.private_key, + subject_key_pass=subject_key.password, + issuer_key_pass=issuer_key.password, extensions=extensions, + days=days) + + # specify CA usage + usages[CA_ID] = True + + # wrap into Certificate class + self.__create_wrapper(cert_pem, subject_key.private_key_id, usages, + issuer_cert.certificate_id, INTERMEDIATE_CA_ID) + + # parse the generated pem for subject and notBefore/notAfter fields + subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem) + + # format the parsed date + not_before_formatted = time.strftime(DATE_FORMAT, not_before) + not_after_formatted = time.strftime(DATE_FORMAT, not_after) + + # specify CA usage + usages[CA_ID] = True + + # create a certificate wrapper + certificate = Certificate(-1, subject.common_name, not_before_formatted, not_after_formatted, cert_pem, + subject_key.private_key_id, INTERMEDIATE_CA_ID, issuer_cert.certificate_id, usages) + + # store the wrapper into the repository + created_id = self.certificate_repository.create(certificate) + + # assign the generated ID to the inserted certificate + certificate.certificate_id = created_id + + return certificate + + def create_end_cert(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate, + issuer_key: PrivateKey, + extensions: str = "", days: int = 30, usages=None): + """ + Creates an end certificate issued by the given parent CA. + :param subject_key: Private key to be used when generating the certificate + :param subject: Subject to be used put into the certificate + :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA + :param issuer_key: PK used to generate the issuer certificate + :param extensions: Extensions to be used when generating the certificate + :param usages: A dictionary containing usages of the certificate to be generated (see constants.py) + :param days: Number of days for which the generated cert. will be considered valid + :return: An instance of Certificate class representing the generated cert + """ + if usages is None: + usages = {} + + # generate a new certificate + cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data, + issuer_key.private_key, + subject_key_pass=subject_key.password, + issuer_key_pass=issuer_key.password, extensions=extensions, + days=days) + + # wrap the generated certificate using Certificate class + certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages, + issuer_cert.certificate_id, CERTIFICATE_ID) + + created_id = self.certificate_repository.create(certificate) + + certificate.certificate_id = created_id + + return certificate + + def get_certificate(self, unique_id: int) -> Certificate: + """ + Tries to fetch a certificate from the certificate repository using a given id. + :param unique_id: ID of the certificate to be fetched + :return: Instance of the Certificate class containing a certificate with the given id or `None` if such + certificate is not found + """ + return self.certificate_repository.read(unique_id) + + def get_certificates(self, cert_type=None) -> List[Certificate]: + """ + Tries to fetch a list of all certificates from the certificate repository. Using the `cert_type` parameter only + certificates of the given type can be returned. + :param cert_type: Type of certificates to be returned + :return: List of instances of the Certificate class representing all certificates present in the certificate + repository. An empty list is returned when no certificates are found. + """ + return self.certificate_repository.read_all(cert_type) + + def get_chain_of_trust(self, from_id: int, to_id: int = -1, exclude_root=True) -> List[Certificate]: + """ + Traverses the certificate hierarchy tree upwards till a certificate with the `to_id` ID is found or till a + root CA certificate is found. Root certificates are excluded from the chain by default. + :param from_id: ID of the first certificate to be included in the chain of trust + :param to_id: ID of the last certificate to be included in the chain of trust + :param exclude_root: a flag indicating whether root CA certificate should be excluded + :return: a list of certificates representing the chain of trust starting with the certificate given by `from_id` + ID + """ + # read the first certificate of the chain + start_cert = self.certificate_repository.read(from_id) + + # if no cert is found or the current cert is root CA and root CAs should be excluded, then return an empty list + if start_cert is None or (start_cert.type_id == ROOT_CA_ID and exclude_root): + return [] + + current_cert = start_cert + chain_of_trust = [current_cert] + + # TODO could possibly be simplified + if start_cert.type_id == ROOT_CA_ID: + # the first cert found is a root ca + return chain_of_trust + + while True: + parent_cert = self.certificate_repository.read(current_cert.parent_id) + + # check whether parent certificate exists + if parent_cert is None: + break + + # check whether the found certificate is a root certificate + if parent_cert.type_id == ROOT_CA_ID: + if not exclude_root: + # append the found root cert only if root certificates should not be excluded from the CoT + chain_of_trust.append(parent_cert) + break + + # append the certificate + chain_of_trust.append(parent_cert) + + # stop iterating over certificates if the id of the found certificate matches `to_id` method parameter + if parent_cert.certificate_id == to_id: + break + + current_cert = parent_cert + + return chain_of_trust + + def delete_certificate(self, unique_id) -> bool: + """ + Deletes a certificate + + :param unique_id: ID of specific certificate + + :return: `True` when the deletion was successful. `False` in other case + """ + # TODO delete children? + return self.certificate_repository.delete(unique_id) + + def get_subject_from_certificate(self, certificate: Certificate) -> Subject: + """ + Get Subject distinguished name from a Certificate + :param certificate: certificate instance whose Subject shall be parsed + :return: instance of Subject class containing resulting distinguished name + """ + (subject, _, _) = self.cryptography_service.parse_cert_pem(certificate.pem_data) + return subject diff --git a/src/services/cryptography.py b/src/services/cryptography.py index 27dcf502907444c6e0918da4da670854f897beb9..dad7e5cae08b05e748eaf99e2c2012e8f8d0fbaa 100644 --- a/src/services/cryptography.py +++ b/src/services/cryptography.py @@ -1,13 +1,19 @@ +import re import subprocess +import time -# encryption method to be used when generating private keys +from src.model.subject import Subject from src.utils.temporary_file import TemporaryFile +# encryption method to be used when generating private keys PRIVATE_KEY_ENCRYPTION_METHOD = "-aes256" # openssl executable name OPENSSL_EXECUTABLE = "openssl" +# format of NOT_BEFORE NOT_AFTER date fields +NOT_AFTER_BEFORE_DATE_FORMAT = "%b %d %H:%M:%S %Y %Z" + class CryptographyService: @@ -86,7 +92,7 @@ class CryptographyService: return self.__run_for_output( ["genrsa", PRIVATE_KEY_ENCRYPTION_METHOD, "-passout", f"pass:{passphrase}", "2048"]).decode() - def create_sscrt(self, subject, key, config="", extensions="", key_pass=None): + def create_sscrt(self, subject, key, config="", extensions="", key_pass=None, days=30): """ Creates a root CA @@ -95,6 +101,7 @@ class CryptographyService: :param config: string containing the configuration to be used :param extensions: name of the section in the configuration representing extensions :param key_pass: passphrase of the private key + :param days: number of days for which the certificate will be valid :return: string containing the generated certificate in PEM format """ @@ -104,7 +111,7 @@ class CryptographyService: subj = self.__subject_to_param_format(subject) with TemporaryFile("openssl.conf", config) as conf_path: - args = ["req", "-x509", "-new", "-subj", subj, + args = ["req", "-x509", "-new", "-subj", subj, "-days", f"{days}", "-key", "-"] if len(config) > 0: args.extend(["-config", conf_path]) @@ -184,9 +191,8 @@ class CryptographyService: :param subject_key: string containing the private key to be used when creating the certificate in PEM format :param issuer_key: string containing the private key of the issuer's certificate in PEM format :param issuer_pem: string containing the certificate of the issuer that will sign this CSR in PEM format - :param issuer_key: string containing the private key of the issuer's certificate in PEM format - :param subject_key_pass: string containing the passphrase of the private key used when creating the certificate in PEM - format + :param subject_key_pass: string containing the passphrase of the private key used when creating the certificate + in PEM format :param issuer_key_pass: string containing the passphrase of the private key of the issuer's certificate in PEM format :param extensions: extensions to be applied when creating the certificate @@ -225,6 +231,64 @@ class CryptographyService: # the process failed because of some other reason (incorrect cert format) raise CryptographyException(OPENSSL_EXECUTABLE, args, err.decode()) + def parse_cert_pem(self, cert_pem): + """ + Parses the given certificate in PEM format and returns the subject of the certificate and it's NOT_BEFORE + and NOT_AFTER field + :param cert_pem: a certificated in a PEM format to be parsed + :return: a tuple containing a subject, NOT_BEFORE and NOT_AFTER dates + """ + # run openssl x509 to view certificate content + args = ["x509", "-noout", "-subject", "-startdate", "-enddate", "-in", "-"] + + cert_info_raw = self.__run_for_output(args, proc_input=bytes(cert_pem, encoding="utf-8")).decode() + + # split lines + results = re.split("\n", cert_info_raw) + subj_line = results[0] + not_before_line = results[1] + not_after_line = results[2] + + # attempt to extract subject via regex + match = re.search(r"subject=(.*)", subj_line) + if match is None: + # TODO use logger + print(f"Could not find subject to parse: {subj_line}") + return None + else: + # find all attributes (key = value) + found = re.findall(r"\s?([^c=\s]+)\s?=\s?([^,\n]+)", match.group(1)) + subj = Subject() + for key, value in found: + if key == "C": + subj.country = value + elif key == "ST": + subj.state = value + elif key == "L": + subj.locality = value + elif key == "O": + subj.organization = value + elif key == "OU": + subj.organization_unit = value + elif key == "CN": + subj.common_name = value + elif key == "emailAddress": + subj.email_address = value + + # extract notBefore and notAfter date fields + not_before = re.search(r"notBefore=(.*)", not_before_line) + not_after = re.search(r"notAfter=(.*)", not_after_line) + + # if date fields are found parse them into date objects + if not_before is not None: + not_before = time.strptime(not_before.group(1), NOT_AFTER_BEFORE_DATE_FORMAT) + if not_after is not None: + not_after = time.strptime(not_after.group(1), NOT_AFTER_BEFORE_DATE_FORMAT) + + # TODO wrapper class? + # return it as a tuple + return subj, not_before, not_after + class CryptographyException(Exception): diff --git a/src/services/key_service.py b/src/services/key_service.py new file mode 100644 index 0000000000000000000000000000000000000000..1b81239973296324256e2dfcb5ecd4963c251fa8 --- /dev/null +++ b/src/services/key_service.py @@ -0,0 +1,60 @@ +from src.dao.private_key_repository import PrivateKeyRepository +from src.model.private_key import PrivateKey +from src.services.cryptography import CryptographyService + + +class KeyService: + + def __init__(self, cryptography_service: CryptographyService, private_key_repository: PrivateKeyRepository): + self.cryptography_service = cryptography_service + self.private_key_repository = private_key_repository + + def create_new_key(self, passphrase="") -> PrivateKey: + """ + Creates a new private key using the given passphrase. + :param passphrase: Passphrase to be used when encrypting the PK + :return: An instance of the <PrivateKey> class representing the generated PK + """ + # generate a new private key + private_key_pem = self.cryptography_service.create_private_key(passphrase) + + # store generated PK and the passphrase in a wrapper + private_key = PrivateKey(-1, private_key_pem, passphrase) + + # store the wrapper in the PK repository + private_key_id = self.private_key_repository.create(private_key) + + # assign the generated ID to the wrapper + private_key.private_key_id = private_key_id + + return private_key + + def get_key(self, unique_id): + """ + Tries to fetch a PK using the given ID. + :param unique_id: ID of the PK to be found + :return:An instance of the required PK or `None` + """ + return self.private_key_repository.read(unique_id) + + def get_keys(self, unique_ids=None): + """ + Tries to fetch all PKs in the repository. Exact PKs to be fetched can be specified using the `unique_ids` + parameter. If `unique_ids` parameter is not passed then all PKs in the repository are returned. + :param unique_ids: An array containing IDs of PKs to be fetched from the repository. + :return: A list of instances of the PrivateKey class representing the PKs found + """ + if unique_ids is None: + return self.private_key_repository.read_all() + else: + # TODO this is very inefficient + return [self.private_key_repository.read(identifier) for identifier in unique_ids] + + def delete_key(self, unique_id): + """ + Deletes a private key + + :param unique_id: ID of specific certificate to be deleted + :return: `True` when the deletion was successful. `False` in other case + """ + return self.private_key_repository.delete(unique_id) diff --git a/tests/dao/certificate_repository/__init__.py b/tests/integration_tests/__init__.py similarity index 100% rename from tests/dao/certificate_repository/__init__.py rename to tests/integration_tests/__init__.py diff --git a/tests/services/__init__.py b/tests/integration_tests/dao/__init__.py similarity index 100% rename from tests/services/__init__.py rename to tests/integration_tests/dao/__init__.py diff --git a/tests/services/cryptography/__init__.py b/tests/integration_tests/dao/certificate_repository/__init__.py similarity index 100% rename from tests/services/cryptography/__init__.py rename to tests/integration_tests/dao/certificate_repository/__init__.py diff --git a/tests/dao/certificate_repository/conftest.py b/tests/integration_tests/dao/certificate_repository/conftest.py similarity index 100% rename from tests/dao/certificate_repository/conftest.py rename to tests/integration_tests/dao/certificate_repository/conftest.py diff --git a/tests/dao/certificate_repository/create_certificate.py b/tests/integration_tests/dao/certificate_repository/create_certificate.py similarity index 100% rename from tests/dao/certificate_repository/create_certificate.py rename to tests/integration_tests/dao/certificate_repository/create_certificate.py diff --git a/tests/integration_tests/services/__init__.py b/tests/integration_tests/services/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tests/integration_tests/services/certificate_service_test.py b/tests/integration_tests/services/certificate_service_test.py new file mode 100644 index 0000000000000000000000000000000000000000..aa812fbf2837b696cb8a815c688d8338f502e4fd --- /dev/null +++ b/tests/integration_tests/services/certificate_service_test.py @@ -0,0 +1,245 @@ +import subprocess + +from src.constants import SSL_ID, CA_ID, AUTHENTICATION_ID, INTERMEDIATE_CA_ID, ROOT_CA_ID, CERTIFICATE_ID, SIGNATURE_ID +from src.model.subject import Subject + + +def export_crt(crt): + return subprocess.check_output(["openssl", "x509", "-noout", "-text", "-in", "-"], + input=bytes(crt, encoding="utf-8"), stderr=subprocess.STDOUT).decode() + + +def test_create_and_get_root_ca(private_key_service, certificate_service): + private_key = private_key_service.create_new_key(passphrase="foobar") + + cert = certificate_service.create_root_ca(private_key, + Subject(common_name="FooName", organization_unit="Department of Foo"), + usages={SSL_ID: True}) + + assert ROOT_CA_ID == cert.type_id + + assert cert.usages[CA_ID] + assert cert.usages[SSL_ID] + assert cert.usages[AUTHENTICATION_ID] is False + assert cert.usages[SIGNATURE_ID] is False + + cert_loaded = certificate_service.get_certificate(cert.certificate_id) + + # verify that the loaded certificate is a CA + cert_loaded_printed = export_crt(cert_loaded.pem_data) + assert """ X509v3 Basic Constraints: critical + CA:TRUE""" in cert_loaded_printed + + assert cert.certificate_id == cert_loaded.certificate_id + assert cert.common_name == cert_loaded.common_name + assert cert.valid_from == cert_loaded.valid_from + assert cert.valid_to == cert_loaded.valid_to + assert cert.pem_data == cert_loaded.pem_data + assert cert.private_key_id == cert_loaded.private_key_id + assert cert.type_id == cert_loaded.type_id + assert cert.parent_id == cert_loaded.parent_id + assert cert.usages == cert_loaded.usages + + +def test_create_and_get_inter_cert(private_key_service, certificate_service): + root_ca_private_key = private_key_service.create_new_key(passphrase="foobar") + inter_ca_private_key = private_key_service.create_new_key() + + root_ca = certificate_service.create_root_ca(root_ca_private_key, + Subject(common_name="RootFoo", organization_unit="Department of Foo")) + + inter_cert = certificate_service.create_ca(inter_ca_private_key, Subject(common_name="Intermediate CA"), root_ca, + root_ca_private_key, usages={SSL_ID: True}) + + assert INTERMEDIATE_CA_ID == inter_cert.type_id + + assert inter_cert.usages[CA_ID] + assert inter_cert.usages[SSL_ID] + assert inter_cert.usages[AUTHENTICATION_ID] is False + assert inter_cert.usages[SIGNATURE_ID] is False + + inter_cert_loaded = certificate_service.get_certificate(inter_cert.certificate_id) + + # verify that the loaded certificate is a CA + cert_loaded_printed = export_crt(inter_cert_loaded.pem_data) + assert """ X509v3 Basic Constraints: critical + CA:TRUE""" in cert_loaded_printed + + assert inter_cert.certificate_id == inter_cert_loaded.certificate_id + assert inter_cert.common_name == inter_cert_loaded.common_name + assert inter_cert.valid_from == inter_cert_loaded.valid_from + assert inter_cert.valid_to == inter_cert_loaded.valid_to + assert inter_cert.pem_data == inter_cert_loaded.pem_data + assert inter_cert.private_key_id == inter_cert_loaded.private_key_id + assert inter_cert.type_id == inter_cert_loaded.type_id + assert inter_cert.parent_id == root_ca.certificate_id + assert inter_cert_loaded.parent_id == root_ca.certificate_id + assert inter_cert.usages == inter_cert_loaded.usages + + +def test_create_and_get_cert(private_key_service, certificate_service): + root_ca_private_key = private_key_service.create_new_key(passphrase="foobar") + inter_ca_private_key = private_key_service.create_new_key(passphrase="barfoo") + end_cert_private_key = private_key_service.create_new_key(passphrase="foofoo") + + root_ca_cert = certificate_service.create_root_ca(root_ca_private_key, + Subject(common_name="RootFoo", + organization_unit="Department of Foo")) + + inter_ca_cert = certificate_service.create_ca(inter_ca_private_key, Subject(common_name="Intermediate CA"), + root_ca_cert, + root_ca_private_key, usages={SSL_ID: True}) + + cert = certificate_service.create_end_cert(end_cert_private_key, + Subject("Foo Child", email_address="foo@bar.cz"), inter_ca_cert, + inter_ca_private_key, usages={AUTHENTICATION_ID: True}) + assert CERTIFICATE_ID == cert.type_id + + assert cert.usages[AUTHENTICATION_ID] + assert cert.usages[SSL_ID] is False + assert cert.usages[SIGNATURE_ID] is False + assert cert.usages[CA_ID] is False + + cert_loaded = certificate_service.get_certificate(cert.certificate_id) + + assert cert.certificate_id == cert_loaded.certificate_id + assert cert.common_name == cert_loaded.common_name + assert cert.valid_from == cert_loaded.valid_from + assert cert.valid_to == cert_loaded.valid_to + assert cert.pem_data == cert_loaded.pem_data + assert cert.private_key_id == cert_loaded.private_key_id + assert cert.type_id == cert_loaded.type_id + assert cert.parent_id == inter_ca_cert.certificate_id + assert cert_loaded.parent_id == inter_ca_cert.certificate_id + assert cert.usages == cert_loaded.usages + + +def test_get_certificates(private_key_service_unique, certificate_service_unique): + root_ca_private_key = private_key_service_unique.create_new_key(passphrase="foobar") + inter_ca_private_key = private_key_service_unique.create_new_key(passphrase="barfoo") + end_cert_private_key = private_key_service_unique.create_new_key(passphrase="foofoo") + + root_ca_cert = certificate_service_unique.create_root_ca(root_ca_private_key, + Subject(common_name="RootFoo", + organization_unit="Department of Foo")) + + inter_ca_cert = certificate_service_unique.create_ca(inter_ca_private_key, Subject(common_name="Intermediate CA"), + root_ca_cert, + root_ca_private_key, usages={SSL_ID: True}) + + cert = certificate_service_unique.create_end_cert(end_cert_private_key, + Subject("Foo Child", email_address="foo@bar.cz"), inter_ca_cert, + inter_ca_private_key, usages={AUTHENTICATION_ID: True}) + + all_certs = certificate_service_unique.get_certificates() + assert 3 == len(all_certs) + assert "RootFoo" == all_certs[0].common_name + assert "Intermediate CA" == all_certs[1].common_name + assert "Foo Child" == all_certs[2].common_name + + assert 1 == len(certificate_service_unique.get_certificates(cert_type=ROOT_CA_ID)) + assert 1 == len(certificate_service_unique.get_certificates(cert_type=INTERMEDIATE_CA_ID)) + assert 1 == len(certificate_service_unique.get_certificates(cert_type=CERTIFICATE_ID)) + + +def test_get_chain_of_trust(private_key_service, certificate_service): + root_ca_private_key = private_key_service.create_new_key(passphrase="foobar") + inter_ca_private_key = private_key_service.create_new_key(passphrase="barfoo") + end_cert_private_key = private_key_service.create_new_key(passphrase="foofoo") + + root_ca_cert = certificate_service.create_root_ca(root_ca_private_key, + Subject(common_name="RootFoo", + organization_unit="Department of Foo")) + + inter_ca_cert = certificate_service.create_ca(inter_ca_private_key, Subject(common_name="Intermediate CA"), + root_ca_cert, + root_ca_private_key, usages={SSL_ID: True}) + + cert = certificate_service.create_end_cert(end_cert_private_key, + Subject("Foo Child", email_address="foo@bar.cz"), inter_ca_cert, + inter_ca_private_key, usages={AUTHENTICATION_ID: True}) + + cot = certificate_service.get_chain_of_trust(cert.certificate_id) + assert len(cot) == 2 + assert [cert.certificate_id, inter_ca_cert.certificate_id] == [cot[0].certificate_id, cot[1].certificate_id] + + cot = certificate_service.get_chain_of_trust(cert.certificate_id, root_ca_cert.private_key_id) + assert len(cot) == 2 + assert [cert.certificate_id, inter_ca_cert.certificate_id] == [cot[0].certificate_id, cot[1].certificate_id] + + cot = certificate_service.get_chain_of_trust(cert.certificate_id, inter_ca_cert.private_key_id) + assert len(cot) == 2 + assert [cert.certificate_id, inter_ca_cert.certificate_id] == [cot[0].certificate_id, cot[1].certificate_id] + + cot = certificate_service.get_chain_of_trust(cert.certificate_id, exclude_root=False) + assert len(cot) == 3 + assert [cert.certificate_id, inter_ca_cert.certificate_id, root_ca_cert.certificate_id] == [cot[0].certificate_id, + cot[1].certificate_id, + cot[2].certificate_id] + + # starting from intermediate certificate + cot = certificate_service.get_chain_of_trust(inter_ca_cert.certificate_id) + assert len(cot) == 1 + assert [inter_ca_cert.certificate_id] == [cot[0].certificate_id] + + cot = certificate_service.get_chain_of_trust(inter_ca_cert.certificate_id, root_ca_cert.private_key_id) + assert len(cot) == 1 + assert [inter_ca_cert.certificate_id] == [cot[0].certificate_id] + + cot = certificate_service.get_chain_of_trust(inter_ca_cert.certificate_id, exclude_root=False) + assert len(cot) == 2 + assert [inter_ca_cert.certificate_id, root_ca_cert.certificate_id] == [cot[0].certificate_id, + cot[1].certificate_id] + + # starting from intermediate certificate + cot = certificate_service.get_chain_of_trust(root_ca_cert.certificate_id) + assert len(cot) == 0 + + cot = certificate_service.get_chain_of_trust(root_ca_cert.certificate_id, root_ca_cert.private_key_id) + assert len(cot) == 0 + + cot = certificate_service.get_chain_of_trust(root_ca_cert.certificate_id, exclude_root=False) + assert len(cot) == 1 + assert [root_ca_cert.certificate_id] == [cot[0].certificate_id] + + +def test_delete_cert(private_key_service, certificate_service): + assert not certificate_service.delete_certificate(-1) + + root_ca_private_key = private_key_service.create_new_key(passphrase="foobar") + + original_len = len(certificate_service.get_certificates()) + + root_ca_cert = certificate_service.create_root_ca(root_ca_private_key, + Subject(common_name="RootFoo", + organization_unit="Department of Foo")) + + len_inserted = len(certificate_service.get_certificates()) + + assert original_len + 1 == len_inserted + + # TODO delete should delete all children? + assert certificate_service.delete_certificate(root_ca_cert.certificate_id) + assert not certificate_service.delete_certificate(root_ca_cert.certificate_id) + + assert len_inserted - 1 == len(certificate_service.get_certificates()) + + +def test_get_subject_from_certificate(private_key_service, certificate_service): + root_ca_private_key = private_key_service.create_new_key(passphrase="foobar") + + root_ca_cert = certificate_service.create_root_ca(root_ca_private_key, + Subject(common_name="RootFoo", + organization_unit="Department of Foo", + email_address="root@ca.com", + country="CZ")) + + subject = certificate_service.get_subject_from_certificate(root_ca_cert) + + assert subject.common_name == "RootFoo" + assert subject.organization_unit == "Department of Foo" + assert subject.email_address == "root@ca.com" + assert subject.organization is None + assert subject.locality is None + assert subject.state is None + assert subject.country == "CZ" diff --git a/tests/integration_tests/services/conftest.py b/tests/integration_tests/services/conftest.py new file mode 100644 index 0000000000000000000000000000000000000000..c9957ff9ca11f8a3ef5b5173f9c61389fffa6699 --- /dev/null +++ b/tests/integration_tests/services/conftest.py @@ -0,0 +1,128 @@ +import os +import sqlite3 +from sqlite3 import Connection + +import pytest + +from src.dao.certificate_repository import CertificateRepository +from src.dao.private_key_repository import PrivateKeyRepository +from src.db.init_queries import SCHEMA_SQL, DEFAULT_VALUES_SQL +from src.services.certificate_service import CertificateService +from src.services.cryptography import CryptographyService +from src.services.key_service import KeyService + + +# scope="module" means that this fixture is run once per module +@pytest.fixture(scope="module") +def connection(): + print("Creating a new SQLITE connection to the test DB") + test_db_file = "test.sqlite" + connection: Connection = sqlite3.connect(test_db_file) + + # yield the created connection + yield connection + + # after tests have finished delete the created db file + try: + print("Deleting the test DB") + os.unlink(test_db_file) + except FileNotFoundError: + print(f"Could not delete {test_db_file} file containing the test DB") + pass + + +@pytest.fixture(scope="module") +def cursor(connection): + cursor = connection.cursor() + + # execute db initialisation script + cursor.executescript(SCHEMA_SQL) + + # insert default values + cursor.executescript(DEFAULT_VALUES_SQL) + + return cursor + + +# scope defaults to "function" which means that the fixture is run once per test (function) +@pytest.fixture +def certificate_repository(connection, cursor): + return CertificateRepository(connection, cursor) + + +@pytest.fixture +def private_key_repository(connection, cursor): + return PrivateKeyRepository(connection, cursor) + + +@pytest.fixture +def cryptography_service(): + return CryptographyService() + + +@pytest.fixture +def private_key_service(private_key_repository, cryptography_service): + return KeyService(cryptography_service, private_key_repository) + + +@pytest.fixture +def certificate_service(certificate_repository, cryptography_service): + return CertificateService(cryptography_service, certificate_repository) + + +# TODO improve this (lots of duplicated code) - some test cases need a DB connection that is not shared with other tests +@pytest.fixture +def connection_unique(): + print("Creating a new unique SQLITE connection to the test DB") + test_db_file = "test_unique.sqlite" + connection: Connection = sqlite3.connect(test_db_file) + + # yield the created connection + yield connection + + # after tests have finished delete the created db file + try: + print("Deleting the unique test DB") + os.unlink(test_db_file) + except FileNotFoundError: + print(f"Could not delete {test_db_file} file containing the unique test DB") + pass + + +@pytest.fixture +def cursor_unique(connection_unique): + cursor = connection_unique.cursor() + + # execute db initialisation script + cursor.executescript(SCHEMA_SQL) + + # insert default values + cursor.executescript(DEFAULT_VALUES_SQL) + + return cursor + + +# scope defaults to "function" which means that the fixture is run once per test (function) +@pytest.fixture +def certificate_repository_unique(connection_unique, cursor_unique): + return CertificateRepository(connection_unique, cursor_unique) + + +@pytest.fixture +def private_key_repository_unique(connection_unique, cursor_unique): + return PrivateKeyRepository(connection_unique, cursor_unique) + + +@pytest.fixture +def cryptography_service_unique(): + return CryptographyService() + + +@pytest.fixture +def private_key_service_unique(private_key_repository_unique, cryptography_service_unique): + return KeyService(cryptography_service_unique, private_key_repository_unique) + + +@pytest.fixture +def certificate_service_unique(certificate_repository_unique, cryptography_service_unique): + return CertificateService(cryptography_service_unique, certificate_repository_unique) diff --git a/tests/integration_tests/services/private_key_service_test.py b/tests/integration_tests/services/private_key_service_test.py new file mode 100644 index 0000000000000000000000000000000000000000..6e429886d1e7eed0c878c50f8233268511485ea0 --- /dev/null +++ b/tests/integration_tests/services/private_key_service_test.py @@ -0,0 +1,38 @@ +def test_create_and_get_pk(private_key_service, certificate_service): + private_key = private_key_service.create_new_key(passphrase="foobar") + private_key_loaded = private_key_service.get_key(private_key.private_key_id) + + assert private_key.private_key_id == private_key_loaded.private_key_id + assert private_key.private_key == private_key_loaded.private_key + assert private_key.password == private_key_loaded.password + + +def test_delete_pk(private_key_service, certificate_service): + assert not private_key_service.delete_key(-1) + + private_key = private_key_service.create_new_key(passphrase="foobar") + + assert private_key_service.delete_key(private_key.private_key_id) is True + assert not private_key_service.delete_key(private_key.private_key_id) + + +def test_get_pks(private_key_service_unique): + pk_1 = private_key_service_unique.create_new_key(passphrase="foobar") + assert 1 == len(private_key_service_unique.get_keys()) + pk_2 = private_key_service_unique.create_new_key(passphrase="foobarbaz") + assert 2 == len(private_key_service_unique.get_keys()) + pk_3 = private_key_service_unique.create_new_key(passphrase="foobaz") + assert 3 == len(private_key_service_unique.get_keys()) + + keys = private_key_service_unique.get_keys() + + assert [pk_1.private_key_id, pk_2.private_key_id, pk_3.private_key_id] == [keys[0].private_key_id, + keys[1].private_key_id, + keys[2].private_key_id] + + assert private_key_service_unique.delete_key(pk_3.private_key_id) + assert 2 == len(private_key_service_unique.get_keys()) + + keys = private_key_service_unique.get_keys([pk_1.private_key_id, pk_2.private_key_id]) + assert [pk_1.private_key_id, pk_2.private_key_id] == [keys[0].private_key_id, + keys[1].private_key_id] diff --git a/tests/unit_tests/__init__.py b/tests/unit_tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tests/unit_tests/services/__init__.py b/tests/unit_tests/services/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tests/unit_tests/services/cryptography/__init__.py b/tests/unit_tests/services/cryptography/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tests/services/cryptography/conftest.py b/tests/unit_tests/services/cryptography/conftest.py similarity index 100% rename from tests/services/cryptography/conftest.py rename to tests/unit_tests/services/cryptography/conftest.py diff --git a/tests/services/cryptography/create_crt_test.py b/tests/unit_tests/services/cryptography/create_crt_test.py similarity index 100% rename from tests/services/cryptography/create_crt_test.py rename to tests/unit_tests/services/cryptography/create_crt_test.py diff --git a/tests/services/cryptography/create_csr_test.py b/tests/unit_tests/services/cryptography/create_csr_test.py similarity index 100% rename from tests/services/cryptography/create_csr_test.py rename to tests/unit_tests/services/cryptography/create_csr_test.py diff --git a/tests/unit_tests/services/cryptography/parse_cert_pem_test.py b/tests/unit_tests/services/cryptography/parse_cert_pem_test.py new file mode 100644 index 0000000000000000000000000000000000000000..eb5e2bf524090cf684ccd23af349f47b240600d4 --- /dev/null +++ b/tests/unit_tests/services/cryptography/parse_cert_pem_test.py @@ -0,0 +1,195 @@ +from src.model.subject import Subject + + +def test_parse_cert_pem(service): + cert_pem = """ +-----BEGIN CERTIFICATE----- + +MIIGITCCBAmgAwIBAgIUb7xAdXd6AkevhmeQqy2BASDqv/IwDQYJKoZIhvcNAQEL +BQAwgZ8xCzAJBgNVBAYTAkNaMRYwFAYDVQQIDA1QaWxzZW4gUmVnaW9uMQ8wDQYD +VQQHDAZQaWxzZW4xFjAUBgNVBAoMDVJvb3RpbmcgUm9vdHMxHDAaBgNVBAsME0Rl +cGFydG1lbnQgb2YgUk9vdHMxFDASBgNVBAMMC01haW4gUm9vdGVyMRswGQYJKoZI +hvcNAQkBFgxyb290QHJvb3QuY3owHhcNMjEwMzIxMTAwMTUyWhcNMjYwMzIxMTAw +MTUyWjCBnzELMAkGA1UEBhMCQ1oxFjAUBgNVBAgMDVBpbHNlbiBSZWdpb24xDzAN +BgNVBAcMBlBpbHNlbjEWMBQGA1UECgwNUm9vdGluZyBSb290czEcMBoGA1UECwwT +RGVwYXJ0bWVudCBvZiBST290czEUMBIGA1UEAwwLTWFpbiBSb290ZXIxGzAZBgkq +hkiG9w0BCQEWDHJvb3RAcm9vdC5jejCCAiIwDQYJKoZIhvcNAQEBBQADggIPADCC +AgoCggIBAMKozynv+ja1VkNWpldsrl6tEGYrkNuG9umyqF0ZOZmzWzR7PiszV8DW +o+OQ3SY7MQ7o3qoE/pSiaApmNFxgarWvGxnVgouncrai1AKB92tFY1VnVfQYICD3 +gdjSzo4Lbfc8+67DHTPc0N70oBZuMueQ6ifUQhrjuVaONwAOsZBdal+VWvctJcrf +fd+s6Jkgb/qWuld21Bzea36PLmgwoe8/RNyS9yzspC8jwdU68BemAPy9NBf9Q8Is +0R7aZ0YwKPsdln3lR5GixrNy+sQl0qwy0NgklWIbqpGbMAInJBbTBmBGIbS0zV3t +Nwi+g1u2WaFn63NeoUswAoDtHDm6FXBFI2BabG5tFVRNdfzGU1PEbILprqk214rt +5+j5xTtpaI07akjozYJfal8c6igKXmNJf+xxtASq5EESNLT0YHwVPlT1S/odGvkN +Hk6OJv2dmcH6nHCgT72aUhaVPP9aUIxlnchPD/iprMqkOkfm/k/LZLmPTsZbfmax +VB1PWRFSWozAR4R562QFNRLLzZBlqiN++XMRBnjX4rRNTjZZyrYG3rIv8SytY8N7 +UU0Ya/k+iYs5inbbHBkC3vI2DT6evxlfaXw8b1QTL4mNwR0aK0HjmVU6XdNcmGYr +/PAxyZNNDM+k9wkcj+Xf4iqVrmk9pHEfkRHHjRpOXvFaLogmx/drAgMBAAGjUzBR +MB0GA1UdDgQWBBQSP3MTbRoAP80MfEriCKa9qoqlFDAfBgNVHSMEGDAWgBQSP3MT +bRoAP80MfEriCKa9qoqlFDAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUA +A4ICAQCXV3PxhN6U/vhRaXriAOr4RNhvGjdT7XnAC7r21GsfyH3omXPqD/RrrUov +9ZWinxTiQ4xg3f+Iz9DCLXOmwmWoEpPU/LPa2UMENey2XOloQSO4JfdrbVVItWm6 +F0W0aqdMxR9lzt7xoOwT/5wkAEJtHkUyCHB0xv6ZVRJYt07FGt8oipaJl3SlkyhH +onKiCPsjwfcZ7W/lJ4PAFRY1DOLL+2CsLQjE9N2TAViY1HBpI3BfzfsDnXKEV2hS +bNS25bpXbyLKGHqhcD9Y/wQID3fmKQilSSKezEn0nnPfnnb2WF32rWFR2pzgeym/ +Q5vWcJRGSKcD0W58Ob1eLF8pG/FOijgjvHxWiotl2bB2rdEAR8BDJrzhRVxYavft +zpLWb5NGJSjPO29cJ170OyBhXYS+/kpgFf3sxDtOacS6k7LOXcydlckAAHGFwllb +0jkyZ0A2q+RGHIKirs1hWQpOb1O6Pvw+mNtxfghZsq8lnceHIUG9BduTXzWm0MEc +Gh+KpX/I0JzuOc91ydNtvMEOjfIAp8mjLAqDCWRd0OzvE45rPbBAHJXPc4P76B1A +XXwUYr8GuSFQZb1Q4BpCayCYvTLj+7q3z72BCqAA+jMJYV/qU0EpsuFjPvzU8apg +7l9NhB7vf/qhW0XHDa4pv5+d+CXUiHPlW+UTIlni1AfgAel1Ww== +-----END CERTIFICATE----- + """ + + # parse a certificate supplied in a PEM format + subj, n_before, n_after = service.parse_cert_pem(cert_pem) + + assert 3 == n_before.tm_mon + assert 21 == n_before.tm_mday + assert 10 == n_before.tm_hour + assert 1 == n_before.tm_min + assert 52 == n_before.tm_sec + assert 2021 == n_before.tm_year + + assert 3 == n_after.tm_mon + assert 21 == n_after.tm_mday + assert 10 == n_after.tm_hour + assert 1 == n_after.tm_min + assert 52 == n_after.tm_sec + assert 2026 == n_after.tm_year + + assert "CZ" == subj.country + assert "Pilsen Region" == subj.state + assert "Pilsen" == subj.locality + assert "Rooting Roots" == subj.organization + assert "Department of ROots" == subj.organization_unit + assert "Main Rooter" == subj.common_name + assert "root@root.cz" == subj.email_address + + +def test_parse_cert_pen_2(service): + cert_pem = """ +-----BEGIN CERTIFICATE----- +MIIFjTCCA3WgAwIBAgIUIuCWtR9ae01+4iLbyoRT8I+l/EIwDQYJKoZIhvcNAQEL +BQAwWTELMAkGA1UEBhMCQ1oxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDESMBAGA1UEAwwJQkxJTlRFUl8yMB4X +DTIxMDMyMzIxMzI1OVoXDTI0MDMyMzIxMzI1OVowWDELMAkGA1UEBhMCQVUxEzAR +BgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5 +IEx0ZDERMA8GA1UEAwwITkNISUxEXzIwggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAw +ggIKAoICAQCwJDvJ9nRxsdTeCLRzWuiYgRq4rwVMraA9sII9ZJhJ+Q7wM2Qf59bx +maMuvZwlpx1H98zbjSwwm0ft7QVzJ4bGF++JG04XcUwaaJWMgiHqwUmrm6GYjyUf +mv1/iG2GGpUHmkCbYGqU+1uYqegHadw/WBwM8Rggo5cyujQewrRBHvGLdNqAIL33 +tVdYuubocV//xg5YwHpM0WzKx5G6Rhat72BfMjTJlpkfIZbUCVRSSphjbHqGhYVO +d6hQ/aCHNBLw2gWxwBFLQDbc2kxKMm81x8p6vBrYBRXINcd3kVVNw6xEYViWfJ6K +FjNPNhvoHNjKhauKKPJHd/MmG0zTUxq3sHZyOkuoq/jxwM6ugYHhHz7z23n/6KPV +44GPZrdi7Xk3xRs3e/EOm2IoyQHfm7QVgAc0ydnVz3XDyvRmnI+Coa5X3mNXWWiC +ikmsOU6wbOGyL8zgFL32Uc1qCMmc2039+xp/NYTs83B0rUoefjBrfLJb8y/mwEck +1713V5TDATCI6dQWyqF83Gybuhaw4w7m3oaMXvALX7GmyjD6A7FG+AMaB4uWPeHf +ZSzWI1yqe4ZzLn4CTnKd6G6gdqMjVwcTr1f8GCjcl6TTbyStkKDypDrZbES8e06p +YTg38DWaY+WtmUEtfX9kQ27q26vePZN0ibU4y990367pecU3nUG0JQIDAQABo04w +TDBKBggrBgEFBQcBAQQ+MDwwOgYIKwYBBQUHMAKGLmh0dHBzOi8vbG9jYWxob3N0 +OjUwMDAvc3RhdGljL2ludGVybWVkaWF0ZS5jcnQwDQYJKoZIhvcNAQELBQADggIB +AG7DMCyAphSYHmSxW0CChrMV0xJ+vNvsFHPtToxykCXZ95aZUm000zPqAVSjTWt4 +/048rzDXGSlCwyt+6eALcwYHQZrVWH0pG6jRyPruhiAlbzGgbS/fjEsn5IvGl+IP +5wNki0iRqo9dHYWxbmSSWsrLwLD4GpvipfB1rJsqRy34j4vwoBc3LjvC+VMhd0/3 +ZFQRrXLt/t6+oQYgIkBeL3mhRI+NHWMERvXM9Z6xLm4afLFyPdxmG/sTmfOSghB7 +EoqLbfNTDFRsJj6tKKosFbqmqrtEx5kL6RXNtMjp/CdwL9olnad96G4+m9X+w2K8 +uyqmVLiTXoe69JHguhiu/nrEEqn9yAlpILCDD8X2FWWt16GhUkdPII38YmZZqbCR +dJ/iuEiC0VhxOsenWI1b18Mm06eFgjHVzjBMZpzOMBvQPhhktmHW/G0NCKpCdCQA +6znlT0o3hQPImW3ZMGAnVfbxwCCvQ45qP6N2dZAV9Z9Fw2XQ2ZTigtmPlieJ4Vpq +/ZkvQVA3c5Ugu+eRdQ7rvR7LPpo7CUJtlZRrs+z7EzSOCzBgtK0eXoBGlunJH9b2 +Oj4NKr8Wp/0oBfE9/x/2JXBa9N9pjd8tOU7wDD0+w90NoK/D2+rCpCYQPa/MNAVP +gug7Na3ya2fwlerj6YM9w+i8Csf8lUFe0gww7NLkbv54 +-----END CERTIFICATE----- + """ + + # parse a certificate supplied in a PEM format + subj, n_before, n_after = service.parse_cert_pem(cert_pem) + + assert 3 == n_before.tm_mon + assert 23 == n_before.tm_mday + assert 21 == n_before.tm_hour + assert 32 == n_before.tm_min + assert 59 == n_before.tm_sec + assert 2021 == n_before.tm_year + + assert 3 == n_after.tm_mon + assert 23 == n_after.tm_mday + assert 21 == n_after.tm_hour + assert 32 == n_after.tm_min + assert 59 == n_after.tm_sec + assert 2024 == n_after.tm_year + + assert "AU" == subj.country + assert "Some-State" == subj.state + assert "Internet Widgits Pty Ltd" == subj.organization + assert "NCHILD_2" == subj.common_name + assert None is subj.locality + assert None is subj.organization_unit + assert None is subj.email_address + + +def test_parse_cert_pen_empty(service): + cert_pem = """ +-----BEGIN CERTIFICATE----- +MIIDczCCAlugAwIBAgIUPM++Jj33iag4uaOMIzED4/rMTB4wDQYJKoZIhvcNAQEL +BQAwSTELMAkGA1UEBhMCICAxCzAJBgNVBAgMAiAgMQowCAYDVQQKDAEgMQswCQYD +VQQDDAIgIDEUMBIGCSqGSIb3DQEJARYFIGZvbyAwHhcNMjEwNDAzMjMzMDEwWhcN +MjEwNTAzMjMzMDEwWjBJMQswCQYDVQQGEwIgIDELMAkGA1UECAwCICAxCjAIBgNV +BAoMASAxCzAJBgNVBAMMAiAgMRQwEgYJKoZIhvcNAQkBFgUgZm9vIDCCASIwDQYJ +KoZIhvcNAQEBBQADggEPADCCAQoCggEBALI9Ksw85aFLBw2wAeRUoxMQarXkWWbw +FyvGCb426EcdKYEiax4BYsK+VLxJpJsIo4DnSM1c0EKNJmN4w+l93CBVhHvmA+qo +3LYShf/DgNeKZD7KJgAWwPHBnA1eOA/8kUX0YT9Z76JpJN46KFfqaY9Scb9GBU/m +Kr/Lm2Rkg/LehMObPfNQm3XGOvcRjHON9VoB7hZW8zt2lvWTkhia9t46p/kY90eg +3iw5JRR/MeYBiYeikjT4g5pMZDkymWUp7eahOsoR4kGYGLkpdXVN66evWzTikUKV +QSHdzUZOiTJ7GFJ70qqh+gAEMCf/Lx8EDbDcuz7ZH40Lr6knY2+9xe8CAwEAAaNT +MFEwHQYDVR0OBBYEFChHMZUZ2fyOrclVGjtopKn7f/mSMB8GA1UdIwQYMBaAFChH +MZUZ2fyOrclVGjtopKn7f/mSMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQEL +BQADggEBAETfyBYSS6drAyGY1/+z7fWKV3aS1Ocd8c/7oj1seFZ8AH+b0zktTynv +khprZhxRGRR6cHhyVmMexSWucWb7zlJZNcO9F0/FIgoqcKODtdNczTJyrC9raeuf +8pAqhaxXcNXXUSB8vNQKHLRtRnPCB3nZE7xSl5RRmSPyPGZyyAYygxRnLjMFgJEU +4c1FOpvRcfRS5yWviOS6dFv+cGA8hoUMXkpIW88GfwgdO6nMSQB1wUdqKoPnaIFc +3vjtLMWkuVZFYqvp3NN6GtyI5pw1O0FzjkLZsAeuHZyIkwpKkMsnGlGW8lz1svZ+ +7AQMsDl5rA4ZVlnLXSQlq3YXVuXZlAI= +-----END CERTIFICATE----- + """ + + # parse a certificate supplied in a PEM format + subj, n_before, n_after = service.parse_cert_pem(cert_pem) + + assert 4 == n_before.tm_mon + assert 3 == n_before.tm_mday + assert 23 == n_before.tm_hour + assert 30 == n_before.tm_min + assert 10 == n_before.tm_sec + assert 2021 == n_before.tm_year + + assert 5 == n_after.tm_mon + assert 3 == n_after.tm_mday + assert 23 == n_after.tm_hour + assert 30 == n_after.tm_min + assert 10 == n_after.tm_sec + assert 2021 == n_after.tm_year + + # TODO improve parsing of fields within quotes + assert "\" \"" == subj.country + assert "\" \"" == subj.state + assert "\" \"" == subj.organization + assert "\" \"" == subj.common_name + assert None is subj.locality + assert None is subj.organization_unit + assert "\" foo \"" == subj.email_address + + +def test_create_and_parse_cert(service): + # create a private key + key = service.create_private_key(passphrase="foobar") + + # create a certificate + cert = service.create_sscrt(Subject(common_name="Foo CN", email_address="foo@bar.cz"), key, key_pass="foobar") + + # parse the subject + parsed_subj, n_before, n_after = service.parse_cert_pem(cert) + + assert "Foo CN" == parsed_subj.common_name + assert "foo@bar.cz" == parsed_subj.email_address diff --git a/tests/services/cryptography/private_keys_test.py b/tests/unit_tests/services/cryptography/private_keys_test.py similarity index 100% rename from tests/services/cryptography/private_keys_test.py rename to tests/unit_tests/services/cryptography/private_keys_test.py diff --git a/tests/services/cryptography/run_for_output_test.py b/tests/unit_tests/services/cryptography/run_for_output_test.py similarity index 100% rename from tests/services/cryptography/run_for_output_test.py rename to tests/unit_tests/services/cryptography/run_for_output_test.py diff --git a/tests/services/cryptography/self_signed_cert_test.py b/tests/unit_tests/services/cryptography/self_signed_cert_test.py similarity index 77% rename from tests/services/cryptography/self_signed_cert_test.py rename to tests/unit_tests/services/cryptography/self_signed_cert_test.py index 83a7b4699a85ec92c01e38f1d462acdd9b2ee652..67585ca71d48b5d5de255d6369b2cea5bba50a52 100644 --- a/tests/services/cryptography/self_signed_cert_test.py +++ b/tests/unit_tests/services/cryptography/self_signed_cert_test.py @@ -110,3 +110,33 @@ def test_create_sscrt_incorrect_passphrase(service): with pytest.raises(CryptographyException) as e: service.create_sscrt(Subject(common_name="Topnax", country="CZ"), private_key) assert "bad decrypt" in e.value.message + + +def test_create_sscrt_days(service): + # create a self signed certificate using configuration and extensions + private_key = service.create_private_key(passphrase="foobar") + + cert = service.create_sscrt(Subject(common_name="Topnax"), private_key, key_pass="foobar", + days=30) + + cert_2 = service.create_sscrt(Subject(common_name="Topnax"), private_key, + key_pass="foobar", + days=31) + + cert_3 = service.create_sscrt(Subject(common_name="Topnax"), private_key, + key_pass="foobar", + days=32) + + args = ["openssl", "x509", "-noout", "-enddate", "-in", "-"] + cert_printed = subprocess.check_output(args, + input=bytes(cert, encoding="utf-8"), stderr=subprocess.STDOUT).decode() + cert_printed_2 = subprocess.check_output(args, + input=bytes(cert_2, encoding="utf-8"), stderr=subprocess.STDOUT).decode() + + cert_printed_3 = subprocess.check_output(args, + input=bytes(cert_3, encoding="utf-8"), stderr=subprocess.STDOUT).decode() + + # TODO improve this test by parsing the date + assert cert_printed != cert_printed_2 + assert cert_printed_2 != cert_printed_3 + assert cert_printed != cert_printed_3 diff --git a/tests/services/cryptography/sign_csr_test.py b/tests/unit_tests/services/cryptography/sign_csr_test.py similarity index 100% rename from tests/services/cryptography/sign_csr_test.py rename to tests/unit_tests/services/cryptography/sign_csr_test.py diff --git a/tests/services/cryptography/verify_ca_test.py b/tests/unit_tests/services/cryptography/verify_ca_test.py similarity index 100% rename from tests/services/cryptography/verify_ca_test.py rename to tests/unit_tests/services/cryptography/verify_ca_test.py