Files
PacCrypt-Webapp/paccrypt_algos/pqcrypto_hybrid.py
T
2025-08-06 19:15:05 -10:00

100 lines
3.3 KiB
Python

import os
import base64
import json
import importlib
import sys
from pathlib import Path
from typing import Optional
from pqcrypto.kem.ml_kem_768 import generate_keypair, encrypt as kem_encapsulate, decrypt as kem_decapsulate
# === Allow Hybrid Selector ===
PARENT_DIR = Path(__file__).resolve().parent.parent
if str(PARENT_DIR) not in sys.path:
sys.path.append(str(PARENT_DIR))
# === Constants ===
KEM_ALG = "ML-KEM-768"
AES_KEY_SIZE = 32 # 256-bit
SYMMETRIC_DEFAULT = "aes_gcm"
# === Base64 Helpers ===
def b64encode(data: bytes) -> str:
return base64.b64encode(data).decode("utf-8")
def b64decode(data: str) -> bytes:
return base64.b64decode(data.encode("utf-8"))
# === Dynamic Engine Loader ===
def load_engine(engine_name: str):
try:
return importlib.import_module(f'paccrypt_algos.{engine_name}')
except ModuleNotFoundError:
raise ValueError(f"Encryption engine '{engine_name}' not found.")
# === Encrypt Text ===
def encrypt_text(plaintext: str, public_key: bytes, engine_name: str = SYMMETRIC_DEFAULT) -> str:
engine = load_engine(engine_name)
kem_ciphertext, shared_secret = kem_encapsulate(public_key)
aes_key = shared_secret[:AES_KEY_SIZE]
encrypted_data = engine.encrypt_text(plaintext, aes_key.hex())
header = json.dumps({"alg": engine_name}).encode()
payload = len(kem_ciphertext).to_bytes(2, 'big') + kem_ciphertext + header + b'\0' + encrypted_data.encode()
return b64encode(payload)
# === Decrypt Text ===
def decrypt_text(encrypted_b64: str, private_key: bytes) -> str:
raw = b64decode(encrypted_b64)
kem_len = int.from_bytes(raw[:2], 'big')
kem_ct = raw[2:2 + kem_len]
rest = raw[2 + kem_len:]
header_data, encrypted_data = rest.split(b'\0', 1)
engine_name = json.loads(header_data.decode()).get("alg")
shared_secret = kem_decapsulate(private_key, kem_ct)
aes_key = shared_secret[:AES_KEY_SIZE]
engine = load_engine(engine_name)
return engine.decrypt_text(encrypted_data.decode(), aes_key.hex())
# === Encrypt File ===
def encrypt_file(in_path: str, out_path: str, public_key: bytes, engine_name: str = SYMMETRIC_DEFAULT):
engine = load_engine(engine_name)
kem_ciphertext, shared_secret = kem_encapsulate(public_key)
aes_key = shared_secret[:AES_KEY_SIZE]
with open(in_path, 'rb') as f:
plaintext = f.read()
encrypted = engine.encrypt_file_bytes(plaintext, aes_key.hex())
header = json.dumps({"alg": engine_name}).encode()
payload = len(kem_ciphertext).to_bytes(2, 'big') + kem_ciphertext + header + b'\0' + encrypted
with open(out_path, 'wb') as f:
f.write(payload)
# === Decrypt File ===
def decrypt_file(in_path: str, out_path: str, private_key: bytes):
with open(in_path, 'rb') as f:
raw = f.read()
kem_len = int.from_bytes(raw[:2], 'big')
kem_ct = raw[2:2 + kem_len]
rest = raw[2 + kem_len:]
header_data, encrypted_data = rest.split(b'\0', 1)
engine_name = json.loads(header_data.decode()).get("alg")
shared_secret = kem_decapsulate(private_key, kem_ct)
aes_key = shared_secret[:AES_KEY_SIZE]
engine = load_engine(engine_name)
plaintext = engine.decrypt_file_bytes(encrypted_data, aes_key.hex())
with open(out_path, 'wb') as f:
f.write(plaintext)
# === Engine Name ===
def get_name():
return "PQCrypto Hybrid"