Phase 1 in progress

This commit is contained in:
TySP-Dev
2025-08-06 19:15:05 -10:00
parent 099a5c8f18
commit e108d23945
15 changed files with 737 additions and 6 deletions
View File
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+136
View File
@@ -0,0 +1,136 @@
import os
import base64
from typing import Optional
from cryptography.hazmat.primitives import padding, hashes, hmac
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from cryptography.exceptions import InvalidSignature
# === Constants ===
SALT_LENGTH = 16
IV_LENGTH = 16
PBKDF2_ITERATIONS = 200_000
KEY_LENGTH = 32
HMAC_KEY_LENGTH = 32 # For HMAC-SHA256
HMAC_LENGTH = 32 # Output size of SHA256
# === Base64 Helpers ===
def b64encode(data: bytes) -> str:
return base64.b64encode(data).decode('utf-8')
def b64decode(data: str) -> bytes:
return base64.b64decode(data.encode('utf-8'))
# === Key Derivation ===
def derive_key(password: str, salt: bytes) -> bytes:
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=KEY_LENGTH + HMAC_KEY_LENGTH,
salt=salt,
iterations=PBKDF2_ITERATIONS,
backend=default_backend()
)
full_key = kdf.derive(password.encode('utf-8'))
return full_key[:KEY_LENGTH], full_key[KEY_LENGTH:]
# === Encrypt Text ===
def encrypt_text(plaintext: str, password: str) -> str:
salt = os.urandom(SALT_LENGTH)
iv = os.urandom(IV_LENGTH)
aes_key, hmac_key = derive_key(password, salt)
padder = padding.PKCS7(128).padder()
padded = padder.update(plaintext.encode('utf-8')) + padder.finalize()
cipher = Cipher(algorithms.AES(aes_key), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
ciphertext = encryptor.update(padded) + encryptor.finalize()
payload = salt + iv + ciphertext
h = hmac.HMAC(hmac_key, hashes.SHA256(), backend=default_backend())
h.update(payload)
mac = h.finalize()
return b64encode(payload + mac)
# === Decrypt Text ===
def decrypt_text(encrypted_b64: str, password: str) -> str:
raw = b64decode(encrypted_b64)
salt = raw[:SALT_LENGTH]
iv = raw[SALT_LENGTH:SALT_LENGTH + IV_LENGTH]
ciphertext = raw[SALT_LENGTH + IV_LENGTH:-HMAC_LENGTH]
mac = raw[-HMAC_LENGTH:]
aes_key, hmac_key = derive_key(password, salt)
h = hmac.HMAC(hmac_key, hashes.SHA256(), backend=default_backend())
h.update(raw[:-HMAC_LENGTH])
h.verify(mac)
cipher = Cipher(algorithms.AES(aes_key), modes.CBC(iv), backend=default_backend())
decryptor = cipher.decryptor()
padded = decryptor.update(ciphertext) + decryptor.finalize()
unpadder = padding.PKCS7(128).unpadder()
plaintext = unpadder.update(padded) + unpadder.finalize()
return plaintext.decode('utf-8')
# === Encrypt File ===
def encrypt_file(in_path, out_path, password: str, metadata: Optional[dict] = None):
with open(in_path, 'rb') as f:
plaintext = f.read()
salt = os.urandom(SALT_LENGTH)
iv = os.urandom(IV_LENGTH)
aes_key, hmac_key = derive_key(password, salt)
padder = padding.PKCS7(128).padder()
padded = padder.update(plaintext) + padder.finalize()
cipher = Cipher(algorithms.AES(aes_key), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
ciphertext = encryptor.update(padded) + encryptor.finalize()
payload = salt + iv + ciphertext
h = hmac.HMAC(hmac_key, hashes.SHA256(), backend=default_backend())
h.update(payload)
mac = h.finalize()
with open(out_path, 'wb') as f:
f.write(payload + mac)
# === Decrypt File ===
def decrypt_file(in_path, out_path, password: str, metadata: Optional[dict] = None):
with open(in_path, 'rb') as f:
raw = f.read()
salt = raw[:SALT_LENGTH]
iv = raw[SALT_LENGTH:SALT_LENGTH + IV_LENGTH]
ciphertext = raw[SALT_LENGTH + IV_LENGTH:-HMAC_LENGTH]
mac = raw[-HMAC_LENGTH:]
aes_key, hmac_key = derive_key(password, salt)
h = hmac.HMAC(hmac_key, hashes.SHA256(), backend=default_backend())
h.update(raw[:-HMAC_LENGTH])
h.verify(mac)
cipher = Cipher(algorithms.AES(aes_key), modes.CBC(iv), backend=default_backend())
decryptor = cipher.decryptor()
padded = decryptor.update(ciphertext) + decryptor.finalize()
unpadder = padding.PKCS7(128).unpadder()
plaintext = unpadder.update(padded) + unpadder.finalize()
with open(out_path, 'wb') as f:
f.write(plaintext)
# === Algo Name ===
def get_name():
return "AES-CBC"
+68
View File
@@ -0,0 +1,68 @@
import os
import base64
import json
from typing import Optional
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
# === Constants ===
SALT_LENGTH = 16
IV_LENGTH = 12
PBKDF2_ITERATIONS = 200_000
KEY_LENGTH = 32 # 256 bits
# === Base64 Helpers ===
def b64encode(data: bytes) -> str:
return base64.b64encode(data).decode('utf-8')
def b64decode(data: str) -> bytes:
return base64.b64decode(data.encode('utf-8'))
# === Key Derivation ===
def derive_key(password: str, salt: bytes) -> bytes:
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=KEY_LENGTH,
salt=salt,
iterations=PBKDF2_ITERATIONS,
backend=default_backend()
)
return kdf.derive(password.encode('utf-8'))
# === Encrypt Text ===
def encrypt_text(plaintext: str, password: str) -> str:
salt = os.urandom(SALT_LENGTH)
iv = os.urandom(IV_LENGTH)
key = derive_key(password, salt)
aesgcm = AESGCM(key)
ciphertext = aesgcm.encrypt(iv, plaintext.encode('utf-8'), None)
payload = salt + iv + ciphertext
return b64encode(payload)
# === Decrypt Text ===
def decrypt_text(encrypted_b64: str, password: str) -> str:
raw = b64decode(encrypted_b64)
salt = raw[:SALT_LENGTH]
iv = raw[SALT_LENGTH:SALT_LENGTH + IV_LENGTH]
ciphertext = raw[SALT_LENGTH + IV_LENGTH:]
key = derive_key(password, salt)
aesgcm = AESGCM(key)
plaintext = aesgcm.decrypt(iv, ciphertext, None)
return plaintext.decode('utf-8')
# === Metadata-less file interface (optional placeholders) ===
def encrypt_file(in_path, out_path, key, metadata: Optional[dict] = None):
raise NotImplementedError("File encryption not implemented yet.")
def decrypt_file(in_path, out_path, key, metadata: Optional[dict] = None):
raise NotImplementedError("File decryption not implemented yet.")
# === Engine Name ===
def get_name():
return "AES-GCM"
+99
View File
@@ -0,0 +1,99 @@
import os
import base64
import json
import importlib
import sys
from pathlib import Path
from typing import Optional
from pqcrypto.kem.ml_kem_768 import generate_keypair, encrypt as kem_encapsulate, decrypt as kem_decapsulate
# === Allow Hybrid Selector ===
PARENT_DIR = Path(__file__).resolve().parent.parent
if str(PARENT_DIR) not in sys.path:
sys.path.append(str(PARENT_DIR))
# === Constants ===
KEM_ALG = "ML-KEM-768"
AES_KEY_SIZE = 32 # 256-bit
SYMMETRIC_DEFAULT = "aes_gcm"
# === Base64 Helpers ===
def b64encode(data: bytes) -> str:
return base64.b64encode(data).decode("utf-8")
def b64decode(data: str) -> bytes:
return base64.b64decode(data.encode("utf-8"))
# === Dynamic Engine Loader ===
def load_engine(engine_name: str):
try:
return importlib.import_module(f'paccrypt_algos.{engine_name}')
except ModuleNotFoundError:
raise ValueError(f"Encryption engine '{engine_name}' not found.")
# === Encrypt Text ===
def encrypt_text(plaintext: str, public_key: bytes, engine_name: str = SYMMETRIC_DEFAULT) -> str:
engine = load_engine(engine_name)
kem_ciphertext, shared_secret = kem_encapsulate(public_key)
aes_key = shared_secret[:AES_KEY_SIZE]
encrypted_data = engine.encrypt_text(plaintext, aes_key.hex())
header = json.dumps({"alg": engine_name}).encode()
payload = len(kem_ciphertext).to_bytes(2, 'big') + kem_ciphertext + header + b'\0' + encrypted_data.encode()
return b64encode(payload)
# === Decrypt Text ===
def decrypt_text(encrypted_b64: str, private_key: bytes) -> str:
raw = b64decode(encrypted_b64)
kem_len = int.from_bytes(raw[:2], 'big')
kem_ct = raw[2:2 + kem_len]
rest = raw[2 + kem_len:]
header_data, encrypted_data = rest.split(b'\0', 1)
engine_name = json.loads(header_data.decode()).get("alg")
shared_secret = kem_decapsulate(private_key, kem_ct)
aes_key = shared_secret[:AES_KEY_SIZE]
engine = load_engine(engine_name)
return engine.decrypt_text(encrypted_data.decode(), aes_key.hex())
# === Encrypt File ===
def encrypt_file(in_path: str, out_path: str, public_key: bytes, engine_name: str = SYMMETRIC_DEFAULT):
engine = load_engine(engine_name)
kem_ciphertext, shared_secret = kem_encapsulate(public_key)
aes_key = shared_secret[:AES_KEY_SIZE]
with open(in_path, 'rb') as f:
plaintext = f.read()
encrypted = engine.encrypt_file_bytes(plaintext, aes_key.hex())
header = json.dumps({"alg": engine_name}).encode()
payload = len(kem_ciphertext).to_bytes(2, 'big') + kem_ciphertext + header + b'\0' + encrypted
with open(out_path, 'wb') as f:
f.write(payload)
# === Decrypt File ===
def decrypt_file(in_path: str, out_path: str, private_key: bytes):
with open(in_path, 'rb') as f:
raw = f.read()
kem_len = int.from_bytes(raw[:2], 'big')
kem_ct = raw[2:2 + kem_len]
rest = raw[2 + kem_len:]
header_data, encrypted_data = rest.split(b'\0', 1)
engine_name = json.loads(header_data.decode()).get("alg")
shared_secret = kem_decapsulate(private_key, kem_ct)
aes_key = shared_secret[:AES_KEY_SIZE]
engine = load_engine(engine_name)
plaintext = engine.decrypt_file_bytes(encrypted_data, aes_key.hex())
with open(out_path, 'wb') as f:
f.write(plaintext)
# === Engine Name ===
def get_name():
return "PQCrypto Hybrid"
+151
View File
@@ -0,0 +1,151 @@
import os
import base64
import json
import importlib
from typing import Optional, Tuple
import sys
from pathlib import Path
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.backends import default_backend
PARENT_DIR = Path(__file__).resolve().parent.parent
if str(PARENT_DIR) not in sys.path:
sys.path.append(str(PARENT_DIR))
# === Constants ===
RSA_KEY_SIZE = 4096
AES_KEY_SIZE = 32 # 256-bit
# === Base64 Helpers ===
def b64encode(data: bytes) -> str:
return base64.b64encode(data).decode("utf-8")
def b64decode(data: str) -> bytes:
return base64.b64decode(data.encode("utf-8"))
# === RSA Key Generation ===
def generate_key_pair() -> Tuple[bytes, bytes]:
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=RSA_KEY_SIZE,
backend=default_backend()
)
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
)
public_pem = private_key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
return private_pem, public_pem
# === Dynamic Engine Loader ===
def load_engine(engine_name: str):
try:
return importlib.import_module(f'paccrypt_algos.{engine_name}')
except ModuleNotFoundError:
raise ValueError(f"Encryption engine '{engine_name}' not found.")
# === Encrypt Text ===
def encrypt_text(plaintext: str, public_key_pem: str, engine_name: str = "aes_gcm") -> str:
engine = load_engine(engine_name)
aes_key = os.urandom(AES_KEY_SIZE)
public_key = serialization.load_pem_public_key(public_key_pem.encode(), backend=default_backend())
encrypted_key = public_key.encrypt(
aes_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
encrypted_data = engine.encrypt_text(plaintext, aes_key.hex())
header = json.dumps({"alg": engine_name}).encode()
payload = len(encrypted_key).to_bytes(2, 'big') + encrypted_key + header + b'\0' + encrypted_data.encode()
return b64encode(payload)
# === Decrypt Text ===
def decrypt_text(encrypted_b64: str, private_key_pem: str) -> str:
private_key = serialization.load_pem_private_key(private_key_pem.encode(), password=None, backend=default_backend())
raw = b64decode(encrypted_b64)
enc_key_len = int.from_bytes(raw[:2], 'big')
enc_key = raw[2:2 + enc_key_len]
rest = raw[2 + enc_key_len:]
header_data, encrypted_data = rest.split(b'\0', 1)
engine_name = json.loads(header_data.decode()).get("alg")
aes_key = private_key.decrypt(
enc_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
engine = load_engine(engine_name)
return engine.decrypt_text(encrypted_data.decode(), aes_key.hex())
# === Encrypt File ===
def encrypt_file(in_path: str, out_path: str, public_key_pem: str, engine_name: str = "aes_gcm"):
engine = load_engine(engine_name)
aes_key = os.urandom(AES_KEY_SIZE)
public_key = serialization.load_pem_public_key(public_key_pem.encode(), backend=default_backend())
encrypted_key = public_key.encrypt(
aes_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
with open(in_path, 'rb') as f:
plaintext = f.read()
encrypted_data = engine.encrypt_file_bytes(plaintext, aes_key.hex())
header = json.dumps({"alg": engine_name}).encode()
payload = len(encrypted_key).to_bytes(2, 'big') + encrypted_key + header + b'\0' + encrypted_data
with open(out_path, 'wb') as f:
f.write(payload)
# === Decrypt File ===
def decrypt_file(in_path: str, out_path: str, private_key_pem: str):
private_key = serialization.load_pem_private_key(private_key_pem.encode(), password=None, backend=default_backend())
with open(in_path, 'rb') as f:
raw = f.read()
enc_key_len = int.from_bytes(raw[:2], 'big')
enc_key = raw[2:2 + enc_key_len]
rest = raw[2 + enc_key_len:]
header_data, encrypted_data = rest.split(b'\0', 1)
engine_name = json.loads(header_data.decode()).get("alg")
aes_key = private_key.decrypt(
enc_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
engine = load_engine(engine_name)
plaintext = engine.decrypt_file_bytes(encrypted_data, aes_key.hex())
with open(out_path, 'wb') as f:
f.write(plaintext)
# === Engine Name ===
def get_name():
return "RSA Hybrid"
+178
View File
@@ -0,0 +1,178 @@
import os
import sys
from aes_gcm import encrypt_text as aesgcm_encrypt_text, decrypt_text as aesgcm_decrypt_text, \
encrypt_file as aesgcm_encrypt_file, decrypt_file as aesgcm_decrypt_file
from aes_cbc import encrypt_text as aescbc_encrypt_text, decrypt_text as aescbc_decrypt_text, \
encrypt_file as aescbc_encrypt_file, decrypt_file as aescbc_decrypt_file
from xchacha import encrypt_text as xchacha_encrypt_text, decrypt_text as xchacha_decrypt_text, \
encrypt_file as xchacha_encrypt_file, decrypt_file as xchacha_decrypt_file
import rsa_hybrid
import pqcrypto_hybrid
def load_text(path, binary=False):
with open(path, 'rb' if binary else 'r') as f:
return f.read()
def save_text(path, data, binary=False):
with open(path, 'wb' if binary else 'w') as f:
f.write(data)
def select_symmetric():
print("\n🔀 Select symmetric engine:")
choices = ["aes_gcm", "aes_cbc", "xchacha"]
for i, c in enumerate(choices):
print(f" [{i}] {c}")
while True:
try:
choice = int(input("Choice: "))
return choices[choice]
except (ValueError, IndexError):
print("❌ Invalid choice. Try again.")
def hybrid_cli(name, module, key_ext, symmetric_engine, is_pem=False):
while True:
print(f"\n=== PacCrypt {name} Debug Mode ({symmetric_engine.upper()}) ===")
print("Choose:")
print(" [g] Generate keypair")
print(" [e] Encrypt text")
print(" [d] Decrypt text")
print(" [ef] Encrypt file")
print(" [df] Decrypt file")
print(" [b] Back to engine menu")
print(" [q] Quit script")
mode = input("\nMode (g/e/d/ef/df/b/q): ").strip().lower()
if mode == 'q':
return 'quit'
elif mode == 'b':
return 'back'
try:
if mode == 'g':
priv, pub = module.generate_key_pair() if hasattr(module, 'generate_key_pair') else module.generate_keypair()
save_text(f"{name}_public.{key_ext}", pub, binary=True)
save_text(f"{name}_private.{key_ext}", priv, binary=True)
print(f"✅ Keypair saved to {name}_public.{key_ext} / {name}_private.{key_ext}")
elif mode == 'e':
plaintext = input("Text to encrypt: ")
pub_path = input("Public key path: ").strip()
pub = load_text(pub_path, binary=not is_pem)
result = module.encrypt_text(plaintext, pub, symmetric_engine)
print(f"\n🔐 Encrypted Base64:\n{result}")
elif mode == 'd':
encrypted = input("Encrypted Base64 input: ")
priv_path = input("Private key path: ").strip()
priv = load_text(priv_path, binary=not is_pem)
result = module.decrypt_text(encrypted, priv)
print(f"\n📝 Decrypted:\n{result}")
elif mode == 'ef':
in_path = input("Input file path: ").strip()
out_path = in_path + ".paccrypt"
pub_path = input("Public key path: ").strip()
pub = load_text(pub_path, binary=not is_pem)
module.encrypt_file(in_path, out_path, pub, symmetric_engine)
print(f"✅ File encrypted and saved to: {out_path}")
elif mode == 'df':
in_path = input("Encrypted file path: ").strip()
out_path = in_path.replace(".paccrypt", "")
priv_path = input("Private key path: ").strip()
priv = load_text(priv_path, binary=not is_pem)
module.decrypt_file(in_path, out_path, priv)
print(f"✅ File decrypted and saved to: {out_path}")
else:
print("❌ Invalid option.")
except Exception as e:
print(f"❌ Error: {e}")
def simple_cli(name, encrypt_text, decrypt_text, encrypt_file, decrypt_file):
while True:
print(f"\n=== PacCrypt {name} Debug Mode ===")
print("Choose:")
print(" [e] Encrypt text")
print(" [d] Decrypt text")
print(" [ef] Encrypt file")
print(" [df] Decrypt file")
print(" [b] Back to engine menu")
print(" [q] Quit script")
mode = input("\nMode (e/d/ef/df/b/q): ").strip().lower()
if mode == 'q':
return 'quit'
elif mode == 'b':
return 'back'
try:
if mode == 'e':
plaintext = input("Plaintext to encrypt: ")
password = input("Password: ")
result = encrypt_text(plaintext, password)
print(f"\n🔐 Encrypted Base64:\n{result}")
elif mode == 'd':
encrypted = input("Encrypted Base64 input: ")
password = input("Password: ")
result = decrypt_text(encrypted, password)
print(f"\n📝 Decrypted:\n{result}")
elif mode == 'ef':
in_path = input("Input file path: ").strip()
out_path = in_path + ".paccrypt"
password = input("Password: ")
encrypt_file(in_path, out_path, password)
print(f"✅ File encrypted and saved to: {out_path}")
elif mode == 'df':
in_path = input("Encrypted file path: ").strip()
out_path = in_path.replace(".paccrypt", "")
password = input("Password: ")
decrypt_file(in_path, out_path, password)
print(f"✅ File decrypted and saved to: {out_path}")
else:
print("❌ Invalid option.")
except Exception as e:
print(f"❌ Error: {e}")
# === PacCrypt CLI Entry ===
while True:
print("\n=== PacCrypt Hardcoded CLI ===")
print("Pick an engine:")
print(" [0] AES-GCM")
print(" [1] AES-CBC")
print(" [2] XChaCha20-Poly1305")
print(" [3] RSA Hybrid (with selectable symmetric)")
print(" [4] PQCrypto Hybrid (with selectable symmetric)")
print(" [q] Quit")
choice = input("Choice: ").strip().lower()
if choice == 'q':
print("👋 Bye.")
sys.exit(0)
symmetric_engine = None
if choice in ['3', '4']:
symmetric_engine = select_symmetric()
engines = {
'0': lambda: simple_cli("AES-GCM", aesgcm_encrypt_text, aesgcm_decrypt_text, aesgcm_encrypt_file, aesgcm_decrypt_file),
'1': lambda: simple_cli("AES-CBC", aescbc_encrypt_text, aescbc_decrypt_text, aescbc_encrypt_file, aescbc_decrypt_file),
'2': lambda: simple_cli("XChaCha20-Poly1305", xchacha_encrypt_text, xchacha_decrypt_text, xchacha_encrypt_file, xchacha_decrypt_file),
'3': lambda: hybrid_cli("RSA_Hybrid", rsa_hybrid, "pem", symmetric_engine, is_pem=True),
'4': lambda: hybrid_cli("PQCrypto_Hybrid", pqcrypto_hybrid, "bin", symmetric_engine),
}
if choice in engines:
result = engines[choice]()
if result == 'quit':
print("👋 Quitting.")
sys.exit(0)
# If 'back', just loops again to show engine menu
else:
print("❌ Invalid choice.")
+92
View File
@@ -0,0 +1,92 @@
import os
import base64
from typing import Optional
from Crypto.Cipher import ChaCha20_Poly1305
from Crypto.Random import get_random_bytes
from Crypto.Protocol.KDF import PBKDF2
from Crypto.Hash import SHA256
# === Constants ===
SALT_LENGTH = 16
NONCE_LENGTH = 24
KEY_LENGTH = 32
PBKDF2_ITERATIONS = 200_000
TAG_LENGTH = 16
# === Base64 Helpers ===
def b64encode(data: bytes) -> str:
return base64.b64encode(data).decode('utf-8')
def b64decode(data: str) -> bytes:
return base64.b64decode(data.encode('utf-8'))
# === Key Derivation ===
def derive_key(password: str, salt: bytes) -> bytes:
return PBKDF2(password, salt, dkLen=KEY_LENGTH, count=PBKDF2_ITERATIONS, hmac_hash_module=SHA256)
# === Encrypt Text ===
def encrypt_text(plaintext: str, password: str) -> str:
salt = get_random_bytes(SALT_LENGTH)
nonce = get_random_bytes(NONCE_LENGTH)
key = derive_key(password, salt)
cipher = ChaCha20_Poly1305.new(key=key, nonce=nonce)
ciphertext, tag = cipher.encrypt_and_digest(plaintext.encode('utf-8'))
final = salt + nonce + ciphertext + tag
return b64encode(final)
# === Decrypt Text ===
def decrypt_text(encrypted_b64: str, password: str) -> str:
raw = b64decode(encrypted_b64)
salt = raw[:SALT_LENGTH]
nonce = raw[SALT_LENGTH:SALT_LENGTH + NONCE_LENGTH]
tag = raw[-TAG_LENGTH:]
ciphertext = raw[SALT_LENGTH + NONCE_LENGTH:-TAG_LENGTH]
key = derive_key(password, salt)
cipher = ChaCha20_Poly1305.new(key=key, nonce=nonce)
plaintext = cipher.decrypt_and_verify(ciphertext, tag)
return plaintext.decode('utf-8')
# === Encrypt File ===
def encrypt_file(in_path, out_path, password: str, metadata: Optional[dict] = None):
with open(in_path, 'rb') as f:
plaintext = f.read()
salt = get_random_bytes(SALT_LENGTH)
nonce = get_random_bytes(NONCE_LENGTH)
key = derive_key(password, salt)
cipher = ChaCha20_Poly1305.new(key=key, nonce=nonce)
ciphertext, tag = cipher.encrypt_and_digest(plaintext)
with open(out_path, 'wb') as f:
f.write(salt + nonce + ciphertext + tag)
# === Decrypt File ===
def decrypt_file(in_path, out_path, password: str, metadata: Optional[dict] = None):
with open(in_path, 'rb') as f:
raw = f.read()
salt = raw[:SALT_LENGTH]
nonce = raw[SALT_LENGTH:SALT_LENGTH + NONCE_LENGTH]
tag = raw[-TAG_LENGTH:]
ciphertext = raw[SALT_LENGTH + NONCE_LENGTH:-TAG_LENGTH]
key = derive_key(password, salt)
cipher = ChaCha20_Poly1305.new(key=key, nonce=nonce)
plaintext = cipher.decrypt_and_verify(ciphertext, tag)
with open(out_path, 'wb') as f:
f.write(plaintext)
# === Engine Name ===
def get_name():
return "XChaCha20-Poly1305"
if __name__ == "__main__":
from Crypto.Cipher.ChaCha20_Poly1305 import ChaCha20Poly1305Cipher as _test # Force import to validate availability
from cryptography.exceptions import InvalidTag # Still catchable for consistency