From 4c38e240dc22c94ec4a6b92a9f5316f76775195c Mon Sep 17 00:00:00 2001 From: theliu Date: Sat, 25 Apr 2026 12:50:36 +0800 Subject: [PATCH] Initial commit: V1 --- .gitignore | 32 + README.md | 133 ++++ asr.py | 167 +++++ config.py | 81 +++ gui.py | 1486 +++++++++++++++++++++++++++++++++++++++++++++ image_gen.py | 215 +++++++ make_video.py | 745 +++++++++++++++++++++++ qwen_download.py | 20 + run.bat | 25 + scene_generate.py | 135 ++++ scene_plan.py | 594 ++++++++++++++++++ text_ai.py | 113 ++++ 12 files changed, 3746 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 asr.py create mode 100644 config.py create mode 100644 gui.py create mode 100644 image_gen.py create mode 100644 make_video.py create mode 100644 qwen_download.py create mode 100644 run.bat create mode 100644 scene_generate.py create mode 100644 scene_plan.py create mode 100644 text_ai.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..73b62c7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,32 @@ +# Python +__pycache__/ +*.py[cod] +*.egg-info/ +dist/ +build/ +*.egg + +# Models (too large for git) +models/ + +# Workspace data (user-generated) +workspace/ + +# Backup +_backup/ + +# Environment +.env +venv/ +.venv/ + +# IDE +.vscode/ +.idea/ +*.swp +*.swo + +# OS +Thumbs.db +Desktop.ini +.DS_Store diff --git a/README.md b/README.md new file mode 100644 index 0000000..87cb70a --- /dev/null +++ b/README.md @@ -0,0 +1,133 @@ +# Videoer + +**AI-powered video generation pipeline** — 从文章到视频的一站式工具。 + +给定一篇英文文章(文本)和对应的朗读音频,自动完成: + +``` +文章文本 + 朗读音频 → AI 场景划分 → 逐场景生成配图 → ASR 时间对齐 → 合成视频(含字幕) +``` + +## Preview + +![Pipeline Overview](docs/pipeline.png) + +## Features + +- **AI Scene Planning** — 基于 LLM(Qwen / GLM)智能划分场景,提取角色、画面描述 +- **AI Image Generation** — 支持 Kolors / Qwen-Image 文生图模型,逐张生成场景配图 +- **Interactive Review** — 逐张审查、确认/重新生成场景图 +- **Forced Alignment** — 基于 Qwen3-ForcedAligner 的语音-文本时间对齐 +- **Video Synthesis** — MoviePy 合成最终视频,自动添加字幕 + +## Architecture + +``` +release1/ +├── gui.py # PyQt6 GUI (main entry) +├── scene_plan.py # LLM scene planning + prompt engineering +├── image_gen.py # Text-to-image API calls +├── asr.py # ASR forced alignment +├── make_video.py # Video synthesis + subtitle rendering +├── text_ai.py # Shared LLM API client +├── config.py # Model paths, API keys, defaults +├── run.bat # Windows launcher +└── qwen_download.py # One-time model download script +``` + +## Workflow + +``` +1. Select workspace (folder with article.txt + voice.mp3) +2. AI Scene Planning → scene_plan.json +3. Image Generation → scene_01.png, scene_02.png, ... +4. ASR Alignment → result.json + timestamps into scene_plan +5. Video Synthesis → output_video.mp4 +``` + +## Quick Start + +### Prerequisites + +- Python 3.12+ +- Conda (recommended) +- NVIDIA GPU (for local ASR model) + +### Setup + +```bash +# Create conda environment +conda create -n Videoer python=3.12 -y +conda activate Videoer + +# Install dependencies +pip install PyQt6 moviepy Pillow requests openai +pip install funasr modelscope torch torchaudio + +# Download ASR model +python qwen_download.py +``` + +### Configuration + +Edit `config.py` to set your API keys: + +```python +# LLM providers (scene planning) +LLM_PROVIDERS = { + "Qwen3.5-35B (ModelScope)": { + "api_key": "YOUR_KEY", + ... + }, + ... +} + +# Image generation +SILICONFLOW_API_KEY = "YOUR_KEY" +MODELSCOPE_API_KEY = "YOUR_KEY" +``` + +> **Tip**: ModelScope and SiliconFlow both offer free-tier API keys. + +### Run + +```bash +# GUI mode (recommended) +python gui.py + +# Or on Windows +run.bat +``` + +### Workspace Structure + +Each video project lives in a workspace folder: + +``` +workspace/my_project/ +├── article.txt # Source article text +├── voice.mp3 # Narration audio +├── scene_plan.json # Generated scene plan (auto) +├── result.json # ASR alignment result (auto) +├── scene_01.png # Generated images (auto) +├── scene_02.png +├── ... +└── output_video.mp4 # Final output (auto) +``` + +## Dependencies + +| Package | Purpose | +|---------|---------| +| PyQt6 | GUI framework | +| moviepy | Video composition | +| Pillow | Image processing / subtitle rendering | +| requests | HTTP API calls | +| openai | Compatible LLM client (OpenAI API format) | +| funasr | ASR forced alignment | +| modelscope | Model loading | +| torch / torchaudio | GPU inference backend | + +## License + +MIT diff --git a/asr.py b/asr.py new file mode 100644 index 0000000..ef9e3c1 --- /dev/null +++ b/asr.py @@ -0,0 +1,167 @@ +""" +asr.py - ASR 强制对齐模块(简化版) +使用 Qwen3-ForcedAligner 对齐音频和文本 +模型路径通过 config.py 中的绝对路径指向 +""" + +import os +import json +import re + + +def _detect_language(text: str) -> str: + """根据文本字符分布自动检测语言""" + if not text: + return "English" + # 统计非 ASCII 字符(中文等) + non_ascii = sum(1 for c in text if ord(c) >= 0x4e00) + ratio = non_ascii / len(text) + return "Chinese" if ratio > 0.1 else "English" + + +def run_asr(workspace: str, language: str = None) -> dict: + """ + 执行 ASR 强制对齐 + + Args: + workspace: 工作区路径(含 article.txt 和 voice.mp3) + + Returns: + dict: {"audio": str, "text": str, "segments": list} + """ + from config import ASR_MODEL_DIR + from qwen_asr import Qwen3ForcedAligner + + audio_path = os.path.join(workspace, "voice.mp3") + text_path = os.path.join(workspace, "article.txt") + output_path = os.path.join(workspace, "result.json") + + # 验证路径 + if not os.path.exists(ASR_MODEL_DIR): + raise FileNotFoundError(f"ASR 模型路径不存在: {ASR_MODEL_DIR}") + if not os.path.exists(audio_path): + raise FileNotFoundError(f"音频文件不存在: {audio_path}") + if not os.path.exists(text_path): + raise FileNotFoundError(f"文本文件不存在: {text_path}") + + # 读取文本 + with open(text_path, 'r', encoding='utf-8') as f: + text = f.read().strip() + + print(f"[ASR] 文本长度: {len(text)} 字符") + print(f"[ASR] 音频文件: {audio_path}") + print(f"[ASR] 模型路径: {ASR_MODEL_DIR}") + + # 加载模型 + print("[ASR] 正在加载模型...") + aligner = Qwen3ForcedAligner.from_pretrained( + ASR_MODEL_DIR, + local_files_only=True, + device_map="cpu" + ) + print("[ASR] 模型加载成功") + + # 自动检测语言(如果未指定) + if language is None: + language = _detect_language(text) + print(f"[ASR] 检测语言: {language}") + + # 运行对齐 + print("[ASR] 正在对齐...") + results = aligner.align( + audio=audio_path, + text=text, + language=language + ) + + # 整理结果 + segments = [] + for result in results: + for item in result.items: + segments.append({ + "text": item.text, + "start": round(item.start_time, 3), + "end": round(item.end_time, 3) + }) + + output_data = { + "audio": audio_path, + "text": text, + "segments": segments + } + + with open(output_path, 'w', encoding='utf-8') as f: + json.dump(output_data, f, ensure_ascii=False, indent=2) + + print(f"[ASR] 完成,共 {len(segments)} 个片段,保存到 {output_path}") + return output_data + + +def match_scenes_to_audio(workspace: str) -> dict: + """ + 将 ASR segments 与 scene_plan 的 scenes 做文本匹配, + 给每个 scene 写入 start_time / end_time,并更新 scene_plan.json。 + + Returns: + dict: 更新后的 scene_plan(带时间信息) + """ + from make_video import load_scene_plan, load_asr_result, assign_scenes_to_segments + from scene_plan import _get_audio_duration + + plan_path = os.path.join(workspace, "scene_plan.json") + result_path = os.path.join(workspace, "result.json") + audio_path = os.path.join(workspace, "voice.mp3") + + if not os.path.exists(plan_path): + raise FileNotFoundError(f"scene_plan.json 不存在: {plan_path}") + if not os.path.exists(result_path): + raise FileNotFoundError(f"result.json 不存在: {result_path}") + if not os.path.exists(audio_path): + raise FileNotFoundError(f"voice.mp3 不存在: {audio_path}") + + scene_plan = load_scene_plan(workspace) + asr_result = load_asr_result(workspace) + + audio_duration = _get_audio_duration(audio_path) + if audio_duration is None: + raise RuntimeError(f"无法获取音频时长: {audio_path}") + + # 恢复每个场景的文本用于 ASR 匹配 + # 优先用 text 字段(原文片段),兼容旧 lines 字段 + for scene in scene_plan["scenes"]: + text_val = scene.get("text", "") + lines_val = scene.get("lines", "") + raw = text_val or lines_val or "" + if isinstance(raw, str) and raw: + # 按句号分句供匹配 + scene["lines"] = [s.strip() for s in raw.split("。") if s.strip()] + else: + scene["lines"] = [] + + # 复用 make_video 的匹配逻辑 + timeline = assign_scenes_to_segments( + scene_plan["scenes"], asr_result.get("segments", []), audio_duration + ) + + # 写回 scene_plan + for item in timeline: + scene_id = item["scene_id"] + for scene in scene_plan["scenes"]: + if scene["scene_id"] == scene_id: + scene["start_time"] = round(item["start"], 3) + scene["end_time"] = round(item["end"], 3) + break + + with open(plan_path, 'w', encoding='utf-8') as f: + json.dump(scene_plan, f, ensure_ascii=False, indent=2) + + print(f"[匹配] 完成,已将时间信息写入 {plan_path}") + for item in timeline: + dur = item["end"] - item["start"] + print(f" Scene {item['scene_id']:2d}: {item['start']:6.2f}s - {item['end']:6.2f}s ({dur:.2f}s)") + + return scene_plan + + +if __name__ == "__main__": + run_asr(os.path.join(os.path.dirname(__file__), "workspace", "1")) diff --git a/config.py b/config.py new file mode 100644 index 0000000..37f381e --- /dev/null +++ b/config.py @@ -0,0 +1,81 @@ +""" +release1 配置文件 +集中管理所有模型路径、API Key、默认参数 +""" + +import os + +# ========== 基础路径 ========== +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +VIDEO_PROJECT_DIR = os.path.dirname(BASE_DIR) # 上级 video/ 目录 + +# ========== ASR 模型(绝对路径指向 video/models/)========== +ASR_MODEL_DIR = os.path.join( + r'C:\pythonproject\video', 'models', 'qwen', 'Qwen3-ForcedAligner-0.6B' +).replace('\\', '/') + +# ========== LLM 提供商(划分场景/角色提取用)========== +LLM_PROVIDERS = { + "Qwen3.5-35B (ModelScope 免费)": { + "api_key": "ms-38de567b-cf88-4523-bac2-ff63d8f1e0f6", + "api_base": "https://api-inference.modelscope.cn/v1/", + "model": "Qwen/Qwen3.5-35B-A3B", + }, + "GLM-4-9B (硅基流动 免费)": { + "api_key": "sk-mjqgwknbttvqnrjjfnxemtjgdivogjaqsftbvoifwjvruwsq", + "api_base": "https://api.siliconflow.cn/v1/", + "model": "THUDM/glm-4-9b-chat", + }, + "Qwen3-32B (硅基流动 付费)": { + "api_key": "sk-mjqgwknbttvqnrjjfnxemtjgdivogjaqsftbvoifwjvruwsq", + "api_base": "https://api.siliconflow.cn/v1/", + "model": "Qwen/Qwen3-32B", + }, + "GLM-5 (ModelScope 免费)": { + "api_key": "ms-38de567b-cf88-4523-bac2-ff63d8f1e0f6", + "api_base": "https://api-inference.modelscope.cn/v1/", + "model": "ZhipuAI/GLM-5", + }, +} + +# 默认 LLM(兼容旧代码) +DEFAULT_LLM = "Qwen3.5-35B (ModelScope 免费)" +LLM_API_KEY = LLM_PROVIDERS[DEFAULT_LLM]["api_key"] +LLM_API_BASE = LLM_PROVIDERS[DEFAULT_LLM]["api_base"] +LLM_MODEL = LLM_PROVIDERS[DEFAULT_LLM]["model"] + +# ========== SiliconFlow API(Kolors 文生图)========== +SILICONFLOW_API_KEY = "sk-mjqgwknbttvqnrjjfnxemtjgdivogjaqsftbvoifwjvruwsq" +SILICONFLOW_API_BASE = "https://api.siliconflow.cn/v1/images/generations" + +# ========== ModelScope API(Qwen 文生图)========== +MODELSCOPE_API_KEY = "ms-38de567b-cf88-4523-bac2-ff63d8f1e0f6" +MODELSCOPE_API_BASE = "https://api-inference.modelscope.cn/v1/images/generations" +MODELSCOPE_POLL_INTERVAL = 3 # 轮询间隔(秒) +MODELSCOPE_MAX_WAIT = 180 # 最大等待时间(秒) + +# ========== 文生图模型 ========== +IMAGE_MODELS = { + "Kolors(便宜快速)": { + "provider": "siliconflow", + "model": "Kwai-Kolors/Kolors", + "default_size": "1280x720", + "guidance_scale": 7.5, + }, + "Qwen-Image(高质量)": { + "provider": "modelscope", + "model": "Qwen/Qwen-Image-2512", + "default_size": "1280x720", + "guidance_scale": 7.5, + }, +} + +# 默认文生图模型 +DEFAULT_IMAGE_MODEL = "Kolors(便宜快速)" + +# ========== 默认参数 ========== +DEFAULT_FPS = 24 +DEFAULT_VIDEO_SIZE = "1280x720" + +# ========== 通用 negative prompt ========== +NEGATIVE_PROMPT = "blurry, low quality, deformed, text, letters, words, subtitle, logo, watermark, caption, label, number" diff --git a/gui.py b/gui.py new file mode 100644 index 0000000..6aeec75 --- /dev/null +++ b/gui.py @@ -0,0 +1,1486 @@ +#!/usr/bin/env python3 +""" +gui.py - 视频制作流水线 GUI(release1) +唯一入口,PyQt6 暗色主题 + +流程:选工作区 → 划分场景 → 逐张生成+审查 → ASR → 合成视频 +""" + +import sys +import os +import json +import time +import traceback +from datetime import datetime + +# 强制确保工作目录和模块搜索路径都是 gui.py 所在目录 +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +os.chdir(_SCRIPT_DIR) +if _SCRIPT_DIR not in sys.path: + sys.path.insert(0, _SCRIPT_DIR) + +# 全局异常日志文件 +_LOG_PATH = os.path.join(_SCRIPT_DIR, "crash.log") + +def _write_crash(msg): + with open(_LOG_PATH, "a", encoding="utf-8") as f: + f.write(f"\n{'='*60}\n{datetime.now().isoformat()}\n{msg}\n{'='*60}\n") + +# 捕获所有未处理异常 +sys.excepthook = lambda exc_type, exc_val, exc_tb: _write_crash("".join(traceback.format_exception(exc_type, exc_val, exc_tb))) + +from PyQt6.QtWidgets import ( + QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, + QLabel, QPushButton, QComboBox, QCheckBox, QListWidget, + QListWidgetItem, QTextEdit, QFileDialog, QMessageBox, + QGroupBox, QSplitter, QProgressBar, QSizePolicy, QDialog, + QDialogButtonBox +) +from PyQt6.QtCore import Qt, QThread, pyqtSignal, QMutex, QWaitCondition, QTimer +from PyQt6.QtGui import QPixmap, QImage, QFont, QColor, QIcon + + +from config import IMAGE_MODELS, DEFAULT_IMAGE_MODEL, DEFAULT_FPS, DEFAULT_VIDEO_SIZE, LLM_PROVIDERS, DEFAULT_LLM + + +# ============================================================ +# 暗色主题样式表 +# ============================================================ +DARK_STYLE = """ +QMainWindow, QWidget { + background-color: #1e1e2e; + color: #cdd6f4; + font-family: "Segoe UI", "Microsoft YaHei", sans-serif; + font-size: 13px; +} + +QGroupBox { + border: 1px solid #45475a; + border-radius: 6px; + margin-top: 12px; + padding: 14px 10px 10px 10px; + font-weight: bold; + font-size: 14px; + color: #89b4fa; +} +QGroupBox::title { + subcontrol-origin: margin; + left: 12px; + padding: 0 6px; +} + +QPushButton { + background-color: #45475a; + color: #cdd6f4; + border: 1px solid #585b70; + border-radius: 5px; + padding: 7px 18px; + min-height: 22px; +} +QPushButton:hover { + background-color: #585b70; + border-color: #89b4fa; +} +QPushButton:pressed { + background-color: #89b4fa; + color: #1e1e2e; +} +QPushButton:disabled { + background-color: #313244; + color: #6c7086; + border-color: #313244; +} + +QPushButton#btnConfirm { + background-color: #a6e3a1; + color: #1e1e2e; + font-weight: bold; + padding: 10px 24px; +} +QPushButton#btnConfirm:hover { + background-color: #94e2d5; +} +QPushButton#btnConfirm:disabled { + background-color: #313244; + color: #6c7086; +} + +QPushButton#btnRegenerate { + background-color: #f9e2af; + color: #1e1e2e; + font-weight: bold; + padding: 10px 24px; +} +QPushButton#btnRegenerate:hover { + background-color: #f5c2e7; +} + +QPushButton#btnSkip { + background-color: #fab387; + color: #1e1e2e; + font-weight: bold; + padding: 10px 24px; +} +QPushButton#btnSkip:hover { + background-color: #f38ba8; +} + +QComboBox { + background-color: #313244; + color: #cdd6f4; + border: 1px solid #45475a; + border-radius: 4px; + padding: 5px 10px; + min-height: 22px; +} +QComboBox:hover { + border-color: #89b4fa; +} +QComboBox::drop-down { + border: none; + width: 24px; +} +QComboBox QAbstractItemView { + background-color: #313244; + color: #cdd6f4; + selection-background-color: #89b4fa; + selection-color: #1e1e2e; + border: 1px solid #45475a; +} + +QCheckBox { + spacing: 6px; + color: #cdd6f4; +} +QCheckBox::indicator { + width: 18px; + height: 18px; + border: 2px solid #585b70; + border-radius: 4px; + background-color: #313244; +} +QCheckBox::indicator:checked { + background-color: #89b4fa; + border-color: #89b4fa; +} + +QTextEdit, QListWidget { + background-color: #181825; + color: #cdd6f4; + border: 1px solid #45475a; + border-radius: 5px; + padding: 4px; +} +QTextEdit:focus, QListWidget:focus { + border-color: #89b4fa; +} + +QListWidget::item { + padding: 6px 8px; + border-bottom: 1px solid #313244; +} +QListWidget::item:selected { + background-color: #45475a; + color: #89b4fa; +} +QListWidget::item:hover { + background-color: #313244; +} + +QProgressBar { + border: 1px solid #45475a; + border-radius: 4px; + text-align: center; + background-color: #313244; + color: #cdd6f4; + height: 20px; +} +QProgressBar::chunk { + background-color: #89b4fa; + border-radius: 3px; +} + +QLabel#titleLabel { + font-size: 18px; + font-weight: bold; + color: #cba6f7; + padding: 4px; +} + +QLabel#sectionLabel { + font-size: 13px; + color: #a6adc8; +} + +QLabel#sceneInfo { + font-size: 12px; + color: #a6adc8; + padding: 4px 0px; +} + +QSplitter::handle { + background-color: #45475a; + width: 2px; +} + +QScrollBar:vertical { + background-color: #181825; + width: 10px; + border: none; +} +QScrollBar::handle:vertical { + background-color: #45475a; + border-radius: 5px; + min-height: 20px; +} +QScrollBar::handle:vertical:hover { + background-color: #585b70; +} +QScrollBar::add-line:vertical, QScrollBar::sub-line:vertical { + height: 0px; +} +""" + + +# ============================================================ +# 备注对话框 +# ============================================================ +class NoteDialog(QDialog): + """在发送消息给AI之前添加备注的对话框""" + + def __init__(self, parent=None, title="添加备注", message=""): + super().__init__(parent) + self.setWindowTitle(title) + self.setMinimumSize(500, 400) + self.setModal(True) + + layout = QVBoxLayout(self) + layout.setContentsMargins(20, 20, 20, 20) + layout.setSpacing(12) + + # 说明标签 + info_label = QLabel("您可以在下方添加额外的说明、上下文或特殊要求,这些信息将一并发送给AI:") + info_label.setWordWrap(True) + info_label.setStyleSheet("color: #6c7086; font-size: 13px;") + layout.addWidget(info_label) + + # 主要消息显示(只读) + if message: + msg_group = QGroupBox("主要消息") + msg_layout = QVBoxLayout(msg_group) + self.message_display = QTextEdit() + self.message_display.setPlainText(message) + self.message_display.setReadOnly(True) + self.message_display.setMaximumHeight(100) + self.message_display.setStyleSheet(""" + background-color: #313244; + border: 1px solid #45475a; + border-radius: 4px; + padding: 8px; + """) + msg_layout.addWidget(self.message_display) + layout.addWidget(msg_group) + + # 备注输入框 + note_group = QGroupBox("给AI的备注(可选)") + note_layout = QVBoxLayout(note_group) + + self.note_input = QTextEdit() + self.note_input.setPlaceholderText( + "例如:\n" + "- 请多划分一些场景,我想要更细致的节奏\n" + "- 这篇文章是抒情散文,请用更诗意的视觉语言\n" + "- 场景提示词要特别详细,包含更多光影和构图细节\n" + "- 这是科技类文章,多用现代、简洁的视觉元素" + ) + self.note_input.setMaximumHeight(150) + self.note_input.setStyleSheet(""" + background-color: #181825; + border: 1px solid #45475a; + border-radius: 4px; + padding: 8px; + font-family: "Segoe UI", "Microsoft YaHei", sans-serif; + """) + note_layout.addWidget(self.note_input) + + # 字符计数 + self.char_count_label = QLabel("0/500") + self.char_count_label.setAlignment(Qt.AlignmentFlag.AlignRight) + self.char_count_label.setStyleSheet("font-size: 11px; color: #6c7086;") + note_layout.addWidget(self.char_count_label) + + layout.addWidget(note_group) + + # 快速模板按钮 + template_group = QGroupBox("快速模板") + template_layout = QHBoxLayout(template_group) + template_layout.setSpacing(8) + + templates = [ + ("更多场景", "请将场景划分得更细致一些,每个场景控制在5-8秒,适合快节奏视频"), + ("更少场景", "请合并相关场景,每个场景可以稍长一些(10-15秒),减少场景总数"), + ("强调视觉", "请特别注重视觉描述的详细程度,提供丰富的画面细节和光影效果"), + ("中文提示词", "请确保所有视觉提示词都使用中文,包括场景描述、光影、构图等"), + ] + + for btn_text, template_text in templates: + btn = QPushButton(btn_text) + btn.setStyleSheet(""" + QPushButton { + background-color: #313244; + border: 1px solid #45475a; + border-radius: 4px; + padding: 6px 12px; + font-size: 12px; + } + QPushButton:hover { + background-color: #45475a; + border-color: #89b4fa; + } + """) + btn.clicked.connect(lambda checked, t=template_text: self._apply_template(t)) + template_layout.addWidget(btn) + + layout.addWidget(template_group) + + # 按钮 + button_box = QDialogButtonBox( + QDialogButtonBox.StandardButton.Cancel | + QDialogButtonBox.StandardButton.Ok + ) + button_box.button(QDialogButtonBox.StandardButton.Ok).setText("发送") + button_box.button(QDialogButtonBox.StandardButton.Cancel).setText("取消") + + ok_button = button_box.button(QDialogButtonBox.StandardButton.Ok) + ok_button.setStyleSheet(""" + QPushButton { + background-color: #a6e3a1; + color: #1e1e2e; + font-weight: bold; + padding: 8px 20px; + } + QPushButton:hover { + background-color: #94e2d5; + } + """) + + cancel_button = button_box.button(QDialogButtonBox.StandardButton.Cancel) + cancel_button.setStyleSheet(""" + QPushButton { + background-color: #45475a; + padding: 8px 20px; + } + QPushButton:hover { + background-color: #585b70; + } + """) + + button_box.accepted.connect(self.accept) + button_box.rejected.connect(self.reject) + layout.addWidget(button_box) + + # 连接字符计数 + self.note_input.textChanged.connect(self._update_char_count) + + # 聚焦到备注输入框 + self.note_input.setFocus() + + def _apply_template(self, template_text: str): + """应用快速模板""" + self.note_input.setPlainText(template_text) + self._update_char_count() + + def _update_char_count(self): + """更新字符计数""" + count = len(self.note_input.toPlainText()) + self.char_count_label.setText(f"{count}/500") + + if count > 450: + self.char_count_label.setStyleSheet("font-size: 11px; color: #f38ba8;") + elif count > 400: + self.char_count_label.setStyleSheet("font-size: 11px; color: #f9e2af;") + else: + self.char_count_label.setStyleSheet("font-size: 11px; color: #6c7086;") + + def get_note(self) -> str: + """获取用户输入的备注""" + return self.note_input.toPlainText().strip() + + +# ============================================================ +# Stream 重定向器(模块级,用于捕获子进程 stdout/stderr) +# ============================================================ +import io as _io + +class _StreamRedirector(_io.TextIOBase): + """将 tqdm 的 \\r 更新转为日志行发送到 GUI""" + def __init__(self, callback): + self._callback = callback + self._buf = "" + + def write(self, s): + if not s: + return 0 + self._buf += s + while "\r" in self._buf: + before, self._buf = self._buf.split("\r", 1) + if before.strip(): + self._callback(before.rstrip()) + while "\n" in self._buf: + before, self._buf = self._buf.split("\n", 1) + if before.strip(): + self._callback(before.rstrip()) + return len(s) + + def flush(self): + if self._buf.strip(): + self._callback(self._buf.rstrip()) + self._buf = "" + + +# ============================================================ +# Worker 线程(QThread,用于一次性任务) +# ============================================================ +class Worker(QThread): + """通用后台工作线程""" + log_signal = pyqtSignal(str) + finished_signal = pyqtSignal(object) + error_signal = pyqtSignal(str) + + def __init__(self, func, *args, **kwargs): + super().__init__() + self.func = func + self.args = args + self.kwargs = kwargs + + def run(self): + try: + result = self.func(*self.args, **self.kwargs) + self.finished_signal.emit(result) + except Exception as e: + _write_crash(f"Worker.run() error:\n{traceback.format_exc()}") + self.error_signal.emit(str(e)) + + +# ============================================================ +# 生成线程(QThread,用于逐张生成+审查流程) +# ============================================================ +class GenerationWorker(QThread): + """场景图生成工作线程 - 逐张生成,通过信号与 GUI 交互""" + # 信号定义 + log_signal = pyqtSignal(str) # 日志 + progress_signal = pyqtSignal(int, int) # (current, total) + show_scene_signal = pyqtSignal(int) # 切换到指定场景 + display_image_signal = pyqtSignal(str) # 显示图片路径 + update_list_item_signal = pyqtSignal(int, str, str) # (index, text, color) + review_ready_signal = pyqtSignal(int) # 图片已生成,等待审查 (scene_index) + generation_done_signal = pyqtSignal(int, int) # (done_count, total) + error_on_scene_signal = pyqtSignal(int, str) # (scene_index, error_msg) + + def __init__(self, scenes, workspace, model_name): + super().__init__() + self.scenes = scenes + self.workspace = workspace + self.model_name = model_name + + # 线程同步原语:替代忙等待 + self._mutex = QMutex() + self._wait_condition = QWaitCondition() + self._user_action = None # None / "confirm" / "regenerate" / "skip" + + def set_user_action(self, action: str): + """由 GUI 调用,设置用户操作并唤醒线程""" + self._mutex.lock() + try: + self._user_action = action + self._wait_condition.wakeAll() + finally: + self._mutex.unlock() + + def run(self): + from image_gen import image_generate + + try: + total = len(self.scenes) + done = 0 + i = 0 + + while i < total: + scene = self.scenes[i] + status = scene.get("status", "pending") + + # 跳过已完成的 + if status in ("generated", "skipped"): + if status == "generated": + done += 1 + i += 1 + continue + + scene_id = scene.get("scene_id", i + 1) + self.progress_signal.emit(i, total) + + # 切换 GUI 到当前场景 + self.show_scene_signal.emit(i) + self.log_signal.emit(f"[*] 生成场景 {scene_id}/{total}...") + + scene_dir = os.path.join(self.workspace, "scene") + os.makedirs(scene_dir, exist_ok=True) + img_path = os.path.join(scene_dir, f"scene_{scene_id:03d}.png") + + try: + result = image_generate( + prompt=scene["visual_prompt"], + save_dir=scene_dir, + model_name=self.model_name, + filename=f"scene_{scene_id:03d}.png", + ) + + img_path = result["filepath"] + scene["status"] = "generated" + scene["filepath"] = img_path + done += 1 + + self.log_signal.emit(f"[OK] 场景 {scene_id} 生成成功") + + # 显示图片 + self.display_image_signal.emit(img_path) + self.update_list_item_signal.emit( + i, + f"[OK] #{scene_id:2d} | {scene['visual_prompt'][:40]}...", + "#a6e3a1" + ) + + # 保存 + self._save_plan() + + # 等待用户审查(事件驱动,非忙等待) + self.review_ready_signal.emit(i) + action = self._wait_for_user_action() + + if action == "regenerate": + # 标记为 pending,重新生成 + scene["status"] = "pending" + if "filepath" in scene: + del scene["filepath"] + self._save_plan() + self.log_signal.emit(f"[>>] 场景 {scene_id} 重新生成") + self.update_list_item_signal.emit( + i, + f"[ ] #{scene_id:2d} | {scene['visual_prompt'][:40]}...", + "#cdd6f4" + ) + continue # 不 i++,重新循环 + elif action == "skip": + scene["status"] = "skipped" + done -= 1 # 之前 +1 了,跳过不算 + self._save_plan() + self.log_signal.emit(f"[>>] 场景 {scene_id} 已跳过") + self.update_list_item_signal.emit( + i, + f"[>>] #{scene_id:2d} | {scene['visual_prompt'][:40]}...", + "#f9e2af" + ) + else: + # confirm - 保持 generated 状态 + self.log_signal.emit(f"[OK] 场景 {scene_id} 已确认") + + i += 1 + + except Exception as e: + scene["status"] = "failed" + scene["error"] = str(e) + self._save_plan() + self.log_signal.emit(f"[X] 场景 {scene_id} 生成失败: {e}") + self.error_on_scene_signal.emit(i, str(e)) + self.update_list_item_signal.emit( + i, + f"[X] #{scene_id:2d} | {scene['visual_prompt'][:40]}...", + "#f38ba8" + ) + i += 1 # 跳过失败的继续 + + self.generation_done_signal.emit(done, total) + + except Exception as e: + _write_crash(f"GenerationWorker.run() error:\n{traceback.format_exc()}") + self.error_on_scene_signal.emit(-1, str(e)) + + def _wait_for_user_action(self) -> str: + """阻塞等待用户操作(使用 QWaitCondition,不占用 CPU)""" + self._mutex.lock() + try: + # 等待直到 _user_action 被设置(由 set_user_action 唤醒) + while self._user_action is None: + self._wait_condition.wait(self._mutex) + + action = self._user_action + self._user_action = None + return action + finally: + self._mutex.unlock() + + def _save_plan(self): + """保存 scene_plan(通过信号让主线程做,但这里直接写文件也行,因为不涉及 UI)""" + plan_path = os.path.join(self.workspace, "scene_plan.json") + try: + with open(plan_path, 'w', encoding='utf-8') as f: + json.dump({"scenes": self.scenes}, f, ensure_ascii=False, indent=2) + except Exception: + pass + + +# ============================================================ +# 主窗口 +# ============================================================ +class VideoPipelineGUI(QMainWindow): + def __init__(self): + super().__init__() + self.setWindowTitle("视频制作流水线 - Release 1") + self.setGeometry(80, 80, 1280, 820) + + # 状态 + self.workspace = None + self.scene_plan = None + self.scenes = [] + self.current_scene_idx = -1 + self.gen_worker = None # GenerationWorker + self._worker = None # 通用 Worker(防止 GC 回收) + + # 审查按钮的槽函数(避免 lambda 导致的信号连接问题) + self._confirm_slot = lambda: None + self._regen_slot = lambda: None + self._skip_slot = lambda: None + + # debounce:编辑字段时延迟 500ms 再写盘,避免每个按键都触发 I/O + self._save_timer = QTimer(self) + self._save_timer.setSingleShot(True) + self._save_timer.setInterval(500) + self._save_timer.timeout.connect(self._save_plan) + + self._build_ui() + + # ============================================================ + # UI 构建 + # ============================================================ + def _build_ui(self): + central = QWidget() + self.setCentralWidget(central) + main_layout = QVBoxLayout(central) + main_layout.setContentsMargins(10, 10, 10, 10) + main_layout.setSpacing(8) + + # --- 标题 --- + title = QLabel("视频制作流水线") + title.setObjectName("titleLabel") + title.setAlignment(Qt.AlignmentFlag.AlignCenter) + main_layout.addWidget(title) + + # --- 顶部控制栏 --- + top_bar = QHBoxLayout() + + # 选工作区 + top_bar.addWidget(QLabel("工作区:")) + self.workspace_label = QLabel("未选择") + self.workspace_label.setStyleSheet("color: #f9e2af; font-weight: bold;") + self.workspace_label.setMinimumWidth(200) + top_bar.addWidget(self.workspace_label) + self.btn_select_ws = QPushButton("选择文件夹") + self.btn_select_ws.clicked.connect(self.select_workspace) + top_bar.addWidget(self.btn_select_ws) + + top_bar.addSpacing(20) + + # LLM 模型选择(场景划分用) + top_bar.addWidget(QLabel("语言模型:")) + self.llm_combo = QComboBox() + self.llm_combo.addItems(LLM_PROVIDERS.keys()) + idx = list(LLM_PROVIDERS.keys()).index(DEFAULT_LLM) + self.llm_combo.setCurrentIndex(idx) + top_bar.addWidget(self.llm_combo) + + top_bar.addSpacing(20) + + # 文生图模型选择 + top_bar.addWidget(QLabel("文生图模型:")) + self.model_combo = QComboBox() + self.model_combo.addItems(IMAGE_MODELS.keys()) + idx = list(IMAGE_MODELS.keys()).index(DEFAULT_IMAGE_MODEL) + self.model_combo.setCurrentIndex(idx) + top_bar.addWidget(self.model_combo) + + top_bar.addSpacing(20) + + # 字幕开关 + self.subtitle_cb = QCheckBox("添加字幕") + self.subtitle_cb.setChecked(True) + top_bar.addWidget(self.subtitle_cb) + + top_bar.addStretch() + main_layout.addLayout(top_bar) + + # --- 步骤按钮栏 --- + steps_bar = QHBoxLayout() + steps_bar.setSpacing(12) + + self.btn_plan = QPushButton("1. 划分场景") + self.btn_plan.clicked.connect(self.run_scene_plan) + self.btn_plan.setEnabled(False) + steps_bar.addWidget(self.btn_plan) + + self.btn_generate = QPushButton("2. 生成场景图") + self.btn_generate.clicked.connect(self.start_generation) + self.btn_generate.setEnabled(False) + steps_bar.addWidget(self.btn_generate) + + self.btn_asr = QPushButton("3. ASR 对齐") + self.btn_asr.clicked.connect(self.run_asr) + self.btn_asr.setEnabled(False) + steps_bar.addWidget(self.btn_asr) + + self.btn_video = QPushButton("4. 合成视频") + self.btn_video.clicked.connect(self.make_video) + self.btn_video.setEnabled(False) + steps_bar.addWidget(self.btn_video) + + steps_bar.addStretch() + + # 进度条 + self.progress = QProgressBar() + self.progress.setValue(0) + self.progress.setMaximumWidth(250) + steps_bar.addWidget(self.progress) + + main_layout.addLayout(steps_bar) + + # --- 主体分割(左:列表+日志 / 右:预览) --- + splitter = QSplitter(Qt.Orientation.Horizontal) + + # ---- 左侧面板 ---- + left_panel = QWidget() + left_layout = QVBoxLayout(left_panel) + left_layout.setContentsMargins(0, 0, 0, 0) + left_layout.setSpacing(6) + + # 场景列表 + scene_group = QGroupBox("场景列表") + sg_layout = QVBoxLayout(scene_group) + + # 增删按钮 + scene_btn_bar = QHBoxLayout() + self.btn_add_scene = QPushButton("+ 添加场景") + self.btn_add_scene.setObjectName("btnSmall") + self.btn_add_scene.clicked.connect(self.add_scene) + self.btn_add_scene.setEnabled(False) + scene_btn_bar.addWidget(self.btn_add_scene) + + self.btn_del_scene = QPushButton("- 删除场景") + self.btn_del_scene.setObjectName("btnSmall") + self.btn_del_scene.clicked.connect(self.delete_scene) + self.btn_del_scene.setEnabled(False) + scene_btn_bar.addWidget(self.btn_del_scene) + + scene_btn_bar.addStretch() + sg_layout.addLayout(scene_btn_bar) + + self.scene_list = QListWidget() + self.scene_list.currentRowChanged.connect(self.on_scene_list_clicked) + sg_layout.addWidget(self.scene_list) + self.scene_count_label = QLabel("共 0 个场景") + self.scene_count_label.setObjectName("sectionLabel") + sg_layout.addWidget(self.scene_count_label) + left_layout.addWidget(scene_group) + + # 日志 + log_group = QGroupBox("日志") + lg_layout = QVBoxLayout(log_group) + self.log_text = QTextEdit() + self.log_text.setReadOnly(True) + self.log_text.setMaximumHeight(200) + lg_layout.addWidget(self.log_text) + left_layout.addWidget(log_group) + + splitter.addWidget(left_panel) + + # ---- 右侧面板(预览 + 审查) ---- + right_panel = QWidget() + right_layout = QVBoxLayout(right_panel) + right_layout.setContentsMargins(0, 0, 0, 0) + right_layout.setSpacing(6) + + # 场景信息 + self.scene_info_label = QLabel("场景: - / -") + self.scene_info_label.setObjectName("sceneInfo") + right_layout.addWidget(self.scene_info_label) + + # 图片预览 + self.image_label = QLabel("请先选择工作区并划分场景") + self.image_label.setAlignment(Qt.AlignmentFlag.AlignCenter) + self.image_label.setMinimumSize(640, 400) + self.image_label.setStyleSheet(""" + background-color: #181825; + border: 2px solid #45475a; + border-radius: 8px; + font-size: 14px; + color: #6c7086; + """) + self.image_label.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding) + right_layout.addWidget(self.image_label, stretch=1) + + # Prompt 显示(可编辑) + prompt_group = QGroupBox("Visual Prompt") + pg_layout = QVBoxLayout(prompt_group) + self.prompt_text = QTextEdit() + self.prompt_text.setMaximumHeight(80) + self.prompt_text.textChanged.connect(self.on_prompt_changed) + pg_layout.addWidget(self.prompt_text) + right_layout.addWidget(prompt_group) + + # 原文片段(可编辑) + text_group = QGroupBox("原文片段 (Text)") + tg_layout = QVBoxLayout(text_group) + self.text_edit = QTextEdit() + self.text_edit.setMaximumHeight(60) + self.text_edit.textChanged.connect(self.on_scene_field_changed) + tg_layout.addWidget(self.text_edit) + right_layout.addWidget(text_group) + + # 场景描述(可编辑) + bg_group = QGroupBox("场景描述 (Background)") + bg_layout = QVBoxLayout(bg_group) + self.bg_edit = QTextEdit() + self.bg_edit.setMaximumHeight(60) + self.bg_edit.textChanged.connect(self.on_scene_field_changed) + bg_layout.addWidget(self.bg_edit) + right_layout.addWidget(bg_group) + + # 审查控制按钮 + review_bar = QHBoxLayout() + review_bar.addStretch() + + self.btn_confirm = QPushButton("确认") + self.btn_confirm.setObjectName("btnConfirm") + self.btn_confirm.setEnabled(False) + review_bar.addWidget(self.btn_confirm) + + self.btn_regen = QPushButton("重新生成") + self.btn_regen.setObjectName("btnRegenerate") + self.btn_regen.setEnabled(False) + review_bar.addWidget(self.btn_regen) + + self.btn_skip = QPushButton("跳过") + self.btn_skip.setObjectName("btnSkip") + self.btn_skip.setEnabled(False) + review_bar.addWidget(self.btn_skip) + + review_bar.addStretch() + right_layout.addLayout(review_bar) + + splitter.addWidget(right_panel) + + # 设置分割比例 + splitter.setSizes([300, 900]) + splitter.setStretchFactor(0, 0) + splitter.setStretchFactor(1, 1) + + main_layout.addWidget(splitter, stretch=1) + + # 初始日志 + self.log("视频制作流水线 v1.0 已启动") + self.log("请先选择一个工作区文件夹(包含 article.txt)") + + # ============================================================ + # 工具方法 + # ============================================================ + def log(self, msg: str): + ts = datetime.now().strftime("%H:%M:%S") + self.log_text.append(f"[{ts}] {msg}") + # 自动滚动到底部 + sb = self.log_text.verticalScrollBar() + sb.setValue(sb.maximum()) + + def set_buttons_enabled(self, **kwargs): + """批量设置按钮启用状态""" + if 'plan' in kwargs: + self.btn_plan.setEnabled(kwargs['plan']) + if 'generate' in kwargs: + self.btn_generate.setEnabled(kwargs['generate']) + if 'asr' in kwargs: + self.btn_asr.setEnabled(kwargs['asr']) + if 'video' in kwargs: + self.btn_video.setEnabled(kwargs['video']) + + def set_review_buttons(self, enabled: bool): + self.btn_confirm.setEnabled(enabled) + self.btn_regen.setEnabled(enabled) + self.btn_skip.setEnabled(enabled) + + def display_image(self, image_path: str): + """在预览区域显示图片""" + if not os.path.exists(image_path): + self.image_label.setText("图片不存在") + return + pixmap = QPixmap(image_path) + # 缩放以适应区域,保持比例 + scaled = pixmap.scaled( + self.image_label.size(), + Qt.AspectRatioMode.KeepAspectRatio, + Qt.TransformationMode.SmoothTransformation + ) + self.image_label.setPixmap(scaled) + + def resizeEvent(self, event): + """窗口大小变化时重新缩放图片""" + super().resizeEvent(event) + if hasattr(self, '_current_img_path') and self._current_img_path: + self.display_image(self._current_img_path) + self._current_img_path = getattr(self, '_current_img_path', None) + + def closeEvent(self, event): + """关闭窗口时等待后台线程结束""" + if self.gen_worker is not None and self.gen_worker.isRunning(): + self.gen_worker.set_user_action("skip") # 唤醒线程使其退出 + self.gen_worker.quit() + self.gen_worker.wait(3000) + if self._worker is not None and self._worker.isRunning(): + self._worker.quit() + self._worker.wait(2000) + event.accept() + + def update_scene_list(self): + """刷新场景列表""" + self.scene_list.clear() + for i, scene in enumerate(self.scenes): + status = scene.get("status", "pending") + scene_id = scene.get("scene_id", i + 1) + prompt = scene.get("visual_prompt", "")[:40] + + # 状态图标 + icon_map = { + "pending": "[ ]", + "generated": "[OK]", + "failed": "[X]", + "skipped": "[>>]", + } + icon = icon_map.get(status, "[ ]") + + # 时间信息 + time_str = "" + if "start_time" in scene and "end_time" in scene: + dur = scene["end_time"] - scene["start_time"] + time_str = f" | {dur:.1f}s" + + text = f"{icon} #{int(scene_id):2d}{time_str} | {prompt}..." + item = QListWidgetItem(text) + self.scene_list.addItem(item) + + # 颜色标记 + if status == "generated": + item.setForeground(QColor("#a6e3a1")) + elif status == "failed": + item.setForeground(QColor("#f38ba8")) + elif status == "skipped": + item.setForeground(QColor("#f9e2af")) + + self.scene_count_label.setText(f"共 {len(self.scenes)} 个场景") + + def show_scene(self, idx: int): + """显示指定场景的图片和信息""" + if idx < 0 or idx >= len(self.scenes): + return + + self.current_scene_idx = idx + scene = self.scenes[idx] + scene_id = scene.get("scene_id", idx + 1) + status = scene.get("status", "pending") + + self.scene_info_label.setText( + f"场景: {scene_id} / {len(self.scenes)} | 状态: {status}" + + (f" | {scene['end_time'] - scene['start_time']:.1f}s" + if "start_time" in scene and "end_time" in scene else "") + ) + self.prompt_text.setPlainText(scene.get("visual_prompt", "")) + self.text_edit.setPlainText(scene.get("text", "")) + self.bg_edit.setPlainText(scene.get("background", "")) + + # 尝试显示图片:优先读 filepath(生成时写入的精确路径),fallback 到规则构造 + img_path = scene.get("filepath") + if not img_path or not os.path.exists(img_path): + scene_dir = os.path.join(self.workspace, "scene") + img_path = os.path.join(scene_dir, f"scene_{scene_id:03d}.png") + if img_path and os.path.exists(img_path): + self._current_img_path = img_path + self.display_image(img_path) + else: + self._current_img_path = None + self.image_label.setText(f"场景 {scene_id} 图片未生成") + + # 高亮列表 + self.scene_list.setCurrentRow(idx) + + # ============================================================ + # 工作区选择 + # ============================================================ + def select_workspace(self): + path = QFileDialog.getExistingDirectory(self, "选择工作区目录(包含 article.txt)") + if not path: + return + + self.workspace = path + self.workspace_label.setText(path) + self.log(f"工作区: {path}") + + # 禁用按钮,防止用户在加载期间操作 + self.btn_select_ws.setEnabled(False) + self.btn_plan.setEnabled(False) + + # 异步加载工作区文件,避免主线程 I/O 阻塞 + def load_workspace(): + result = {"path": path, "has_article": False, "has_audio": False, "scene_plan": None} + article_path = os.path.join(path, "article.txt") + if os.path.exists(article_path): + with open(article_path, 'r', encoding='utf-8') as f: + article = f.read().strip() + result["has_article"] = True + result["article_len"] = len(article) + audio_path = os.path.join(path, "voice.mp3") + result["has_audio"] = os.path.exists(audio_path) + plan_path = os.path.join(path, "scene_plan.json") + if os.path.exists(plan_path): + with open(plan_path, 'r', encoding='utf-8') as f: + result["scene_plan"] = json.load(f) + return result + + self._worker = Worker(load_workspace) + self._worker.finished_signal.connect(self._on_workspace_loaded) + self._worker.error_signal.connect(self._on_workspace_load_error) + self._worker.start() + + def _on_workspace_loaded(self, result): + """异步加载工作区完成""" + path = result["path"] + self.btn_select_ws.setEnabled(True) + + if not result["has_article"]: + QMessageBox.warning(self, "文件缺失", f"未找到 article.txt") + return + + self.log(f"找到 article.txt({result['article_len']} 字符)") + self.btn_plan.setEnabled(True) + + if result["has_audio"]: + self.log("找到 voice.mp3") + else: + self.log("未找到 voice.mp3,ASR 和视频合成将不可用") + + scene_plan = result.get("scene_plan") + if scene_plan: + self.scene_plan = scene_plan + self.scenes = self.scene_plan.get("scenes", []) + self.update_scene_list() + self.log(f"找到已有场景计划({len(self.scenes)} 个场景)") + self.btn_generate.setEnabled(True) + self.btn_add_scene.setEnabled(True) + self.btn_del_scene.setEnabled(True) + if result["has_audio"]: + self.btn_asr.setEnabled(True) + self.btn_video.setEnabled(True) + if self.scenes: + self.show_scene(0) + + def _on_workspace_load_error(self, error_msg): + self.btn_select_ws.setEnabled(True) + self.log(f"[X] 工作区加载失败: {error_msg}") + + # ============================================================ + # 场景列表点击 + # ============================================================ + def on_scene_list_clicked(self, row: int): + if 0 <= row < len(self.scenes): + self.show_scene(row) + + # ============================================================ + # 步骤 1:划分场景 + # ============================================================ + def run_scene_plan(self): + if not self.workspace: + QMessageBox.warning(self, "错误", "请先选择工作区") + return + + # 防止重复触发 + if self._worker is not None and self._worker.isRunning(): + self.log("[!] 上一个任务仍在运行,请稍候...") + return + + # 弹出备注对话框 + dialog = NoteDialog( + parent=self, + title="划分场景 - 添加备注", + message="即将开始分析 article.txt 并划分场景" + ) + + if dialog.exec() != QDialog.DialogCode.Accepted: + # 用户取消了 + return + + user_note = dialog.get_note() + + self.btn_plan.setEnabled(False) + self.log("=" * 40) + self.log("开始划分场景...") + if user_note: + self.log(f"[备注] {user_note}") + + # 捕获当前 provider 和 workspace,避免闭包延迟求值 + provider = self.llm_combo.currentText() + workspace = self.workspace + + def task(): + import scene_plan as _sp + return _sp.main(workspace=workspace, provider=provider, user_note=user_note) + + self._worker = Worker(task) + self._worker.log_signal.connect(self.log) + self._worker.finished_signal.connect(self.on_plan_done) + self._worker.error_signal.connect(self.on_plan_error) + self._worker.start() + + def on_plan_done(self, result): + # 读取 scene_plan.json + plan_path = os.path.join(self.workspace, "scene_plan.json") + if os.path.exists(plan_path): + with open(plan_path, 'r', encoding='utf-8') as f: + self.scene_plan = json.load(f) + self.scenes = self.scene_plan.get("scenes", []) + self.update_scene_list() + self.log(f"场景划分完成,共 {len(self.scenes)} 个场景") + + self.btn_generate.setEnabled(True) + self.btn_add_scene.setEnabled(True) + self.btn_del_scene.setEnabled(True) + # 检查音频 + if os.path.exists(os.path.join(self.workspace, "voice.mp3")): + self.btn_asr.setEnabled(True) + self.btn_video.setEnabled(True) + + if self.scenes: + self.show_scene(0) + self.btn_plan.setEnabled(True) + + def on_plan_error(self, error_msg): + self.log(f"[X] 场景划分失败: {error_msg}") + QMessageBox.critical(self, "错误", f"场景划分失败:\n{error_msg}") + self.btn_plan.setEnabled(True) + + # ============================================================ + # 步骤 2:生成场景图(逐张审查,QThread + 信号) + # ============================================================ + def start_generation(self): + if not self.scenes: + QMessageBox.error(self, "错误", "请先划分场景") + return + + # 检查是否有待生成的场景 + has_pending = any(s.get("status") in ("pending", "failed") for s in self.scenes) + if not has_pending: + QMessageBox.information(self, "完成", "所有场景已生成或跳过!") + return + + self.btn_generate.setEnabled(False) + self.btn_plan.setEnabled(False) + self.set_review_buttons(False) + self.log("=" * 40) + self.log("开始生成场景图(逐张审查模式)...") + + model_name = self.model_combo.currentText() + + # 创建并启动 GenerationWorker + self.gen_worker = GenerationWorker(self.scenes, self.workspace, model_name) + + # 连接信号 → 全部在主线程执行 + self.gen_worker.log_signal.connect(self.log) + self.gen_worker.progress_signal.connect( + lambda cur, total: self.progress.setValue(int((cur / total) * 100) if total > 0 else 0) + ) + self.gen_worker.show_scene_signal.connect(self.show_scene) + self.gen_worker.display_image_signal.connect(self._on_display_image) + self.gen_worker.update_list_item_signal.connect(self._on_update_list_item) + self.gen_worker.review_ready_signal.connect(self._on_review_ready) + self.gen_worker.generation_done_signal.connect(self._on_generation_done) + self.gen_worker.error_on_scene_signal.connect(self._on_scene_error) + + # 设置审查按钮的槽函数(使用实例变量避免信号连接泄漏) + worker_ref = self.gen_worker + self._confirm_slot = lambda: worker_ref.set_user_action("confirm") + self._regen_slot = lambda: worker_ref.set_user_action("regenerate") + self._skip_slot = lambda: worker_ref.set_user_action("skip") + + self.btn_confirm.clicked.connect(self._confirm_slot) + self.btn_regen.clicked.connect(self._regen_slot) + self.btn_skip.clicked.connect(self._skip_slot) + + self.gen_worker.start() + + def _on_display_image(self, img_path: str): + """主线程:显示图片""" + self._current_img_path = img_path + self.display_image(img_path) + + def _on_update_list_item(self, index: int, text: str, color: str): + """主线程:更新列表项""" + item = self.scene_list.item(index) + if item: + item.setText(text) + item.setForeground(QColor(color)) + + def _on_review_ready(self, scene_idx: int): + """主线程:图片已生成,启用审查按钮""" + self.set_review_buttons(True) + + def _on_scene_error(self, scene_idx: int, error_msg: str): + """主线程:某场景生成失败""" + # 失败的不需要审查,继续下一个 + pass + + def _on_generation_done(self, done_count: int, total: int): + """主线程:生成全部完成""" + self.progress.setValue(100) + self.set_review_buttons(False) + self.btn_generate.setEnabled(True) + self.btn_plan.setEnabled(True) + + # 刷新场景列表 + self.update_scene_list() + + # 同步 scenes 回 scene_plan + if self.scene_plan: + self.scene_plan["scenes"] = self.scenes + + self.log(f"[*] 生成流程结束,已完成 {done_count}/{total}") + QMessageBox.information(self, "完成", f"场景图生成完成\n已生成: {done_count}/{total}") + + # 断开生成模式的槽函数,恢复默认行为 + self.btn_confirm.clicked.disconnect(self._confirm_slot) + self.btn_regen.clicked.disconnect(self._regen_slot) + self.btn_skip.clicked.disconnect(self._skip_slot) + + # 重置槽函数引用 + self._confirm_slot = lambda: None + self._regen_slot = lambda: None + self._skip_slot = lambda: None + + self.gen_worker = None + + def _renumber_scene_ids(self): + """重新编号所有场景的 scene_id""" + for i, scene in enumerate(self.scenes): + scene["scene_id"] = i + 1 + + def on_prompt_changed(self): + """visual_prompt 编辑后延迟写盘(debounce 500ms)""" + if self.current_scene_idx < 0 or self.current_scene_idx >= len(self.scenes): + return + self.scenes[self.current_scene_idx]["visual_prompt"] = self.prompt_text.toPlainText() + # 刷新列表中对应的条目 + self._refresh_list_item(self.current_scene_idx) + self._save_timer.start() # 重置并重新计时 + + def on_scene_field_changed(self): + """text / background 编辑后延迟写盘(debounce 500ms)""" + if self.current_scene_idx < 0 or self.current_scene_idx >= len(self.scenes): + return + self.scenes[self.current_scene_idx]["text"] = self.text_edit.toPlainText() + self.scenes[self.current_scene_idx]["background"] = self.bg_edit.toPlainText() + self._save_timer.start() # 重置并重新计时 + + def add_scene(self): + """在当前选中场景后面插入一个新场景""" + if not self.scenes: + idx = 0 + else: + idx = self.current_scene_idx + 1 if self.current_scene_idx >= 0 else len(self.scenes) + + new_scene = { + "scene_id": 0, # 临时值,稍后 renumber + "text": "", + "background": "", + "visual_prompt": "", + "status": "pending" + } + self.scenes.insert(idx, new_scene) + self._renumber_scene_ids() + self.scene_plan["scenes"] = self.scenes + self._save_plan() + self.update_scene_list() + self.show_scene(idx) + self.log(f"[+] 已在位置 {idx + 1} 添加新场景") + + def delete_scene(self): + """删除当前选中的场景""" + if self.current_scene_idx < 0 or self.current_scene_idx >= len(self.scenes): + return + idx = self.current_scene_idx + del self.scenes[idx] + self._renumber_scene_ids() + self.scene_plan["scenes"] = self.scenes + self._save_plan() + self.update_scene_list() + # 显示相邻场景 + show_idx = min(idx, len(self.scenes) - 1) + if show_idx >= 0: + self.show_scene(show_idx) + self.log(f"[-] 已删除场景(剩余 {len(self.scenes)} 个)") + + def _refresh_list_item(self, idx: int): + """刷新场景列表中指定条目的文字""" + if idx < 0 or idx >= len(self.scenes): + return + scene = self.scenes[idx] + status = scene.get("status", "pending") + scene_id = scene.get("scene_id", idx + 1) + prompt = scene.get("visual_prompt", "")[:40] + icon_map = { + "pending": "[ ]", + "generated": "[OK]", + "failed": "[X]", + "skipped": "[>>]", + } + icon = icon_map.get(status, "[ ]") + time_str = "" + if "start_time" in scene and "end_time" in scene: + dur = scene["end_time"] - scene["start_time"] + time_str = f" | {dur:.1f}s" + text = f"{icon} #{int(scene_id):2d}{time_str} | {prompt}..." + item = self.scene_list.item(idx) + if item: + item.setText(text) + + def _save_plan(self): + """保存 scene_plan.json""" + if self.scene_plan and self.workspace: + plan_path = os.path.join(self.workspace, "scene_plan.json") + with open(plan_path, 'w', encoding='utf-8') as f: + json.dump(self.scene_plan, f, ensure_ascii=False, indent=2) + + # ============================================================ + # 步骤 3:ASR 对齐 + # ============================================================ + def run_asr(self): + if not self.workspace: + return + + if self._worker is not None and self._worker.isRunning(): + self.log("[!] 上一个任务仍在运行,请稍候...") + return + + audio_path = os.path.join(self.workspace, "voice.mp3") + if not os.path.exists(audio_path): + QMessageBox.warning(self, "文件缺失", "未找到 voice.mp3") + return + + self.btn_asr.setEnabled(False) + self.log("=" * 40) + self.log("开始 ASR 对齐(模型加载可能需要较长时间)...") + + def task(): + import asr + asr.run_asr(self.workspace) + return asr.match_scenes_to_audio(self.workspace) + + self._worker = Worker(task) + self._worker.log_signal.connect(self.log) + self._worker.finished_signal.connect(self.on_asr_done) + self._worker.error_signal.connect(self.on_asr_error) + self._worker.start() + + def on_asr_done(self, result): + # 刷新 scene_plan(已包含时间信息) + plan_path = os.path.join(self.workspace, "scene_plan.json") + if os.path.exists(plan_path): + with open(plan_path, 'r', encoding='utf-8') as f: + self.scene_plan = json.load(f) + self.scenes = self.scene_plan.get("scenes", []) + self.update_scene_list() + + scene_count = len(self.scenes) + matched = sum(1 for s in self.scenes if "start_time" in s) + self.log(f"[OK] ASR + 场景匹配完成,{matched}/{scene_count} 个场景已分配时间") + QMessageBox.information(self, "完成", + f"ASR 对齐完成,场景匹配成功\n{matched}/{scene_count} 个场景已分配时间") + self.btn_asr.setEnabled(True) + + def on_asr_error(self, error_msg): + self.log(f"[X] ASR 失败: {error_msg}") + QMessageBox.critical(self, "错误", f"ASR 对齐失败:\n{error_msg}") + self.btn_asr.setEnabled(True) + + # ============================================================ + # 步骤 4:合成视频 + # ============================================================ + def make_video(self): + if not self.workspace: + return + + if self._worker is not None and self._worker.isRunning(): + self.log("[!] 上一个任务仍在运行,请稍候...") + return + + self.btn_video.setEnabled(False) + self.log("=" * 40) + self.log("开始合成视频...") + + self._worker = Worker(self._make_video_task) + self._worker.log_signal.connect(self.log) + self._worker.finished_signal.connect(self.on_video_done) + self._worker.error_signal.connect(self.on_video_error) + self._worker.start() + + def _make_video_task(self): + """视频合成任务 - 通过 stdout 重定向捕获 tqdm 进度到日志""" + import sys + import io + + worker = self._worker + redirector = _StreamRedirector(lambda msg: worker.log_signal.emit(msg)) + + import make_video + original_stdout = sys.stdout + original_stderr = sys.stderr + sys.stdout = redirector + sys.stderr = redirector + + try: + video_path = make_video.main( + workspace=self.workspace, + fps=DEFAULT_FPS, + size=DEFAULT_VIDEO_SIZE, + subtitle=self.subtitle_cb.isChecked(), + ) + return video_path + finally: + sys.stdout = original_stdout + sys.stderr = original_stderr + + def on_video_done(self, result): + self.log(f"[OK] 视频合成完成: {result}") + QMessageBox.information(self, "完成", f"视频已生成:\n{result}") + self.btn_video.setEnabled(True) + + def on_video_error(self, error_msg): + self.log(f"[X] 视频合成失败: {error_msg}") + QMessageBox.critical(self, "错误", f"视频合成失败:\n{error_msg}") + self.btn_video.setEnabled(True) + + +# ============================================================ +# 入口 +# ============================================================ +def main(): + app = QApplication(sys.argv) + app.setStyleSheet(DARK_STYLE) + window = VideoPipelineGUI() + window.show() + sys.exit(app.exec()) + + +if __name__ == "__main__": + main() diff --git a/image_gen.py b/image_gen.py new file mode 100644 index 0000000..5acf5ba --- /dev/null +++ b/image_gen.py @@ -0,0 +1,215 @@ +""" +image_gen.py - 统一文生图接口 +支持两个模型: + - Kolors(便宜快速)→ SiliconFlow API(同步) + - Qwen-Image(高质量)→ ModelScope API(异步轮询) +""" + +import requests +import os +import time +from datetime import datetime +from config import ( + SILICONFLOW_API_KEY, + SILICONFLOW_API_BASE, + MODELSCOPE_API_KEY, + MODELSCOPE_API_BASE, + MODELSCOPE_POLL_INTERVAL, + MODELSCOPE_MAX_WAIT, + IMAGE_MODELS, + NEGATIVE_PROMPT, +) + + +def _generate_siliconflow(prompt, model_id, size, guidance, neg, save_dir, filename): + """SiliconFlow 同步 API(Kolors)""" + payload = { + "model": model_id, + "prompt": prompt, + "image_size": size, + "n": 1, + "num_inference_steps": 20, + "guidance_scale": guidance, + "negative_prompt": neg, + } + + headers = { + "Authorization": f"Bearer {SILICONFLOW_API_KEY}", + "Content-Type": "application/json", + } + + print(f" [SiliconFlow] 提交: {prompt[:60]}{'...' if len(prompt) > 60 else ''}") + + for attempt in range(6): # 最多重试 5 次 + resp = requests.post(SILICONFLOW_API_BASE, headers=headers, json=payload, timeout=120) + print(f" HTTP {resp.status_code}: {resp.text[:300]}") + + if resp.status_code == 429: + wait = 15 * (attempt + 1) # 15s, 30s, 45s, 60s, 75s + print(f" [!] 限频,等待 {wait}s 后重试 ({attempt+1}/5)...") + time.sleep(wait) + continue + + if resp.status_code != 200: + raise Exception(f"SiliconFlow 生成失败 ({resp.status_code}): {resp.text[:300]}") + break + else: + raise Exception("SiliconFlow 持续限频,已重试 5 次,请稍后再试或切换模型") + + result = resp.json() + images = result.get("images", []) + if not images: + raise Exception(f"SiliconFlow 返回无图片: {result}") + + img_url = images[0].get("url") + if not img_url: + raise Exception(f"返回图片 URL 为空: {result}") + + img_data = requests.get(img_url, timeout=60).content + + if filename is None: + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"kolors_{ts}.png" + filepath = os.path.join(save_dir, filename) + with open(filepath, "wb") as f: + f.write(img_data) + + print(f" [OK] {filename}") + return {"url": img_url, "filepath": filepath} + + +def _generate_modelscope(prompt, model_id, size, guidance, neg, save_dir, filename): + """ModelScope 异步轮询 API(Qwen-Image)""" + submit_headers = { + "Authorization": f"Bearer {MODELSCOPE_API_KEY}", + "Content-Type": "application/json", + "X-ModelScope-Async-Mode": "true" + } + payload = { + "model": model_id, + "prompt": prompt, + "n": 1, + "size": size, + "guidance_scale": guidance, + "negative_prompt": neg, + } + + print(f" [ModelScope] 提交: {prompt[:60]}{'...' if len(prompt) > 60 else ''}") + resp = requests.post(MODELSCOPE_API_BASE, headers=submit_headers, json=payload, timeout=60) + if resp.status_code != 200: + raise Exception(f"ModelScope 提交失败 ({resp.status_code}): {resp.text[:300]}") + + result = resp.json() + task_id = result.get("task_id") + if not task_id: + raise Exception(f"未找到 task_id: {result}") + print(f" task_id: {task_id}") + + # 轮询结果 + query_headers = { + "Authorization": f"Bearer {MODELSCOPE_API_KEY}", + "X-ModelScope-Task-Type": "image_generation" + } + status_url = f"https://api-inference.modelscope.cn/v1/tasks/{task_id}" + start = time.time() + + for attempt in range(100): + if attempt > 0: + time.sleep(MODELSCOPE_POLL_INTERVAL) + elapsed = int(time.time() - start) + if elapsed > MODELSCOPE_MAX_WAIT: + raise Exception(f"ModelScope 超时({MODELSCOPE_MAX_WAIT}s)") + + qresp = requests.get(status_url, headers=query_headers, timeout=30) + if qresp.status_code != 200: + continue + + qresult = qresp.json() + task_status = qresult.get("task_status", "") + if attempt % 5 == 0 or task_status in ("SUCCEED", "FAILED"): + print(f" [{elapsed}s] {task_status}") + + if task_status == "SUCCEED": + output_images = (qresult.get("output_images") + or qresult.get("outputs", {}).get("output_images") + or []) + if not output_images: + raise Exception(f"SUCCEED 但无图片: {qresult}") + url = output_images[0] + img_data = requests.get(url, timeout=180).content + + if filename is None: + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"qwen_{ts}.png" + filepath = os.path.join(save_dir, filename) + with open(filepath, "wb") as f: + f.write(img_data) + + print(f" [OK] {filename} ({elapsed}s)") + return {"url": url, "filepath": filepath} + + elif task_status == "FAILED": + raise Exception(f"ModelScope 任务失败: {qresult.get('errors', qresult)}") + + raise Exception(f"ModelScope 超时({MODELSCOPE_MAX_WAIT}s)") + + +def image_generate( + prompt: str, + save_dir: str = "./generated_images", + model_name: str = None, + n: int = 1, + seed: int = None, + num_inference_steps: int = 20, + guidance_scale: float = None, + negative_prompt: str = None, + filename: str = None, + image_size: str = None, +) -> dict: + """ + 统一文生图接口 + + Args: + prompt: 生成提示词 + save_dir: 保存目录 + model_name: 模型名称(IMAGE_MODELS 的 key),默认用 config 中的 DEFAULT_IMAGE_MODEL + image_size: 图片尺寸,默认 1280x720(16:9) + + Returns: + dict: {"url": str, "filepath": str} + """ + from config import DEFAULT_IMAGE_MODEL + + if model_name is None: + model_name = DEFAULT_IMAGE_MODEL + + model_config = IMAGE_MODELS.get(model_name) + if not model_config: + raise ValueError(f"未知模型: {model_name},可选: {list(IMAGE_MODELS.keys())}") + + model_id = model_config["model"] + size = image_size or model_config["default_size"] + guidance = guidance_scale if guidance_scale is not None else model_config["guidance_scale"] + neg = negative_prompt or NEGATIVE_PROMPT + + os.makedirs(save_dir, exist_ok=True) + + provider = model_config["provider"] + if provider == "siliconflow": + return _generate_siliconflow(prompt, model_id, size, guidance, neg, save_dir, filename) + elif provider == "modelscope": + return _generate_modelscope(prompt, model_id, size, guidance, neg, save_dir, filename) + else: + raise ValueError(f"未知 provider: {provider}") + + +def get_available_models() -> list[str]: + """返回可用的文生图模型名称列表""" + return list(IMAGE_MODELS.keys()) + + +if __name__ == "__main__": + for name in get_available_models(): + print(f"\n测试模型: {name}") + result = image_generate("A cute cat sitting on a desk, 16:9 aspect ratio", model_name=name) + print(f" 路径: {result['filepath']}") diff --git a/make_video.py b/make_video.py new file mode 100644 index 0000000..4e05057 --- /dev/null +++ b/make_video.py @@ -0,0 +1,745 @@ +""" +make_video.py - 场景图 + ASR 时间戳 + 音频 → 视频 + +流程: +1. 读取 voice.mp3 获取总时长 +2. 读取 scene_plan.json 获取场景列表 +3. 如果有 result.json(ASR 对齐),按对齐结果分配时间;否则平均分配 +4. 用 moviepy 将场景图在对应时段显示,配上音频生成视频 +""" + +import os +import json +import math +import re +import numpy as np +from PIL import Image, ImageDraw, ImageFont +from moviepy import ( + AudioFileClip, + ImageClip, + CompositeVideoClip, + ColorClip, + concatenate_videoclips +) + + +def load_scene_plan(workspace: str) -> dict: + """读取 scene_plan.json""" + path = os.path.join(workspace, 'scene_plan.json') + with open(path, 'r', encoding='utf-8') as f: + return json.load(f) + + +def load_asr_result(workspace: str) -> dict: + """读取 result.json(ASR 对齐结果),如果不存在返回 None""" + path = os.path.join(workspace, "result.json") + if os.path.exists(path): + with open(path, 'r', encoding='utf-8') as f: + return json.load(f) + return None + + +def build_scene_timeline(scene_plan: dict, asr_result: dict, audio_duration: float, + workspace: str = None) -> list: + """ + 构建场景时间线。优先使用 scene 中已有的 start_time/end_time(ASR 步骤写入), + 没有则根据 ASR 文本匹配分配,再没有则平均分配。 + """ + scenes = scene_plan["scenes"] + num_scenes = len(scenes) + + # --- 策略 1:直接使用 scene 已有的时间戳(ASR 步骤写入的) --- + if all("start_time" in s and "end_time" in s for s in scenes): + timeline = [] + for scene in scenes: + timeline.append({ + "scene_id": scene["scene_id"], + "start": scene["start_time"], + "end": scene["end_time"], + "has_match": True, + }) + # 确保最后一个场景延伸到音频结尾 + if timeline: + timeline[-1]["end"] = audio_duration + return timeline + + # --- 策略 2:部分有时间戳,部分没有 --- + has_timestamp = [s for s in scenes if "start_time" in s and "end_time" in s] + if has_timestamp and len(has_timestamp) < num_scenes: + timeline = [] + matched_end = 0.0 + for scene in scenes: + if "start_time" in scene and "end_time" in scene: + timeline.append({ + "scene_id": scene["scene_id"], + "start": scene["start_time"], + "end": scene["end_time"], + }) + matched_end = max(matched_end, scene["end_time"]) + else: + timeline.append({ + "scene_id": scene["scene_id"], + "start": None, + "end": None, + }) + + # 未匹配的场景按剩余时间平均分配 + unmatched_indices = [i for i, item in enumerate(timeline) if item["start"] is None] + if unmatched_indices: + remaining = audio_duration - matched_end + if remaining > 0: + seg_dur = remaining / len(unmatched_indices) + t = matched_end + for idx in unmatched_indices: + timeline[idx]["start"] = t + timeline[idx]["end"] = t + seg_dur + t += seg_dur + else: + for idx in unmatched_indices: + timeline[idx]["start"] = matched_end + timeline[idx]["end"] = matched_end + 2.0 + + if timeline: + timeline[-1]["end"] = audio_duration + return timeline + + # --- 策略 3:没有任何时间戳,用 ASR 文本匹配或平均分配 --- + if asr_result is not None: + # 准备匹配用的文本 + scenes_for_match = [] + for scene in scenes: + s = dict(scene) + text_val = s.get("text", "") + lines_val = s.get("lines", "") + raw = text_val or lines_val or "" + if isinstance(raw, str) and raw: + s["lines"] = [x.strip() for x in raw.split("。") if x.strip()] + elif isinstance(raw, list): + s["lines"] = raw + else: + s["lines"] = [] + scenes_for_match.append(s) + + asr_segments = asr_result.get("segments", []) + timeline = assign_scenes_to_segments(scenes_for_match, asr_segments, audio_duration) + else: + # 平均分配 + duration_per_scene = audio_duration / num_scenes + timeline = [] + current_time = 0.0 + for scene in scenes: + start = current_time + end = min(current_time + duration_per_scene, audio_duration) + timeline.append({ + "scene_id": scene["scene_id"], + "start": start, + "end": end, + }) + current_time = end + + return timeline + + +def _normalize_text(text: str) -> str: + """清理文本:去除标点、空格、转小写,用于相似度比较""" + if not text: + return "" + # 去除所有非字母数字和非中文字符 + text = re.sub(r'[^\w\u4e00-\u9fff]', '', text) + return text.lower() + + +def _longest_common_substring_ratio(s1: str, s2: str) -> float: + """ + 计算两个字符串的最长公共子串比例 + + 使用动态规划,但为了性能优化: + - 如果字符串太长,使用简化的滑动窗口方法 + """ + if not s1 or not s2: + return 0.0 + + # 对于短字符串,使用精确的 DP 算法 + if len(s1) * len(s2) < 100000: # 乘积小于 10万 + m, n = len(s1), len(s2) + # 优化空间复杂度:只用两行 + prev = [0] * (n + 1) + curr = [0] * (n + 1) + max_len = 0 + + for i in range(1, m + 1): + for j in range(1, n + 1): + if s1[i-1] == s2[j-1]: + curr[j] = prev[j-1] + 1 + max_len = max(max_len, curr[j]) + else: + curr[j] = 0 + prev, curr = curr, [0] * (n + 1) + + return max_len / max(len(s1), len(s2)) + + # 对于长字符串,使用简化的滑动窗口 + return _sliding_window_similarity(s1, s2) + + +def _sliding_window_similarity(s1: str, s2: str, window_size: int = 20) -> float: + """ + 滑动窗口相似度(用于长字符串的快速估算) + + 将 s1 分成多个窗口,在 s2 中查找最佳匹配 + """ + if len(s1) <= window_size: + # 如果 s1 本身很短,直接检查是否在 s2 中 + if s1 in s2: + return 1.0 + # 否则检查每个字符的出现 + common = sum(1 for c in s1 if c in s2) + return common / len(s1) if s1 else 0.0 + + # 分窗口检查 + total_score = 0.0 + num_windows = 0 + + for i in range(0, len(s1) - window_size + 1, window_size // 2): + window = s1[i:i+window_size] + if window in s2: + total_score += 1.0 + else: + # 部分匹配 + common = sum(1 for c in window if c in s2) + total_score += common / window_size + num_windows += 1 + + return total_score / num_windows if num_windows > 0 else 0.0 + + +def assign_scenes_to_segments(scenes: list, asr_segments: list, audio_duration: float) -> list: + """ + 基于文本相似度的精确匹配 + + 策略: + 1. 每个 scene 有 text 字段(原文片段) + 2. 在 ASR segments 中搜索这段文本的出现位置 + 3. 使用最长公共子串相似度容忍 ASR 识别错误 + 4. 阈值 60%,避免误匹配 + """ + SIMILARITY_THRESHOLD = 0.5 # 相似度阈值 + MAX_WINDOW_SIZE = 20 # 最多合并 20 个 ASR segments(长句需要更多) + + # 预处理:清理 ASR segments 的文本 + asr_cleaned = [] + for seg in asr_segments: + text = seg.get('text', '').strip() + if text: + asr_cleaned.append({ + 'text': text, + 'text_normalized': _normalize_text(text), + 'start': seg['start'], + 'end': seg['end'] + }) + + def find_best_match(scene_text: str, start_seg_idx: int): + """ + 找到 scene_text 在 ASR segments 中的最佳匹配位置 + + 使用滑动窗口尝试不同长度的 segment 组合 + """ + if not scene_text or not scene_text.strip(): + return None + + scene_normalized = _normalize_text(scene_text) + + best_score = 0.0 + best_match = None + + # 滑动窗口:尝试不同数量的 segment 组合 + for window_size in range(1, min(MAX_WINDOW_SIZE + 1, len(asr_cleaned) - start_seg_idx + 1)): + for i in range(start_seg_idx, len(asr_cleaned) - window_size + 1): + # 合并多个 segments 的文本 + combined_text = ''.join( + seg['text_normalized'] for seg in asr_cleaned[i:i+window_size] + ) + + # 计算相似度 + similarity = _longest_common_substring_ratio(scene_normalized, combined_text) + + if similarity > best_score: + best_score = similarity + best_match = { + 'start': asr_cleaned[i]['start'], + 'end': asr_cleaned[i + window_size - 1]['end'], + 'start_idx': i, + 'end_idx': i + window_size - 1, + 'similarity': similarity + } + + # 如果已经达到很高相似度,提前返回 + if similarity >= 0.90: + return best_match + + # 只有达到阈值才算匹配成功 + if best_score >= SIMILARITY_THRESHOLD: + return best_match + + return None + + # 主匹配循环 + timeline_raw = [] + last_end_idx = 0 # 记录上一个匹配结束的位置 + + for scene in scenes: + # 优先使用 text 字段,兼容旧 lines 字段 + scene_text = scene.get('text', '') or scene.get('lines', '') + if isinstance(scene_text, list): + scene_text = ''.join(scene_text) + + match = find_best_match(scene_text, last_end_idx) + + if match: + timeline_raw.append({ + 'scene_id': scene['scene_id'], + 'start': match['start'], + 'end': match['end'], + 'has_match': True, + 'similarity': match['similarity'] + }) + # 下一个场景从当前匹配结束后开始 + last_end_idx = match['end_idx'] + 1 + else: + timeline_raw.append({ + 'scene_id': scene['scene_id'], + 'start': None, + 'end': None, + 'has_match': False, + 'similarity': 0.0 + }) + + # 处理未匹配的场景:按剩余音频时间平均分配 + unmatched_indices = [i for i, item in enumerate(timeline_raw) if not item['has_match']] + if unmatched_indices: + # 找到已匹配场景的最大结束时间 + matched_end = 0.0 + for item in timeline_raw: + if item['has_match'] and item['end']: + matched_end = max(matched_end, item['end']) + + remaining_duration = audio_duration - matched_end + if remaining_duration > 0 and len(unmatched_indices) > 0: + seg_dur = remaining_duration / len(unmatched_indices) + current_t = matched_end + + for idx in unmatched_indices: + timeline_raw[idx]['start'] = current_t + timeline_raw[idx]['end'] = current_t + seg_dur + timeline_raw[idx]['has_match'] = True + current_t += seg_dur + else: + # 如果没有剩余时间,给一个默认时长 + for idx in unmatched_indices: + timeline_raw[idx]['start'] = matched_end + timeline_raw[idx]['end'] = matched_end + 2.0 + timeline_raw[idx]['has_match'] = True + + # 确保每个场景至少有 0.5 秒 + for item in timeline_raw: + if item['end'] is None or item['end'] <= item['start']: + item['end'] = item['start'] + 0.5 + + # 消除间隙:确保时间连续 + for i in range(len(timeline_raw) - 1): + curr = timeline_raw[i] + nxt = timeline_raw[i + 1] + gap = nxt['start'] - curr['end'] + if gap > 0.01: # 允许 10ms 的微小间隙 + curr['end'] = nxt['start'] + + # 最后一个场景延伸到音频结尾 + if timeline_raw: + timeline_raw[-1]['end'] = audio_duration + + # 合并极短的场景(< 1秒)到下一个场景 + merged = [] + skip_next = False + for i in range(len(timeline_raw)): + if skip_next: + skip_next = False + continue + + item = dict(timeline_raw[i]) + duration = item['end'] - item['start'] + + # 如果当前场景太短且不是最后一个,合并到下一个 + if duration < 1.0 and i < len(timeline_raw) - 1: + item['end'] = timeline_raw[i + 1]['end'] + merged.append(item) + skip_next = True + else: + merged.append(item) + + return merged + + +def find_scene_image(workspace: str, scene_id: int) -> str: + """查找场景图""" + candidates = [ + os.path.join(workspace, "scene", f"scene_{scene_id:03d}.png"), + os.path.join(workspace, "scene", f"scene_{scene_id}.png"), + ] + for path in candidates: + if os.path.exists(path): + return path + return None + + +def create_video(workspace: str, timeline: list, audio_path: str, output_path: str, + fps: int = 24, img_size: tuple = (1280, 720), + subtitle: bool = True, log_fn=None): + """ + 用 moviepy 创建视频 + + Args: + workspace: 工作区路径 + timeline: 场景时间线 + audio_path: 音频文件路径 + output_path: 输出视频路径 + fps: 帧率 + img_size: 视频尺寸 (w, h) + subtitle: 是否添加字幕 + log_fn: 日志回调函数 + """ + def log(msg): + if log_fn: + log_fn(msg) + else: + print(msg) + + clips = [] + for item in timeline: + scene_id = item["scene_id"] + start = item["start"] + end = item["end"] + duration = end - start + + if duration <= 0: + continue + + img_path = find_scene_image(workspace, scene_id) + if img_path: + img_clip = ImageClip(img_path).with_duration(duration) + img_clip = img_clip.resized(new_size=img_size) + else: + log(f" [警告] scene_{scene_id} 无图片,使用纯色背景") + img_clip = ColorClip(size=img_size, color=(50, 50, 50)).with_duration(duration) + + img_clip = img_clip.with_start(start) + clips.append(img_clip) + + # 字幕 + if subtitle: + asr_path = os.path.join(workspace, "result.json") + if os.path.exists(asr_path): + with open(asr_path, encoding="utf-8") as f: + asr_result = json.load(f) + segments = asr_result.get("segments", []) + log(f"[*] 原始 ASR 片段: {len(segments)} 条") + + # 将 ASR segments 按时间分组(每段 3-5 秒,避免字幕堆满屏幕) + def group_segments_into_sentences(segments: list) -> list: + """ + 将短的 ASR segments 合并成合理的字幕组。 + 支持中文(字级)和英文(词级)ASR 输出。 + 限制条件: + - 每组最多 4 秒时长 + - 每组最多 40 个字符(中文)/ 60 个字符(英文,含空格) + - 遇到句号等结束标点时提前分组 + """ + if not segments: + return [] + + sentence_endings = set('。!?.!?…\n') + MAX_CHARS = 60 # 字符上限(英文含空格时会更长) + MAX_DURATION = 4.0 + + def flush(text, start, end): + if text and start is not None: + grouped.append({"text": text.strip(), "start": start, "end": end}) + + grouped = [] + current_words = [] # 词/字列表 + current_start = None + current_end = None + + for seg in segments: + text = seg.get("text", "").strip() + if not text: + continue + + seg_start = seg.get("start", 0) + seg_end = seg.get("end", 0) + + if current_start is None: + current_start = seg_start + + # 判断加入当前词后是否超限 + # 用空格拼英文,中文直接拼(中文 seg 通常每个词自带空格或无需加) + preview_words = current_words + [text] + # 根据是否含非 ASCII 字符判断是中文还是英文 + is_ascii_word = all(ord(c) < 128 for c in text) + if is_ascii_word: + preview_text = " ".join(preview_words) + else: + preview_text = "".join(preview_words) + + preview_duration = seg_end - current_start + + # 先检查:加入后是否超限 → 如果是,先 flush 旧组,再开新组 + overflow = ( + len(preview_text) > MAX_CHARS or + preview_duration > MAX_DURATION + ) + + if overflow and current_words: + # flush 当前组(不含新词) + if is_ascii_word: + flush(" ".join(current_words), current_start, current_end) + else: + flush("".join(current_words), current_start, current_end) + current_words = [text] + current_start = seg_start + current_end = seg_end + else: + current_words.append(text) + current_end = seg_end + + # 如果当前词是句尾标点,立刻 flush + if text and text[-1] in sentence_endings: + if is_ascii_word: + flush(" ".join(current_words), current_start, current_end) + else: + flush("".join(current_words), current_start, current_end) + current_words = [] + current_start = None + current_end = None + + # flush 剩余 + if current_words: + if all(ord(c) < 128 for c in current_words[0]): + flush(" ".join(current_words), current_start or 0, current_end or 0) + else: + flush("".join(current_words), current_start or 0, current_end or 0) + + return grouped + + # 按句子分组 + sentence_groups = group_segments_into_sentences(segments) + log(f"[*] 添加字幕,共 {len(sentence_groups)} 个句子(从 {len(segments)} 个片段合并)") + + w, h = img_size + font_size = max(int(h * 0.045), 20) + margin_bottom = int(h * 0.10) + max_chars_per_line = max(int(w / (font_size * 0.85)), 15) + + # 字体:优先黑体,fallback arial + font_path = "C:/Windows/Fonts/simhei.ttf" + if not os.path.exists(font_path): + font_path = "C:/Windows/Fonts/arial.ttf" + + try: + pil_font = ImageFont.truetype(font_path, font_size) + except Exception: + pil_font = ImageFont.load_default() + + # 字体度量 + tmp_img = Image.new("RGBA", (1, 1)) + tmp_draw = ImageDraw.Draw(tmp_img) + metrics = tmp_draw.textbbox((0, 0), "gypj8q", font=pil_font) + font_full_height = metrics[3] - metrics[1] + + def wrap_text(text: str, max_chars: int) -> list: + """按字符数拆分多行,优先在标点处断行""" + if len(text) <= max_chars: + return [text] + lines = [] + while len(text) > max_chars: + cut = max_chars + for i in range(max_chars, max(max_chars - 5, 0), -1): + if text[i] in ",。!?、;:\u201c\u201d\u2018\u2019\u2026\u2014,.!?;: ": + cut = i + 1 + break + lines.append(text[:cut]) + text = text[cut:] + if text: + lines.append(text) + return lines + + def render_subtitle_pil(text_lines: list) -> np.ndarray: + """用 PIL 渲染多行字幕,返回 RGBA numpy 数组""" + line_spacing = int(font_size * 0.35) + line_height = font_full_height + line_spacing + + line_widths = [] + for line in text_lines: + bbox = tmp_draw.textbbox((0, 0), line, font=pil_font) + line_widths.append(bbox[2] - bbox[0]) + + img_w = max(line_widths) + 6 + img_h = len(text_lines) * line_height + int(font_size * 0.3) + + img = Image.new("RGBA", (img_w, img_h), (0, 0, 0, 0)) + draw = ImageDraw.Draw(img) + + for idx, line in enumerate(text_lines): + x = (img_w - line_widths[idx]) // 2 + y = idx * line_height + for dx in (-2, -1, 0, 1, 2): + for dy in (-2, -1, 0, 1, 2): + if dx == 0 and dy == 0: + continue + draw.text((x + dx, y + dy), line, fill=(0, 0, 0, 255), font=pil_font) + draw.text((x, y), line, fill=(255, 255, 255, 255), font=pil_font) + + return np.array(img) + + # 按句子显示字幕(而不是逐个片段) + for sentence in sentence_groups: + text = sentence["text"].strip() + if not text: + continue + seg_start = sentence["start"] + seg_end = sentence["end"] + seg_dur = seg_end - seg_start + if seg_dur <= 0: + continue + + lines = wrap_text(text, max_chars_per_line) + rgba = render_subtitle_pil(lines) + + txt_clip = ImageClip(rgba) + txt_clip = txt_clip.with_position(("center", h - margin_bottom - rgba.shape[0])) + txt_clip = txt_clip.with_duration(seg_dur).with_start(seg_start) + clips.append(txt_clip) + else: + log("[提示] 未找到 result.json,跳过字幕") + + if not clips: + raise ValueError("没有可用的场景片段!") + + # 加载音频 + audio = AudioFileClip(audio_path) + + # 合成视频 + video = CompositeVideoClip(clips, size=img_size) + video = video.with_audio(audio) + video = video.with_duration(audio.duration) + + # 导出 + video.write_videofile( + output_path, + fps=fps, + codec='libx264', + audio_codec='aac', + bitrate='8000k', + ) + video.close() + + log(f"[OK] 视频已保存: {output_path}") + + +def main(workspace: str = None, fps: int = 24, size: str = "1280x720", + subtitle: bool = True, log_fn=None): + """ + 主入口 + + Args: + workspace: 工作区路径 + fps: 帧率 + size: 视频尺寸 "WxH" + subtitle: 是否添加字幕 + log_fn: 日志回调 + """ + from config import DEFAULT_VIDEO_SIZE + + if workspace is None: + workspace = os.path.join(os.path.dirname(os.path.abspath(__file__)), "workspace", "1") + w, h = map(int, (size or DEFAULT_VIDEO_SIZE).split("x")) + img_size = (w, h) + + def log(msg): + if log_fn: + log_fn(msg) + else: + print(msg) + + log(f"[视频合成] 工作区: {workspace}") + log(f"[视频合成] 尺寸: {img_size}, FPS: {fps}") + + # 1. 加载 scene_plan + scene_plan = load_scene_plan(workspace) + num_scenes = len(scene_plan["scenes"]) + log(f"[视频合成] 共 {num_scenes} 个场景") + + # 2. 加载音频 + audio_path = os.path.join(workspace, "voice.mp3") + if not os.path.exists(audio_path): + raise FileNotFoundError(f"音频文件不存在: {audio_path}") + + # 使用 try-finally 确保音频资源正确释放 + audio = AudioFileClip(audio_path) + try: + audio_duration = audio.duration + log(f"[视频合成] 音频时长: {audio_duration:.2f} 秒") + finally: + audio.close() + + # 3. 读取 ASR 结果 + asr_result = load_asr_result(workspace) + if asr_result: + seg_count = len(asr_result.get("segments", [])) + log(f"[视频合成] ASR: {seg_count} 个片段") + else: + log("[视频合成] 无 ASR 结果,按平均分配") + + # 4. 构建时间线 + timeline = build_scene_timeline(scene_plan, asr_result, audio_duration, workspace=workspace) + + log("[视频合成] 场景时间分配:") + matched_count = 0 + unmatched_count = 0 + for item in timeline: + dur = item['end'] - item['start'] + + # 显示匹配质量 + similarity = item.get('similarity', 0.0) + if similarity > 0: + quality = f"[{similarity:.0%}]" + matched_count += 1 + elif item.get('has_match', False): + quality = "[OK]" + matched_count += 1 + else: + quality = "[兜底]" + unmatched_count += 1 + + log(f" Scene {item['scene_id']:2d}: {item['start']:6.2f}s - {item['end']:6.2f}s ({dur:.2f}s) {quality}") + + if unmatched_count > 0: + log(f"[警告] {unmatched_count}/{len(timeline)} 个场景未精确匹配,使用兜底分配") + else: + log(f"[OK] 所有场景均成功匹配") + + # 5. 生成视频 + has_image = sum(1 for item in timeline if find_scene_image(workspace, item["scene_id"]) is not None) + log(f"[视频合成] 有图片场景: {has_image}/{len(timeline)}") + + if has_image == 0: + log("[警告] 所有场景都没有图片,视频将使用纯色背景!") + + output_path = os.path.join(workspace, "output_video.mp4") + create_video(workspace, timeline, audio_path, output_path, fps=fps, img_size=img_size, + subtitle=subtitle, log_fn=log_fn) + log(f"[完成] 视频: {output_path}") + return output_path + + +if __name__ == "__main__": + main() diff --git a/qwen_download.py b/qwen_download.py new file mode 100644 index 0000000..75773cb --- /dev/null +++ b/qwen_download.py @@ -0,0 +1,20 @@ + +from modelscope import snapshot_download +import os + +# 1. 定义模型 ID +model_id = 'qwen/Qwen3-ForcedAligner-0.6B' + +# 2. 在当前目录下创建 models 文件夹 +save_dir = os.path.join(os.path.dirname(__file__), 'models') + +# 创建目录(如果不存在) +os.makedirs(save_dir, exist_ok=True) + +print(f"下载中... 保存至: {save_dir}") + +try: + model_dir = snapshot_download(model_id, cache_dir=save_dir) + print(f"下载完成: {model_dir}") +except Exception as e: + print(f"失败: {e}") diff --git a/run.bat b/run.bat new file mode 100644 index 0000000..446fe93 --- /dev/null +++ b/run.bat @@ -0,0 +1,25 @@ +@echo off +chcp 65001 >nul + +cd /d "%~dp0" + +:: 清除缓存 +del /s /q __pycache__\*.pyc 2>nul +for /d %%d in (__pycache__) do rd /s /q "%%d" 2>nul + +echo 正在激活 Videoer 环境并启动 GUI... + +call C:\ProgramData\anaconda3\Scripts\activate.bat Videoer +if errorlevel 1 ( + echo [错误] 无法激活 Videoer 环境 + pause + exit /b 1 +) + +cd /d "%~dp0" +python "%~dp0gui.py" +if errorlevel 1 ( + echo. + echo [错误] 启动失败 + pause +) diff --git a/scene_generate.py b/scene_generate.py new file mode 100644 index 0000000..993b6f8 --- /dev/null +++ b/scene_generate.py @@ -0,0 +1,135 @@ +""" +scene_generate.py - 场景图生成模块(简化版) +读取 scene_plan.json,对每个 pending 场景用文生图直接生成 +不再需要角色元素图、Panel 拼图、img2img + +支持断点续传:已生成的自动跳过,失败的可重试 +""" + +import json +import os +from datetime import datetime + +# ========== 路径配置 ========== +WORKSPACE = "workspace/1" # 运行时会被 gui.py 覆盖 +PLAN_PATH = os.path.join(WORKSPACE, "scene_plan.json") +SCENE_IMG_DIR = os.path.join(WORKSPACE, "scene") + + +def load_plan() -> dict: + with open(PLAN_PATH, encoding="utf-8") as f: + return json.load(f) + + +def save_plan(plan: dict): + with open(PLAN_PATH, "w", encoding="utf-8") as f: + json.dump(plan, f, ensure_ascii=False, indent=2) + + +def generate_single_scene( + scene: dict, + idx: int, + total: int, + model_name: str, + on_image_generated=None, +) -> dict: + """ + 生成单个场景的图片 + + Args: + scene: 场景信息 dict + idx: 当前索引 + total: 总场景数 + model_name: 文生图模型名称 + on_image_generated: 回调函数 (scene, filepath),用于 GUI 审查 + + Returns: + 更新后的 scene dict + """ + scene_id = scene["scene_id"] + visual_prompt = scene["visual_prompt"] + + print(f"\n[场景 {scene_id}] [{idx+1}/{total}]") + print(f" Prompt: {visual_prompt[:80]}...") + + os.makedirs(SCENE_IMG_DIR, exist_ok=True) + scene_filename = f"scene_{scene_id:03d}.png" + filepath = os.path.join(SCENE_IMG_DIR, scene_filename) + + try: + from image_gen import image_generate + + result = image_generate( + prompt=visual_prompt, + save_dir=SCENE_IMG_DIR, + model_name=model_name, + filename=scene_filename, + ) + + filepath = result["filepath"] + scene["status"] = "generated" + scene["filepath"] = filepath + print(f" [完成] {os.path.basename(filepath)}") + + # 如果有回调(GUI 审查),调用它 + if on_image_generated: + on_image_generated(scene, filepath) + + return scene + + except Exception as e: + scene["status"] = "failed" + scene["error"] = str(e) + print(f" [失败] {e}") + return scene + + +def main(workspace: str = None, model_name: str = "Kolors(便宜快速)"): + """主流程:生成所有 pending 场景""" + global WORKSPACE, PLAN_PATH, SCENE_IMG_DIR + + if workspace: + WORKSPACE = workspace + PLAN_PATH = os.path.join(WORKSPACE, "scene_plan.json") + SCENE_IMG_DIR = os.path.join(WORKSPACE, "scene") + + if not os.path.exists(PLAN_PATH): + raise FileNotFoundError(f"未找到 {PLAN_PATH},请先运行场景划分") + + plan = load_plan() + scenes = plan["scenes"] + total = len(scenes) + + done = [s for s in scenes if s.get("status") == "generated"] + failed = [s for s in scenes if s.get("status") == "failed"] + pending = [s for s in scenes if s.get("status") == "pending"] + print(f"总场景: {total} | 已完成: {len(done)} | 失败: {len(failed)} | 待生成: {len(pending)}") + + for idx, scene in enumerate(scenes): + status = scene.get("status") + if status == "generated": + print(f"[{idx+1}/{total}] 场景 {scene['scene_id']} 已完成,跳过") + continue + + updated = generate_single_scene(scene, idx, total, model_name) + scenes[idx] = updated + save_plan(plan) + + if updated["status"] == "failed": + print(f"[停止] 场景 {scene['scene_id']} 生成失败,再次运行可续传") + break + + print(f"进度: {sum(1 for s in scenes if s.get('status') == 'generated')}/{total}") + + done_count = sum(1 for s in scenes if s.get("status") == "generated") + print(f"\n========== 完成 ==========") + print(f"已完成: {done_count}/{total}") + if done_count == total: + print("[全部完成] 所有场景图已生成!") + else: + remaining = [s["scene_id"] for s in scenes if s.get("status") != "generated"] + print(f"剩余: {remaining}") + + +if __name__ == "__main__": + main() diff --git a/scene_plan.py b/scene_plan.py new file mode 100644 index 0000000..dbe03be --- /dev/null +++ b/scene_plan.py @@ -0,0 +1,594 @@ +""" +scene_plan.py - 场景划分模块 +AI 读取 article.txt → 划分场景(用于 ASR 对齐) +输出 scene_plan.json +""" + +import json +import os +import re +import subprocess +import textwrap +from text_ai import text_ai + +# 默认工作区(仅用于独立运行时的测试) +_DEFAULT_WORKSPACE = "workspace/1" + + +# ========== 场景划分 ========== +PLANNER_SYSTEM = """You are an expert cinematic storyboard director and visual prompt engineer. Your task is to transform written articles into detailed, production-ready visual scenes for AI image generation. + +## YOUR MISSION +Create richly detailed, cinematic visual prompts that bring the article to life through compelling imagery. Each scene should be a complete visual blueprint that an AI image generator can use to create stunning, professional-quality images. + +--- + +## STEP 1 — ANALYZE THE ARTICLE TYPE + +Classify the article into ONE of these categories: + +**NARRATIVE** (stories, novels, scripts, personal anecdotes) +- Has characters with dialogue, emotional arcs, plot progression +- Requires character consistency across scenes + +**KNOWLEDGE** (essays, science explainers, book reviews, historical analysis, philosophy) +- Abstract concepts, theories, ideas without specific characters +- Needs metaphorical and symbolic visualization + +**TUTORIAL** (how-to guides, technical instructions, step-by-step processes) +- Procedural content with clear steps +- Focus on demonstrations and UI/screen elements + +--- + +## STEP 2 — SCENE GRANULARITY STRATEGY + +### FOR NARRATIVE ARTICLES: +Create **fine-grained scenes** (5-15 seconds each): +- One scene per meaningful action beat or emotional shift +- Dialogue exchanges: new scene when speaker changes or mood shifts +- Camera changes: close-up → wide shot = separate scenes +- Time/location jumps = new scene + +### FOR KNOWLEDGE ARTICLES: +Create **moderate scenes** (8-20 seconds each): +- One scene per key concept or paragraph +- Don't over-merge: each distinct visual idea gets its own scene +- Use varied visualization techniques (see below) + +### FOR TUTORIAL ARTICLES: +Create **step-based scenes** (5-12 seconds each): +- One scene per instruction step or sub-step +- Include before/after states if applicable + +--- + +## STEP 3 — VISUAL PROMPT ENGINEERING (CRITICAL) + +Your visual_prompt must be **DETAILED, SPECIFIC, and PRODUCTION-READY**. Follow this structure: + +### MANDATORY ELEMENTS (in order): + +**1. SHOT TYPE & CAMERA ANGLE** (choose specifically): + - Extreme Close-Up (ECU): eyes, hands, small objects + - Close-Up (CU): face, upper body + - Medium Shot (MS): waist up, two people conversing + - Full Shot (FS): entire body, room context + - Wide Shot (WS): landscape, establishing shot + - Bird's Eye View: overhead perspective + - Dutch Angle: tilted camera for tension + +**2. SUBJECT DESCRIPTION** (be extremely specific): + + FOR HUMANS (NEVER use names, always descriptive labels): + - Age range: "a woman in her late 20s" + - Ethnicity/Skin tone: "with warm olive skin" / "fair-skinned" / "deep brown skin" + - Hair: "shoulder-length wavy black hair with subtle highlights" + - Build: "slender build" / "athletic frame" + - Clothing (specific): "wearing a cream-colored cashmere turtleneck sweater" + - Expression: "with a contemplative, slightly melancholic expression" + - Pose/Action: "gazing thoughtfully out a rain-streaked window" + + FIRST APPEARANCE: Define ALL physical traits completely + SUBSEQUENT APPEARANCES: Reuse EXACT same description for consistency + + FOR OBJECTS/SETTINGS: + - Material: "polished mahogany desk" / "weathered stone walls" + - Condition: "vintage leather-bound books with gold embossing" + - Arrangement: "arranged in neat stacks" / "scattered haphazardly" + +**3. ENVIRONMENT/BACKGROUND** (layer the details): + - Immediate setting: "in a cozy study room" + - Background elements: "floor-to-ceiling bookshelves filled with ancient tomes" + - Depth cues: "soft-focus background showing a fireplace with flickering flames" + - Weather/Time: "on a misty autumn morning" / "during golden hour sunset" + +**4. LIGHTING & ATMOSPHERE** (create mood): + - Light source: "warm lamplight casting long shadows" + - Quality: "soft diffused natural light from a large window" + - Color temperature: "cool blue moonlight" / "warm amber candlelight" + - Atmospheric effects: "dust motes dancing in sunbeams" / "gentle fog rolling across the scene" + - Mood keywords: "serene and contemplative" / "tense and dramatic" / "nostalgic and dreamy" + +**5. COMPOSITION & STYLE** (guide the aesthetic): + - Rule of thirds: "subject positioned off-center following rule of thirds" + - Leading lines: "perspective lines converging toward the subject" + - Color palette: "muted earth tones with pops of burgundy" / "monochromatic blues" + - Artistic style reference: "cinematic photography style reminiscent of Roger Deakins" / "painterly quality inspired by Edward Hopper" / "clean minimalist aesthetic" + - Depth of field: "shallow depth of field with creamy bokeh" / "deep focus throughout" + +**6. TECHNICAL QUALITIES** (ensure image quality): + - Resolution hint: "ultra-high detail, 8k quality" + - Texture emphasis: "rich textures visible in fabric and wood grain" + - Sharpness: "razor-sharp focus on subject's eyes" + +--- + +## VISUALIZATION TECHNIQUES FOR ABSTRACT CONCEPTS (Knowledge Articles) + +When the text discusses abstract ideas, use these strategies: + +**METAPHORICAL IMAGERY:** +- "Cultural bridge" → "an elegant stone bridge spanning a misty valley, connecting two distinct architectural styles, symbolizing East meets West" +- "Knowledge expansion" → "an open ancient book radiating golden light, with luminous particles forming constellations above it" + +**HISTORICAL RECONSTRUCTION:** +- "ancient Chinese philosophy" → "a wise scholar in flowing Han dynasty robes sitting cross-legged on a bamboo mat, surrounded by scrolls, soft morning light filtering through paper windows" + +**SYMBOLIC COMPOSITIONS:** +- "technological progress" → "a vintage pocket watch gradually transforming into a sleek smartwatch, gears and circuits merging, dramatic side lighting" + +**PERSONIFICATION:** +- "artificial intelligence" → "a humanoid figure made of translucent glass and glowing neural networks, standing in a futuristic laboratory, blue and purple ambient lighting" + +**DATA VISUALIZATION AS ART:** +- "statistical trends" → "elegant 3D bar charts rising like crystal structures from a reflective surface, bathed in gradient lighting from blue to orange" + +--- + +## STEP 4 — CRITICAL RULES + +1. **COMPLETE COVERAGE**: Every sentence in the article must appear in exactly one scene. No skipping. + +2. **EXACT TEXT EXCERPTS**: The "text" field must contain VERBATIM quotes from the article. Do NOT paraphrase or summarize. + +3. **LANGUAGE CONSISTENCY**: + - If article is in Chinese → write background AND visual_prompt in Chinese + - If article is in English → write both in English + - Maintain the article's language throughout + +4. **PROMPT LENGTH & DETAIL**: + - Minimum: 60 words + - Ideal: 80-150 words + - Maximum: 200 words + - More detail = better image generation results + +5. **AVOID THESE MISTAKES**: + ❌ Vague: "a person thinking" + ✅ Specific: "a young Asian woman with shoulder-length dark hair, wearing glasses and a white blouse, resting her chin on her hand while gazing thoughtfully at a laptop screen, soft afternoon light from a nearby window" + + ❌ Generic: "a beautiful landscape" + ✅ Specific: "a sweeping mountain vista at sunrise, snow-capped peaks glowing pink and orange in the first light, wispy clouds drifting through valleys below, crisp alpine air suggested by sharp clarity" + +6. **UNIQUENESS**: Each scene's text excerpt must be unique. No overlaps or duplicates. + +7. **NEGATIVE PROMPT AVOIDANCE**: Do NOT include in visual_prompt: + - Text, letters, words, captions, labels, watermarks, logos + - Blurry, low quality, deformed elements + - Multiple conflicting perspectives in one scene + +--- + +## OUTPUT FORMAT + +Return ONLY valid JSON. No markdown formatting. No explanations. No code blocks. + +{ + "scenes": [ + { + "scene_id": 1, + "text": "Exact quote from the article covering this scene...", + "background": "Detailed description of the setting, environment, and context in the article's language...", + "visual_prompt": "[SHOT TYPE] + [SUBJECT] + [ENVIRONMENT] + [LIGHTING] + [COMPOSITION] + [STYLE]. Rich, detailed, production-ready prompt in the article's language, 80-150 words...", + "status": "pending" + }, + { + "scene_id": 2, + ... + } + ] +} + +--- + +## EXAMPLES OF HIGH-QUALITY VISUAL PROMPTS + +### Example 1 (Narrative - Chinese): +"visual_prompt": "中景镜头。一位三十岁左右的亚洲女性,皮肤白皙,留着齐肩的黑色直发,穿着米色羊绒毛衣和深灰色长裤,坐在咖啡馆靠窗的位置。她双手捧着一杯冒着热气的拿铁咖啡,眼神略带忧郁地凝视着窗外淅沥的雨滴。背景是模糊的咖啡馆内部,暖黄色的吊灯和木质桌椅营造出温馨的氛围。柔和的自然光透过布满雨珠的玻璃窗洒入,在她的侧脸投下温柔的阴影。电影感摄影风格,浅景深,温暖的色调,充满沉思和怀旧的情绪。" + +### Example 2 (Knowledge - English): +"visual_prompt": "Wide establishing shot. An ancient library interior with towering floor-to-ceiling oak bookshelves filled with leather-bound volumes dating back centuries. A grand wooden reading table sits in the center, illuminated by a single ornate brass lamp casting warm golden light. Dust motes dance in the atmospheric light beams streaming through tall arched windows. The perspective uses leading lines from the bookshelf aisles converging toward a distant figure of a scholar in Renaissance-era robes. Cinematic photography with deep shadows and rich amber tones, reminiscent of classical paintings by Rembrandt. Ultra-detailed, showcasing intricate wood carvings and the texture of aged parchment." + +### Example 3 (Tutorial - Chinese): +"visual_prompt": "特写镜头。一双修长的手正在操作一台银色的笔记本电脑,屏幕上显示着Python代码编辑器界面,代码清晰可见。手指悬停在键盘上方,准备敲击回车键。桌面整洁,旁边放着一杯绿茶和一个打开的笔记本,上面有手写的流程图。明亮的白色台灯光线从左侧照射,营造专注的工作氛围。现代简约风格,高清晰度,强调屏幕上的代码细节和手指的动作瞬间。冷色调为主,点缀温暖的木质桌面纹理。" + +--- + +NOW, analyze the provided article and create your detailed scene breakdown. Remember: MORE DETAIL = BETTER IMAGES. Be specific, be cinematic, be creative. +""" + + +def _fix_json_text(text: str) -> str: + """尝试修复 LLM 返回的常见 JSON 格式问题""" + # 找第一个 { 到最后一个 } 之间的内容 + start = text.find("{") + end = text.rfind("}") + if start == -1 or end == -1: + return text + text = text[start:end+1] + + # 修复:中文引号 "" '' 替换为转义 + text = text.replace("\u201c", '\\"').replace("\u201d", '\\"') + text = text.replace("\u2018", "'").replace("\u2019", "'") + + # 修复:把 \n 换行符字面文本变成真正的换行(LLM 有时输出 \\n) + text = text.replace("\\n", "\n") + + # 修复:控制字符(除了换行和制表) + text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', '', text) + + return text + + +def _extract_scenes_fallback(text: str) -> list: + """ + 主力解析方案:逐个提取 scene 块。 + 先尝试 json.loads,失败就用正则暴力提取。 + """ + scenes = [] + # 匹配 "scene_id" 字段所在的 { } 块,兼容多余引号如 "scene_id": 23" + pattern = r'\{\s*"scene_id"\s*:\s*"?(\d+)"?\s*[,\s]' + pos = 0 + while True: + m = re.search(pattern, text[pos:]) + if not m: + break + block_start = pos + m.start() + # 从这个 { 开始找匹配的 } + depth = 0 + i = block_start + while i < len(text): + if text[i] == '{': + depth += 1 + elif text[i] == '}': + depth -= 1 + if depth == 0: + block = text[block_start:i+1] + # 清理控制字符 + block = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', '', block) + # 先尝试 json.loads + try: + scene = json.loads(block) + if isinstance(scene, dict) and "scene_id" in scene: + scenes.append(scene) + except json.JSONDecodeError: + # json.loads 失败,暴力提取 + scene = _brute_extract_scene(block) + if scene: + scenes.append(scene) + break + i += 1 + pos = block_start + 1 + return scenes + + +def _brute_extract_scene(block: str) -> dict | None: + """暴力从一段文本中提取 scene 字段,不依赖 json.loads""" + scene = {} + + # scene_id:兼容多余引号 + m_id = re.search(r'"scene_id"\s*:\s*"?(\d+)"?', block) + if m_id: + scene["scene_id"] = int(m_id.group(1)) + else: + return None + + # text:可能有中文内容,匹配到下一个字段前 + m_text = re.search(r'"text"\s*:\s*"(.+?)"\s*(?:,|\})', block, re.DOTALL) + if m_text: + scene["text"] = m_text.group(1).strip() + + # 兼容旧字段 lines + if "text" not in scene: + m_lines = re.search(r'"lines"\s*:\s*"(.+?)"\s*(?:,|\})', block, re.DOTALL) + if m_lines: + scene["text"] = m_lines.group(1).strip() + + # background:可能有中文引号 "" 嵌套 + m_bg = re.search(r'"background"\s*:\s*"(.+?)"\s*,\s*"visual_prompt"', block, re.DOTALL) + if m_bg: + scene["background"] = m_bg.group(1).strip() + else: + # 兜底:匹配到最后一个 " 之前 + m_bg2 = re.search(r'"background"\s*:\s*"(.+?)"\s*(?:,|\})', block, re.DOTALL) + if m_bg2: + scene["background"] = m_bg2.group(1).strip() + + # visual_prompt + m_prompt = re.search(r'"visual_prompt"\s*:\s*"(.+?)"\s*,\s*"status"', block, re.DOTALL) + if m_prompt: + scene["visual_prompt"] = m_prompt.group(1).strip() + else: + m_prompt = re.search(r'"visual_prompt"\s*:\s*"(.+)"\s*\}', block, re.DOTALL) + if m_prompt: + scene["visual_prompt"] = m_prompt.group(1).strip() + else: + scene["visual_prompt"] = "No visual prompt" + + if "text" not in scene: + scene["text"] = "" + scene["status"] = "pending" + return scene + + +def _get_audio_duration(audio_path: str) -> float | None: + """Get audio duration in seconds. Tries ffprobe first, falls back to mutagen.""" + if not audio_path or not os.path.exists(audio_path): + print(f"[DEBUG] audio not found: {audio_path}") + return None + + # Method 1: ffprobe + try: + result = subprocess.run( + ["ffprobe", "-v", "quiet", "-show_entries", "format=duration", + "-of", "csv=p=0", audio_path], + capture_output=True, text=True, timeout=10, + creationflags=subprocess.CREATE_NO_WINDOW if os.name == "nt" else 0, + ) + raw = result.stdout.strip() + if raw: + return float(raw) + except FileNotFoundError: + print("[DEBUG] ffprobe not found, trying mutagen...") + except Exception as e: + print(f"[DEBUG] ffprobe error: {e}") + + # Method 2: mutagen (pure Python, no external binary) + try: + from mutagen.mp3 import MP3 + audio = MP3(audio_path) + return audio.info.length + except ImportError: + print("[DEBUG] mutagen not installed. Run: pip install mutagen") + except Exception as e: + print(f"[DEBUG] mutagen error: {e}") + + return None + + +def plan_scenes(article_text: str, workspace: str = None, provider: str = None, user_note: str = None) -> dict: + """ + 调用 AI 划分场景 + + Args: + article_text: 文章文本 + workspace: 工作区路径(包含 article.txt 和 voice.mp3) + provider: LLM 提供商名称 + user_note: 用户添加的备注(可选) + """ + if workspace is None: + workspace = _DEFAULT_WORKSPACE + + # 获取音频时长 + audio_path = os.path.join(workspace, "voice.mp3") + duration = _get_audio_duration(audio_path) + duration_hint = "" + if duration and duration > 0: + minutes = int(duration // 60) + seconds = int(duration % 60) + min_scenes = max(5, int(duration / 60 * 5)) # ~5 scenes/min + max_scenes = int(duration / 60 * 16) # ~16 scenes/min + duration_hint = f""" +IMPORTANT — AUDIO DURATION CONSTRAINT: +The voiceover audio is exactly {minutes}m {seconds}s ({int(duration)} seconds). +Recommended scene count: {min_scenes} to {max_scenes} (each scene ~5-15 seconds). +This is a recommendation for pacing. You may adjust based on the article's content.""" + + # 构建用户备注部分 + user_note_section = "" + if user_note and user_note.strip(): + user_note_section = f""" +--- + +## USER'S SPECIAL INSTRUCTIONS + +Please pay attention to the following special requirements from the user: + +{user_note.strip()} + +Incorporate these instructions into your scene planning and visual prompt generation. + +--- +""" + + prompt = f"""## ARTICLE TO CONVERT + +Here is the complete article text that you need to divide into visual scenes: + +{textwrap.dedent(article_text)} + +--- + +## YOUR TASK + +Analyze this article thoroughly and create a detailed scene-by-scene visual breakdown following the guidelines in the system prompt. + +**Key Requirements:** +1. **Cover every sentence** - Do not skip any content +2. **Be extremely detailed** in visual prompts (80-150 words each) +3. **Use cinematic language** - shot types, lighting, composition, mood +4. **Maintain consistency** - characters look the same across scenes +5. **Match the article's language** - if Chinese, write prompts in Chinese + +{duration_hint} +{user_note_section} +**Remember:** Your visual prompts will be used directly by AI image generators. The more specific and detailed you are, the better the final images will be. Think like a film director planning each shot. + +Now, create your scene breakdown with rich, production-ready visual prompts. +""" + + print("=" * 60) + print(f"[PROMPT] system prompt: {len(PLANNER_SYSTEM)} chars") + print(f"[PROMPT] user prompt: {len(prompt)} chars") + print("=" * 60) + + response = text_ai(prompt, PLANNER_SYSTEM, provider=provider) + + if response is None: + raise ValueError("text_ai 返回了 None,请检查 LLM 配置和网络连接") + + # 保存原始 LLM 返回供调试 + plan_path = os.path.join(workspace, "scene_plan.json") + debug_path = os.path.join(workspace, "_debug_response.txt") + try: + with open(debug_path, "w", encoding="utf-8") as f: + f.write(response) + print(f"[DEBUG] 原始返回已保存到 {debug_path}") + except Exception: + pass + + print(f"[DEBUG] LLM response length: {len(response)} chars") + + # 清理可能的 markdown 包裹 + text = response.strip() + if text.startswith("```"): + lines = text.split("```") + text = lines[1] if len(lines) > 1 else lines[0] + if text.startswith("json"): + text = text[4:] + text = text.strip() + + # 解析 JSON:先尝试 json.loads(快速通道),失败就暴力提取(主力方案) + data = None + scenes = None + + for attempt, raw in enumerate([text, _fix_json_text(text)]): + try: + data = json.loads(raw) + break + except json.JSONDecodeError as e: + print(f"[DEBUG] JSON parse attempt {attempt+1} failed: {e.msg} at line {e.lineno} col {e.colno}") + + if data is None: + # 暴力逐块提取 + scenes = _extract_scenes_fallback(text) + if scenes: + print(f"[RECOVERY] JSON 整体解析失败,暴力提取到 {len(scenes)} 个场景") + + # 从解析结果中提取 scenes 列表 + if scenes is not None: + pass # 已经有 scenes 了 + elif data is not None: + if isinstance(data, list): + scenes = data + elif "scenes" in data: + scenes = data["scenes"] + else: + for v in data.values(): + if isinstance(v, list) and v and "scene_id" in v[0]: + scenes = v + break + + if not scenes: + raise ValueError("AI 返回内容无法解析为场景数据。请查看工作区下的 _debug_response.txt") + + return {"scenes": scenes} + + +def main(workspace: str = None, provider: str = None, user_note: str = None): + """ + 执行场景划分 + + Args: + workspace: 工作区路径(包含 article.txt) + provider: LLM 提供商名称 + user_note: 用户添加的备注(可选) + """ + if workspace is None: + workspace = _DEFAULT_WORKSPACE + + plan_path = os.path.join(workspace, "scene_plan.json") + article_path = os.path.join(workspace, "article.txt") + + print("[场景划分] 开始...") + print(f" 工作区: {workspace}") + + # 1. 读取 article(原文,不编号) + with open(article_path, encoding="utf-8") as f: + article_raw = f.read().strip() + # 过滤无意义行 + article_lines = [l.strip() for l in article_raw.split("\n") if l.strip() and l.strip() != "Identifying the speaker"] + article_text = "\n".join(article_lines) + print(f" Article 长度: {len(article_text)} 字符") + + # 2. AI 划分场景 + plan = plan_scenes(article_text, workspace=workspace, provider=provider, user_note=user_note) + scenes = plan["scenes"] + + # 3. 确保每个 scene 有必要字段 + for i, scene in enumerate(scenes): + if "status" not in scene: + scene["status"] = "pending" + if "scene_id" not in scene: + scene["scene_id"] = i + 1 + else: + scene["scene_id"] = int(scene["scene_id"]) + if "visual_prompt" not in scene: + scene["visual_prompt"] = scene.get("background") or scene.get("description") or "No visual prompt" + if "text" not in scene: + # 兼容旧字段 lines + scene["text"] = scene.get("lines", "") + if "background" not in scene: + scene["background"] = "" + # 清理旧字段 + scene.pop("lines", None) + + # 4. 保存计划 + output = { + "total_scenes": len(scenes), + "scenes": scenes + } + with open(plan_path, "w", encoding="utf-8") as f: + json.dump(output, f, ensure_ascii=False, indent=2) + + print(f"\n[场景划分] 完成,共 {len(scenes)} 个场景,已保存到 {plan_path}") + print("\n" + "="*80) + for s in scenes: + scene_id = s['scene_id'] + text_preview = (s.get("text", "") or "").replace('\n', ' ')[:50] + prompt_preview = s.get('visual_prompt', '')[:80] + bg_preview = s.get('background', '')[:50] + + print(f"[SCENE {scene_id:2d}]") + print(f" TEXT: {text_preview}...") + print(f" PROMPT: {prompt_preview}...") + print(f" BG: {bg_preview}...") + print("\n" + "="*80) + + # 统计信息 + total_prompt_words = sum(len(s.get('visual_prompt', '').split()) for s in scenes) + avg_prompt_len = total_prompt_words / len(scenes) if scenes else 0 + print(f"\n[STATS]") + print(f" AVG PROMPT LENGTH: {avg_prompt_len:.0f} words/scene") + print(f" TOTAL SCENES: {len(scenes)}") + + if avg_prompt_len < 50: + print(f" [WARN] Prompts are short, consider more detail for better image quality") + elif avg_prompt_len >= 80: + print(f" [OK] Prompt detail level is good") + + return plan + + +if __name__ == "__main__": + main() diff --git a/text_ai.py b/text_ai.py new file mode 100644 index 0000000..4232450 --- /dev/null +++ b/text_ai.py @@ -0,0 +1,113 @@ +""" +text_ai.py - LLM 文本生成 +用于场景划分等 AI 推理任务 +支持多 LLM 提供商切换 +""" + +from openai import OpenAI +from config import LLM_PROVIDERS, DEFAULT_LLM, LLM_API_KEY, LLM_API_BASE, LLM_MODEL + + +def text_ai(in_put: str, system_prompt: str = "You are a helpful assistant.", + provider: str = None) -> str: + """ + 调用 LLM 生成文本 + + Args: + in_put: 用户输入内容 + system_prompt: 系统提示词 + provider: LLM 提供商名称(对应 LLM_PROVIDERS 的 key),None 则用默认 + + Returns: + AI 生成的文本 + """ + if provider and provider in LLM_PROVIDERS: + cfg = LLM_PROVIDERS[provider] + api_key = cfg["api_key"] + api_base = cfg["api_base"] + model = cfg["model"] + else: + api_key = LLM_API_KEY + api_base = LLM_API_BASE + model = LLM_MODEL + + client = OpenAI( + api_key=api_key, + base_url=api_base, + ) + + # ModelScope 的 Qwen3 系列和 GLM 系列默认开启 thinking,需要关掉 + # 注意:MiniMax 系列不是 Qwen/GLM,不需要也不能传 enable_thinking + extra_body = {} + is_modelscope = "modelscope" in api_base.lower() + is_qwen = "qwen" in model.lower() + is_glm = "glm" in model.lower() or "zhipuai" in model.lower() + if is_modelscope and (is_qwen or is_glm): + extra_body["enable_thinking"] = False + + response = client.chat.completions.create( + model=model, + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": in_put} + ], + max_tokens=16384, + stream=False, + extra_body=extra_body if extra_body else None, + ) + + # 防御:choices 为空或 None + if not response.choices: + # 尝试从 response 对象提取有用信息 + resp_dict = response.model_dump() if hasattr(response, "model_dump") else {} + error_msg = resp_dict.get("error", {}) + if isinstance(error_msg, dict): + err_text = error_msg.get("message", str(error_msg)) + else: + err_text = str(resp_dict) + raise ValueError( + f"模型 '{model}' 返回了空的 choices。\n" + f"响应内容: {err_text}\n" + f"可能是模型暂时不可用或请求被拒绝。" + ) + + msg = response.choices[0].message + content = msg.content + + # 检测输出是否被截断 + finish = response.choices[0].finish_reason + if finish == "length": + print(f"[WARN] LLM output truncated (finish_reason=length), max_tokens may be too small") + + if content is None: + # fallback:尝试多种字段名(不同 API 叫法不同) + for attr in ("thinking_content", "reasoning_content", "text", "output"): + fallback = getattr(msg, attr, None) + if fallback: + content = fallback + break + + if content is None: + # 最后一搏:尝试把 message 对象当 dict 看 + try: + msg_dict = msg.model_dump() if hasattr(msg, "model_dump") else vars(msg) + for v in msg_dict.values(): + if isinstance(v, str) and v.strip(): + content = v + break + except Exception: + pass + + if content is None: + finish = response.choices[0].finish_reason + raise ValueError( + f"模型 '{model}' 返回内容为空(content=None)," + f"finish_reason={finish}。\n" + f"如果使用 MiniMax 系列,请改用 Qwen3.5-35B (ModelScope 免费) 或其他 Qwen 模型。" + ) + return content + + +if __name__ == "__main__": + result = text_ai("Hello, say hi in one sentence.") + print(result)