diff --git a/msp/certificate_methods.py b/msp/certificate_methods.py
new file mode 100644
index 0000000..8915b83
--- /dev/null
+++ b/msp/certificate_methods.py
@@ -0,0 +1,314 @@
+import frappe
+from frappe.utils.file_manager import save_file
+import zipfile
+import os
+from frappe.utils import get_files_path
+from cryptography import x509
+from cryptography.hazmat.primitives import serialization
+from cryptography.hazmat.primitives.asymmetric import padding
+from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.backends import default_backend
+from cryptography.x509 import load_pem_x509_certificate
+from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
+import datetime
+
+def process_certficate_data(doctype, method=None):
+
+ if doctype.attach_zip:
+ doctype = create_attach_cert_and_key_from_zip(doctype)
+
+ if doctype.attach_certificate_copy:
+ fill_certificate_data_from_attach_certificate_copy_content(doctype)
+
+ if not doctype.certificate_data:
+ frappe.throw("No Cert Data Available.")
+ else:
+ print(doctype.certificate_data)
+ if not doctype.certificate_data.startswith('-----BEGIN CERTIFICATE-----') or not doctype.certificate_data.strip().endswith('-----END CERTIFICATE-----'):
+ frappe.throw("Missing Start or End Marker in Certificate")
+ else:
+ print("Success")
+
+ cert_data = parse_certificate_content(doctype.certificate_data)
+ pretty_html_with_cert_data = get_pretty_html_from_cert_data(cert_data)
+ print(pretty_html_with_cert_data)
+ doctype.certificate_information = pretty_html_with_cert_data
+ doctype.not_valid_before = cert_data["not_valid_before"]
+ doctype.not_valid_after = cert_data["not_valid_after"]
+
+ if doctype.ca_label_content:
+ ca_data = parse_certificate_content(doctype.ca_label_content)
+ pretty_html_with_ca_data = get_pretty_html_from_cert_data(ca_data)
+ print(pretty_html_with_ca_data)
+ doctype.ca_information = pretty_html_with_ca_data
+ doctype.ca_verification = verify_ca(cert_data, ca_data)
+ else:
+ print("No CA label content")
+
+ cert = doctype.certificate_data
+ key = doctype.private_key
+ doctype.private_key_verification = verify_private_key(doctype.private_key, doctype.certificate_data)
+
+ if not doctype.certificate_name and doctype.certificate_information:
+ set_certificate_name(doctype)
+
+def set_doctype_name(doctype, method=None):
+
+ cert_data = parse_certificate_content(doctype.certificate_data)
+ common_name = cert_data["subject"]["common_name"]
+ not_valid_after = cert_data["not_valid_after"]
+
+ not_valid_after_dt = datetime.datetime.strptime(not_valid_after, "%Y-%m-%d %H:%M:%S")
+ formatted_date = not_valid_after_dt.strftime('%Y-%m-%d-%H-%M')
+
+ certname = f"{common_name}_{formatted_date}"
+
+ frappe.rename_doc("x509_certificate", doctype.name, certname)
+
+ doctype.name = certname
+ doctype.save()
+ frappe.db.commit()
+
+def verify_ca(cert_data, ca_data):
+ if cert_data["issuer"]["common_name"] == ca_data["subject"]["common_name"]:
+ print("CA correct")
+ return "CA is valid"
+ else:
+ print("CA not valid")
+ return "CA not valid"
+
+def verify_private_key(key: str, cert: str):
+ try:
+ # Load the certificate
+ certificate = load_pem_x509_certificate(cert.encode(), default_backend())
+
+ # Load the key
+ private_key = serialization.load_pem_private_key(
+ key.encode(),
+ password=None,
+ backend=default_backend()
+ )
+
+ # Get the public key from the certificate
+ public_key = certificate.public_key()
+
+ # Generate a random message to sign and verify
+ message = b"Cachimba"
+
+ # Sign the message with the private key
+ signature = private_key.sign(
+ message,
+ PKCS1v15(),
+ hashes.SHA256()
+ )
+
+ # Verify the signature with the public key
+ public_key.verify(
+ signature,
+ message,
+ PKCS1v15(),
+ hashes.SHA256()
+ )
+
+ return "Private Key is valid and matches the certificate."
+ except Exception as e:
+ print(f"An error occurred: {e}")
+ return f"Private Key is not valid: {e}"
+
+def create_attach_cert_and_key_from_zip(doctype):
+ docname = doctype.name
+ zip_file = doctype.attach_zip
+ if zip_file:
+ try:
+ # Change the route to absolute if necesary
+ if zip_file.startswith("/private/files/"):
+ zip_file = get_files_path(*zip_file.split("/private/files/", 1)[1].split("/"), is_private=1)
+ elif zip_file.startswith("/files/"):
+ zip_file = get_files_path(*zip_file.split("/files/", 1)[1].split("/"))
+
+ # Verify if the file exists
+ if not os.path.exists(zip_file):
+ return doctype
+
+ with zipfile.ZipFile(zip_file, 'r') as zip_ref:
+ for file_info in zip_ref.infolist():
+ filename = file_info.filename
+
+ # Check if filename ends with crt.pem or key.pem
+ if filename.endswith("crt.pem") or filename.endswith("key.pem") or filename.endswith("ca.pem"):
+ with zip_ref.open(filename) as extracted_file:
+ content = extracted_file.read()
+
+ # Determine the field to attach the file
+ if filename.endswith("crt.pem"):
+ attach_field = "attach_certificate_copy"
+ elif filename.endswith("key.pem"):
+ attach_field = "attach_key"
+ elif filename.endswith("ca.pem"):
+ attach_field = "ca_label_content"
+
+ # Save the file and attach it to the document
+ saved_file = save_file(filename, content, "x509_certificate", docname, df=attach_field, is_private=1)
+ doctype.set(attach_field, saved_file.file_url)
+
+ print(f"Filename: {filename}")
+ print(f"Content:\n{content.decode('utf-8')}\n")
+
+ except Exception as e:
+ print(f"Error extracting and reading zip file: {e}")
+ else:
+ print("No zip file attached or file path is empty.")
+
+ return doctype
+
+def fill_certificate_data_from_attach_certificate_copy_content(doctype):
+ content, content_key, ca_label = None, None, None
+
+ if doctype:
+ if doctype.attach_certificate_copy:
+ certificate_file_path = doctype.attach_certificate_copy
+ content = read_file_content(certificate_file_path)
+ doctype.certificate_data = content
+
+ if doctype.attach_key:
+ key_file_path = doctype.attach_key
+ content_key = read_file_content(key_file_path)
+ doctype.private_key = content_key
+
+ if doctype.ca_label_content:
+ ca_file_path = doctype.ca_label_content
+ ca_label = read_file_content(ca_file_path)
+ doctype.ca_label_content = ca_label
+
+ return doctype
+
+def read_file_content(file_path):
+ if file_path:
+ if file_path.startswith("/private/files/"):
+ file_path = get_files_path(*file_path.split("/private/files/", 1)[1].split("/"), is_private=1)
+ elif file_path.startswith("/files/"):
+ file_path = get_files_path(*file_path.split("/files/", 1)[1].split("/"))
+
+ try:
+ with open(file_path, 'r') as file:
+ content = file.read()
+ if not content:
+ print(f"File {file_path} is empty.")
+ except FileNotFoundError:
+ print(f"File {file_path} not found.")
+ content = None
+ else:
+ content = None
+
+ return content
+
+def parse_certificate_content(certificate_content):
+ print("parse_certificate")
+ try:
+ certificate = x509.load_pem_x509_certificate(certificate_content.encode('utf-8'))
+
+ subject_attrs = {
+ "country": None,
+ "state": None,
+ "city": None,
+ "organization": None,
+ "organization_unit": None,
+ "common_name": None
+ }
+
+ subject = certificate.subject
+ for attr in subject:
+ if attr.oid == x509.NameOID.COUNTRY_NAME:
+ subject_attrs["country"] = attr.value
+ elif attr.oid == x509.NameOID.STATE_OR_PROVINCE_NAME:
+ subject_attrs["state"] = attr.value
+ elif attr.oid == x509.NameOID.LOCALITY_NAME:
+ subject_attrs["city"] = attr.value
+ elif attr.oid == x509.NameOID.ORGANIZATION_NAME:
+ subject_attrs["organization"] = attr.value
+ elif attr.oid == x509.NameOID.ORGANIZATIONAL_UNIT_NAME:
+ subject_attrs["organization_unit"] = attr.value
+ elif attr.oid == x509.NameOID.COMMON_NAME:
+ subject_attrs["common_name"] = attr.value
+
+ # Obtain CN
+ issuer = certificate.issuer
+ issuer_common_name = None
+ for attr in issuer:
+ if attr.oid == x509.NameOID.COMMON_NAME:
+ issuer_common_name = attr.value
+ break
+
+ subject_alt_names = format_subject_alt_names(certificate)
+ data = {
+ "subject": subject_attrs,
+ "issuer": {
+ "common_name": issuer_common_name
+ },
+ "subject_alt_names": subject_alt_names,
+ "not_valid_before": str(certificate.not_valid_before),
+ "not_valid_after": str(certificate.not_valid_after),
+ "serial_number": certificate.serial_number,
+ "signature_algorithm": str(certificate.signature_algorithm_oid)
+ }
+
+ return data
+ except ValueError as e:
+ print(f"Error parsing certificate data: {e}")
+ return {"error": "Invalid certificate format"}
+ except Exception as e:
+ print(f"Error parsing certificate data: {e}")
+ return {"error": str(e)}
+
+
+def format_subject_alt_names(certificate):
+ try:
+ ext = certificate.extensions.get_extension_for_class(x509.SubjectAlternativeName)
+ alt_names = ext.value.get_values_for_type(x509.DNSName)
+ return ', '.join(alt_names) if alt_names else 'No more'
+ except x509.ExtensionNotFound:
+ return 'No more'
+
+def get_pretty_html_from_cert_data(cert_data):
+ subject = cert_data.get("subject", {})
+ issuer = cert_data.get("issuer", "N/A")
+
+ # Format the certificate information
+ certificate_info = (
+ f"Subject:
"
+ f"- Common Name: {subject.get('common_name', 'N/A')}
"
+ f"- Country: {subject.get('country', 'N/A')}
"
+ f"- State/Province: {subject.get('state', 'N/A')}
"
+ f"- City: {subject.get('city', 'N/A')}
"
+ f"- Organization: {subject.get('organization', 'N/A')}
"
+ f"- Organizational Unit: {subject.get('organization_unit', 'N/A')}
"
+ f"Issuer: {issuer}
"
+ f"Subject Alternative Name: {cert_data.get('subject_alt_names', 'N/A')}
"
+ f"Validity Period:
"
+ f"- Not Before: {cert_data.get('not_valid_before', 'N/A')}
"
+ f"- Not After: {cert_data.get('not_valid_after', 'N/A')}
"
+ f"Serial Number: {cert_data.get('serial_number', 'N/A')}
"
+ )
+ print(certificate_info)
+ return certificate_info
+
+def set_certificate_name(doctype):
+ cert_data = parse_certificate_content(doctype.certificate_data)
+
+ common_name = cert_data["subject"]["common_name"]
+ not_valid_after = cert_data["not_valid_after"]
+
+ # Extrat date data
+ not_valid_after_dt = datetime.datetime.strptime(not_valid_after, "%Y-%m-%d %H:%M:%S")
+ year = not_valid_after_dt.year
+ month = not_valid_after_dt.month
+ day = not_valid_after_dt.day
+ hour = not_valid_after_dt.hour
+ minute = not_valid_after_dt.minute
+
+ # Join CN and date
+ certificate_name = f"{common_name}-{year}-{month:02d}-{day:02d}-{hour:02d}-{minute:02d}"
+
+ doctype.certificate_name = certificate_name
+
+ print(f"Certificate Name set to: {certificate_name}")
\ No newline at end of file
diff --git a/msp/hooks.py b/msp/hooks.py
index 48b68d9..9362461 100644
--- a/msp/hooks.py
+++ b/msp/hooks.py
@@ -92,6 +92,10 @@ doctype_js = {"Location" : "public/js/location.js"}
doc_events = {
"Location": {
"before_save": "msp.hooked_methods.build_full_location_path"
+ },
+ "x509_certificate": {
+ "before_save": "msp.certificate_methods.process_certficate_data",
+ "after_insert": "msp.certificate_methods.set_doctype_name"
}
}
diff --git a/msp/msp/doctype/x509_certificate/x509_certificate.js b/msp/msp/doctype/x509_certificate/x509_certificate.js
index fc0a9ec..70fa247 100644
--- a/msp/msp/doctype/x509_certificate/x509_certificate.js
+++ b/msp/msp/doctype/x509_certificate/x509_certificate.js
@@ -5,68 +5,91 @@
//-------------------------------------------------------------------------------------------------------------------------------
frappe.ui.form.on('x509_certificate', {
after_save: function(frm) {
- loadCertificateInformation(frm);
+ //loadCertificateInformation(frm);
}
});
// Function to load certificate information
function loadCertificateInformation(frm) {
+ // Extract and attach ZIP files first
frappe.call({
- method: 'msp.msp.doctype.x509_certificate.x509_certificate.read_cert_data',
+ method: 'msp.msp.doctype.x509_certificate.x509_certificate.extract_and_read_zip_file',
args: {
- certificate_name: frm.doc.name,
- doc: frm.doc
+ docname: frm.doc.name
},
callback: function(response) {
- if (response.message) {
- if (response.message.error) {
- if (response.message.error === 'Invalid certificate format') {
- frappe.msgprint("Invalid Certificate Data");
- } else {
- frappe.msgprint(response.message.error);
- }
- return;
- }
-
- var parsedSubject = response.message.subject;
- var parsedIssuer = parseIssuerInfo(response.message.issuer);
-
- var certificateInfo = "Subject:
" +
- "- Common Name: " + (parsedSubject.common_name || 'N/A') + "
" +
- "- Country: " + (parsedSubject.country || 'N/A') + "
" +
- "- State/Province: " + (parsedSubject.state || 'N/A') + "
" +
- "- City: " + (parsedSubject.city || 'N/A') + "
" +
- "- Organization: " + (parsedSubject.organization || 'N/A') + "
" +
- "- Organizational Unit: " + (parsedSubject.organization_unit || 'N/A') + "
" +
- "Issuer: " + (parsedIssuer.commonName || 'N/A') + "
" +
- "Subject Alternative Name: " + (response.message.subject_alt_names || 'N/A') + "
" +
- "Validity Period:
" +
- "- Not Before: " + response.message.not_valid_before + "
" +
- "- Not After: " + response.message.not_valid_after + "
" +
- "Serial Number: " + response.message.serial_number;
-
- frappe.msgprint("Setting information field with: " + frm.doc.name + "
" + certificateInfo);
- frm.set_value('certificate_information', certificateInfo);
-
- // Check if private key is empty
- if (!frm.doc.private_key) {
- frappe.msgprint("Private Key is empty");
- } else {
- // Check if private key is valid
- frappe.msgprint(response.message.private_key_valid ? "Private Key is correct" : "Private Key is not valid");
- }
-
- // Set the create_date and expiry_date fields
- frm.set_value('not_valid_before', response.message.not_valid_before);
- frm.set_value('not_valid_after', response.message.not_valid_after);
+ if (response.message && response.message.success) {
+ // Refresh the certificate_data and private_key fields
+ frm.set_value('certificate_data');
+ frm.set_value('private_key');
// Refresh the form to show updated values
- frm.refresh_field('not_valid_before');
- frm.refresh_field('not_valid_after');
- frm.refresh_field('certificate_information');
- } else {
- frappe.msgprint("Can't fetch certificate information.");
+ frm.refresh_field('certificate_data');
+ frm.refresh_field('private_key');
+
}
+
+ // Proceed to load certificate information
+ frappe.call({
+ method: 'msp.msp.doctype.x509_certificate.x509_certificate.read_cert_data',
+ args: {
+ certificate_name: frm.doc.name,
+ doc: frm.doc
+ },
+ callback: function(response) {
+ if (response.message) {
+ if (response.message.error) {
+ if (response.message.error === 'Invalid certificate format') {
+ frappe.msgprint("Invalid Certificate Data");
+ } else {
+ frappe.msgprint(response.message.error);
+ }
+ return;
+ }
+
+ var parsedSubject = response.message.subject;
+ var parsedIssuer = parseIssuerInfo(response.message.issuer);
+
+ var certificateInfo = "Subject:
" +
+ "- Common Name: " + (parsedSubject.common_name || 'N/A') + "
" +
+ "- Country: " + (parsedSubject.country || 'N/A') + "
" +
+ "- State/Province: " + (parsedSubject.state || 'N/A') + "
" +
+ "- City: " + (parsedSubject.city || 'N/A') + "
" +
+ "- Organization: " + (parsedSubject.organization || 'N/A') + "
" +
+ "- Organizational Unit: " + (parsedSubject.organization_unit || 'N/A') + "
" +
+ "Issuer: " + (parsedIssuer.commonName || 'N/A') + "
" +
+ "Subject Alternative Name: " + (response.message.subject_alt_names || 'N/A') + "
" +
+ "Validity Period:
" +
+ "- Not Before: " + response.message.not_valid_before + "
" +
+ "- Not After: " + response.message.not_valid_after + "
" +
+ "Serial Number: " + response.message.serial_number;
+
+ frappe.msgprint("Setting information field with: " + frm.doc.name + "
" + certificateInfo);
+ frm.set_value('certificate_information', certificateInfo);
+
+ // Check if private key is empty
+ if (!frm.doc.private_key) {
+ frappe.msgprint("Private Key is empty");
+ } else {
+ // Check if private key is valid
+ frappe.msgprint(response.message.private_key_valid ? "Private Key is correct" : "Private Key is not valid");
+ }
+
+ // Set the create_date and expiry_date fields
+ frm.set_value('not_valid_before', response.message.not_valid_before);
+ frm.set_value('not_valid_after', response.message.not_valid_after);
+
+ // Refresh the form to show updated values
+ frm.refresh_field('not_valid_before');
+ frm.refresh_field('not_valid_after');
+ frm.refresh_field('certificate_information');
+ frm.refresh_field('certificate_data');
+ frm.refresh_field('private_key');
+ } else {
+ frappe.msgprint("Can't fetch certificate information.");
+ }
+ }
+ });
}
});
}
@@ -82,6 +105,7 @@ function parseSubjectInfo(subjectInfo) {
};
}
return null;
+
}
function parseIssuerInfo(issuerInfo) {
@@ -102,4 +126,4 @@ function formatSubjectAltName(subjectAltName) {
return subjectAltName !== 'No more' ? subjectAltName : 'No hay nombres alternativos';
}
return 'No hay nombres alternativos';
-}
\ No newline at end of file
+}
diff --git a/msp/msp/doctype/x509_certificate/x509_certificate.json b/msp/msp/doctype/x509_certificate/x509_certificate.json
index 7982a97..0fa0cd0 100644
--- a/msp/msp/doctype/x509_certificate/x509_certificate.json
+++ b/msp/msp/doctype/x509_certificate/x509_certificate.json
@@ -1,39 +1,44 @@
{
"actions": [],
"allow_rename": 1,
- "autoname": "format:certificate-{certificate_name}-{###}",
"creation": "2024-06-04 14:19:04.168850",
"doctype": "DocType",
"engine": "InnoDB",
"field_order": [
"certificate_name",
- "attach_cert",
+ "attach_zip",
+ "attach_certificate_copy",
"attach_key",
"certificate_data",
"private_key",
+ "private_key_verification",
+ "ca_label_content",
+ "ca_verification",
"not_valid_before",
"not_valid_after",
- "certificate_information"
+ "certificate_information",
+ "ca_information"
],
"fields": [
{
"fieldname": "certificate_name",
"fieldtype": "Data",
"in_list_view": 1,
- "label": "Certificate Name",
- "reqd": 1
+ "label": "Certificate Name"
},
{
"fieldname": "certificate_data",
"fieldtype": "Long Text",
"in_list_view": 1,
- "label": "Certificate Data"
+ "label": "Certificate Data",
+ "read_only_depends_on": "eval:doc.attach_certificate_copy"
},
{
"fieldname": "private_key",
"fieldtype": "Long Text",
"in_list_view": 1,
- "label": "Private Key"
+ "label": "Private Key",
+ "read_only_depends_on": "eval:doc.attach_key"
},
{
"fieldname": "not_valid_before",
@@ -53,22 +58,56 @@
"label": "Certificate Information",
"read_only": 1
},
- {
- "description": "Upload Certificate in PEM or P12 Format",
- "fieldname": "attach_cert",
- "fieldtype": "Attach",
- "label": "Attach Certificate"
- },
{
"description": "Upload Key for PEM format Certificate",
"fieldname": "attach_key",
"fieldtype": "Attach",
- "label": "Attach Key"
+ "label": "Attach Key",
+ "read_only_depends_on": "eval:doc.attach_zip"
+ },
+ {
+ "description": "Upload Certificate in PEM or P12 Format",
+ "fieldname": "attach_certificate_copy",
+ "fieldtype": "Attach",
+ "label": "Attach Certificate Copy",
+ "read_only_depends_on": "eval:doc.attach_zip"
+ },
+ {
+ "fieldname": "attach_zip",
+ "fieldtype": "Attach",
+ "label": "zip"
+ },
+ {
+ "fieldname": "private_key_verification",
+ "fieldtype": "Data",
+ "label": "Private Key Verification",
+ "read_only": 1
+ },
+ {
+ "depends_on": "eval:doc.ca_label_content",
+ "fieldname": "ca_information",
+ "fieldtype": "Long Text",
+ "label": "CA Information",
+ "read_only": 1
+ },
+ {
+ "depends_on": "eval:doc.attach_zip",
+ "fieldname": "ca_label_content",
+ "fieldtype": "Long Text",
+ "label": "Certificate Authority",
+ "read_only_depends_on": "eval:doc.attach_zip"
+ },
+ {
+ "depends_on": "eval:doc.ca_label_content",
+ "fieldname": "ca_verification",
+ "fieldtype": "Data",
+ "label": "CA Verification",
+ "read_only": 1
}
],
"index_web_pages_for_search": 1,
"links": [],
- "modified": "2024-06-05 12:37:40.951738",
+ "modified": "2024-06-14 10:50:44.471107",
"modified_by": "Administrator",
"module": "MSP",
"name": "x509_certificate",
diff --git a/msp/msp/doctype/x509_certificate/x509_certificate.py b/msp/msp/doctype/x509_certificate/x509_certificate.py
index 1995e34..6d67881 100644
--- a/msp/msp/doctype/x509_certificate/x509_certificate.py
+++ b/msp/msp/doctype/x509_certificate/x509_certificate.py
@@ -1,248 +1,5 @@
import frappe
from frappe.model.document import Document
-from cryptography.hazmat.primitives import serialization
-from cryptography.hazmat.backends import default_backend
-from cryptography import x509
-from cryptography.hazmat.primitives.asymmetric import padding
-from cryptography.hazmat.primitives import hashes
-from frappe.utils import get_files_path
-import json
class x509_certificate(Document):
pass
-
-@frappe.whitelist()
-def read_cert_data(certificate_name=None, doc=None):
- """
- Read data of certificado X.509.
-
- Args:
- certificate_name (str): Name.
- doc (str): Documento JSON.
-
- Returns:
- dict: Certificate Data.
- """
-
- # Obtain certificate doc
- certificate_doc = get_certificate_doc(certificate_name)
-
- # Read data
- content, content_key = read_attached_files(doc, certificate_doc)
-
- # Parse and valid certificate
- if content:
- return parse_certificate(content, certificate_doc, content_key)
-
- # Read data from certificate_data field if no attached file
- cert_data_field = read_certificate_data_field(certificate_doc)
- if cert_data_field:
- return parse_certificate(cert_data_field, certificate_doc, content_key)
-
-
-@frappe.whitelist()
-def get_certificate_doc(certificate_name):
- """
- obtain the document.
-
- Args:
- certificate_name (str): Name of the certificate.
-
- Returns:
- Document: Document of certificate.
- """
-
- if isinstance(certificate_name, str):
- try:
- certificate_doc = frappe.get_doc("x509_certificate", certificate_name)
- except frappe.DoesNotExistError:
- print(f"Certificate document {certificate_name} not found.")
- return {"error": f"Certificate document {certificate_name} not found."}
- else:
- certificate_doc = certificate_name
-
- return certificate_doc
-
-
-@frappe.whitelist()
-def read_attached_files(doc, certificate_doc):
- """
- Read the certificate and the private key if it exist.
-
- Args:
- doc (str): JSON document that have extre-info.
-
- """
-
- content, content_key = None, None
-
- if doc and "attach_cert" in json.loads(doc):
- file_path = json.loads(doc)["attach_cert"]
- if file_path:
- content = read_file_content(file_path)
- certificate_doc.certificate_data = content
- certificate_doc.save()
-
- if doc and "attach_key" in json.loads(doc):
- file_path = json.loads(doc)["attach_key"]
- if file_path:
- content_key = read_file_content(file_path)
- certificate_doc.private_key = content_key
- certificate_doc.save()
-
- return content, content_key
-
-
-@frappe.whitelist()
-def read_file_content(file_path):
- """
- Read the file.
-
- Args:
- file_path (str):
-
- Returns:
- str: content
- """
-
- if file_path:
- if file_path.startswith("/private/files/"):
- file_path = get_files_path(*file_path.split("/private/files/", 1)[1].split("/"), is_private=1)
- elif file_path.startswith("/files/"):
- file_path = get_files_path(*file_path.split("/files/", 1)[1].split("/"))
-
- try:
- with open(file_path, 'r') as file:
- content = file.read()
- if not content:
- print(f"File {file_path} is empty.")
- except FileNotFoundError:
- print(f"File {file_path} not found.")
- content = None
- else:
- content = None
-
- return content
-
-@frappe.whitelist()
-def read_certificate_data_field(certificate_doc):
- """
- Read certificate_data field.
-
- Args:
- certificate_doc (Document):
-
- Returns:
- str: Content of certificate_date
- """
- cert_data = getattr(certificate_doc, 'certificate_data', None)
- if cert_data:
- return cert_data
- return None
-
-@frappe.whitelist()
-def parse_certificate(content, certificate_doc, content_key):
- """
- Parse the information
-
- Args:
- content (str):
- certificate_doc (Document):
- content_key (str):
-
- Returns:
- dict: Certificate data.
- """
-
- try:
- certificate = x509.load_pem_x509_certificate(content.encode('utf-8'))
-
- def get_attr(name):
- try:
- return certificate.subject.get_attributes_for_oid(name)[0].value
- except IndexError:
- return None
-
- subject_attrs = {
- "country": get_attr(x509.NameOID.COUNTRY_NAME),
- "state": get_attr(x509.NameOID.STATE_OR_PROVINCE_NAME),
- "city": get_attr(x509.NameOID.LOCALITY_NAME),
- "organization": get_attr(x509.NameOID.ORGANIZATION_NAME),
- "organization_unit": get_attr(x509.NameOID.ORGANIZATIONAL_UNIT_NAME),
- "common_name": get_attr(x509.NameOID.COMMON_NAME)
- }
-
- subject_alt_names = format_subject_alt_names(certificate)
- data = {
- "subject": subject_attrs,
- "issuer": str(certificate.issuer),
- "subject_alt_names": subject_alt_names,
- "not_valid_before": str(certificate.not_valid_before),
- "not_valid_after": str(certificate.not_valid_after),
- "serial_number": certificate.serial_number,
- "signature_algorithm": str(certificate.signature_algorithm_oid)
- }
-
- # Validar clave privada
- private_key_valid = validate_private_key(certificate_doc.private_key, content_key)
- data["private_key_valid"] = private_key_valid
-
- return data
- except ValueError as e:
- print(f"Error parsing certificate data: {e}")
- return {"error": "Invalid certificate format"}
- except Exception as e:
- print(f"Error parsing certificate data: {e}")
- return {"error": str(e)}
-
-
-@frappe.whitelist()
-def format_subject_alt_names(certificate):
- """
- Format alternative names
-
- Args:
- certificate (Certificate): Certificado X.509.
-
- Returns:
- str: Alternative names
- """
-
- try:
- ext = certificate.extensions.get_extension_for_class(x509.SubjectAlternativeName)
- alt_names = ext.value.get_values_for_type(x509.DNSName)
- return ', '.join(alt_names) if alt_names else 'No more'
- except x509.ExtensionNotFound:
- return 'No more'
-
-
-@frappe.whitelist()
-def validate_private_key(private_key, content_key):
- """
- Validate the private key.
-
- Args:
- private_key (str):
- content_key (str): content of private key
-
- Returns:
- bool: True if is correct, False if not.
- """
-
- if not private_key:
- return False # La clave privada está vacía
-
- try:
- private_key = serialization.load_pem_private_key(private_key.encode('utf-8'), password=None, backend=default_backend())
- public_key = private_key.public_key()
- public_key.verify(
- private_key.sign(b'Hello World!', padding.PKCS1v15(), hashes.SHA256()),
- b'Hello World!',
- padding.PKCS1v15(),
- hashes.SHA256()
- )
- return True # La clave privada es válida
- except Exception as e:
- print(f"Error validating private key: {e}")
- return False # La clave privada es inválida
-