diff --git a/src/services/certificate_service.py b/src/services/certificate_service.py index 4e5bcb626ab718c3f0f5f4b677613edcef1c1b39..d9cebdab138eaffadc37c7d8812bd8aa4f9005d4 100644 --- a/src/services/certificate_service.py +++ b/src/services/certificate_service.py @@ -15,7 +15,7 @@ from src.services.cryptography import CryptographyService import time -DATE_FORMAT = "%d.%m.%Y %H:%M:%S" +VALID_FROM_TO_DATE_FORMAT = "%d.%m.%Y %H:%M:%S" CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE" CRL_EXTENSION = "crlDistributionPoints=URI:" OCSP_EXTENSION = "authorityInfoAccess=OCSP;URI:" @@ -55,7 +55,7 @@ class CertificateService: # create a new self signed certificate cert_pem = self.cryptography_service.create_sscrt(subject, key.private_key, key_pass=key.password, - extensions=extensions, config=config, days=days) + extensions=extensions, config=config, days=days, sn=cert_id) # specify CA usage usages[CA_ID] = True @@ -86,8 +86,8 @@ class CertificateService: # TODO this could be improved in the future in such way that calling openssl is not required to parse the dates subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem) # format the parsed date - not_before_formatted = time.strftime(DATE_FORMAT, not_before) - not_after_formatted = time.strftime(DATE_FORMAT, not_after) + not_before_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_before) + not_after_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_after) # create a certificate wrapper certificate = Certificate(-1, subj.common_name, not_before_formatted, not_after_formatted, cert_pem, @@ -123,7 +123,8 @@ class CertificateService: issuer_key.private_key, subject_key_pass=subject_key.password, issuer_key_pass=issuer_key.password, extensions=extensions, - days=days) + days=days, + sn=cert_id) # specify CA usage usages[CA_ID] = True @@ -136,8 +137,8 @@ class CertificateService: subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem) # format the parsed date - not_before_formatted = time.strftime(DATE_FORMAT, not_before) - not_after_formatted = time.strftime(DATE_FORMAT, not_after) + not_before_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_before) + not_after_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_after) # specify CA usage usages[CA_ID] = True @@ -171,12 +172,17 @@ class CertificateService: if usages is None: usages = {} + # get the next certificate ID in order to be able to specify the serial number + cert_id = self.certificate_repository.get_next_id() + # generate a new certificate cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data, issuer_key.private_key, subject_key_pass=subject_key.password, issuer_key_pass=issuer_key.password, extensions=extensions, - days=days) + days=days, + sn=cert_id + ) # wrap the generated certificate using Certificate class certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages, diff --git a/src/services/crl/__init__.py b/src/services/crl/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/services/crl/ca_index_file_line_generator.py b/src/services/crl/ca_index_file_line_generator.py new file mode 100644 index 0000000000000000000000000000000000000000..c1134cfb6de76492df1cd0a81f493d42b56e8a40 --- /dev/null +++ b/src/services/crl/ca_index_file_line_generator.py @@ -0,0 +1,40 @@ +import time +from time import struct_time + +from src.model.certificate import Certificate +from src.model.subject import Subject + +TAB_CHAR = "\t" +INDEX_FILE_DATE_ENTRY_FORMAT = "%y%m%d%H%M%SZ" + + +def get_index_file_time_entry(date: struct_time): + # convert the time to the format of openssl CA index file + return time.strftime(INDEX_FILE_DATE_ENTRY_FORMAT, date) + + +def get_distinguished_name(subject: Subject): + # convert subject class instance to the distinguished name in the openssl CA index file format + return "".join([f"/{key}={value}" if value is not None else "" for key, value in subject.to_dict().items()]) + + +def create_index_file_revoked_line(revoked_certificate: Certificate, subject: Subject, revocation_date: struct_time, + valid_to: struct_time) -> str: + # converts the given certificate as well as the subject and revocation / valid_to dates to a line of openssl CA + # index file format + items = [ + # certificate status flag (R stands for revoked) + "R", + # followed by the expiration date field + f"{get_index_file_time_entry(valid_to)}", + # followed by the revocation date field + f"{get_index_file_time_entry(revocation_date)},{revoked_certificate.revocation_reason}", + # followed by the serial number of the certificate in hex format + hex(revoked_certificate.certificate_id).replace("x", "").upper(), + # certificate filename ("unknown" literal used for unknown file names) + "unknown", + # certificate distinguished name + get_distinguished_name(subject) + ] + + return TAB_CHAR.join(items) diff --git a/src/services/crl/crl_service.py b/src/services/crl/crl_service.py new file mode 100644 index 0000000000000000000000000000000000000000..01f2c640008f34c3a0b97d8b95e291f1237f980c --- /dev/null +++ b/src/services/crl/crl_service.py @@ -0,0 +1,44 @@ +from datetime import datetime + +from injector import inject + +from src.dao.certificate_repository import CertificateRepository +from src.services.crl.ca_index_file_line_generator import create_index_file_revoked_line +from src.services.cryptography import CryptographyService + + +class CrlService: + @inject + def __init__(self, + certificate_repository: CertificateRepository, + cryptography_service: CryptographyService + ): + self.certificate_repository = certificate_repository + self.cryptography_service = cryptography_service + + def create_revoked_index(self, ca_id) -> str: + """ + Queries the certificate repository and looks for all certificates revoked by the certificate authority given + by the passed ID. Found certificates are then put into a string representing the CA's database index file. + + :param ca_id: ID of the CA whose revoked certificates should be put into the index file + :return: a str representing the content of a CA index file + """ + + index_lines = [] + # iterate over revoked certificates of the CA given by an ID + for certificate in self.certificate_repository.get_all_revoked_by(ca_id): + # extract the complete subject information and not_after date field + subject, _, not_after = self.cryptography_service.parse_cert_pem(certificate.pem_data) + + line = create_index_file_revoked_line(certificate, + subject, + # parse revocation date from unix timestamp to struct_time + datetime.utcfromtimestamp(int(certificate.revocation_date)).timetuple(), + not_after) + + # append it to the list of lines + index_lines.append(line) + + # join all lines with a new line + return "\n".join(index_lines) diff --git a/src/services/cryptography.py b/src/services/cryptography.py index 886577edadd6b5f09dad06aa24ae3704606b8e79..26d6f2c46c311ca154807ec55cbf33543789ecf0 100644 --- a/src/services/cryptography.py +++ b/src/services/cryptography.py @@ -1,6 +1,7 @@ import re import subprocess import time +import random from src.model.subject import Subject from src.utils.temporary_file import TemporaryFile @@ -24,6 +25,9 @@ SSCRT_SECTION = "sscrt_ext" CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE" +# upper bound of the range of random serial numbers to be generated +MAX_SN = 4294967296 + class CryptographyService: @@ -102,7 +106,7 @@ class CryptographyService: return self.__run_for_output( ["genrsa", PRIVATE_KEY_ENCRYPTION_METHOD, "-passout", f"pass:{passphrase}", "2048"]).decode() - def create_sscrt(self, subject, key, config="", extensions="", key_pass=None, days=30): + def create_sscrt(self, subject, key, config="", extensions="", key_pass=None, days=30, sn: int = None): """ Creates a root CA @@ -112,6 +116,7 @@ class CryptographyService: :param extensions: name of the section in the configuration representing extensions :param key_pass: passphrase of the private key :param days: number of days for which the certificate will be valid + :param sn: serial number to be set, when "None" is set a random serial number is generated :return: string containing the generated certificate in PEM format """ @@ -133,6 +138,11 @@ class CryptographyService: args = ["req", "-x509", "-new", "-subj", subj, "-days", f"{days}", "-key", "-"] + # serial number passed, use it when generating the certificate, + # without passing it openssl generates a random one + if sn is not None: + args.extend(["-set_serial", str(sn)]) + if len(config) > 0: args.extend(["-config", conf_path]) if len(extensions) > 0: @@ -167,7 +177,7 @@ class CryptographyService: return self.__run_for_output(args, proc_input=bytes(key, encoding="utf-8")).decode() - def __sign_csr(self, csr, issuer_pem, issuer_key, issuer_key_pass=None, extensions="", days=30): + def __sign_csr(self, csr, issuer_pem, issuer_key, issuer_key_pass=None, extensions="", days=30, sn: int = None): """ Signs the given CSR by the given issuer CA @@ -178,17 +188,22 @@ class CryptographyService: format :param extensions: extensions to be applied when signing the CSR :param days: number of days for which the certificate will be valid + :param sn: serial number to be set, when "None" is set a random serial number is generated :return: string containing the generated and signed certificate in PEM format """ # concatenate CSR, issuer certificate and issuer's key (will be used in the openssl call) proc_input = csr + issuer_pem + issuer_key + # TODO find a better way to generate a random serial number or let openssl generate a .srl file + # when serial number is not passed generate a random one + if sn is None: + sn = random.randint(0, MAX_SN) + # prepare openssl parameters... # CSR, CA and CA's private key will be passed via stdin (that's the meaning of the '-' symbol) - params = ["x509", "-req", "-in", "-", "-CA", "-", "-CAkey", "-", "-CAcreateserial", "-days", str(days)] - - # TODO delete created -.srl file + params = ["x509", "-req", "-in", "-", "-CA", "-", "-CAkey", "-", "-CAcreateserial", "-days", str(days), + "-set_serial", str(sn)] with TemporaryFile("extensions.conf", extensions) as ext_path: # add the passphrase even when None is passed. Otherwise when running tests with pytest some tests freeze @@ -202,7 +217,8 @@ class CryptographyService: def create_crt(self, subject, subject_key, issuer_pem, issuer_key, subject_key_pass=None, issuer_key_pass=None, extensions="", - days=30): + days=30, + sn: int = None): """ Creates a certificate by using the given subject, subject's key, issuer and its key. @@ -216,11 +232,12 @@ class CryptographyService: format :param extensions: extensions to be applied when creating the certificate :param days: number of days for which the certificate will be valid + :param sn: serial number to be set, when "None" is set a random serial number is generated :return: string containing the generated certificate in PEM format """ csr = self.__create_csr(subject, subject_key, key_pass=subject_key_pass) return self.__sign_csr(csr, issuer_pem, issuer_key, issuer_key_pass=issuer_key_pass, extensions=extensions, - days=days) + days=days, sn=sn) @staticmethod def verify_cert(certificate): diff --git a/tests/integration_tests/services/conftest.py b/tests/integration_tests/services/conftest.py index 8f57a538ae9c49be91c7c194b2e97fc5dde4be11..4b05785c022c9e9cedfdfb01f72af3e2d6476055 100644 --- a/tests/integration_tests/services/conftest.py +++ b/tests/integration_tests/services/conftest.py @@ -10,6 +10,7 @@ from src.dao.certificate_repository import CertificateRepository from src.dao.private_key_repository import PrivateKeyRepository from src.db.init_queries import SCHEMA_SQL, DEFAULT_VALUES_SQL from src.services.certificate_service import CertificateService +from src.services.crl.crl_service import CrlService from src.services.cryptography import CryptographyService from src.services.key_service import KeyService @@ -68,15 +69,15 @@ def private_key_repository_unique(connection_unique): @pytest.fixture -def cryptography_service_unique(): - return CryptographyService() +def private_key_service_unique(private_key_repository_unique, cryptography_service): + return KeyService(cryptography_service, private_key_repository_unique) @pytest.fixture -def private_key_service_unique(private_key_repository_unique, cryptography_service_unique): - return KeyService(cryptography_service_unique, private_key_repository_unique) +def certificate_service_unique(certificate_repository_unique, cryptography_service, configuration): + return CertificateService(cryptography_service, certificate_repository_unique, configuration) @pytest.fixture -def certificate_service_unique(certificate_repository_unique, cryptography_service_unique, configuration): - return CertificateService(cryptography_service_unique, certificate_repository_unique, configuration) +def crl_service_unique(certificate_repository_unique, cryptography_service): + return CrlService(certificate_repository_unique, cryptography_service) diff --git a/tests/integration_tests/services/crl_service_test.py b/tests/integration_tests/services/crl_service_test.py new file mode 100644 index 0000000000000000000000000000000000000000..e9190dbb50d33327f2e50e62489a1967502d8ff5 --- /dev/null +++ b/tests/integration_tests/services/crl_service_test.py @@ -0,0 +1,86 @@ +import time +from datetime import datetime + +from src.model.subject import Subject +from src.services.certificate_service import VALID_FROM_TO_DATE_FORMAT + +CA_INDEX_DATE_FORMAT = "%y%m%d%H%M%SZ" + + +def convert_valid_to_date_to_ca_index_format(date): + return convert_date_to_ca_index_format(time.strptime(date, VALID_FROM_TO_DATE_FORMAT)) + + +def convert_date_to_ca_index_format(date): + return time.strftime(CA_INDEX_DATE_FORMAT, date) + + +def test_set_certificate_revoked(certificate_service_unique, private_key_service_unique, crl_service_unique, + cryptography_service): + root_ca_private_key = private_key_service_unique.create_new_key(passphrase="foobar") + inter_ca_private_key = private_key_service_unique.create_new_key(passphrase="barfoo") + + root_ca_cert = certificate_service_unique.create_root_ca(root_ca_private_key, + Subject(common_name="RootFoo", + organization_unit="Department of Foo")) + + # create a CA + foo_ca = certificate_service_unique.create_ca(inter_ca_private_key, Subject(common_name="Foo CA", locality="Brno"), + root_ca_cert, + root_ca_private_key) + + # create another CA + bar_ca = certificate_service_unique.create_ca(inter_ca_private_key, + Subject(common_name="Bar CA", country="CZ", locality="Pilsen"), + root_ca_cert, + root_ca_private_key) + + # create another CA + certificate_service_unique.create_ca(inter_ca_private_key, Subject(common_name="BarBaz CA"), + root_ca_cert, + root_ca_private_key) + + # create a certificate + baz_cert = certificate_service_unique.create_end_cert(inter_ca_private_key, + Subject(common_name="Baz CA", state="ST"), + root_ca_cert, + root_ca_private_key) + + # revoke first created intermediate CA + certificate_service_unique.set_certificate_revocation_status(foo_ca.certificate_id, "revoked", "unspecified") + # revoke second created intermediate CA + certificate_service_unique.set_certificate_revocation_status(bar_ca.certificate_id, "revoked", "keyCompromise") + # revoke the created end certificate (non-CA) + certificate_service_unique.set_certificate_revocation_status(baz_cert.certificate_id, "revoked", + "privilegeWithdrawn") + + # create index of revoked certificates + out = crl_service_unique.create_revoked_index(root_ca_cert.certificate_id) + + # fetch certificates in order to update the revocation_date fields + foo_ca = certificate_service_unique.get_certificate(foo_ca.certificate_id) + bar_ca = certificate_service_unique.get_certificate(bar_ca.certificate_id) + baz_cert = certificate_service_unique.get_certificate(baz_cert.certificate_id) + + # convert revoked date fields (revoked date fields change with each test run) + revoked_dates = [ + convert_date_to_ca_index_format(datetime.utcfromtimestamp(int(foo_ca.revocation_date)).timetuple()), + convert_date_to_ca_index_format(datetime.utcfromtimestamp(int(bar_ca.revocation_date)).timetuple()), + convert_date_to_ca_index_format(datetime.utcfromtimestamp(int(baz_cert.revocation_date)).timetuple()), + ] + + # convert valid_to date fields (expiration date fields change with each test run) + valid_to_dates = [ + convert_valid_to_date_to_ca_index_format(foo_ca.valid_to), + convert_valid_to_date_to_ca_index_format(bar_ca.valid_to), + convert_valid_to_date_to_ca_index_format(baz_cert.valid_to), + ] + + # arrange expected lines + expected_lines = [ + f"R {valid_to_dates[0]} {revoked_dates[0]},unspecified 02 unknown /CN=Foo CA/L=Brno", + f"R {valid_to_dates[1]} {revoked_dates[1]},keyCompromise 03 unknown /CN=Bar CA/C=CZ/L=Pilsen", + f"R {valid_to_dates[2]} {revoked_dates[2]},privilegeWithdrawn 05 unknown /CN=Baz CA/ST=ST" + ] + + assert out == "\n".join(expected_lines) diff --git a/tests/unit_tests/services/crl/__init__.py b/tests/unit_tests/services/crl/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tests/unit_tests/services/crl/ca_index_file_line_generator_test.py b/tests/unit_tests/services/crl/ca_index_file_line_generator_test.py new file mode 100644 index 0000000000000000000000000000000000000000..6514b23915afa4809ebfaf988217dcb468aa93aa --- /dev/null +++ b/tests/unit_tests/services/crl/ca_index_file_line_generator_test.py @@ -0,0 +1,66 @@ +import time + +from src.model.certificate import Certificate +from src.model.subject import Subject +from src.services.crl.ca_index_file_line_generator import create_index_file_revoked_line + + +def test_get_index_file_time_entry_valid(): + cert = Certificate( + 1, "Foo", "21.03.2020", "21.03.2021", "", -1, -1, -1, {}, "02.02.2021", "keyCompromise" + ) + + date_format = "%d.%m.%Y" + + expiration_date = time.strptime(cert.valid_to, date_format) + revocation_date = time.strptime(cert.revocation_date, date_format) + + index_line = create_index_file_revoked_line(cert, Subject( + "Foo", "CZ", "Pilsen", email_address="bar@foo.cz" + ), revocation_date, expiration_date) + + assert "R 210321000000Z 210202000000Z,keyCompromise 01 unknown " \ + "/CN=Foo/C=CZ/L=Pilsen/emailAddress=bar@foo.cz" \ + == index_line + + +def test_get_index_file_time_entry_valid_2(): + cert = Certificate( + 101024, "Bar", "01.01.2019", "06.10.2021", "", -1, -1, -1, {}, "03.09.2021", "affiliationChanged" + ) + + date_format = "%d.%m.%Y" + + expiration_date = time.strptime(cert.valid_to, date_format) + revocation_date = time.strptime(cert.revocation_date, date_format) + + index_line = create_index_file_revoked_line(cert, Subject( + "Bar", "SK", "Foosen", organization="Bar Org." + ), revocation_date, expiration_date) + + assert "R 211006000000Z 210903000000Z,affiliationChanged 018AA0 unknown " \ + "/CN=Bar/C=SK/L=Foosen/O=Bar Org." \ + == index_line + + +def test_get_index_file_time_entry_valid_3(): + cert = Certificate( + 1, "IA CA", "", "01.04.2023 15:01:11", "", -1, -1, -1, {}, "01.04.2021 15:12:00", "keyCompromise" + ) + + date_format = "%d.%m.%Y %H:%M:%S" + + expiration_date = time.strptime(cert.valid_to, date_format) + revocation_date = time.strptime(cert.revocation_date, date_format) + + index_line = create_index_file_revoked_line(cert, Subject( + common_name=cert.common_name, + state="Some-State", + country="AU", + organization="Internet Widgits Pty Ltd " + ), revocation_date, expiration_date) + + expected = "R 230401150111Z 210401151200Z,keyCompromise 01 unknown /CN=IA CA/C=AU/ST=Some-State/O=Internet " \ + "Widgits Pty Ltd " + + assert expected == index_line diff --git a/tests/unit_tests/services/cryptography/create_crt_test.py b/tests/unit_tests/services/cryptography/create_crt_test.py index babda35f7d797a6d0301a1db2831bc3a09667874..abca761f268a5f490f7dcaa534bfda7801953376 100644 --- a/tests/unit_tests/services/cryptography/create_crt_test.py +++ b/tests/unit_tests/services/cryptography/create_crt_test.py @@ -11,48 +11,48 @@ def export_crt(crt): input=bytes(crt, encoding="utf-8"), stderr=subprocess.STDOUT).decode() -def test_sign_cst(service): +def test_create_crt(service): # create root CA root_key = service.create_private_key() root_ca = service.create_sscrt(Subject(common_name="foo"), root_key) - # create a private key to be used to make a CSR for the intermediate CA - inter_key = service.create_private_key() + # create a private key to be used to make a CSR for the child certificate + cert_key = service.create_private_key() - # create a CA using the root CA - inter_ca = service.create_crt(Subject(common_name="bar", country="CZ"), inter_key, root_ca, root_key) + # create a cert using the root CA + cert = service.create_crt(Subject(common_name="bar", country="CZ"), cert_key, root_ca, root_key) - inter_ca_printed = export_crt(inter_ca) + cert_printed = export_crt(cert) # assert fields - assert "Issuer: CN = foo" in inter_ca_printed - assert "Subject: CN = bar, C = CZ" in inter_ca_printed + assert "Issuer: CN = foo" in cert_printed + assert "Subject: CN = bar, C = CZ" in cert_printed -def test_sign_crt_passphrase(service): +def test_create_crt_passphrase(service): # create root CA and encrypt the private key of the root CA root_key_passphrase = "barbaz" root_key = service.create_private_key(passphrase=root_key_passphrase) root_ca = service.create_sscrt(Subject(common_name="foo"), root_key, key_pass=root_key_passphrase) - # create a private key to be used to make a CSR for the intermediate CA - inter_key_passphrase = "foobazbar" - inter_key = service.create_private_key(passphrase=inter_key_passphrase) + # create a private key to be used to make a CSR for the child certificate + cert_key_passphrase = "foobazbar" + cert_key = service.create_private_key(passphrase=cert_key_passphrase) - # create a CA using the root CA - inter_ca = service.create_crt(Subject(common_name="bar", country="CZ"), inter_key, root_ca, root_key, - subject_key_pass=inter_key_passphrase, issuer_key_pass=root_key_passphrase) + # create a certificate using the root CA + cert = service.create_crt(Subject(common_name="bar", country="CZ"), cert_key, root_ca, root_key, + subject_key_pass=cert_key_passphrase, issuer_key_pass=root_key_passphrase) - inter_ca_printed = export_crt(inter_ca) + cert_printed = export_crt(cert) # assert fields - assert "Issuer: CN = foo" in inter_ca_printed - assert "Subject: CN = bar, C = CZ" in inter_ca_printed + assert "Issuer: CN = foo" in cert_printed + assert "Subject: CN = bar, C = CZ" in cert_printed # some basic incorrect passphrase combinations passphrases = [ - (inter_key, None), - (inter_key, "foofoobarbar"), + (cert_key, None), + (cert_key, "foofoobarbar"), (None, root_key), ("foofoobarbar", root_key), ("foofoobarbar", "foofoobarbar"), @@ -62,38 +62,38 @@ def test_sign_crt_passphrase(service): for (key_pass, issuer_key_pass) in passphrases: # try to create it using a wrong issuer passphrase with pytest.raises(CryptographyException) as e: - inter_ca = service.create_crt(Subject(common_name="bar", country="CZ"), inter_key, root_ca, root_key, - subject_key_pass=key_pass, issuer_key_pass=issuer_key_pass) + service.create_crt(Subject(common_name="bar", country="CZ"), cert_key, root_ca, root_key, + subject_key_pass=key_pass, issuer_key_pass=issuer_key_pass) assert "bad decrypt" in e.value.message -def test_sign_crt_extensions(service): +def test_create_crt_extensions(service): # create root CA and encrypt the private key of the root CA root_key_passphrase = "barbaz" root_key = service.create_private_key(passphrase=root_key_passphrase) root_ca = service.create_sscrt(Subject(common_name="foo"), root_key, key_pass=root_key_passphrase) # create a private key to be used to make a CSR for the intermediate CA - inter_key_passphrase = "foofoo" - inter_key = service.create_private_key() + cert_key_passphrase = "foofoo" + cert_key = service.create_private_key() # create a CA using the root CA - inter_ca = service.create_crt(Subject(common_name="bar", country="CZ"), inter_key, root_ca, root_key, - subject_key_pass=inter_key_passphrase, issuer_key_pass=root_key_passphrase, - extensions="authorityInfoAccess=caIssuers;URI:bar.cz/baz/cert\n" - "basicConstraints=critical,CA:TRUE\n" - "crlDistributionPoints=URI:http://localhost/api/crl/0\n" - "authorityInfoAccess=OCSP;URI:http://localhost/api/ocsp/0\n") + cert = service.create_crt(Subject(common_name="bar", country="CZ"), cert_key, root_ca, root_key, + subject_key_pass=cert_key_passphrase, issuer_key_pass=root_key_passphrase, + extensions="authorityInfoAccess=caIssuers;URI:bar.cz/baz/cert\n" + "basicConstraints=critical,CA:TRUE\n" + "crlDistributionPoints=URI:http://localhost/api/crl/0\n" + "authorityInfoAccess=OCSP;URI:http://localhost/api/ocsp/0\n") - inter_ca_printed = export_crt(inter_ca) + cert_printed = export_crt(cert) # assert fields - assert "Issuer: CN = foo" in inter_ca_printed - assert "Subject: CN = bar, C = CZ" in inter_ca_printed - assert "X509v3 CRL Distribution Points:" in inter_ca_printed - assert "URI:http://localhost/api/crl/0" in inter_ca_printed - assert "Authority Information Access:" in inter_ca_printed - assert "OCSP - URI:http://localhost/api/ocsp/0" in inter_ca_printed + assert "Issuer: CN = foo" in cert_printed + assert "Subject: CN = bar, C = CZ" in cert_printed + assert "X509v3 CRL Distribution Points:" in cert_printed + assert "URI:http://localhost/api/crl/0" in cert_printed + assert "Authority Information Access:" in cert_printed + assert "OCSP - URI:http://localhost/api/ocsp/0" in cert_printed # assert extensions expected_extensions = """ X509v3 extensions: @@ -107,5 +107,49 @@ def test_sign_crt_extensions(service): Authority Information Access: OCSP - URI:http://localhost/api/ocsp/0""" expected_extensions = expected_extensions.replace("\n", "").replace("\r", "") - inter_ca_printed = inter_ca_printed.replace("\n", "").replace("\r", "") - assert expected_extensions in inter_ca_printed + cert_printed = cert_printed.replace("\n", "").replace("\r", "") + assert expected_extensions in cert_printed + + +def test_create_crt_serial_number(service): + # create root CA + root_key = service.create_private_key() + root_ca = service.create_sscrt(Subject(common_name="foo"), root_key) + + # create a private key to be used to make a new certificate + cert_key = service.create_private_key() + + # define a serial number to be used + serial_number = 1 + serial_number_hex = hex(serial_number).replace("x", "") + + # create a certificate using the root CA + cert = service.create_crt(Subject(common_name="bar", country="CZ"), cert_key, root_ca, root_key, + sn=serial_number) + + cert_printed = subprocess.check_output(["openssl", "x509", "-noout", "-in", "-", "-serial"], + input=bytes(cert, encoding="utf-8"), stderr=subprocess.STDOUT).decode() + + assert f"serial={serial_number_hex}" in cert_printed + + +def test_create_crt_negative_serial_number(service): + # create root CA + root_key = service.create_private_key() + root_ca = service.create_sscrt(Subject(common_name="foo"), root_key) + + # create a private key to be used to make a new certificate + cert_key = service.create_private_key() + + # define a serial number to be used + serial_number = -1 + serial_number_hex = hex(serial_number).replace("x", "") + + # create a certificate using the root CA + cert = service.create_crt(Subject(common_name="bar", country="CZ"), cert_key, root_ca, root_key, + sn=serial_number) + + cert_printed = subprocess.check_output(["openssl", "x509", "-noout", "-in", "-", "-serial"], + input=bytes(cert, encoding="utf-8"), stderr=subprocess.STDOUT).decode() + + assert f"serial={serial_number_hex}" in cert_printed diff --git a/tests/unit_tests/services/cryptography/self_signed_cert_test.py b/tests/unit_tests/services/cryptography/self_signed_cert_test.py index 9ceedde56a3029b615d053b98cb68cbbef7f8261..e9d1ac11e1e7bf7228819deb41a8abe52dbf3f01 100644 --- a/tests/unit_tests/services/cryptography/self_signed_cert_test.py +++ b/tests/unit_tests/services/cryptography/self_signed_cert_test.py @@ -176,3 +176,34 @@ def test_create_sscrt_days(service): assert cert_printed != cert_printed_2 assert cert_printed_2 != cert_printed_3 assert cert_printed != cert_printed_3 + + +def test_create_sscrt_with_serial_number(service): + # create a self signed certificate with a PK that is protected by a passphrase + private_key = service.create_private_key() + + serial_number = 1024 + serial_number_hex = hex(serial_number).replace("x", "") + + # specify a serial number + cert = service.create_sscrt(Subject(common_name="Topnax", country="CZ"), private_key, sn=serial_number) + out = subprocess.check_output(["openssl", "x509", "-noout", "-in", "-", "-serial"], + input=bytes(cert, encoding="utf-8"), stderr=subprocess.STDOUT).decode() + + assert f"serial={serial_number_hex}" in out + + +def test_create_sscrt_with_negative_serial_number(service): + # create a self signed certificate with a PK that is protected by a passphrase + private_key = service.create_private_key() + + # check whether negative serial number does not raise an exception + serial_number = -1024 + serial_number_hex = hex(serial_number).replace("x", "") + + # specify a serial number + cert = service.create_sscrt(Subject(common_name="Topnax", country="CZ"), private_key, sn=serial_number) + out = subprocess.check_output(["openssl", "x509", "-noout", "-in", "-", "-serial"], + input=bytes(cert, encoding="utf-8"), stderr=subprocess.STDOUT).decode() + + assert f"serial={serial_number_hex}" in out