101 lines
3.4 KiB
Python
101 lines
3.4 KiB
Python
import json
|
||
import os
|
||
import hashlib
|
||
from base64 import urlsafe_b64encode
|
||
from datetime import datetime
|
||
from cryptography.fernet import Fernet
|
||
from config import Config
|
||
|
||
class UserManager:
|
||
def __init__(self):
|
||
# 使用 Config.SECRET_KEY 派生一个可复现的 Fernet key,避免每次运行生成不同 key
|
||
digest = hashlib.sha256(Config.SECRET_KEY.encode()).digest()
|
||
self.key = urlsafe_b64encode(digest)
|
||
self.cipher = Fernet(self.key)
|
||
self.users_file = Config.USERS_FILE
|
||
self.load_users()
|
||
# 如果不存在内测账号,则创建之(账号: admin 密码: 123)
|
||
if 'admin' not in self.users:
|
||
try:
|
||
self.add_user('admin', '123', '', {'name': '内测管理员'})
|
||
except Exception:
|
||
pass
|
||
|
||
def encrypt(self, data):
|
||
return self.cipher.encrypt(data.encode()).decode()
|
||
|
||
def decrypt(self, encrypted_data):
|
||
return self.cipher.decrypt(encrypted_data.encode()).decode()
|
||
|
||
def load_users(self):
|
||
if os.path.exists(self.users_file):
|
||
with open(self.users_file, 'r', encoding='utf-8') as f:
|
||
self.users = json.load(f)
|
||
else:
|
||
self.users = {}
|
||
self.save_users()
|
||
|
||
def save_users(self):
|
||
with open(self.users_file, 'w', encoding='utf-8') as f:
|
||
json.dump(self.users, f, ensure_ascii=False, indent=2)
|
||
|
||
def add_user(self, account, password, token, user_info):
|
||
# 确保 user_info 是 dict 且包含必要的字段以防模版渲染错误
|
||
if not isinstance(user_info, dict):
|
||
user_info = {}
|
||
user_info.setdefault('name', account)
|
||
user_info.setdefault('orgFullList', [])
|
||
|
||
self.users[account] = {
|
||
'password': self.encrypt(password),
|
||
'token': token,
|
||
'user_info': user_info,
|
||
'last_login': datetime.now().isoformat(),
|
||
'online': False,
|
||
'socket_id': None
|
||
}
|
||
self.save_users()
|
||
|
||
def get_user(self, account):
|
||
return self.users.get(account)
|
||
|
||
def update_user_status(self, account, online=False, socket_id=None):
|
||
if account in self.users:
|
||
self.users[account]['online'] = online
|
||
if socket_id:
|
||
self.users[account]['socket_id'] = socket_id
|
||
self.save_users()
|
||
|
||
def verify_user(self, account, password):
|
||
user = self.get_user(account)
|
||
if user:
|
||
try:
|
||
decrypted = self.decrypt(user['password'])
|
||
return decrypted == password
|
||
except:
|
||
return False
|
||
return False
|
||
|
||
class ChatManager:
|
||
def __init__(self):
|
||
self.active_files = []
|
||
self.file_lock = False
|
||
|
||
def can_upload_file(self):
|
||
"""检查是否可以上传文件(最多5个)"""
|
||
# 清理过期的文件(超过1小时)
|
||
current_time = datetime.now()
|
||
self.active_files = [
|
||
file for file in self.active_files
|
||
if (current_time - file['upload_time']).seconds < 3600
|
||
]
|
||
return len(self.active_files) < Config.MAX_SIMULTANEOUS_FILES
|
||
|
||
def add_file(self, file_info):
|
||
self.active_files.append({
|
||
'filename': file_info['filename'],
|
||
'size': file_info['size'],
|
||
'uploader': file_info['uploader'],
|
||
'upload_time': datetime.now(),
|
||
'url': file_info['url']
|
||
}) |