# ===== Standard Library Imports ===== import os import io import json import html import base64 import hashlib import secrets import subprocess import platform from datetime import datetime import sys import psutil from flask_cors import CORS from io import BytesIO # ===== Third-Party Imports ===== from flask import ( Flask, render_template, request, jsonify, session, redirect, url_for, flash, send_file, make_response ) from werkzeug.utils import secure_filename from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from cryptography.hazmat.primitives.hashes import SHA256 from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.fernet import Fernet import pyotp import qrcode from io import BytesIO # ===== PacCrypt Algorithm Imports ===== from paccrypt_algos import aes_cbc, aes_gcm, xchacha, rsa_hybrid # Post-quantum crypto removed for simplicity # ===== Application Configuration ===== app = Flask(__name__) app.secret_key = os.getenv("FLASK_SECRET", os.urandom(24)) CORS(app, origins=["https://pdf.unnaturalll.dev"]) # ===== Constants ===== ADMIN_CRED_FILE = 'application_data/admin_creds.json' ADMIN_KEY_FILE = 'application_data/admin_key.key' ADMIN_LOG_FILE = 'application_data/admin_logs.enc' SETTINGS_FILE = 'application_data/settings.json' DEFAULT_SETTINGS = { "upload_folder": "pacshare", "max_file_age_days": 14, "max_file_size_bytes": 25 * 1024 * 1024 * 1024 # 25GB } # ===== Available Encryption Algorithms ===== AVAILABLE_ALGORITHMS = { "aes_cbc": { "name": "AES-CBC", "module": aes_cbc, "supports_text": True, "supports_file": True, "description": "AES-256 with CBC mode and HMAC authentication" }, "aes_gcm": { "name": "AES-GCM", "module": aes_gcm, "supports_text": True, "supports_file": False, "description": "AES-256 with GCM mode (authenticated encryption)" }, "xchacha": { "name": "XChaCha20-Poly1305", "module": xchacha, "supports_text": True, "supports_file": True, "description": "XChaCha20 stream cipher with Poly1305 authentication" }, "rsa_hybrid": { "name": "RSA Hybrid", "module": rsa_hybrid, "supports_text": True, "supports_file": True, "description": "RSA-4096 with AES hybrid encryption", "requires_keypair": True } } # Post-quantum algorithms removed # ===== Settings Management ===== def load_settings(): """Load application settings from file or create with defaults.""" if not os.path.exists(SETTINGS_FILE): with open(SETTINGS_FILE, 'w') as f: json.dump(DEFAULT_SETTINGS, f) with open(SETTINGS_FILE, 'r') as f: return json.load(f) settings = load_settings() UPLOAD_FOLDER = settings["upload_folder"] MAX_FILE_AGE_DAYS = settings["max_file_age_days"] MAX_FILE_SIZE_BYTES = settings["max_file_size_bytes"] # Ensure upload folder exists and has proper permissions if not os.path.exists(UPLOAD_FOLDER): try: os.makedirs(UPLOAD_FOLDER, exist_ok=True) # Set permissions to 755 (rwxr-xr-x) os.chmod(UPLOAD_FOLDER, 0o755) print(f"[INFO] Created upload directory: {UPLOAD_FOLDER}") except Exception as e: print(f"[ERROR] Failed to create upload directory: {str(e)}") raise # ===== Cryptographic Functions ===== def derive_key(password: str, salt: bytes) -> bytes: """Derive a cryptographic key from password using PBKDF2.""" return PBKDF2HMAC(algorithm=SHA256(), length=32, salt=salt, iterations=200_000).derive(password.encode()) def hash_password(password: str, salt: bytes) -> str: """Hash a password with salt for secure storage.""" return base64.urlsafe_b64encode(derive_key(password, salt)).decode() def advanced_encrypt(plaintext: str, password: str) -> str: """Encrypt plaintext with AES-GCM and return base64-encoded result.""" salt = os.urandom(16) nonce = os.urandom(12) key = derive_key(password, salt) ciphertext = AESGCM(key).encrypt(nonce, plaintext.encode(), None) return base64.b64encode(salt + nonce + ciphertext).decode() def advanced_decrypt(data_b64: str, password: str) -> str: """Decrypt base64-encoded AES-GCM encrypted data.""" try: data = base64.b64decode(data_b64) salt, nonce, ciphertext = data[:16], data[16:28], data[28:] key = derive_key(password, salt) plaintext = AESGCM(key).decrypt(nonce, ciphertext, None) return plaintext.decode() except Exception: return "[Error] Invalid password or corrupted data!" # ===== Admin Authentication ===== def load_admin_key(): """Load or generate admin encryption key.""" if not os.path.exists(ADMIN_KEY_FILE): with open(ADMIN_KEY_FILE, 'wb') as f: f.write(Fernet.generate_key()) with open(ADMIN_KEY_FILE, 'rb') as f: return f.read() def encrypt_creds(username, password, totp_secret=None): """Encrypt and store admin credentials.""" key = load_admin_key() cipher = Fernet(key) salt = os.urandom(16) hashed_pw = hash_password(password, salt) data = { "u": username, "p": hashed_pw, "s": base64.b64encode(salt).decode(), "totp_secret": totp_secret, "2fa_enabled": totp_secret is not None } with open(ADMIN_CRED_FILE, 'wb') as f: f.write(cipher.encrypt(json.dumps(data).encode())) def check_creds(username, password, totp_code=None): """Verify admin credentials.""" try: key = load_admin_key() cipher = Fernet(key) with open(ADMIN_CRED_FILE, 'rb') as f: decrypted = cipher.decrypt(f.read()) creds = json.loads(decrypted) salt = base64.b64decode(creds["s"]) # Check username and password first if not (creds["u"] == username and creds["p"] == hash_password(password, salt)): return False # Check 2FA if enabled if creds.get("2fa_enabled", False): if not totp_code: return False totp_secret = creds.get("totp_secret") if not totp_secret: return False totp = pyotp.TOTP(totp_secret) if not totp.verify(totp_code, valid_window=1): return False return True except Exception as e: print("[ERROR] check_creds failed:", e) return False def get_admin_2fa_status(): """Check if admin has 2FA enabled.""" try: key = load_admin_key() cipher = Fernet(key) with open(ADMIN_CRED_FILE, 'rb') as f: decrypted = cipher.decrypt(f.read()) creds = json.loads(decrypted) return creds.get("2fa_enabled", False) except Exception: return False def get_admin_totp_secret(): """Get admin TOTP secret for QR code generation.""" try: key = load_admin_key() cipher = Fernet(key) with open(ADMIN_CRED_FILE, 'rb') as f: decrypted = cipher.decrypt(f.read()) creds = json.loads(decrypted) return creds.get("totp_secret") except Exception: return None def log_admin_event(message: str): """Log admin actions securely.""" try: key = load_admin_key() cipher = Fernet(key) timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") encrypted = cipher.encrypt(f"[{timestamp}] {message}".encode()) with open(ADMIN_LOG_FILE, 'ab') as f: f.write(encrypted + b"\n") except Exception as e: print("[ERROR] Failed to write admin log:", e) # ===== File Management ===== def cleanup_expired_files(): """Remove files older than MAX_FILE_AGE_DAYS.""" try: now = datetime.now() for fname in os.listdir(UPLOAD_FOLDER): if fname.endswith(".enc") or fname.endswith(".json"): path = os.path.join(UPLOAD_FOLDER, fname) try: file_time = datetime.datetime.fromtimestamp(os.path.getmtime(path), ) age = (now - file_time).days if age > MAX_FILE_AGE_DAYS: os.remove(path) print(f"[INFO] Deleted expired file: {fname}") except Exception as e: print(f"[ERROR] Could not check/delete file {fname}: {e}") except Exception as e: print(f"[ERROR] Failed to cleanup expired files: {str(e)}") # ===== Route Handlers ===== @app.route("/", methods=["GET", "POST"]) def index(): """Main application route handling file uploads.""" if request.method == 'POST': if 'file' in request.files: return handle_file_upload(request) else: return jsonify(error="Use /api/encrypt or /api/decrypt endpoints for text operations"), 400 return render_template("index.html", settings=settings) def handle_file_upload(request): """Process file upload and encryption.""" file = request.files['file'] enc_password = request.form.get('enc_password') pickup_password = request.form.get('pickup_password') algorithm = request.form.get('algorithm', 'aes_cbc') # Default to AES-CBC enable_2fa = request.form.get('enable_2fa') == 'on' # Check if 2FA checkbox is checked if not file or not enc_password or not pickup_password: return jsonify({"error": "Missing fields"}), 400 if file.content_length and file.content_length > MAX_FILE_SIZE_BYTES: return jsonify({"error": f"File too large! Limit: {MAX_FILE_SIZE_BYTES / (1024**3):.2f} GB"}), 400 # Validate algorithm if algorithm not in AVAILABLE_ALGORITHMS: return jsonify({"error": "Invalid algorithm"}), 400 algo_config = AVAILABLE_ALGORITHMS[algorithm] if not algo_config["supports_file"]: return jsonify({"error": "Algorithm does not support file operations"}), 400 filename = secure_filename(file.filename) temp_path = os.path.join(UPLOAD_FOLDER, filename) file.save(temp_path) try: # Use the selected algorithm for encryption module = algo_config["module"] random_id = secrets.token_urlsafe(24) encrypted_filename = f"{random_id}.{algorithm}.encrypted" encrypted_path = os.path.join(UPLOAD_FOLDER, encrypted_filename) # Encrypt file using the correct API (in_path, out_path, password) module.encrypt_file(temp_path, encrypted_path, enc_password) os.remove(temp_path) meta = { 'pickup_password': base64.urlsafe_b64encode(hashlib.sha256(pickup_password.encode()).digest()).decode(), 'original_name': encrypt_filename(filename, enc_password), 'algorithm': algorithm, # Store algorithm used for decryption 'timestamp': datetime.now().isoformat(), 'require_2fa': enable_2fa } # Generate TOTP secret if 2FA is enabled if enable_2fa: totp_secret = pyotp.random_base32() meta['totp_secret'] = totp_secret meta['service_name'] = f"PacCrypt File: {filename[:20]}..." with open(os.path.join(UPLOAD_FOLDER, f"{random_id}.json"), 'w') as f: json.dump(meta, f) pickup_url = request.host_url.rstrip('/') + url_for('pickup_file', file_id=random_id) response_data = {"success": True, "pickup_url": pickup_url} # If 2FA is enabled, also return QR code URL for immediate setup if enable_2fa: qr_url = request.host_url.rstrip('/') + url_for('generate_qr_code', file_id=random_id) response_data["qr_code_url"] = qr_url response_data["totp_secret"] = totp_secret response_data["service_name"] = f"PacCrypt File: {filename[:20]}..." return jsonify(response_data) except Exception as e: # Clean up temp file if it still exists if os.path.exists(temp_path): os.remove(temp_path) return jsonify({"error": f"Encryption failed: {str(e)}"}), 500 def encrypt_filename(filename: str, password: str) -> str: salt = os.urandom(16) key = derive_key(password, salt) nonce = os.urandom(12) ct = AESGCM(key).encrypt(nonce, filename.encode(), None) return base64.urlsafe_b64encode(salt + nonce + ct).decode() def decrypt_filename(enc_filename_b64: str, password: str) -> str: raw = base64.urlsafe_b64decode(enc_filename_b64) salt, nonce, ct = raw[:16], raw[16:28], raw[28:] key = derive_key(password, salt) return AESGCM(key).decrypt(nonce, ct, None).decode() # ===== File Pickup Route ===== @app.route("/pickup/", methods=["GET", "POST"]) def pickup_file(file_id): """Handle file pickup and decryption.""" meta_path = os.path.join(UPLOAD_FOLDER, f"{file_id}.json") # Find the encrypted file (could have different algorithm extensions) enc_path = None for filename in os.listdir(UPLOAD_FOLDER): if filename.startswith(f"{file_id}.") and filename.endswith(".encrypted"): enc_path = os.path.join(UPLOAD_FOLDER, filename) break # Fallback to old .enc format for backward compatibility if not enc_path: old_enc_path = os.path.join(UPLOAD_FOLDER, f"{file_id}.enc") if os.path.exists(old_enc_path): enc_path = old_enc_path if not os.path.exists(meta_path) or not enc_path or not os.path.exists(enc_path): flash("File not found or expired") return redirect(url_for('index')) if request.method == 'POST': return handle_file_pickup(request, meta_path, enc_path, file_id) # Check if 2FA is required for this file require_2fa = False service_name = None if os.path.exists(meta_path): with open(meta_path, 'r') as f: meta = json.load(f) require_2fa = meta.get('require_2fa', False) if require_2fa: service_name = meta.get('service_name', 'PacCrypt File') return render_template("pickup.html", file_id=file_id, require_2fa=require_2fa, service_name=service_name) def handle_file_pickup(request, meta_path, enc_path, file_id): """Process file pickup and decryption.""" pickup_password = request.form.get('pickup_password') enc_password = request.form.get('enc_password') totp_code = request.form.get('totp_code') if not pickup_password or not enc_password: flash("Missing fields") return redirect(request.url) with open(meta_path, 'r') as f: meta = json.load(f) expected_hash = base64.urlsafe_b64encode(hashlib.sha256(pickup_password.encode()).digest()).decode() if expected_hash != meta['pickup_password']: flash("Incorrect pickup password") return redirect(request.url) # Check 2FA if required if meta.get('require_2fa', False): if not totp_code: flash("2FA code is required") return redirect(request.url) totp_secret = meta.get('totp_secret') if not totp_secret: flash("2FA configuration error") return redirect(request.url) totp = pyotp.TOTP(totp_secret) if not totp.verify(totp_code, valid_window=1): # Allow 1 window tolerance for clock drift flash("Invalid 2FA code") return redirect(request.url) # Check if this is an algorithm-based encryption or legacy AESGCM algorithm = meta.get('algorithm') try: if algorithm and algorithm in AVAILABLE_ALGORITHMS: # Use the new algorithm-based decryption algo_config = AVAILABLE_ALGORITHMS[algorithm] module = algo_config["module"] # Create temporary file for decryption temp_dec_path = os.path.join(UPLOAD_FOLDER, f"temp_decrypt_{secrets.token_urlsafe(8)}") try: # Decrypt file using the correct API (in_path, out_path, password) module.decrypt_file(enc_path, temp_dec_path, enc_password) # Read decrypted data with open(temp_dec_path, 'rb') as f: decrypted = f.read() finally: # Clean up temp file if os.path.exists(temp_dec_path): os.remove(temp_dec_path) else: # Legacy AESGCM decryption for backward compatibility with open(enc_path, 'rb') as f: enc_data = f.read() salt, nonce, ct = enc_data[:16], enc_data[16:28], enc_data[28:] key = derive_key(enc_password, salt) decrypted = AESGCM(key).decrypt(nonce, ct, None) except Exception as e: flash(f"Decryption failed: {str(e)}") return redirect(request.url) # Clean up files after successful decryption os.remove(meta_path) os.remove(enc_path) log_admin_event(f"File {file_id} downloaded and deleted.") try: original_name = decrypt_filename(meta['original_name'], enc_password) except Exception: original_name = "retrieved_file" response = send_file( io.BytesIO(decrypted), as_attachment=True, download_name=original_name, mimetype='application/octet-stream' ) # Add headers for better mobile compatibility response.headers['Content-Disposition'] = f'attachment; filename="{original_name}"' response.headers['Content-Type'] = 'application/octet-stream' response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate' response.headers['Pragma'] = 'no-cache' response.headers['Expires'] = '0' return response # ===== 2FA QR Code Routes ===== @app.route("/admin-qr") def admin_qr_code(): """Generate QR code for admin 2FA setup.""" if not session.get("admin_logged_in"): return redirect(url_for("admin_login")) totp_secret = get_admin_totp_secret() if not totp_secret: return "2FA not enabled for admin", 400 # Generate TOTP URI for QR code totp_uri = pyotp.totp.TOTP(totp_secret).provisioning_uri( name="admin", issuer_name="PacCrypt Admin" ) # Generate QR code qr = qrcode.QRCode(version=1, box_size=10, border=5) qr.add_data(totp_uri) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") # Save to BytesIO img_buffer = BytesIO() img.save(img_buffer, format='PNG') img_buffer.seek(0) response = make_response(img_buffer.getvalue()) response.headers['Content-Type'] = 'image/png' response.headers['Content-Disposition'] = 'inline; filename="admin_2fa_qr.png"' return response @app.route("/qr/") def generate_qr_code(file_id): """Generate QR code for 2FA setup.""" meta_path = os.path.join(UPLOAD_FOLDER, f"{file_id}.json") if not os.path.exists(meta_path): return "File not found", 404 with open(meta_path, 'r') as f: meta = json.load(f) if not meta.get('require_2fa', False): return "2FA not enabled for this file", 400 totp_secret = meta.get('totp_secret') service_name = meta.get('service_name', 'PacCrypt File') if not totp_secret: return "TOTP secret not found", 400 # Generate TOTP URI for QR code totp_uri = pyotp.totp.TOTP(totp_secret).provisioning_uri( name=file_id, issuer_name=service_name ) # Generate QR code qr = qrcode.QRCode(version=1, box_size=10, border=5) qr.add_data(totp_uri) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") # Save to BytesIO img_buffer = BytesIO() img.save(img_buffer, format='PNG') img_buffer.seek(0) response = make_response(img_buffer.getvalue()) response.headers['Content-Type'] = 'image/png' response.headers['Content-Disposition'] = f'inline; filename="{file_id}_qr.png"' return response # ===== Admin Routes ===== @app.route("/admin-logs") def admin_logs(): """View admin activity logs.""" if not session.get("admin_logged_in"): return redirect(url_for("admin_login")) logs = [] try: key = load_admin_key() cipher = Fernet(key) if os.path.exists(ADMIN_LOG_FILE): with open(ADMIN_LOG_FILE, 'rb') as f: lines = f.readlines() for line in lines[-100:]: if line.strip(): try: decrypted = cipher.decrypt(line.strip()) logs.append(decrypted.decode()) except Exception: logs.append("[Error] Corrupted log entry.") except Exception as e: logs.append(f"[Error loading logs] {str(e)}") return jsonify(logs=logs) @app.route("/admin-settings", methods=["GET", "POST"]) def admin_settings(): """Manage application settings.""" if not session.get("admin_logged_in"): return redirect(url_for("admin_login")) current_settings = load_settings() if request.method == 'POST': return handle_settings_update(request, current_settings) return render_template("admin_settings.html", settings=current_settings) def handle_settings_update(request, current_settings): """Process settings update request.""" upload_folder = request.form.get('upload_folder', current_settings.get('upload_folder', 'uploads')) max_file_age_days = int(request.form.get('max_file_age_days', current_settings.get('max_file_age_days', 14))) max_file_size_gb = float(request.form.get('max_file_size_gb', current_settings.get('max_file_size_bytes', 25 * 1024 * 1024 * 1024) / (1024 * 1024 * 1024))) max_file_size_bytes = int(max_file_size_gb * 1024 * 1024 * 1024) updated_settings = { "upload_folder": upload_folder, "max_file_age_days": max_file_age_days, "max_file_size_bytes": max_file_size_bytes } with open(SETTINGS_FILE, 'w') as f: json.dump(updated_settings, f) flash("Settings updated successfully!") global settings, UPLOAD_FOLDER, MAX_FILE_AGE_DAYS, MAX_FILE_SIZE_BYTES settings = load_settings() UPLOAD_FOLDER = settings.get('upload_folder', 'uploads') MAX_FILE_AGE_DAYS = settings.get('max_file_age_days', 14) MAX_FILE_SIZE_BYTES = settings.get('max_file_size_bytes', 25 * 1024 * 1024 * 1024) if not os.path.exists(UPLOAD_FOLDER): os.makedirs(UPLOAD_FOLDER) return redirect(url_for("admin_settings")) @app.route("/admin-setup", methods=["GET", "POST"]) def admin_setup(): """Initial admin account setup.""" if os.path.exists(ADMIN_CRED_FILE): return redirect(url_for("admin_login")) if request.method == "POST": u = request.form.get("username") p = request.form.get("password") enable_2fa = request.form.get("enable_2fa") == "on" if u and p: totp_secret = pyotp.random_base32() if enable_2fa else None encrypt_creds(u, p, totp_secret) session["admin_logged_in"] = True session["admin_2fa_setup"] = enable_2fa return redirect(url_for("admin_page")) flash("Both fields required") return render_template("admin_setup.html") @app.route("/admin-login", methods=["GET", "POST"]) def admin_login(): """Admin login handler.""" if request.method == "POST": u = request.form.get("username") p = request.form.get("password") totp_code = request.form.get("totp_code") if check_creds(u, p, totp_code): session["admin_logged_in"] = True log_admin_event("Admin login successful.") return redirect(url_for("admin_page")) else: log_admin_event("Admin login failed.") flash("Incorrect credentials or 2FA code") # Check if 2FA is enabled for the UI requires_2fa = get_admin_2fa_status() return render_template("admin_login.html", requires_2fa=requires_2fa) @app.route("/admin-logout") def admin_logout(): """Admin logout handler.""" session.pop("admin_logged_in", None) return redirect(url_for("index")) @app.route("/adminpage") def admin_page(): """Admin dashboard.""" if not session.get("admin_logged_in"): if not os.path.exists(ADMIN_CRED_FILE): return redirect(url_for("admin_setup")) return redirect(url_for("admin_login")) cleanup_expired_files() routes = [rule.rule for rule in app.url_map.iter_rules() if rule.endpoint != 'static'] now = datetime.now() try: boot_time = datetime.fromtimestamp(psutil.boot_time()) uptime = now - boot_time days = uptime.days hours, remainder = divmod(uptime.seconds, 3600) minutes = remainder // 60 uptime_str = f"{days} days, {hours} hours, {minutes} minutes" except Exception as e: print(f"[ERROR] Uptime calculation failed: {e}") uptime_str = "Unavailable" server_info = { "uptime": uptime_str, "server_time": now.strftime("%Y-%m-%d %H:%M:%S"), "python_version": platform.python_version(), "debug_mode": app.debug } # Get 2FA status for UI tfa_enabled = get_admin_2fa_status() return render_template("admin.html", routes=routes, server_info=server_info, tfa_enabled=tfa_enabled) @app.route("/restart-server", methods=["POST"]) def restart_server(): """Restart the server.""" if not session.get("admin_logged_in"): return jsonify({"error": "Unauthorized"}), 401 try: if platform.system() == "Windows": current_pid = os.getpid() restart_script = f""" @echo off timeout /t 2 /nobreak taskkill /F /PID {current_pid} set PRODUCTION=true start "" "python" "app.py" """ with open("restart.bat", "w") as f: f.write(restart_script) subprocess.Popen(["restart.bat"], shell=True) return jsonify({"message": "Server restart initiated"}), 200 else: current_pid = os.getpid() python_path = sys.executable script_path = os.path.abspath(__file__) # Create a safer and cleaner restart script restart_script = """#!/bin/bash sleep 2 PID=$1 kill "$PID" while kill -0 "$PID" 2>/dev/null; do sleep 0.5; done export PRODUCTION=true exec "$2" "$3" """ with open("restart.sh", "w") as f: f.write(restart_script) os.chmod("restart.sh", 0o755) subprocess.Popen(["./restart.sh", str(current_pid), python_path, script_path]) return jsonify({"message": "Server restart initiated"}), 200 except Exception as e: print(f"[ERROR] Failed to restart server: {str(e)}") return jsonify({"error": f"Failed to restart server: {str(e)}"}), 500 @app.route("/admin-reset", methods=["POST"]) def admin_reset(): """Reset admin credentials.""" if not session.get("admin_logged_in"): return redirect(url_for("admin_login")) try: if os.path.exists(ADMIN_CRED_FILE): os.remove(ADMIN_CRED_FILE) if os.path.exists(ADMIN_KEY_FILE): os.remove(ADMIN_KEY_FILE) session.pop("admin_logged_in", None) flash("Admin credentials reset. Please create new credentials.") except Exception as e: flash("Failed to reset admin credentials.") print("[ERROR] admin_reset failed:", e) return redirect(url_for("admin_setup")) @app.route("/admin-change-password", methods=["POST"]) def admin_change_password(): """Change admin password.""" if not session.get("admin_logged_in"): return redirect(url_for("admin_login")) current = request.form.get("current_password") new = request.form.get("new_password") try: key = load_admin_key() cipher = Fernet(key) with open(ADMIN_CRED_FILE, 'rb') as file: decrypted = cipher.decrypt(file.read()) creds = json.loads(decrypted) salt = base64.b64decode(creds["s"]) if hash_password(current, salt) != creds["p"]: flash("Current password is incorrect") return redirect(url_for("admin_page")) creds["p"] = hash_password(new, salt) encrypted = cipher.encrypt(json.dumps(creds).encode()) with open(ADMIN_CRED_FILE, 'wb') as file: file.write(encrypted) log_admin_event("Admin password changed.") flash("Password updated successfully", "password-feedback") return redirect(url_for("admin_page")) except Exception as e: flash("Failed to update password") print("[ERROR] Password change failed:", e) return redirect(url_for("admin_page")) @app.route("/admin-enable-2fa", methods=["POST"]) def admin_enable_2fa(): """Enable 2FA for admin account.""" if not session.get("admin_logged_in"): return redirect(url_for("admin_login")) try: key = load_admin_key() cipher = Fernet(key) with open(ADMIN_CRED_FILE, 'rb') as file: decrypted = cipher.decrypt(file.read()) creds = json.loads(decrypted) # Generate new TOTP secret totp_secret = pyotp.random_base32() creds["totp_secret"] = totp_secret creds["2fa_enabled"] = True encrypted = cipher.encrypt(json.dumps(creds).encode()) with open(ADMIN_CRED_FILE, 'wb') as file: file.write(encrypted) log_admin_event("Admin 2FA enabled.") flash("2FA enabled successfully. Scan the QR code with your authenticator app.", "2fa-feedback") return redirect(url_for("admin_page")) except Exception as e: flash("Failed to enable 2FA") print("[ERROR] 2FA enable failed:", e) return redirect(url_for("admin_page")) @app.route("/admin-disable-2fa", methods=["POST"]) def admin_disable_2fa(): """Disable 2FA for admin account.""" if not session.get("admin_logged_in"): return redirect(url_for("admin_login")) totp_code = request.form.get("totp_code") if not totp_code: flash("2FA code required to disable 2FA") return redirect(url_for("admin_page")) try: key = load_admin_key() cipher = Fernet(key) with open(ADMIN_CRED_FILE, 'rb') as file: decrypted = cipher.decrypt(file.read()) creds = json.loads(decrypted) # Verify 2FA code before disabling if creds.get("2fa_enabled", False): totp_secret = creds.get("totp_secret") if totp_secret: totp = pyotp.TOTP(totp_secret) if not totp.verify(totp_code, valid_window=1): flash("Invalid 2FA code") return redirect(url_for("admin_page")) creds["totp_secret"] = None creds["2fa_enabled"] = False encrypted = cipher.encrypt(json.dumps(creds).encode()) with open(ADMIN_CRED_FILE, 'wb') as file: file.write(encrypted) log_admin_event("Admin 2FA disabled.") flash("2FA disabled successfully", "2fa-feedback") return redirect(url_for("admin_page")) except Exception as e: flash("Failed to disable 2FA") print("[ERROR] 2FA disable failed:", e) return redirect(url_for("admin_page")) @app.route("/admin-clear-uploads", methods=["POST"]) def admin_clear_uploads(): """Clear all uploaded files.""" if not session.get("admin_logged_in"): return redirect(url_for("admin_login")) deleted = 0 for filename in os.listdir(UPLOAD_FOLDER): if filename.endswith(".enc") or filename.endswith(".json"): try: os.remove(os.path.join(UPLOAD_FOLDER, filename)) deleted += 1 except Exception as e: print("[ERROR] Failed to delete:", filename, e) flash(f"Cleared {deleted} uploaded file(s).", "clear-feedback") return redirect(url_for("admin_page")) @app.route("/admin-update-server", methods=["POST"]) def admin_update_server(): """Update server from GitHub repository.""" if not session.get("admin_logged_in"): return jsonify({"error": "Unauthorized"}), 401 try: # Get the absolute path of the current directory current_dir = os.path.abspath(os.path.dirname(__file__)) # Try to find git executable git_paths = [ "/usr/bin/git", # Standard Debian path "/usr/local/bin/git", "/bin/git", "git" # Fallback to PATH ] git_cmd = None for path in git_paths: if os.path.exists(path) or path == "git": try: # Test if git is executable subprocess.run([path, "--version"], check=True, capture_output=True) git_cmd = path break except Exception: continue if not git_cmd: return jsonify({"error": "Git executable not found. Please ensure git is installed and accessible."}), 500 # Try to find the git repository by checking parent directories repo_dir = current_dir max_depth = 5 # Limit how far up we'll look found_git = False for _ in range(max_depth): git_dir = os.path.join(repo_dir, ".git") if os.path.exists(git_dir): found_git = True break parent_dir = os.path.dirname(repo_dir) if parent_dir == repo_dir: # We've reached the root directory break repo_dir = parent_dir if not found_git: return jsonify({ "error": "Git repository not found. Current directory: " + current_dir, "details": "Please ensure the application is running from within the git repository directory." }), 400 # Execute git commands with proper error handling try: # Fetch latest changes fetch_result = subprocess.run([git_cmd, "fetch"], cwd=repo_dir, check=True, capture_output=True, text=True) # Reset to origin/main reset_result = subprocess.run([git_cmd, "reset", "--hard", "origin/main"], cwd=repo_dir, check=True, capture_output=True, text=True) # Pull latest changes pull_result = subprocess.run([git_cmd, "pull"], cwd=repo_dir, check=True, capture_output=True, text=True) return jsonify({ "message": "Server updated successfully from GitHub!", "details": { "fetch": fetch_result.stdout, "reset": reset_result.stdout, "pull": pull_result.stdout } }), 200 except subprocess.CalledProcessError as e: error_msg = f"Git operation failed: {e.stderr if e.stderr else e.stdout}" print(f"[ERROR] {error_msg}") return jsonify({"error": error_msg}), 500 except Exception as e: error_msg = f"Update failed: {str(e)}" print(f"[ERROR] {error_msg}") return jsonify({"error": error_msg}), 500 @app.route("/admin-switch-dev-mode", methods=["POST"]) def admin_switch_dev_mode(): """Switch server to development mode.""" if not session.get("admin_logged_in"): return jsonify({"error": "Unauthorized"}), 401 try: script_path = os.path.join(os.path.dirname(__file__), "application_data", "control_scripts", "restart_dev.py") if not os.path.exists(script_path): return jsonify({"error": "Development restart script not found"}), 404 # Execute the restart script subprocess.Popen(["python", script_path]) return jsonify({"message": "Switching to development mode... Server will restart momentarily."}), 200 except Exception as e: error_msg = f"Failed to switch to dev mode: {str(e)}" print(f"[ERROR] {error_msg}") return jsonify({"error": error_msg}), 500 @app.route("/admin-switch-prod-mode", methods=["POST"]) def admin_switch_prod_mode(): """Switch server to production mode.""" if not session.get("admin_logged_in"): return jsonify({"error": "Unauthorized"}), 401 try: script_path = os.path.join(os.path.dirname(__file__), "application_data", "control_scripts", "restart_prod.py") if not os.path.exists(script_path): return jsonify({"error": "Production restart script not found"}), 404 # Execute the restart script subprocess.Popen(["python", script_path]) return jsonify({"message": "Switching to production mode... Server will restart momentarily."}), 200 except Exception as e: error_msg = f"Failed to switch to prod mode: {str(e)}" print(f"[ERROR] {error_msg}") return jsonify({"error": error_msg}), 500 # ===== Sitemap and Robots ===== @app.route("/sitemap", methods=["GET"]) def sitemap(): """Generate sitemap.xml.""" sitemap_xml = ''' https://paccrypt.unnaturalll.dev/ https://paccrypt.unnaturalll.dev/pickup https://paccrypt.unnaturalll.dev/adminpage https://paccrypt.unnaturalll.dev/sitemap ''' return sitemap_xml, 200, {'Content-Type': 'application/xml'} @app.route("/robots.txt") def robots_txt(): """Generate robots.txt.""" lines = [ "User-agent: *", "Disallow: /adminpage", "Disallow: /admin-login", "Disallow: /admin-setup", "Disallow: /admin-reset", "Disallow: /admin-settings", "Disallow: /restart-server", "Disallow: /pickup", "Disallow: /admin-change-password", "Allow: /", f"Sitemap: {url_for('sitemap', _external=True)}" ] return "\n".join(lines), 200, {"Content-Type": "text/plain"} # ===== API Endpoints ===== @app.route("/api/algorithms", methods=["GET"]) def api_algorithms(): """Get list of available encryption algorithms.""" algorithms = {} for key, config in AVAILABLE_ALGORITHMS.items(): algorithms[key] = { "name": config["name"], "description": config["description"], "supports_text": config["supports_text"], "supports_file": config["supports_file"], "requires_keypair": config.get("requires_keypair", False) } return jsonify(algorithms=algorithms) @app.route("/api/generate-keypair", methods=["POST"]) def api_generate_keypair(): """Generate RSA key pair for hybrid algorithms.""" try: data = request.get_json() algorithm = data.get("algorithm", "rsa_hybrid") if algorithm not in AVAILABLE_ALGORITHMS: return jsonify({"error": "Invalid algorithm"}), 400 if not AVAILABLE_ALGORITHMS[algorithm].get("requires_keypair"): return jsonify({"error": "Algorithm does not require key pairs"}), 400 module = AVAILABLE_ALGORITHMS[algorithm]["module"] private_key, public_key = module.generate_key_pair() return jsonify({ "private_key": private_key.decode() if isinstance(private_key, bytes) else private_key, "public_key": public_key.decode() if isinstance(public_key, bytes) else public_key }) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/encrypt", methods=["POST"]) def api_encrypt(): try: # Text encryption if request.is_json: data = request.get_json() message = data.get("message", "") password = data.get("password", "") algorithm = data.get("algorithm", "aes_gcm") public_key = data.get("public_key", "") if not message: return jsonify({"error": "Missing message"}), 400 if algorithm not in AVAILABLE_ALGORITHMS: return jsonify({"error": "Invalid algorithm"}), 400 algo_config = AVAILABLE_ALGORITHMS[algorithm] if not algo_config["supports_text"]: return jsonify({"error": "Algorithm does not support text operations"}), 400 module = algo_config["module"] if algo_config.get("requires_keypair"): if not public_key: return jsonify({"error": "Public key required for this algorithm"}), 400 encrypted = module.encrypt_text(message, public_key, algorithm.replace("_hybrid", "")) else: if not password: return jsonify({"error": "Password required"}), 400 encrypted = module.encrypt_text(message, password) return jsonify({"result": encrypted, "algorithm": algorithm}) # File encryption if "file" in request.files and "enc_password" in request.form: uploaded_file = request.files["file"] password = request.form["enc_password"] algorithm = request.form.get("algorithm", "aes_cbc") if algorithm not in AVAILABLE_ALGORITHMS: return jsonify({"error": "Invalid algorithm"}), 400 algo_config = AVAILABLE_ALGORITHMS[algorithm] if not algo_config["supports_file"]: return jsonify({"error": "Algorithm does not support file operations"}), 400 file_data = uploaded_file.read() temp_in = f"temp_in_{secrets.token_urlsafe(8)}" temp_out = f"temp_out_{secrets.token_urlsafe(8)}" try: with open(temp_in, 'wb') as f: f.write(file_data) module = algo_config["module"] module.encrypt_file(temp_in, temp_out, password) with open(temp_out, 'rb') as f: encrypted_data = f.read() output_filename = f"{uploaded_file.filename}.{algorithm}.encrypted" return send_file( BytesIO(encrypted_data), as_attachment=True, download_name=output_filename, mimetype="application/octet-stream" ) finally: for temp_file in [temp_in, temp_out]: if os.path.exists(temp_file): os.remove(temp_file) return jsonify({"error": "Missing or invalid input"}), 400 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/decrypt", methods=["POST"]) def api_decrypt(): try: # Text decryption if request.is_json: data = request.get_json() encrypted_b64 = data.get("message", "") password = data.get("password", "") algorithm = data.get("algorithm", "aes_gcm") private_key = data.get("private_key", "") if not encrypted_b64: return jsonify({"error": "Missing encrypted message"}), 400 if algorithm not in AVAILABLE_ALGORITHMS: return jsonify({"error": "Invalid algorithm"}), 400 algo_config = AVAILABLE_ALGORITHMS[algorithm] if not algo_config["supports_text"]: return jsonify({"error": "Algorithm does not support text operations"}), 400 module = algo_config["module"] if algo_config.get("requires_keypair"): if not private_key: return jsonify({"error": "Private key required for this algorithm"}), 400 plaintext = module.decrypt_text(encrypted_b64, private_key) else: if not password: return jsonify({"error": "Password required"}), 400 plaintext = module.decrypt_text(encrypted_b64, password) return jsonify({"result": plaintext}) # File decryption if "file" in request.files and "enc_password" in request.form: uploaded_file = request.files["file"] password = request.form["enc_password"] # Try to determine algorithm from filename filename = uploaded_file.filename algorithm = "aes_cbc" # default for algo_name in AVAILABLE_ALGORITHMS.keys(): if f".{algo_name}.encrypted" in filename: algorithm = algo_name break # Allow override algorithm = request.form.get("algorithm", algorithm) if algorithm not in AVAILABLE_ALGORITHMS: return jsonify({"error": "Invalid algorithm"}), 400 algo_config = AVAILABLE_ALGORITHMS[algorithm] if not algo_config["supports_file"]: return jsonify({"error": "Algorithm does not support file operations"}), 400 encrypted_data = uploaded_file.read() temp_in = f"temp_in_{secrets.token_urlsafe(8)}" temp_out = f"temp_out_{secrets.token_urlsafe(8)}" try: with open(temp_in, 'wb') as f: f.write(encrypted_data) module = algo_config["module"] module.decrypt_file(temp_in, temp_out, password) with open(temp_out, 'rb') as f: decrypted_data = f.read() # Clean up filename if f".{algorithm}.encrypted" in filename: filename = filename.replace(f".{algorithm}.encrypted", "") elif filename.endswith(".encrypted"): filename = filename[:-10] else: filename = f"decrypted_{filename}" return send_file( BytesIO(decrypted_data), as_attachment=True, download_name=filename, mimetype="application/octet-stream" ) finally: for temp_file in [temp_in, temp_out]: if os.path.exists(temp_file): os.remove(temp_file) return jsonify({"error": "Missing or invalid input"}), 400 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/pacshare", methods=["POST"]) def api_pacshare(): try: enc_password = request.form.get("enc_password") pickup_password = request.form.get("pickup_password") algorithm = request.form.get("algorithm", "aes_cbc") # Default to AES-CBC enable_2fa = request.form.get("enable_2fa") == "on" # Check if 2FA checkbox is checked file = request.files.get("file") if not file or not enc_password or not pickup_password: return jsonify({"error": "Missing file or fields"}), 400 # Validate algorithm if algorithm not in AVAILABLE_ALGORITHMS: return jsonify({"error": "Invalid algorithm"}), 400 algo_config = AVAILABLE_ALGORITHMS[algorithm] if not algo_config["supports_file"]: return jsonify({"error": "Algorithm does not support file operations"}), 400 filename = secure_filename(file.filename) temp_path = os.path.join(UPLOAD_FOLDER, f"temp_{secrets.token_urlsafe(8)}_{filename}") file.save(temp_path) try: # Use the selected algorithm for encryption module = algo_config["module"] file_id = secrets.token_urlsafe(24) encrypted_filename = f"{file_id}.{algorithm}.encrypted" enc_path = os.path.join(UPLOAD_FOLDER, encrypted_filename) meta_path = os.path.join(UPLOAD_FOLDER, f"{file_id}.json") # Encrypt file using the correct API (in_path, out_path, password) module.encrypt_file(temp_path, enc_path, enc_password) encrypted_filename = encrypt_filename(filename, enc_password) meta = { 'pickup_password': base64.urlsafe_b64encode( hashlib.sha256(pickup_password.encode()).digest() ).decode(), 'original_name': encrypted_filename, 'algorithm': algorithm, # Store algorithm used for decryption 'timestamp': datetime.now().isoformat(), 'require_2fa': enable_2fa } # Generate TOTP secret if 2FA is enabled if enable_2fa: totp_secret = pyotp.random_base32() meta['totp_secret'] = totp_secret meta['service_name'] = f"PacCrypt File: {filename[:20]}..." with open(meta_path, "w") as f: json.dump(meta, f) pickup_url = request.host_url.rstrip('/') + url_for('pickup_file', file_id=file_id) response_data = {"pickup_url": pickup_url} # If 2FA is enabled, also return QR code URL for immediate setup if enable_2fa: qr_url = request.host_url.rstrip('/') + url_for('generate_qr_code', file_id=file_id) response_data["qr_code_url"] = qr_url response_data["totp_secret"] = totp_secret response_data["service_name"] = f"PacCrypt File: {filename[:20]}..." return jsonify(response_data) finally: # Clean up temp file if os.path.exists(temp_path): os.remove(temp_path) except Exception as e: return jsonify({"error": str(e)}), 500 # ===== Error Handlers ===== @app.errorhandler(404) def page_not_found(e): return render_template('404.html'), 404 @app.errorhandler(500) def server_error(e): return render_template('500.html'), 500 @app.errorhandler(403) def forbidden(e): return render_template('403.html'), 403 @app.errorhandler(405) def method_not_allowed(e): return render_template('403.html'), 403 @app.errorhandler(FileNotFoundError) def handle_file_not_found(e): if os.getenv("PRODUCTION", "false").lower() == "true": return render_template('500.html'), 500 else: raise e # ===== Application Entry Point ===== if __name__ == "__main__": PRODUCTION = os.getenv("PRODUCTION", "false").lower() == "true" if PRODUCTION: from waitress import serve print("[INFO] Running in PRODUCTION mode with Waitress.") serve(app, host="0.0.0.0", port=5000) else: print("[INFO] Running in DEVELOPMENT mode with Flask server.") app.run(debug=True, host="0.0.0.0", port=5000)