100 lines
3.3 KiB
Python
100 lines
3.3 KiB
Python
import os
|
|
import base64
|
|
import json
|
|
import importlib
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from pqcrypto.kem.ml_kem_768 import generate_keypair, encrypt as kem_encapsulate, decrypt as kem_decapsulate
|
|
|
|
# === Allow Hybrid Selector ===
|
|
PARENT_DIR = Path(__file__).resolve().parent.parent
|
|
if str(PARENT_DIR) not in sys.path:
|
|
sys.path.append(str(PARENT_DIR))
|
|
|
|
# === Constants ===
|
|
KEM_ALG = "ML-KEM-768"
|
|
AES_KEY_SIZE = 32 # 256-bit
|
|
SYMMETRIC_DEFAULT = "aes_gcm"
|
|
|
|
# === Base64 Helpers ===
|
|
def b64encode(data: bytes) -> str:
|
|
return base64.b64encode(data).decode("utf-8")
|
|
|
|
def b64decode(data: str) -> bytes:
|
|
return base64.b64decode(data.encode("utf-8"))
|
|
|
|
# === Dynamic Engine Loader ===
|
|
def load_engine(engine_name: str):
|
|
try:
|
|
return importlib.import_module(f'paccrypt_algos.{engine_name}')
|
|
except ModuleNotFoundError:
|
|
raise ValueError(f"Encryption engine '{engine_name}' not found.")
|
|
|
|
# === Encrypt Text ===
|
|
def encrypt_text(plaintext: str, public_key: bytes, engine_name: str = SYMMETRIC_DEFAULT) -> str:
|
|
engine = load_engine(engine_name)
|
|
kem_ciphertext, shared_secret = kem_encapsulate(public_key)
|
|
aes_key = shared_secret[:AES_KEY_SIZE]
|
|
|
|
encrypted_data = engine.encrypt_text(plaintext, aes_key.hex())
|
|
header = json.dumps({"alg": engine_name}).encode()
|
|
payload = len(kem_ciphertext).to_bytes(2, 'big') + kem_ciphertext + header + b'\0' + encrypted_data.encode()
|
|
return b64encode(payload)
|
|
|
|
# === Decrypt Text ===
|
|
def decrypt_text(encrypted_b64: str, private_key: bytes) -> str:
|
|
raw = b64decode(encrypted_b64)
|
|
kem_len = int.from_bytes(raw[:2], 'big')
|
|
kem_ct = raw[2:2 + kem_len]
|
|
rest = raw[2 + kem_len:]
|
|
header_data, encrypted_data = rest.split(b'\0', 1)
|
|
engine_name = json.loads(header_data.decode()).get("alg")
|
|
|
|
shared_secret = kem_decapsulate(private_key, kem_ct)
|
|
aes_key = shared_secret[:AES_KEY_SIZE]
|
|
|
|
engine = load_engine(engine_name)
|
|
return engine.decrypt_text(encrypted_data.decode(), aes_key.hex())
|
|
|
|
# === Encrypt File ===
|
|
def encrypt_file(in_path: str, out_path: str, public_key: bytes, engine_name: str = SYMMETRIC_DEFAULT):
|
|
engine = load_engine(engine_name)
|
|
kem_ciphertext, shared_secret = kem_encapsulate(public_key)
|
|
aes_key = shared_secret[:AES_KEY_SIZE]
|
|
|
|
with open(in_path, 'rb') as f:
|
|
plaintext = f.read()
|
|
|
|
encrypted = engine.encrypt_file_bytes(plaintext, aes_key.hex())
|
|
header = json.dumps({"alg": engine_name}).encode()
|
|
payload = len(kem_ciphertext).to_bytes(2, 'big') + kem_ciphertext + header + b'\0' + encrypted
|
|
|
|
with open(out_path, 'wb') as f:
|
|
f.write(payload)
|
|
|
|
# === Decrypt File ===
|
|
def decrypt_file(in_path: str, out_path: str, private_key: bytes):
|
|
with open(in_path, 'rb') as f:
|
|
raw = f.read()
|
|
|
|
kem_len = int.from_bytes(raw[:2], 'big')
|
|
kem_ct = raw[2:2 + kem_len]
|
|
rest = raw[2 + kem_len:]
|
|
header_data, encrypted_data = rest.split(b'\0', 1)
|
|
engine_name = json.loads(header_data.decode()).get("alg")
|
|
|
|
shared_secret = kem_decapsulate(private_key, kem_ct)
|
|
aes_key = shared_secret[:AES_KEY_SIZE]
|
|
|
|
engine = load_engine(engine_name)
|
|
plaintext = engine.decrypt_file_bytes(encrypted_data, aes_key.hex())
|
|
|
|
with open(out_path, 'wb') as f:
|
|
f.write(plaintext)
|
|
|
|
# === Engine Name ===
|
|
def get_name():
|
|
return "PQCrypto Hybrid"
|