import json import os import hashlib from base64 import urlsafe_b64encode from datetime import datetime from cryptography.fernet import Fernet from config import Config class UserManager: def __init__(self): # 使用 Config.SECRET_KEY 派生一个可复现的 Fernet key,避免每次运行生成不同 key digest = hashlib.sha256(Config.SECRET_KEY.encode()).digest() self.key = urlsafe_b64encode(digest) self.cipher = Fernet(self.key) self.users_file = Config.USERS_FILE self.load_users() # 如果不存在内测账号,则创建之(账号: admin 密码: 123) if 'admin' not in self.users: try: self.add_user('admin', '123', '', {'name': '内测管理员'}) except Exception: pass def encrypt(self, data): return self.cipher.encrypt(data.encode()).decode() def decrypt(self, encrypted_data): return self.cipher.decrypt(encrypted_data.encode()).decode() def load_users(self): if os.path.exists(self.users_file): with open(self.users_file, 'r', encoding='utf-8') as f: self.users = json.load(f) else: self.users = {} self.save_users() def save_users(self): with open(self.users_file, 'w', encoding='utf-8') as f: json.dump(self.users, f, ensure_ascii=False, indent=2) def add_user(self, account, password, token, user_info): # 确保 user_info 是 dict 且包含必要的字段以防模版渲染错误 if not isinstance(user_info, dict): user_info = {} user_info.setdefault('name', account) user_info.setdefault('orgFullList', []) self.users[account] = { 'password': self.encrypt(password), 'token': token, 'user_info': user_info, 'last_login': datetime.now().isoformat(), 'online': False, 'socket_id': None } self.save_users() def get_user(self, account): return self.users.get(account) def update_user_status(self, account, online=False, socket_id=None): if account in self.users: self.users[account]['online'] = online if socket_id: self.users[account]['socket_id'] = socket_id self.save_users() def verify_user(self, account, password): user = self.get_user(account) if user: try: decrypted = self.decrypt(user['password']) return decrypted == password except: return False return False class ChatManager: def __init__(self): self.active_files = [] self.file_lock = False def can_upload_file(self): """检查是否可以上传文件(最多5个)""" # 清理过期的文件(超过1小时) current_time = datetime.now() self.active_files = [ file for file in self.active_files if (current_time - file['upload_time']).seconds < 3600 ] return len(self.active_files) < Config.MAX_SIMULTANEOUS_FILES def add_file(self, file_info): self.active_files.append({ 'filename': file_info['filename'], 'size': file_info['size'], 'uploader': file_info['uploader'], 'upload_time': datetime.now(), 'url': file_info['url'] })