From 10d71f747fc34d677b16b769d941ef5aff0a25a6 Mon Sep 17 00:00:00 2001 From: Your Name Date: Fri, 7 Jun 2024 12:33:26 +0200 Subject: [PATCH] certificate management --- msp/msp/doctype/x509_certificate/__init__.py | 0 .../x509_certificate/test_x509_certificate.py | 9 + .../x509_certificate/x509_certificate.js | 105 ++++++++ .../x509_certificate/x509_certificate.json | 94 +++++++ .../x509_certificate/x509_certificate.py | 248 ++++++++++++++++++ 5 files changed, 456 insertions(+) create mode 100644 msp/msp/doctype/x509_certificate/__init__.py create mode 100644 msp/msp/doctype/x509_certificate/test_x509_certificate.py create mode 100644 msp/msp/doctype/x509_certificate/x509_certificate.js create mode 100644 msp/msp/doctype/x509_certificate/x509_certificate.json create mode 100644 msp/msp/doctype/x509_certificate/x509_certificate.py diff --git a/msp/msp/doctype/x509_certificate/__init__.py b/msp/msp/doctype/x509_certificate/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/msp/msp/doctype/x509_certificate/test_x509_certificate.py b/msp/msp/doctype/x509_certificate/test_x509_certificate.py new file mode 100644 index 0000000..2eaa2af --- /dev/null +++ b/msp/msp/doctype/x509_certificate/test_x509_certificate.py @@ -0,0 +1,9 @@ +# Copyright (c) 2024, itsdave GmbH and Contributors +# See license.txt + +# import frappe +from frappe.tests.utils import FrappeTestCase + + +class Testx509_certificate(FrappeTestCase): + pass diff --git a/msp/msp/doctype/x509_certificate/x509_certificate.js b/msp/msp/doctype/x509_certificate/x509_certificate.js new file mode 100644 index 0000000..fc0a9ec --- /dev/null +++ b/msp/msp/doctype/x509_certificate/x509_certificate.js @@ -0,0 +1,105 @@ +// Copyright (c) 2024, itsdave GmbH and Contributors +// For license information, please see license.txt + + +//------------------------------------------------------------------------------------------------------------------------------- +frappe.ui.form.on('x509_certificate', { + after_save: function(frm) { + loadCertificateInformation(frm); + } +}); + +// Function to load certificate information +function loadCertificateInformation(frm) { + frappe.call({ + method: 'msp.msp.doctype.x509_certificate.x509_certificate.read_cert_data', + args: { + certificate_name: frm.doc.name, + doc: frm.doc + }, + callback: function(response) { + if (response.message) { + if (response.message.error) { + if (response.message.error === 'Invalid certificate format') { + frappe.msgprint("Invalid Certificate Data"); + } else { + frappe.msgprint(response.message.error); + } + return; + } + + var parsedSubject = response.message.subject; + var parsedIssuer = parseIssuerInfo(response.message.issuer); + + var certificateInfo = "Subject:
" + + "- Common Name: " + (parsedSubject.common_name || 'N/A') + "
" + + "- Country: " + (parsedSubject.country || 'N/A') + "
" + + "- State/Province: " + (parsedSubject.state || 'N/A') + "
" + + "- City: " + (parsedSubject.city || 'N/A') + "
" + + "- Organization: " + (parsedSubject.organization || 'N/A') + "
" + + "- Organizational Unit: " + (parsedSubject.organization_unit || 'N/A') + "
" + + "Issuer: " + (parsedIssuer.commonName || 'N/A') + "
" + + "Subject Alternative Name: " + (response.message.subject_alt_names || 'N/A') + "
" + + "Validity Period:
" + + "- Not Before: " + response.message.not_valid_before + "
" + + "- Not After: " + response.message.not_valid_after + "
" + + "Serial Number: " + response.message.serial_number; + + frappe.msgprint("Setting information field with: " + frm.doc.name + "
" + certificateInfo); + frm.set_value('certificate_information', certificateInfo); + + // Check if private key is empty + if (!frm.doc.private_key) { + frappe.msgprint("Private Key is empty"); + } else { + // Check if private key is valid + frappe.msgprint(response.message.private_key_valid ? "Private Key is correct" : "Private Key is not valid"); + } + + // Set the create_date and expiry_date fields + frm.set_value('not_valid_before', response.message.not_valid_before); + frm.set_value('not_valid_after', response.message.not_valid_after); + + // Refresh the form to show updated values + frm.refresh_field('not_valid_before'); + frm.refresh_field('not_valid_after'); + frm.refresh_field('certificate_information'); + } else { + frappe.msgprint("Can't fetch certificate information."); + } + } + }); +} + +function parseSubjectInfo(subjectInfo) { + if (subjectInfo) { + // Regular expression to extract Common Name (CN) from subject + var parenthesesPattern = /CN=([^,]+)/; + var cnMatch = subjectInfo.match(parenthesesPattern); + var commonName = cnMatch ? cnMatch[1] : null; + return { + commonName: commonName + }; + } + return null; +} + +function parseIssuerInfo(issuerInfo) { + if (issuerInfo) { + var parenthesesPattern = /CN=([^,]+)/; + var cnMatch = issuerInfo.match(parenthesesPattern); + var commonName = cnMatch ? cnMatch[1] : null; + return { + commonName: commonName + }; + } + return null; +} + +// Function to format Subject Alternative Name +function formatSubjectAltName(subjectAltName) { + if (subjectAltName) { + return subjectAltName !== 'No more' ? subjectAltName : 'No hay nombres alternativos'; + } + return 'No hay nombres alternativos'; +} \ No newline at end of file diff --git a/msp/msp/doctype/x509_certificate/x509_certificate.json b/msp/msp/doctype/x509_certificate/x509_certificate.json new file mode 100644 index 0000000..7982a97 --- /dev/null +++ b/msp/msp/doctype/x509_certificate/x509_certificate.json @@ -0,0 +1,94 @@ +{ + "actions": [], + "allow_rename": 1, + "autoname": "format:certificate-{certificate_name}-{###}", + "creation": "2024-06-04 14:19:04.168850", + "doctype": "DocType", + "engine": "InnoDB", + "field_order": [ + "certificate_name", + "attach_cert", + "attach_key", + "certificate_data", + "private_key", + "not_valid_before", + "not_valid_after", + "certificate_information" + ], + "fields": [ + { + "fieldname": "certificate_name", + "fieldtype": "Data", + "in_list_view": 1, + "label": "Certificate Name", + "reqd": 1 + }, + { + "fieldname": "certificate_data", + "fieldtype": "Long Text", + "in_list_view": 1, + "label": "Certificate Data" + }, + { + "fieldname": "private_key", + "fieldtype": "Long Text", + "in_list_view": 1, + "label": "Private Key" + }, + { + "fieldname": "not_valid_before", + "fieldtype": "Datetime", + "label": "Not Valid Before", + "read_only": 1 + }, + { + "fieldname": "not_valid_after", + "fieldtype": "Datetime", + "label": "Not Valid After", + "read_only": 1 + }, + { + "fieldname": "certificate_information", + "fieldtype": "Long Text", + "label": "Certificate Information", + "read_only": 1 + }, + { + "description": "Upload Certificate in PEM or P12 Format", + "fieldname": "attach_cert", + "fieldtype": "Attach", + "label": "Attach Certificate" + }, + { + "description": "Upload Key for PEM format Certificate", + "fieldname": "attach_key", + "fieldtype": "Attach", + "label": "Attach Key" + } + ], + "index_web_pages_for_search": 1, + "links": [], + "modified": "2024-06-05 12:37:40.951738", + "modified_by": "Administrator", + "module": "MSP", + "name": "x509_certificate", + "naming_rule": "Expression (old style)", + "owner": "Administrator", + "permissions": [ + { + "create": 1, + "delete": 1, + "email": 1, + "export": 1, + "print": 1, + "read": 1, + "report": 1, + "role": "System Manager", + "share": 1, + "write": 1 + } + ], + "sort_field": "modified", + "sort_order": "DESC", + "states": [] +} \ No newline at end of file diff --git a/msp/msp/doctype/x509_certificate/x509_certificate.py b/msp/msp/doctype/x509_certificate/x509_certificate.py new file mode 100644 index 0000000..1995e34 --- /dev/null +++ b/msp/msp/doctype/x509_certificate/x509_certificate.py @@ -0,0 +1,248 @@ +import frappe +from frappe.model.document import Document +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.backends import default_backend +from cryptography import x509 +from cryptography.hazmat.primitives.asymmetric import padding +from cryptography.hazmat.primitives import hashes +from frappe.utils import get_files_path +import json + +class x509_certificate(Document): + pass + +@frappe.whitelist() +def read_cert_data(certificate_name=None, doc=None): + """ + Read data of certificado X.509. + + Args: + certificate_name (str): Name. + doc (str): Documento JSON. + + Returns: + dict: Certificate Data. + """ + + # Obtain certificate doc + certificate_doc = get_certificate_doc(certificate_name) + + # Read data + content, content_key = read_attached_files(doc, certificate_doc) + + # Parse and valid certificate + if content: + return parse_certificate(content, certificate_doc, content_key) + + # Read data from certificate_data field if no attached file + cert_data_field = read_certificate_data_field(certificate_doc) + if cert_data_field: + return parse_certificate(cert_data_field, certificate_doc, content_key) + + +@frappe.whitelist() +def get_certificate_doc(certificate_name): + """ + obtain the document. + + Args: + certificate_name (str): Name of the certificate. + + Returns: + Document: Document of certificate. + """ + + if isinstance(certificate_name, str): + try: + certificate_doc = frappe.get_doc("x509_certificate", certificate_name) + except frappe.DoesNotExistError: + print(f"Certificate document {certificate_name} not found.") + return {"error": f"Certificate document {certificate_name} not found."} + else: + certificate_doc = certificate_name + + return certificate_doc + + +@frappe.whitelist() +def read_attached_files(doc, certificate_doc): + """ + Read the certificate and the private key if it exist. + + Args: + doc (str): JSON document that have extre-info. + + """ + + content, content_key = None, None + + if doc and "attach_cert" in json.loads(doc): + file_path = json.loads(doc)["attach_cert"] + if file_path: + content = read_file_content(file_path) + certificate_doc.certificate_data = content + certificate_doc.save() + + if doc and "attach_key" in json.loads(doc): + file_path = json.loads(doc)["attach_key"] + if file_path: + content_key = read_file_content(file_path) + certificate_doc.private_key = content_key + certificate_doc.save() + + return content, content_key + + +@frappe.whitelist() +def read_file_content(file_path): + """ + Read the file. + + Args: + file_path (str): + + Returns: + str: content + """ + + if file_path: + if file_path.startswith("/private/files/"): + file_path = get_files_path(*file_path.split("/private/files/", 1)[1].split("/"), is_private=1) + elif file_path.startswith("/files/"): + file_path = get_files_path(*file_path.split("/files/", 1)[1].split("/")) + + try: + with open(file_path, 'r') as file: + content = file.read() + if not content: + print(f"File {file_path} is empty.") + except FileNotFoundError: + print(f"File {file_path} not found.") + content = None + else: + content = None + + return content + +@frappe.whitelist() +def read_certificate_data_field(certificate_doc): + """ + Read certificate_data field. + + Args: + certificate_doc (Document): + + Returns: + str: Content of certificate_date + """ + cert_data = getattr(certificate_doc, 'certificate_data', None) + if cert_data: + return cert_data + return None + +@frappe.whitelist() +def parse_certificate(content, certificate_doc, content_key): + """ + Parse the information + + Args: + content (str): + certificate_doc (Document): + content_key (str): + + Returns: + dict: Certificate data. + """ + + try: + certificate = x509.load_pem_x509_certificate(content.encode('utf-8')) + + def get_attr(name): + try: + return certificate.subject.get_attributes_for_oid(name)[0].value + except IndexError: + return None + + subject_attrs = { + "country": get_attr(x509.NameOID.COUNTRY_NAME), + "state": get_attr(x509.NameOID.STATE_OR_PROVINCE_NAME), + "city": get_attr(x509.NameOID.LOCALITY_NAME), + "organization": get_attr(x509.NameOID.ORGANIZATION_NAME), + "organization_unit": get_attr(x509.NameOID.ORGANIZATIONAL_UNIT_NAME), + "common_name": get_attr(x509.NameOID.COMMON_NAME) + } + + subject_alt_names = format_subject_alt_names(certificate) + data = { + "subject": subject_attrs, + "issuer": str(certificate.issuer), + "subject_alt_names": subject_alt_names, + "not_valid_before": str(certificate.not_valid_before), + "not_valid_after": str(certificate.not_valid_after), + "serial_number": certificate.serial_number, + "signature_algorithm": str(certificate.signature_algorithm_oid) + } + + # Validar clave privada + private_key_valid = validate_private_key(certificate_doc.private_key, content_key) + data["private_key_valid"] = private_key_valid + + return data + except ValueError as e: + print(f"Error parsing certificate data: {e}") + return {"error": "Invalid certificate format"} + except Exception as e: + print(f"Error parsing certificate data: {e}") + return {"error": str(e)} + + +@frappe.whitelist() +def format_subject_alt_names(certificate): + """ + Format alternative names + + Args: + certificate (Certificate): Certificado X.509. + + Returns: + str: Alternative names + """ + + try: + ext = certificate.extensions.get_extension_for_class(x509.SubjectAlternativeName) + alt_names = ext.value.get_values_for_type(x509.DNSName) + return ', '.join(alt_names) if alt_names else 'No more' + except x509.ExtensionNotFound: + return 'No more' + + +@frappe.whitelist() +def validate_private_key(private_key, content_key): + """ + Validate the private key. + + Args: + private_key (str): + content_key (str): content of private key + + Returns: + bool: True if is correct, False if not. + """ + + if not private_key: + return False # La clave privada está vacía + + try: + private_key = serialization.load_pem_private_key(private_key.encode('utf-8'), password=None, backend=default_backend()) + public_key = private_key.public_key() + public_key.verify( + private_key.sign(b'Hello World!', padding.PKCS1v15(), hashes.SHA256()), + b'Hello World!', + padding.PKCS1v15(), + hashes.SHA256() + ) + return True # La clave privada es válida + except Exception as e: + print(f"Error validating private key: {e}") + return False # La clave privada es inválida +