Merging dev into main
This commit is contained in:
@@ -0,0 +1,136 @@
|
||||
import os
|
||||
import base64
|
||||
from typing import Optional
|
||||
|
||||
from cryptography.hazmat.primitives import padding, hashes, hmac
|
||||
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.exceptions import InvalidSignature
|
||||
|
||||
# === Constants ===
|
||||
SALT_LENGTH = 16
|
||||
IV_LENGTH = 16
|
||||
PBKDF2_ITERATIONS = 200_000
|
||||
KEY_LENGTH = 32
|
||||
HMAC_KEY_LENGTH = 32 # For HMAC-SHA256
|
||||
HMAC_LENGTH = 32 # Output size of SHA256
|
||||
|
||||
# === Base64 Helpers ===
|
||||
def b64encode(data: bytes) -> str:
|
||||
return base64.b64encode(data).decode('utf-8')
|
||||
|
||||
def b64decode(data: str) -> bytes:
|
||||
return base64.b64decode(data.encode('utf-8'))
|
||||
|
||||
# === Key Derivation ===
|
||||
def derive_key(password: str, salt: bytes) -> bytes:
|
||||
kdf = PBKDF2HMAC(
|
||||
algorithm=hashes.SHA256(),
|
||||
length=KEY_LENGTH + HMAC_KEY_LENGTH,
|
||||
salt=salt,
|
||||
iterations=PBKDF2_ITERATIONS,
|
||||
backend=default_backend()
|
||||
)
|
||||
full_key = kdf.derive(password.encode('utf-8'))
|
||||
return full_key[:KEY_LENGTH], full_key[KEY_LENGTH:]
|
||||
|
||||
# === Encrypt Text ===
|
||||
def encrypt_text(plaintext: str, password: str) -> str:
|
||||
salt = os.urandom(SALT_LENGTH)
|
||||
iv = os.urandom(IV_LENGTH)
|
||||
aes_key, hmac_key = derive_key(password, salt)
|
||||
|
||||
padder = padding.PKCS7(128).padder()
|
||||
padded = padder.update(plaintext.encode('utf-8')) + padder.finalize()
|
||||
|
||||
cipher = Cipher(algorithms.AES(aes_key), modes.CBC(iv), backend=default_backend())
|
||||
encryptor = cipher.encryptor()
|
||||
ciphertext = encryptor.update(padded) + encryptor.finalize()
|
||||
|
||||
payload = salt + iv + ciphertext
|
||||
|
||||
h = hmac.HMAC(hmac_key, hashes.SHA256(), backend=default_backend())
|
||||
h.update(payload)
|
||||
mac = h.finalize()
|
||||
|
||||
return b64encode(payload + mac)
|
||||
|
||||
# === Decrypt Text ===
|
||||
def decrypt_text(encrypted_b64: str, password: str) -> str:
|
||||
raw = b64decode(encrypted_b64)
|
||||
|
||||
salt = raw[:SALT_LENGTH]
|
||||
iv = raw[SALT_LENGTH:SALT_LENGTH + IV_LENGTH]
|
||||
ciphertext = raw[SALT_LENGTH + IV_LENGTH:-HMAC_LENGTH]
|
||||
mac = raw[-HMAC_LENGTH:]
|
||||
|
||||
aes_key, hmac_key = derive_key(password, salt)
|
||||
|
||||
h = hmac.HMAC(hmac_key, hashes.SHA256(), backend=default_backend())
|
||||
h.update(raw[:-HMAC_LENGTH])
|
||||
h.verify(mac)
|
||||
|
||||
cipher = Cipher(algorithms.AES(aes_key), modes.CBC(iv), backend=default_backend())
|
||||
decryptor = cipher.decryptor()
|
||||
padded = decryptor.update(ciphertext) + decryptor.finalize()
|
||||
|
||||
unpadder = padding.PKCS7(128).unpadder()
|
||||
plaintext = unpadder.update(padded) + unpadder.finalize()
|
||||
|
||||
return plaintext.decode('utf-8')
|
||||
|
||||
# === Encrypt File ===
|
||||
def encrypt_file(in_path, out_path, password: str, metadata: Optional[dict] = None):
|
||||
with open(in_path, 'rb') as f:
|
||||
plaintext = f.read()
|
||||
|
||||
salt = os.urandom(SALT_LENGTH)
|
||||
iv = os.urandom(IV_LENGTH)
|
||||
aes_key, hmac_key = derive_key(password, salt)
|
||||
|
||||
padder = padding.PKCS7(128).padder()
|
||||
padded = padder.update(plaintext) + padder.finalize()
|
||||
|
||||
cipher = Cipher(algorithms.AES(aes_key), modes.CBC(iv), backend=default_backend())
|
||||
encryptor = cipher.encryptor()
|
||||
ciphertext = encryptor.update(padded) + encryptor.finalize()
|
||||
|
||||
payload = salt + iv + ciphertext
|
||||
|
||||
h = hmac.HMAC(hmac_key, hashes.SHA256(), backend=default_backend())
|
||||
h.update(payload)
|
||||
mac = h.finalize()
|
||||
|
||||
with open(out_path, 'wb') as f:
|
||||
f.write(payload + mac)
|
||||
|
||||
# === Decrypt File ===
|
||||
def decrypt_file(in_path, out_path, password: str, metadata: Optional[dict] = None):
|
||||
with open(in_path, 'rb') as f:
|
||||
raw = f.read()
|
||||
|
||||
salt = raw[:SALT_LENGTH]
|
||||
iv = raw[SALT_LENGTH:SALT_LENGTH + IV_LENGTH]
|
||||
ciphertext = raw[SALT_LENGTH + IV_LENGTH:-HMAC_LENGTH]
|
||||
mac = raw[-HMAC_LENGTH:]
|
||||
|
||||
aes_key, hmac_key = derive_key(password, salt)
|
||||
|
||||
h = hmac.HMAC(hmac_key, hashes.SHA256(), backend=default_backend())
|
||||
h.update(raw[:-HMAC_LENGTH])
|
||||
h.verify(mac)
|
||||
|
||||
cipher = Cipher(algorithms.AES(aes_key), modes.CBC(iv), backend=default_backend())
|
||||
decryptor = cipher.decryptor()
|
||||
padded = decryptor.update(ciphertext) + decryptor.finalize()
|
||||
|
||||
unpadder = padding.PKCS7(128).unpadder()
|
||||
plaintext = unpadder.update(padded) + unpadder.finalize()
|
||||
|
||||
with open(out_path, 'wb') as f:
|
||||
f.write(plaintext)
|
||||
|
||||
# === Algo Name ===
|
||||
def get_name():
|
||||
return "AES-CBC"
|
||||
@@ -0,0 +1,68 @@
|
||||
import os
|
||||
import base64
|
||||
import json
|
||||
from typing import Optional
|
||||
|
||||
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||||
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
|
||||
# === Constants ===
|
||||
SALT_LENGTH = 16
|
||||
IV_LENGTH = 12
|
||||
PBKDF2_ITERATIONS = 200_000
|
||||
KEY_LENGTH = 32 # 256 bits
|
||||
|
||||
# === Base64 Helpers ===
|
||||
def b64encode(data: bytes) -> str:
|
||||
return base64.b64encode(data).decode('utf-8')
|
||||
|
||||
def b64decode(data: str) -> bytes:
|
||||
return base64.b64decode(data.encode('utf-8'))
|
||||
|
||||
# === Key Derivation ===
|
||||
def derive_key(password: str, salt: bytes) -> bytes:
|
||||
kdf = PBKDF2HMAC(
|
||||
algorithm=hashes.SHA256(),
|
||||
length=KEY_LENGTH,
|
||||
salt=salt,
|
||||
iterations=PBKDF2_ITERATIONS,
|
||||
backend=default_backend()
|
||||
)
|
||||
return kdf.derive(password.encode('utf-8'))
|
||||
|
||||
# === Encrypt Text ===
|
||||
def encrypt_text(plaintext: str, password: str) -> str:
|
||||
salt = os.urandom(SALT_LENGTH)
|
||||
iv = os.urandom(IV_LENGTH)
|
||||
key = derive_key(password, salt)
|
||||
|
||||
aesgcm = AESGCM(key)
|
||||
ciphertext = aesgcm.encrypt(iv, plaintext.encode('utf-8'), None)
|
||||
|
||||
payload = salt + iv + ciphertext
|
||||
return b64encode(payload)
|
||||
|
||||
# === Decrypt Text ===
|
||||
def decrypt_text(encrypted_b64: str, password: str) -> str:
|
||||
raw = b64decode(encrypted_b64)
|
||||
salt = raw[:SALT_LENGTH]
|
||||
iv = raw[SALT_LENGTH:SALT_LENGTH + IV_LENGTH]
|
||||
ciphertext = raw[SALT_LENGTH + IV_LENGTH:]
|
||||
|
||||
key = derive_key(password, salt)
|
||||
aesgcm = AESGCM(key)
|
||||
plaintext = aesgcm.decrypt(iv, ciphertext, None)
|
||||
return plaintext.decode('utf-8')
|
||||
|
||||
# === Metadata-less file interface (optional placeholders) ===
|
||||
def encrypt_file(in_path, out_path, key, metadata: Optional[dict] = None):
|
||||
raise NotImplementedError("File encryption not implemented yet.")
|
||||
|
||||
def decrypt_file(in_path, out_path, key, metadata: Optional[dict] = None):
|
||||
raise NotImplementedError("File decryption not implemented yet.")
|
||||
|
||||
# === Engine Name ===
|
||||
def get_name():
|
||||
return "AES-GCM"
|
||||
@@ -0,0 +1,151 @@
|
||||
import os
|
||||
import base64
|
||||
import json
|
||||
import importlib
|
||||
from typing import Optional, Tuple
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa, padding
|
||||
from cryptography.hazmat.primitives import hashes, serialization
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
|
||||
PARENT_DIR = Path(__file__).resolve().parent.parent
|
||||
if str(PARENT_DIR) not in sys.path:
|
||||
sys.path.append(str(PARENT_DIR))
|
||||
|
||||
# === Constants ===
|
||||
RSA_KEY_SIZE = 4096
|
||||
AES_KEY_SIZE = 32 # 256-bit
|
||||
|
||||
# === Base64 Helpers ===
|
||||
def b64encode(data: bytes) -> str:
|
||||
return base64.b64encode(data).decode("utf-8")
|
||||
|
||||
def b64decode(data: str) -> bytes:
|
||||
return base64.b64decode(data.encode("utf-8"))
|
||||
|
||||
# === RSA Key Generation ===
|
||||
def generate_key_pair() -> Tuple[bytes, bytes]:
|
||||
private_key = rsa.generate_private_key(
|
||||
public_exponent=65537,
|
||||
key_size=RSA_KEY_SIZE,
|
||||
backend=default_backend()
|
||||
)
|
||||
private_pem = private_key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
encryption_algorithm=serialization.NoEncryption()
|
||||
)
|
||||
public_pem = private_key.public_key().public_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PublicFormat.SubjectPublicKeyInfo
|
||||
)
|
||||
return private_pem, public_pem
|
||||
|
||||
# === Dynamic Engine Loader ===
|
||||
def load_engine(engine_name: str):
|
||||
try:
|
||||
return importlib.import_module(f'paccrypt_algos.{engine_name}')
|
||||
except ModuleNotFoundError:
|
||||
raise ValueError(f"Encryption engine '{engine_name}' not found.")
|
||||
|
||||
# === Encrypt Text ===
|
||||
def encrypt_text(plaintext: str, public_key_pem: str, engine_name: str = "aes_gcm") -> str:
|
||||
engine = load_engine(engine_name)
|
||||
aes_key = os.urandom(AES_KEY_SIZE)
|
||||
|
||||
public_key = serialization.load_pem_public_key(public_key_pem.encode(), backend=default_backend())
|
||||
encrypted_key = public_key.encrypt(
|
||||
aes_key,
|
||||
padding.OAEP(
|
||||
mgf=padding.MGF1(algorithm=hashes.SHA256()),
|
||||
algorithm=hashes.SHA256(),
|
||||
label=None
|
||||
)
|
||||
)
|
||||
|
||||
encrypted_data = engine.encrypt_text(plaintext, aes_key.hex())
|
||||
header = json.dumps({"alg": engine_name}).encode()
|
||||
payload = len(encrypted_key).to_bytes(2, 'big') + encrypted_key + header + b'\0' + encrypted_data.encode()
|
||||
return b64encode(payload)
|
||||
|
||||
# === Decrypt Text ===
|
||||
def decrypt_text(encrypted_b64: str, private_key_pem: str) -> str:
|
||||
private_key = serialization.load_pem_private_key(private_key_pem.encode(), password=None, backend=default_backend())
|
||||
raw = b64decode(encrypted_b64)
|
||||
|
||||
enc_key_len = int.from_bytes(raw[:2], 'big')
|
||||
enc_key = raw[2:2 + enc_key_len]
|
||||
rest = raw[2 + enc_key_len:]
|
||||
header_data, encrypted_data = rest.split(b'\0', 1)
|
||||
engine_name = json.loads(header_data.decode()).get("alg")
|
||||
|
||||
aes_key = private_key.decrypt(
|
||||
enc_key,
|
||||
padding.OAEP(
|
||||
mgf=padding.MGF1(algorithm=hashes.SHA256()),
|
||||
algorithm=hashes.SHA256(),
|
||||
label=None
|
||||
)
|
||||
)
|
||||
|
||||
engine = load_engine(engine_name)
|
||||
return engine.decrypt_text(encrypted_data.decode(), aes_key.hex())
|
||||
|
||||
# === Encrypt File ===
|
||||
def encrypt_file(in_path: str, out_path: str, public_key_pem: str, engine_name: str = "aes_gcm"):
|
||||
engine = load_engine(engine_name)
|
||||
aes_key = os.urandom(AES_KEY_SIZE)
|
||||
|
||||
public_key = serialization.load_pem_public_key(public_key_pem.encode(), backend=default_backend())
|
||||
encrypted_key = public_key.encrypt(
|
||||
aes_key,
|
||||
padding.OAEP(
|
||||
mgf=padding.MGF1(algorithm=hashes.SHA256()),
|
||||
algorithm=hashes.SHA256(),
|
||||
label=None
|
||||
)
|
||||
)
|
||||
|
||||
with open(in_path, 'rb') as f:
|
||||
plaintext = f.read()
|
||||
|
||||
encrypted_data = engine.encrypt_file_bytes(plaintext, aes_key.hex())
|
||||
header = json.dumps({"alg": engine_name}).encode()
|
||||
payload = len(encrypted_key).to_bytes(2, 'big') + encrypted_key + header + b'\0' + encrypted_data
|
||||
|
||||
with open(out_path, 'wb') as f:
|
||||
f.write(payload)
|
||||
|
||||
# === Decrypt File ===
|
||||
def decrypt_file(in_path: str, out_path: str, private_key_pem: str):
|
||||
private_key = serialization.load_pem_private_key(private_key_pem.encode(), password=None, backend=default_backend())
|
||||
|
||||
with open(in_path, 'rb') as f:
|
||||
raw = f.read()
|
||||
|
||||
enc_key_len = int.from_bytes(raw[:2], 'big')
|
||||
enc_key = raw[2:2 + enc_key_len]
|
||||
rest = raw[2 + enc_key_len:]
|
||||
header_data, encrypted_data = rest.split(b'\0', 1)
|
||||
engine_name = json.loads(header_data.decode()).get("alg")
|
||||
|
||||
aes_key = private_key.decrypt(
|
||||
enc_key,
|
||||
padding.OAEP(
|
||||
mgf=padding.MGF1(algorithm=hashes.SHA256()),
|
||||
algorithm=hashes.SHA256(),
|
||||
label=None
|
||||
)
|
||||
)
|
||||
|
||||
engine = load_engine(engine_name)
|
||||
plaintext = engine.decrypt_file_bytes(encrypted_data, aes_key.hex())
|
||||
|
||||
with open(out_path, 'wb') as f:
|
||||
f.write(plaintext)
|
||||
|
||||
# === Engine Name ===
|
||||
def get_name():
|
||||
return "RSA Hybrid"
|
||||
@@ -0,0 +1,92 @@
|
||||
import os
|
||||
import base64
|
||||
from typing import Optional
|
||||
from Crypto.Cipher import ChaCha20_Poly1305
|
||||
from Crypto.Random import get_random_bytes
|
||||
from Crypto.Protocol.KDF import PBKDF2
|
||||
from Crypto.Hash import SHA256
|
||||
|
||||
# === Constants ===
|
||||
SALT_LENGTH = 16
|
||||
NONCE_LENGTH = 24
|
||||
KEY_LENGTH = 32
|
||||
PBKDF2_ITERATIONS = 200_000
|
||||
TAG_LENGTH = 16
|
||||
|
||||
# === Base64 Helpers ===
|
||||
def b64encode(data: bytes) -> str:
|
||||
return base64.b64encode(data).decode('utf-8')
|
||||
|
||||
def b64decode(data: str) -> bytes:
|
||||
return base64.b64decode(data.encode('utf-8'))
|
||||
|
||||
# === Key Derivation ===
|
||||
def derive_key(password: str, salt: bytes) -> bytes:
|
||||
return PBKDF2(password, salt, dkLen=KEY_LENGTH, count=PBKDF2_ITERATIONS, hmac_hash_module=SHA256)
|
||||
|
||||
# === Encrypt Text ===
|
||||
def encrypt_text(plaintext: str, password: str) -> str:
|
||||
salt = get_random_bytes(SALT_LENGTH)
|
||||
nonce = get_random_bytes(NONCE_LENGTH)
|
||||
key = derive_key(password, salt)
|
||||
|
||||
cipher = ChaCha20_Poly1305.new(key=key, nonce=nonce)
|
||||
ciphertext, tag = cipher.encrypt_and_digest(plaintext.encode('utf-8'))
|
||||
|
||||
final = salt + nonce + ciphertext + tag
|
||||
return b64encode(final)
|
||||
|
||||
# === Decrypt Text ===
|
||||
def decrypt_text(encrypted_b64: str, password: str) -> str:
|
||||
raw = b64decode(encrypted_b64)
|
||||
salt = raw[:SALT_LENGTH]
|
||||
nonce = raw[SALT_LENGTH:SALT_LENGTH + NONCE_LENGTH]
|
||||
tag = raw[-TAG_LENGTH:]
|
||||
ciphertext = raw[SALT_LENGTH + NONCE_LENGTH:-TAG_LENGTH]
|
||||
|
||||
key = derive_key(password, salt)
|
||||
cipher = ChaCha20_Poly1305.new(key=key, nonce=nonce)
|
||||
plaintext = cipher.decrypt_and_verify(ciphertext, tag)
|
||||
|
||||
return plaintext.decode('utf-8')
|
||||
|
||||
# === Encrypt File ===
|
||||
def encrypt_file(in_path, out_path, password: str, metadata: Optional[dict] = None):
|
||||
with open(in_path, 'rb') as f:
|
||||
plaintext = f.read()
|
||||
|
||||
salt = get_random_bytes(SALT_LENGTH)
|
||||
nonce = get_random_bytes(NONCE_LENGTH)
|
||||
key = derive_key(password, salt)
|
||||
|
||||
cipher = ChaCha20_Poly1305.new(key=key, nonce=nonce)
|
||||
ciphertext, tag = cipher.encrypt_and_digest(plaintext)
|
||||
|
||||
with open(out_path, 'wb') as f:
|
||||
f.write(salt + nonce + ciphertext + tag)
|
||||
|
||||
# === Decrypt File ===
|
||||
def decrypt_file(in_path, out_path, password: str, metadata: Optional[dict] = None):
|
||||
with open(in_path, 'rb') as f:
|
||||
raw = f.read()
|
||||
|
||||
salt = raw[:SALT_LENGTH]
|
||||
nonce = raw[SALT_LENGTH:SALT_LENGTH + NONCE_LENGTH]
|
||||
tag = raw[-TAG_LENGTH:]
|
||||
ciphertext = raw[SALT_LENGTH + NONCE_LENGTH:-TAG_LENGTH]
|
||||
|
||||
key = derive_key(password, salt)
|
||||
cipher = ChaCha20_Poly1305.new(key=key, nonce=nonce)
|
||||
plaintext = cipher.decrypt_and_verify(ciphertext, tag)
|
||||
|
||||
with open(out_path, 'wb') as f:
|
||||
f.write(plaintext)
|
||||
|
||||
# === Engine Name ===
|
||||
def get_name():
|
||||
return "XChaCha20-Poly1305"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from Crypto.Cipher.ChaCha20_Poly1305 import ChaCha20Poly1305Cipher as _test # Force import to validate availability
|
||||
from cryptography.exceptions import InvalidTag # Still catchable for consistency
|
||||
Reference in New Issue
Block a user