Skip to content
Snippets Groups Projects

8574 get issued by filter

Merged Jan Pašek requested to merge 8574_get_issued_by_filter into master
3 files
+ 57
11
Compare changes
  • Side-by-side
  • Inline
Files
3
@@ -29,6 +29,7 @@ USAGE = "usage"
SUBJECT = "subject"
VALIDITY_DAYS = "validityDays"
CA = "CA"
ISSUED_BY = "issuedby"
STATUS = "status"
REASON = "reason"
REASON_UNDEFINED = "unspecified"
@@ -161,7 +162,7 @@ class CertController:
cert = self.certificate_service.get_certificate(v)
if cert is None:
return E_NO_CERTIFICATES_FOUND, C_NO_DATA
return E_NO_CERTIFICATES_FOUND, C_NO_DATA
else:
return {"success": True, "data": cert.pem_data}, C_SUCCESS
@@ -183,7 +184,7 @@ class CertController:
cert = self.certificate_service.get_certificate(v)
if cert is None:
return E_NO_CERTIFICATES_FOUND, C_NO_DATA
return E_NO_CERTIFICATES_FOUND, C_NO_DATA
else:
data = self.cert_to_dict_full(cert)
if data is None:
@@ -201,14 +202,15 @@ class CertController:
:rtype: CertificateListResponse
"""
targets = {ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID} # all targets
issuer_id = -1
# the filtering parameter can be read as URL argument or as a request body
if request.is_json or "filtering" in request.args.keys(): # if the request carries JSON data
if request.is_json or FILTERING in request.args.keys(): # if the request carries JSON data
if request.is_json:
data = request.get_json() # get it
else:
try:
data = {"filtering": json.loads(request.args["filtering"])}
data = {FILTERING: json.loads(request.args[FILTERING])}
except JSONDecodeError:
return E_NOT_JSON_FORMAT, C_BAD_REQUEST
@@ -223,23 +225,33 @@ class CertController:
targets.remove(INTERMEDIATE_CA_ID)
else:
return E_WRONG_PARAMETERS, C_BAD_REQUEST
if ISSUED_BY in data[FILTERING]: # containing 'issuedby'
if isinstance(data[FILTERING][ISSUED_BY], int): # which is an 'int'
issuer_id = data[FILTERING][ISSUED_BY] # then get its children only
else:
return E_WRONG_PARAMETERS, C_BAD_REQUEST
if issuer_id >= 0: # if filtering by an issuer
try:
children = self.certificate_service.get_certificates_issued_by(issuer_id) # get his children
except CertificateNotFoundException: # if id does not exist
return E_NO_CERTIFICATES_FOUND, C_NOT_FOUND # throw
if len(targets) == TREE_NODE_TYPE_COUNT: # = 3 -> root node,
certs = [child for child in children if child.type_id in targets]
elif len(targets) == TREE_NODE_TYPE_COUNT: # = 3 -> root node,
# intermediate node,
# or leaf node
# if filtering did not change the
# targets,
certs = self.certificate_service.get_certificates() # fetch everything
else: # otherwise fetch targets only
certs = list(chain(*(self.certificate_service.get_certificates(target) for target in targets)))
certs = self.certificate_service.get_certificates() # fetch everything
else: # otherwise fetch targets only
certs = list(chain(*(self.certificate_service.get_certificates(target) for target in targets)))
else:
certs = self.certificate_service.get_certificates() # if no params, fetch everything
if certs is None:
return E_GENERAL_ERROR, C_INTERNAL_SERVER_ERROR
elif len(certs) == 0:
return E_NO_CERTIFICATES_FOUND, C_NO_DATA
else:
ret = []
for c in certs:
@@ -296,7 +308,7 @@ class CertController:
cert = self.certificate_service.get_certificate(v)
if cert is None:
return E_NO_CERTIFICATES_FOUND, C_NO_DATA
return E_NO_CERTIFICATES_FOUND, C_NO_DATA
if cert.parent_id is None:
return E_NO_CERTIFICATES_FOUND, C_NO_DATA
Loading