More towards the roadmap

This commit is contained in:
Tyler Sammons
2025-09-15 12:55:01 -10:00
parent 5d568f7f89
commit 38d3b7e6c1
9 changed files with 3653 additions and 150 deletions
+156 -11
View File
@@ -13,6 +13,12 @@ import sys
import psutil
from flask_cors import CORS
from io import BytesIO
import ipaddress
from functools import wraps
import time
from collections import defaultdict
from datetime import datetime, timedelta
import clamd
# ===== Third-Party Imports =====
from flask import (
@@ -27,6 +33,8 @@ from cryptography.fernet import Fernet
import pyotp
import qrcode
from io import BytesIO
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
# ===== PacCrypt Algorithm Imports =====
from paccrypt_algos import aes_cbc, aes_gcm, xchacha, rsa_hybrid
@@ -37,6 +45,16 @@ app = Flask(__name__)
app.secret_key = os.getenv("FLASK_SECRET", os.urandom(24))
CORS(app, origins=["https://pdf.unnaturalll.dev"])
# Initialize rate limiter
limiter = Limiter(
key_func=get_remote_address,
default_limits=["1000 per hour"]
)
limiter.init_app(app)
# Session timeout configuration
app.permanent_session_lifetime = timedelta(minutes=30) # 30 minute timeout
# ===== Constants =====
ADMIN_CRED_FILE = 'application_data/admin_creds.json'
ADMIN_KEY_FILE = 'application_data/admin_key.key'
@@ -46,7 +64,12 @@ SETTINGS_FILE = 'application_data/settings.json'
DEFAULT_SETTINGS = {
"upload_folder": "pacshare",
"max_file_age_days": 14,
"max_file_size_bytes": 25 * 1024 * 1024 * 1024 # 25GB
"max_file_size_bytes": 25 * 1024 * 1024 * 1024, # 25GB
"admin_ip_whitelist": [], # Empty list means all IPs allowed
"virus_scanning_enabled": True,
"session_timeout_minutes": 30,
"rate_limit_per_minute": 60,
"rate_limit_per_hour": 1000
}
# ===== Available Encryption Algorithms =====
@@ -228,6 +251,89 @@ def log_admin_event(message: str):
except Exception as e:
print("[ERROR] Failed to write admin log:", e)
# ===== Security Functions =====
def check_ip_whitelist(ip_address):
"""Check if IP address is in admin whitelist."""
whitelist = settings.get("admin_ip_whitelist", [])
if not whitelist: # Empty list means all IPs allowed
return True
try:
client_ip = ipaddress.ip_address(ip_address)
for allowed_ip in whitelist:
if '/' in allowed_ip: # CIDR notation
if client_ip in ipaddress.ip_network(allowed_ip, strict=False):
return True
else: # Single IP
if client_ip == ipaddress.ip_address(allowed_ip):
return True
return False
except ValueError:
return False
def admin_required(f):
"""Decorator to require admin authentication and IP whitelist check."""
@wraps(f)
def decorated_function(*args, **kwargs):
# Check session timeout
if 'admin_logged_in' not in session:
return redirect(url_for('admin_login'))
# Check if session has expired
if 'login_time' in session:
login_time = datetime.fromisoformat(session['login_time'])
if datetime.now() - login_time > timedelta(minutes=settings.get("session_timeout_minutes", 30)):
session.clear()
log_admin_event(f"Session expired for IP {request.remote_addr}")
return redirect(url_for('admin_login'))
# Check IP whitelist
if not check_ip_whitelist(request.remote_addr):
log_admin_event(f"Unauthorized IP access attempt: {request.remote_addr}")
return jsonify({"error": "Access denied: IP not whitelisted"}), 403
return f(*args, **kwargs)
return decorated_function
def scan_file_for_viruses(file_path):
"""Scan file for viruses using ClamAV."""
if not settings.get("virus_scanning_enabled", True):
return True, "Virus scanning disabled"
try:
cd = clamd.ClamdUnixSocket()
# Test connection
cd.ping()
# Scan file
result = cd.scan(file_path)
if result is None:
return True, "File is clean"
# If infected
for file, status in result.items():
if status[0] == 'FOUND':
return False, f"Virus detected: {status[1]}"
return True, "File is clean"
except clamd.ConnectionError:
# ClamAV daemon not running
log_admin_event("ClamAV daemon not available - virus scanning skipped")
return True, "ClamAV not available - scan skipped"
except Exception as e:
log_admin_event(f"Virus scan error: {str(e)}")
return True, f"Scan error: {str(e)}"
def check_session_timeout():
"""Check if admin session has timed out."""
if 'admin_logged_in' in session and 'login_time' in session:
login_time = datetime.fromisoformat(session['login_time'])
if datetime.now() - login_time > timedelta(minutes=settings.get("session_timeout_minutes", 30)):
session.clear()
return True
return False
# ===== File Management =====
def cleanup_expired_files():
"""Remove files older than MAX_FILE_AGE_DAYS."""
@@ -284,6 +390,15 @@ def handle_file_upload(request):
temp_path = os.path.join(UPLOAD_FOLDER, filename)
file.save(temp_path)
# Virus scan the uploaded file
is_clean, scan_message = scan_file_for_viruses(temp_path)
if not is_clean:
os.remove(temp_path) # Remove infected file
log_admin_event(f"Virus detected in upload: {filename} - {scan_message}")
return jsonify({"error": f"File rejected: {scan_message}"}), 400
log_admin_event(f"File uploaded and scanned: {filename} - {scan_message}")
try:
# Use the selected algorithm for encryption
module = algo_config["module"]
@@ -477,6 +592,7 @@ def handle_file_pickup(request, meta_path, enc_path, file_id):
# ===== 2FA QR Code Routes =====
@app.route("/admin-qr")
@admin_required
def admin_qr_code():
"""Generate QR code for admin 2FA setup."""
if not session.get("admin_logged_in"):
@@ -554,6 +670,7 @@ def generate_qr_code(file_id):
# ===== Admin Routes =====
@app.route("/admin-logs")
@admin_required
def admin_logs():
"""View admin activity logs."""
if not session.get("admin_logged_in"):
@@ -579,11 +696,9 @@ def admin_logs():
return jsonify(logs=logs)
@app.route("/admin-settings", methods=["GET", "POST"])
@admin_required
def admin_settings():
"""Manage application settings."""
if not session.get("admin_logged_in"):
return redirect(url_for("admin_login"))
current_settings = load_settings()
if request.method == 'POST':
@@ -597,10 +712,23 @@ def handle_settings_update(request, current_settings):
max_file_size_gb = float(request.form.get('max_file_size_gb', current_settings.get('max_file_size_bytes', 25 * 1024 * 1024 * 1024) / (1024 * 1024 * 1024)))
max_file_size_bytes = int(max_file_size_gb * 1024 * 1024 * 1024)
# Security settings
session_timeout_minutes = int(request.form.get('session_timeout_minutes', current_settings.get('session_timeout_minutes', 30)))
virus_scanning_enabled = request.form.get('virus_scanning_enabled') == 'on'
# IP whitelist (one per line)
ip_whitelist_text = request.form.get('admin_ip_whitelist', '')
admin_ip_whitelist = [ip.strip() for ip in ip_whitelist_text.split('\n') if ip.strip()]
updated_settings = {
"upload_folder": upload_folder,
"max_file_age_days": max_file_age_days,
"max_file_size_bytes": max_file_size_bytes
"max_file_size_bytes": max_file_size_bytes,
"admin_ip_whitelist": admin_ip_whitelist,
"virus_scanning_enabled": virus_scanning_enabled,
"session_timeout_minutes": session_timeout_minutes,
"rate_limit_per_minute": current_settings.get("rate_limit_per_minute", 60),
"rate_limit_per_hour": current_settings.get("rate_limit_per_hour", 1000)
}
with open(SETTINGS_FILE, 'w') as f:
@@ -645,9 +773,17 @@ def admin_login():
p = request.form.get("password")
totp_code = request.form.get("totp_code")
# Check IP whitelist first
if not check_ip_whitelist(request.remote_addr):
log_admin_event(f"Login attempt from unauthorized IP: {request.remote_addr}")
flash("Access denied: IP not authorized")
return render_template("admin_login.html", requires_2fa=get_admin_2fa_status())
if check_creds(u, p, totp_code):
session["admin_logged_in"] = True
log_admin_event("Admin login successful.")
session["login_time"] = datetime.now().isoformat()
session.permanent = True # Enable session timeout
log_admin_event(f"Admin login successful from IP {request.remote_addr}")
return redirect(url_for("admin_page"))
else:
log_admin_event("Admin login failed.")
@@ -664,13 +800,9 @@ def admin_logout():
return redirect(url_for("index"))
@app.route("/adminpage")
@admin_required
def admin_page():
"""Admin dashboard."""
if not session.get("admin_logged_in"):
if not os.path.exists(ADMIN_CRED_FILE):
return redirect(url_for("admin_setup"))
return redirect(url_for("admin_login"))
cleanup_expired_files()
routes = [rule.rule for rule in app.url_map.iter_rules() if rule.endpoint != 'static']
@@ -748,6 +880,7 @@ exec "$2" "$3"
return jsonify({"error": f"Failed to restart server: {str(e)}"}), 500
@app.route("/admin-reset", methods=["POST"])
@admin_required
def admin_reset():
"""Reset admin credentials."""
if not session.get("admin_logged_in"):
@@ -765,6 +898,7 @@ def admin_reset():
return redirect(url_for("admin_setup"))
@app.route("/admin-change-password", methods=["POST"])
@admin_required
def admin_change_password():
"""Change admin password."""
if not session.get("admin_logged_in"):
@@ -800,6 +934,7 @@ def admin_change_password():
return redirect(url_for("admin_page"))
@app.route("/admin-enable-2fa", methods=["POST"])
@admin_required
def admin_enable_2fa():
"""Enable 2FA for admin account."""
if not session.get("admin_logged_in"):
@@ -831,6 +966,7 @@ def admin_enable_2fa():
return redirect(url_for("admin_page"))
@app.route("/admin-disable-2fa", methods=["POST"])
@admin_required
def admin_disable_2fa():
"""Disable 2FA for admin account."""
if not session.get("admin_logged_in"):
@@ -874,6 +1010,7 @@ def admin_disable_2fa():
return redirect(url_for("admin_page"))
@app.route("/admin-clear-uploads", methods=["POST"])
@admin_required
def admin_clear_uploads():
"""Clear all uploaded files."""
if not session.get("admin_logged_in"):
@@ -892,6 +1029,7 @@ def admin_clear_uploads():
return redirect(url_for("admin_page"))
@app.route("/admin-update-server", methods=["POST"])
@admin_required
def admin_update_server():
"""Update server from GitHub repository."""
if not session.get("admin_logged_in"):
@@ -974,6 +1112,7 @@ def admin_update_server():
return jsonify({"error": error_msg}), 500
@app.route("/admin-switch-dev-mode", methods=["POST"])
@admin_required
def admin_switch_dev_mode():
"""Switch server to development mode."""
if not session.get("admin_logged_in"):
@@ -995,6 +1134,7 @@ def admin_switch_dev_mode():
return jsonify({"error": error_msg}), 500
@app.route("/admin-switch-prod-mode", methods=["POST"])
@admin_required
def admin_switch_prod_mode():
"""Switch server to production mode."""
if not session.get("admin_logged_in"):
@@ -1048,6 +1188,7 @@ def robots_txt():
# ===== API Endpoints =====
@app.route("/api/algorithms", methods=["GET"])
@limiter.limit("100 per minute")
def api_algorithms():
"""Get list of available encryption algorithms."""
algorithms = {}
@@ -1062,6 +1203,7 @@ def api_algorithms():
return jsonify(algorithms=algorithms)
@app.route("/api/generate-keypair", methods=["POST"])
@limiter.limit("10 per minute")
def api_generate_keypair():
"""Generate RSA key pair for hybrid algorithms."""
try:
@@ -1085,6 +1227,7 @@ def api_generate_keypair():
return jsonify({"error": str(e)}), 500
@app.route("/api/encrypt", methods=["POST"])
@limiter.limit("30 per minute")
def api_encrypt():
try:
# Text encryption
@@ -1164,6 +1307,7 @@ def api_encrypt():
return jsonify({"error": str(e)}), 500
@app.route("/api/decrypt", methods=["POST"])
@limiter.limit("30 per minute")
def api_decrypt():
try:
# Text decryption
@@ -1260,6 +1404,7 @@ def api_decrypt():
return jsonify({"error": str(e)}), 500
@app.route("/api/pacshare", methods=["POST"])
@limiter.limit("10 per minute")
def api_pacshare():
try:
enc_password = request.form.get("enc_password")