V .4.1
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
# ===== Standard Library Imports =====
|
||||
import os
|
||||
import io
|
||||
import json
|
||||
@@ -5,11 +6,13 @@ import html
|
||||
import base64
|
||||
import hashlib
|
||||
import secrets
|
||||
import shutil
|
||||
import datetime
|
||||
import subprocess
|
||||
import platform
|
||||
from datetime import UTC
|
||||
import sys
|
||||
|
||||
# ===== Third-Party Imports =====
|
||||
from flask import (
|
||||
Flask, render_template, request, jsonify, session,
|
||||
redirect, url_for, flash, send_file
|
||||
@@ -20,9 +23,11 @@ from cryptography.hazmat.primitives.hashes import SHA256
|
||||
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
||||
from cryptography.fernet import Fernet
|
||||
|
||||
# ===== Application Configuration =====
|
||||
app = Flask(__name__)
|
||||
app.secret_key = os.getenv("FLASK_SECRET", os.urandom(24))
|
||||
|
||||
# ===== Constants =====
|
||||
ADMIN_CRED_FILE = 'admin_creds.json'
|
||||
ADMIN_KEY_FILE = 'admin_key.key'
|
||||
ADMIN_LOG_FILE = 'admin_logs.enc'
|
||||
@@ -35,8 +40,9 @@ DEFAULT_SETTINGS = {
|
||||
"max_file_size_bytes": 25 * 1024 * 1024 * 1024 # 25GB
|
||||
}
|
||||
|
||||
# === Settings ===
|
||||
# ===== Settings Management =====
|
||||
def load_settings():
|
||||
"""Load application settings from file or create with defaults."""
|
||||
if not os.path.exists(SETTINGS_FILE):
|
||||
with open(SETTINGS_FILE, 'w') as f:
|
||||
json.dump(DEFAULT_SETTINGS, f)
|
||||
@@ -51,20 +57,25 @@ MAX_FILE_SIZE_BYTES = settings["max_file_size_bytes"]
|
||||
if not os.path.exists(UPLOAD_FOLDER):
|
||||
os.makedirs(UPLOAD_FOLDER)
|
||||
|
||||
# === Crypto ===
|
||||
# ===== Cryptographic Functions =====
|
||||
def derive_key(password: str, salt: bytes) -> bytes:
|
||||
"""Derive a cryptographic key from password using PBKDF2."""
|
||||
return PBKDF2HMAC(algorithm=SHA256(), length=32, salt=salt, iterations=200_000).derive(password.encode())
|
||||
|
||||
def hash_password(password: str, salt: bytes) -> str:
|
||||
"""Hash a password with salt for secure storage."""
|
||||
return base64.urlsafe_b64encode(derive_key(password, salt)).decode()
|
||||
|
||||
def simple_encode(text: str) -> str:
|
||||
"""Basic Caesar cipher encryption."""
|
||||
return ''.join(ALPHABET[(ALPHABET.index(c) + 3) % 26] if c in ALPHABET else c for c in text.lower())
|
||||
|
||||
def simple_decode(text: str) -> str:
|
||||
"""Basic Caesar cipher decryption."""
|
||||
return ''.join(ALPHABET[(ALPHABET.index(c) - 3) % 26] if c in ALPHABET else c for c in text.lower())
|
||||
|
||||
def advanced_encrypt(plaintext: str, password: str) -> str:
|
||||
"""Encrypt text using AES-GCM with password-derived key."""
|
||||
salt = os.urandom(16)
|
||||
key = derive_key(password, salt)
|
||||
nonce = os.urandom(12)
|
||||
@@ -72,6 +83,7 @@ def advanced_encrypt(plaintext: str, password: str) -> str:
|
||||
return base64.urlsafe_b64encode(salt + nonce + ct).decode()
|
||||
|
||||
def advanced_decrypt(token_b64: str, password: str) -> str:
|
||||
"""Decrypt text using AES-GCM with password-derived key."""
|
||||
try:
|
||||
data = base64.urlsafe_b64decode(token_b64.encode())
|
||||
salt, nonce, ct = data[:16], data[16:28], data[28:]
|
||||
@@ -80,8 +92,9 @@ def advanced_decrypt(token_b64: str, password: str) -> str:
|
||||
except Exception:
|
||||
return "[Error] Invalid password or corrupted data!"
|
||||
|
||||
# === Admin Auth ===
|
||||
# ===== Admin Authentication =====
|
||||
def load_admin_key():
|
||||
"""Load or generate admin encryption key."""
|
||||
if not os.path.exists(ADMIN_KEY_FILE):
|
||||
with open(ADMIN_KEY_FILE, 'wb') as f:
|
||||
f.write(Fernet.generate_key())
|
||||
@@ -89,6 +102,7 @@ def load_admin_key():
|
||||
return f.read()
|
||||
|
||||
def encrypt_creds(username, password):
|
||||
"""Encrypt and store admin credentials."""
|
||||
key = load_admin_key()
|
||||
cipher = Fernet(key)
|
||||
salt = os.urandom(16)
|
||||
@@ -98,6 +112,7 @@ def encrypt_creds(username, password):
|
||||
f.write(cipher.encrypt(data))
|
||||
|
||||
def check_creds(username, password):
|
||||
"""Verify admin credentials."""
|
||||
try:
|
||||
key = load_admin_key()
|
||||
cipher = Fernet(key)
|
||||
@@ -111,82 +126,104 @@ def check_creds(username, password):
|
||||
return False
|
||||
|
||||
def log_admin_event(message: str):
|
||||
"""Log admin actions securely."""
|
||||
try:
|
||||
key = load_admin_key()
|
||||
cipher = Fernet(key)
|
||||
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
timestamp = datetime.datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S")
|
||||
encrypted = cipher.encrypt(f"[{timestamp}] {message}".encode())
|
||||
with open(ADMIN_LOG_FILE, 'ab') as f:
|
||||
f.write(encrypted + b"\n")
|
||||
except Exception as e:
|
||||
print("[ERROR] Failed to write admin log:", e)
|
||||
|
||||
# === Text Encryption Route ===
|
||||
# ===== File Management =====
|
||||
def cleanup_expired_files():
|
||||
"""Remove files older than MAX_FILE_AGE_DAYS."""
|
||||
now = datetime.datetime.now(UTC)
|
||||
for fname in os.listdir(UPLOAD_FOLDER):
|
||||
if fname.endswith(".enc") or fname.endswith(".json"):
|
||||
path = os.path.join(UPLOAD_FOLDER, fname)
|
||||
try:
|
||||
file_time = datetime.datetime.fromtimestamp(os.path.getmtime(path), UTC)
|
||||
age = (now - file_time).days
|
||||
if age > MAX_FILE_AGE_DAYS:
|
||||
os.remove(path)
|
||||
print(f"[INFO] Deleted expired file: {fname}")
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Could not check/delete file {fname}: {e}")
|
||||
|
||||
# ===== Route Handlers =====
|
||||
@app.route("/", methods=["GET", "POST"])
|
||||
def index():
|
||||
"""Main application route handling encryption/decryption and file uploads."""
|
||||
if request.method == 'POST':
|
||||
if 'file' in request.files: # <-- Handling file upload
|
||||
file = request.files['file']
|
||||
enc_password = request.form.get('enc_password')
|
||||
pickup_password = request.form.get('pickup_password')
|
||||
|
||||
if not file or not enc_password or not pickup_password:
|
||||
flash('Missing fields')
|
||||
return redirect(url_for('index'))
|
||||
|
||||
if file.content_length and file.content_length > MAX_FILE_SIZE_BYTES:
|
||||
flash(f"File too large! Limit: {MAX_FILE_SIZE_BYTES / (1024**3):.2f} GB")
|
||||
return redirect(url_for('index'))
|
||||
|
||||
filename = secure_filename(file.filename)
|
||||
temp_path = os.path.join(UPLOAD_FOLDER, filename)
|
||||
file.save(temp_path)
|
||||
|
||||
with open(temp_path, 'rb') as f:
|
||||
data = f.read()
|
||||
|
||||
salt = os.urandom(16)
|
||||
key = derive_key(enc_password, salt)
|
||||
nonce = os.urandom(12)
|
||||
ct = AESGCM(key).encrypt(nonce, data, None)
|
||||
|
||||
random_id = secrets.token_urlsafe(24)
|
||||
|
||||
with open(os.path.join(UPLOAD_FOLDER, f"{random_id}.enc"), 'wb') as f:
|
||||
f.write(salt + nonce + ct)
|
||||
os.remove(temp_path)
|
||||
|
||||
meta = {
|
||||
'pickup_password': base64.urlsafe_b64encode(hashlib.sha256(pickup_password.encode()).digest()).decode(),
|
||||
'original_name': filename,
|
||||
'timestamp': datetime.datetime.utcnow().isoformat()
|
||||
}
|
||||
with open(os.path.join(UPLOAD_FOLDER, f"{random_id}.json"), 'w') as f:
|
||||
json.dump(meta, f)
|
||||
|
||||
pickup_url = request.host_url.rstrip('/') + url_for('pickup_file', file_id=random_id)
|
||||
flash(pickup_url)
|
||||
return redirect(url_for('index'))
|
||||
|
||||
else: # <-- Handling encryption/decryption
|
||||
data = request.get_json()
|
||||
encryption_type = data.get("encryption-type", "basic")
|
||||
operation = data.get("operation", "")
|
||||
message = data.get("message", "")
|
||||
password = data.get("password", "")
|
||||
|
||||
if encryption_type == "basic":
|
||||
result = simple_encode(message) if operation == "encrypt" else simple_decode(message)
|
||||
else:
|
||||
result = advanced_encrypt(message, password) if operation == "encrypt" else advanced_decrypt(message, password)
|
||||
|
||||
return jsonify(result=html.escape(result))
|
||||
|
||||
if 'file' in request.files:
|
||||
return handle_file_upload(request)
|
||||
else:
|
||||
return handle_text_operation(request)
|
||||
return render_template("index.html", result="", password="", encryption_type="advanced", settings=settings)
|
||||
|
||||
# === File Pickup Route ===
|
||||
def handle_file_upload(request):
|
||||
"""Process file upload and encryption."""
|
||||
file = request.files['file']
|
||||
enc_password = request.form.get('enc_password')
|
||||
pickup_password = request.form.get('pickup_password')
|
||||
|
||||
if not file or not enc_password or not pickup_password:
|
||||
return jsonify({"error": "Missing fields"}), 400
|
||||
|
||||
if file.content_length and file.content_length > MAX_FILE_SIZE_BYTES:
|
||||
return jsonify({"error": f"File too large! Limit: {MAX_FILE_SIZE_BYTES / (1024**3):.2f} GB"}), 400
|
||||
|
||||
filename = secure_filename(file.filename)
|
||||
temp_path = os.path.join(UPLOAD_FOLDER, filename)
|
||||
file.save(temp_path)
|
||||
|
||||
with open(temp_path, 'rb') as f:
|
||||
data = f.read()
|
||||
|
||||
salt = os.urandom(16)
|
||||
key = derive_key(enc_password, salt)
|
||||
nonce = os.urandom(12)
|
||||
ct = AESGCM(key).encrypt(nonce, data, None)
|
||||
|
||||
random_id = secrets.token_urlsafe(24)
|
||||
|
||||
with open(os.path.join(UPLOAD_FOLDER, f"{random_id}.enc"), 'wb') as f:
|
||||
f.write(salt + nonce + ct)
|
||||
os.remove(temp_path)
|
||||
|
||||
meta = {
|
||||
'pickup_password': base64.urlsafe_b64encode(hashlib.sha256(pickup_password.encode()).digest()).decode(),
|
||||
'original_name': filename,
|
||||
'timestamp': datetime.datetime.now(UTC).isoformat()
|
||||
}
|
||||
with open(os.path.join(UPLOAD_FOLDER, f"{random_id}.json"), 'w') as f:
|
||||
json.dump(meta, f)
|
||||
|
||||
pickup_url = request.host_url.rstrip('/') + url_for('pickup_file', file_id=random_id)
|
||||
return jsonify({"success": True, "pickup_url": pickup_url})
|
||||
|
||||
def handle_text_operation(request):
|
||||
"""Process text encryption/decryption operations."""
|
||||
data = request.get_json()
|
||||
encryption_type = data.get("encryption-type", "basic")
|
||||
operation = data.get("operation", "")
|
||||
message = data.get("message", "")
|
||||
password = data.get("password", "")
|
||||
|
||||
if encryption_type == "basic":
|
||||
result = simple_encode(message) if operation == "encrypt" else simple_decode(message)
|
||||
else:
|
||||
result = advanced_encrypt(message, password) if operation == "encrypt" else advanced_decrypt(message, password)
|
||||
|
||||
return jsonify(result=html.escape(result))
|
||||
|
||||
# ===== File Pickup Route =====
|
||||
@app.route("/pickup/<file_id>", methods=["GET", "POST"])
|
||||
def pickup_file(file_id):
|
||||
"""Handle file pickup and decryption."""
|
||||
meta_path = os.path.join(UPLOAD_FOLDER, f"{file_id}.json")
|
||||
enc_path = os.path.join(UPLOAD_FOLDER, f"{file_id}.enc")
|
||||
|
||||
@@ -195,59 +232,61 @@ def pickup_file(file_id):
|
||||
return redirect(url_for('index'))
|
||||
|
||||
if request.method == 'POST':
|
||||
pickup_password = request.form.get('pickup_password')
|
||||
enc_password = request.form.get('enc_password')
|
||||
|
||||
if not pickup_password or not enc_password:
|
||||
flash("Missing fields")
|
||||
return redirect(request.url)
|
||||
|
||||
with open(meta_path, 'r') as f:
|
||||
meta = json.load(f)
|
||||
|
||||
expected_hash = base64.urlsafe_b64encode(hashlib.sha256(pickup_password.encode()).digest()).decode()
|
||||
if expected_hash != meta['pickup_password']:
|
||||
flash("Incorrect pickup password")
|
||||
return redirect(request.url)
|
||||
|
||||
with open(enc_path, 'rb') as f:
|
||||
enc_data = f.read()
|
||||
salt, nonce, ct = enc_data[:16], enc_data[16:28], enc_data[28:]
|
||||
key = derive_key(enc_password, salt)
|
||||
|
||||
try:
|
||||
decrypted = AESGCM(key).decrypt(nonce, ct, None)
|
||||
except Exception:
|
||||
flash("Decryption failed")
|
||||
return redirect(request.url)
|
||||
|
||||
os.remove(meta_path)
|
||||
os.remove(enc_path)
|
||||
log_admin_event(f"File {file_id} downloaded and deleted.")
|
||||
|
||||
return send_file(io.BytesIO(decrypted), as_attachment=True, download_name=meta['original_name'])
|
||||
|
||||
return handle_file_pickup(request, meta_path, enc_path, file_id)
|
||||
return render_template("pickup.html", file_id=file_id)
|
||||
|
||||
def cleanup_expired_files():
|
||||
now = datetime.datetime.utcnow()
|
||||
def handle_file_pickup(request, meta_path, enc_path, file_id):
|
||||
"""Process file pickup and decryption."""
|
||||
pickup_password = request.form.get('pickup_password')
|
||||
enc_password = request.form.get('enc_password')
|
||||
|
||||
for fname in os.listdir(UPLOAD_FOLDER):
|
||||
if fname.endswith(".enc") or fname.endswith(".json"):
|
||||
path = os.path.join(UPLOAD_FOLDER, fname)
|
||||
try:
|
||||
file_time = datetime.datetime.utcfromtimestamp(os.path.getmtime(path))
|
||||
age = (now - file_time).days
|
||||
if age > MAX_FILE_AGE_DAYS:
|
||||
os.remove(path)
|
||||
print(f"[INFO] Deleted expired file: {fname}")
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Could not check/delete file {fname}: {e}")
|
||||
if not pickup_password or not enc_password:
|
||||
flash("Missing fields")
|
||||
return redirect(request.url)
|
||||
|
||||
with open(meta_path, 'r') as f:
|
||||
meta = json.load(f)
|
||||
|
||||
# === Admin Log Viewer ===
|
||||
expected_hash = base64.urlsafe_b64encode(hashlib.sha256(pickup_password.encode()).digest()).decode()
|
||||
if expected_hash != meta['pickup_password']:
|
||||
flash("Incorrect pickup password")
|
||||
return redirect(request.url)
|
||||
|
||||
with open(enc_path, 'rb') as f:
|
||||
enc_data = f.read()
|
||||
salt, nonce, ct = enc_data[:16], enc_data[16:28], enc_data[28:]
|
||||
key = derive_key(enc_password, salt)
|
||||
|
||||
try:
|
||||
decrypted = AESGCM(key).decrypt(nonce, ct, None)
|
||||
except Exception:
|
||||
flash("Decryption failed")
|
||||
return redirect(request.url)
|
||||
|
||||
os.remove(meta_path)
|
||||
os.remove(enc_path)
|
||||
log_admin_event(f"File {file_id} downloaded and deleted.")
|
||||
|
||||
response = send_file(
|
||||
io.BytesIO(decrypted),
|
||||
as_attachment=True,
|
||||
download_name=meta['original_name'],
|
||||
mimetype='application/octet-stream'
|
||||
)
|
||||
|
||||
# Add headers for better mobile compatibility
|
||||
response.headers['Content-Disposition'] = f'attachment; filename="{meta["original_name"]}"'
|
||||
response.headers['Content-Type'] = 'application/octet-stream'
|
||||
response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'
|
||||
response.headers['Pragma'] = 'no-cache'
|
||||
response.headers['Expires'] = '0'
|
||||
|
||||
return response
|
||||
|
||||
# ===== Admin Routes =====
|
||||
@app.route("/admin-logs")
|
||||
def admin_logs():
|
||||
"""View admin activity logs."""
|
||||
if not session.get("admin_logged_in"):
|
||||
return redirect(url_for("admin_login"))
|
||||
|
||||
@@ -270,47 +309,50 @@ def admin_logs():
|
||||
|
||||
return jsonify(logs=logs)
|
||||
|
||||
# === Admin Settings Editor ===
|
||||
@app.route("/admin-settings", methods=["GET", "POST"])
|
||||
def admin_settings():
|
||||
"""Manage application settings."""
|
||||
if not session.get("admin_logged_in"):
|
||||
return redirect(url_for("admin_login"))
|
||||
|
||||
current_settings = load_settings()
|
||||
|
||||
if request.method == 'POST':
|
||||
upload_folder = request.form.get('upload_folder', current_settings.get('upload_folder', 'uploads'))
|
||||
max_file_age_days = int(request.form.get('max_file_age_days', current_settings.get('max_file_age_days', 14)))
|
||||
max_file_size_gb = float(request.form.get('max_file_size_gb', current_settings.get('max_file_size_bytes', 25 * 1024 * 1024 * 1024) / (1024 * 1024 * 1024)))
|
||||
max_file_size_bytes = int(max_file_size_gb * 1024 * 1024 * 1024)
|
||||
|
||||
updated_settings = {
|
||||
"upload_folder": upload_folder,
|
||||
"max_file_age_days": max_file_age_days,
|
||||
"max_file_size_bytes": max_file_size_bytes
|
||||
}
|
||||
|
||||
with open(SETTINGS_FILE, 'w') as f:
|
||||
json.dump(updated_settings, f)
|
||||
|
||||
flash("Settings updated successfully!")
|
||||
|
||||
global settings, UPLOAD_FOLDER, MAX_FILE_AGE_DAYS, MAX_FILE_SIZE_BYTES
|
||||
settings = load_settings()
|
||||
UPLOAD_FOLDER = settings.get('upload_folder', 'uploads')
|
||||
MAX_FILE_AGE_DAYS = settings.get('max_file_age_days', 14)
|
||||
MAX_FILE_SIZE_BYTES = settings.get('max_file_size_bytes', 25 * 1024 * 1024 * 1024)
|
||||
|
||||
if not os.path.exists(UPLOAD_FOLDER):
|
||||
os.makedirs(UPLOAD_FOLDER)
|
||||
|
||||
return redirect(url_for("admin_settings"))
|
||||
|
||||
return handle_settings_update(request, current_settings)
|
||||
return render_template("admin_settings.html", settings=current_settings)
|
||||
|
||||
# === Admin Setup ===
|
||||
def handle_settings_update(request, current_settings):
|
||||
"""Process settings update request."""
|
||||
upload_folder = request.form.get('upload_folder', current_settings.get('upload_folder', 'uploads'))
|
||||
max_file_age_days = int(request.form.get('max_file_age_days', current_settings.get('max_file_age_days', 14)))
|
||||
max_file_size_gb = float(request.form.get('max_file_size_gb', current_settings.get('max_file_size_bytes', 25 * 1024 * 1024 * 1024) / (1024 * 1024 * 1024)))
|
||||
max_file_size_bytes = int(max_file_size_gb * 1024 * 1024 * 1024)
|
||||
|
||||
updated_settings = {
|
||||
"upload_folder": upload_folder,
|
||||
"max_file_age_days": max_file_age_days,
|
||||
"max_file_size_bytes": max_file_size_bytes
|
||||
}
|
||||
|
||||
with open(SETTINGS_FILE, 'w') as f:
|
||||
json.dump(updated_settings, f)
|
||||
|
||||
flash("Settings updated successfully!")
|
||||
|
||||
global settings, UPLOAD_FOLDER, MAX_FILE_AGE_DAYS, MAX_FILE_SIZE_BYTES
|
||||
settings = load_settings()
|
||||
UPLOAD_FOLDER = settings.get('upload_folder', 'uploads')
|
||||
MAX_FILE_AGE_DAYS = settings.get('max_file_age_days', 14)
|
||||
MAX_FILE_SIZE_BYTES = settings.get('max_file_size_bytes', 25 * 1024 * 1024 * 1024)
|
||||
|
||||
if not os.path.exists(UPLOAD_FOLDER):
|
||||
os.makedirs(UPLOAD_FOLDER)
|
||||
|
||||
return redirect(url_for("admin_settings"))
|
||||
|
||||
@app.route("/admin-setup", methods=["GET", "POST"])
|
||||
def admin_setup():
|
||||
"""Initial admin account setup."""
|
||||
if os.path.exists(ADMIN_CRED_FILE):
|
||||
return redirect(url_for("admin_login"))
|
||||
if request.method == "POST":
|
||||
@@ -323,9 +365,9 @@ def admin_setup():
|
||||
flash("Both fields required")
|
||||
return render_template("admin_setup.html")
|
||||
|
||||
# === Admin Login ===
|
||||
@app.route("/admin-login", methods=["GET", "POST"])
|
||||
def admin_login():
|
||||
"""Admin login handler."""
|
||||
if request.method == "POST":
|
||||
u = request.form.get("username")
|
||||
p = request.form.get("password")
|
||||
@@ -338,15 +380,15 @@ def admin_login():
|
||||
flash("Incorrect credentials")
|
||||
return render_template("admin_login.html")
|
||||
|
||||
# === Admin Logout ===
|
||||
@app.route("/admin-logout")
|
||||
def admin_logout():
|
||||
"""Admin logout handler."""
|
||||
session.pop("admin_logged_in", None)
|
||||
return redirect(url_for("index"))
|
||||
|
||||
# === Admin Page ===
|
||||
@app.route("/adminpage")
|
||||
def admin_page():
|
||||
"""Admin dashboard."""
|
||||
if not session.get("admin_logged_in"):
|
||||
if not os.path.exists(ADMIN_CRED_FILE):
|
||||
return redirect(url_for("admin_setup"))
|
||||
@@ -355,32 +397,88 @@ def admin_page():
|
||||
cleanup_expired_files()
|
||||
routes = [rule.rule for rule in app.url_map.iter_rules() if rule.endpoint != 'static']
|
||||
|
||||
try:
|
||||
uptime = subprocess.check_output("uptime -p", shell=True).decode().strip()
|
||||
except Exception:
|
||||
uptime = "Unavailable"
|
||||
# Get uptime based on OS
|
||||
if platform.system() == "Windows":
|
||||
try:
|
||||
# Windows uptime using PowerShell
|
||||
ps_command = "(Get-CimInstance -ClassName Win32_OperatingSystem).LastBootUpTime"
|
||||
uptime_output = subprocess.check_output(["powershell", "-Command", ps_command], shell=True).decode()
|
||||
# Convert the PowerShell DateTime to Python datetime
|
||||
boot_time = datetime.datetime.strptime(uptime_output.strip(), "%A, %B %d, %Y %I:%M:%S %p")
|
||||
# Make boot_time timezone-aware (assuming local time)
|
||||
boot_time = boot_time.replace(tzinfo=datetime.timezone.utc)
|
||||
current_time = datetime.datetime.now(UTC)
|
||||
uptime = current_time - boot_time
|
||||
uptime_str = f"{uptime.days} days, {uptime.seconds // 3600} hours, {(uptime.seconds % 3600) // 60} minutes"
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Failed to get Windows uptime: {str(e)}")
|
||||
uptime_str = "Unavailable"
|
||||
else:
|
||||
try:
|
||||
# Linux uptime using uptime command
|
||||
uptime_str = subprocess.check_output("uptime -p", shell=True).decode().strip()
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Failed to get Linux uptime: {str(e)}")
|
||||
uptime_str = "Unavailable"
|
||||
|
||||
server_info = {
|
||||
"uptime": uptime,
|
||||
"time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||||
"uptime": uptime_str,
|
||||
"time": datetime.datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S"),
|
||||
"python": platform.python_version(),
|
||||
"debug": app.debug
|
||||
}
|
||||
|
||||
return render_template("admin.html", routes=routes, server_info=server_info)
|
||||
|
||||
# === Restart Server ===
|
||||
@app.route("/restart-server")
|
||||
@app.route("/restart-server", methods=["POST"])
|
||||
def restart_server():
|
||||
"""Restart the server."""
|
||||
if not session.get("admin_logged_in"):
|
||||
return redirect(url_for("admin_login"))
|
||||
subprocess.Popen(["sudo", "systemctl", "restart", "paccrypt.service"])
|
||||
flash("Restart triggered")
|
||||
return redirect(url_for("admin_page"))
|
||||
return jsonify({"error": "Unauthorized"}), 401
|
||||
|
||||
try:
|
||||
if platform.system() == "Windows":
|
||||
# Get the current process ID
|
||||
current_pid = os.getpid()
|
||||
# Create a batch file to restart the server
|
||||
restart_script = f"""
|
||||
@echo off
|
||||
timeout /t 2 /nobreak
|
||||
start "" "python" "app.py"
|
||||
"""
|
||||
with open("restart.bat", "w") as f:
|
||||
f.write(restart_script)
|
||||
|
||||
# Start the restart script and exit
|
||||
subprocess.Popen(["restart.bat"], shell=True)
|
||||
return jsonify({"message": "Server restart initiated"}), 200
|
||||
else:
|
||||
# For Linux/Unix systems, use a Python-based restart
|
||||
# Get the current Python interpreter and script path
|
||||
python_path = sys.executable
|
||||
script_path = os.path.abspath(__file__)
|
||||
|
||||
# Create a shell script to restart the server
|
||||
restart_script = f"""#!/bin/bash
|
||||
sleep 2
|
||||
{python_path} {script_path}
|
||||
"""
|
||||
|
||||
# Write and make the script executable
|
||||
with open("restart.sh", "w") as f:
|
||||
f.write(restart_script)
|
||||
os.chmod("restart.sh", 0o755)
|
||||
|
||||
# Start the restart script and exit
|
||||
subprocess.Popen(["./restart.sh"], shell=True)
|
||||
return jsonify({"message": "Server restart initiated"}), 200
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Failed to restart server: {str(e)}")
|
||||
return jsonify({"error": f"Failed to restart server: {str(e)}"}), 500
|
||||
|
||||
# === Reset Admin Credentials ===
|
||||
@app.route("/admin-reset", methods=["POST"])
|
||||
def admin_reset():
|
||||
"""Reset admin credentials."""
|
||||
if not session.get("admin_logged_in"):
|
||||
return redirect(url_for("admin_login"))
|
||||
try:
|
||||
@@ -395,9 +493,9 @@ def admin_reset():
|
||||
print("[ERROR] admin_reset failed:", e)
|
||||
return redirect(url_for("admin_setup"))
|
||||
|
||||
# === Change Admin Password ===
|
||||
@app.route("/admin-change-password", methods=["POST"])
|
||||
def admin_change_password():
|
||||
"""Change admin password."""
|
||||
if not session.get("admin_logged_in"):
|
||||
return redirect(url_for("admin_login"))
|
||||
|
||||
@@ -432,6 +530,7 @@ def admin_change_password():
|
||||
|
||||
@app.route("/admin-clear-uploads", methods=["POST"])
|
||||
def admin_clear_uploads():
|
||||
"""Clear all uploaded files."""
|
||||
if not session.get("admin_logged_in"):
|
||||
return redirect(url_for("admin_login"))
|
||||
|
||||
@@ -449,31 +548,34 @@ def admin_clear_uploads():
|
||||
|
||||
@app.route("/admin-update-server", methods=["POST"])
|
||||
def admin_update_server():
|
||||
"""Update server from GitHub repository."""
|
||||
if not session.get("admin_logged_in"):
|
||||
return redirect(url_for("admin_login"))
|
||||
return jsonify({"error": "Unauthorized"}), 401
|
||||
|
||||
try:
|
||||
repo_dir = os.path.abspath(os.path.dirname(__file__)) # Dynamically get current project directory
|
||||
repo_url = "https://github.com/TySP-Dev/PacCrypt.git"
|
||||
|
||||
# Ensure directory is a git repo
|
||||
repo_dir = os.path.abspath(os.path.dirname(__file__))
|
||||
|
||||
# Check if we're in a git repository
|
||||
if not os.path.exists(os.path.join(repo_dir, ".git")):
|
||||
return redirect(url_for('500.html'))
|
||||
return jsonify({"error": "Not a git repository"}), 400
|
||||
|
||||
# Pull latest changes
|
||||
# Execute git commands
|
||||
subprocess.run(["git", "fetch"], cwd=repo_dir, check=True)
|
||||
subprocess.run(["git", "reset", "--hard", "origin/main"], cwd=repo_dir, check=True)
|
||||
subprocess.run(["git", "pull", repo_url, "main"], cwd=repo_dir, check=True)
|
||||
subprocess.run(["git", "pull"], cwd=repo_dir, check=True)
|
||||
|
||||
flash("Server updated from GitHub!", "clear-feedback")
|
||||
return jsonify({"message": "Server updated successfully from GitHub!"}), 200
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"[ERROR] Git operation failed: {str(e)}")
|
||||
return jsonify({"error": f"Git operation failed: {str(e)}"}), 500
|
||||
except Exception as e:
|
||||
flash(f"Update failed: {str(e)}", "clear-feedback")
|
||||
|
||||
return redirect(url_for("admin_page"))
|
||||
|
||||
print(f"[ERROR] Update failed: {str(e)}")
|
||||
return jsonify({"error": f"Update failed: {str(e)}"}), 500
|
||||
|
||||
# ===== Sitemap and Robots =====
|
||||
@app.route("/sitemap", methods=["GET"])
|
||||
def sitemap():
|
||||
"""Generate sitemap.xml."""
|
||||
sitemap_xml = '''<?xml version="1.0" encoding="UTF-8"?>
|
||||
<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">
|
||||
<url><loc>https://paccrypt.unnaturalll.dev/</loc></url>
|
||||
@@ -483,9 +585,9 @@ def sitemap():
|
||||
</urlset>'''
|
||||
return sitemap_xml, 200, {'Content-Type': 'application/xml'}
|
||||
|
||||
|
||||
@app.route("/robots.txt")
|
||||
def robots_txt():
|
||||
"""Generate robots.txt."""
|
||||
lines = [
|
||||
"User-agent: *",
|
||||
"Disallow: /adminpage",
|
||||
@@ -501,10 +603,7 @@ def robots_txt():
|
||||
]
|
||||
return "\n".join(lines), 200, {"Content-Type": "text/plain"}
|
||||
|
||||
|
||||
|
||||
|
||||
# === Error Handlers ===
|
||||
# ===== Error Handlers =====
|
||||
@app.errorhandler(404)
|
||||
def page_not_found(e):
|
||||
return render_template('404.html'), 404
|
||||
@@ -526,10 +625,9 @@ def handle_file_not_found(e):
|
||||
if os.getenv("PRODUCTION", "false").lower() == "true":
|
||||
return render_template('500.html'), 500
|
||||
else:
|
||||
raise e # re-raise for debugging in development
|
||||
raise e
|
||||
|
||||
|
||||
# === Server Mode Execution ===
|
||||
# ===== Application Entry Point =====
|
||||
if __name__ == "__main__":
|
||||
PRODUCTION = os.getenv("PRODUCTION", "false").lower() == "true"
|
||||
if PRODUCTION:
|
||||
|
||||
Reference in New Issue
Block a user