""" make_video.py - 场景图 + ASR 时间戳 + 音频 → 视频 流程: 1. 读取 voice.mp3 获取总时长 2. 读取 scene_plan.json 获取场景列表 3. 如果有 result.json(ASR 对齐),按对齐结果分配时间;否则平均分配 4. 用 moviepy 将场景图在对应时段显示,配上音频生成视频 """ import os import json import math import re import numpy as np from PIL import Image, ImageDraw, ImageFont from moviepy import ( AudioFileClip, ImageClip, CompositeVideoClip, ColorClip, concatenate_videoclips ) def load_scene_plan(workspace: str) -> dict: """读取 scene_plan.json""" path = os.path.join(workspace, 'scene_plan.json') with open(path, 'r', encoding='utf-8') as f: return json.load(f) def load_asr_result(workspace: str) -> dict: """读取 result.json(ASR 对齐结果),如果不存在返回 None""" path = os.path.join(workspace, "result.json") if os.path.exists(path): with open(path, 'r', encoding='utf-8') as f: return json.load(f) return None def build_scene_timeline(scene_plan: dict, asr_result: dict, audio_duration: float, workspace: str = None) -> list: """ 构建场景时间线。优先使用 scene 中已有的 start_time/end_time(ASR 步骤写入), 没有则根据 ASR 文本匹配分配,再没有则平均分配。 """ scenes = scene_plan["scenes"] num_scenes = len(scenes) # --- 策略 1:直接使用 scene 已有的时间戳(ASR 步骤写入的) --- if all("start_time" in s and "end_time" in s for s in scenes): timeline = [] for scene in scenes: timeline.append({ "scene_id": scene["scene_id"], "start": scene["start_time"], "end": scene["end_time"], "has_match": True, }) # 确保最后一个场景延伸到音频结尾 if timeline: timeline[-1]["end"] = audio_duration return timeline # --- 策略 2:部分有时间戳,部分没有 --- has_timestamp = [s for s in scenes if "start_time" in s and "end_time" in s] if has_timestamp and len(has_timestamp) < num_scenes: timeline = [] matched_end = 0.0 for scene in scenes: if "start_time" in scene and "end_time" in scene: timeline.append({ "scene_id": scene["scene_id"], "start": scene["start_time"], "end": scene["end_time"], }) matched_end = max(matched_end, scene["end_time"]) else: timeline.append({ "scene_id": scene["scene_id"], "start": None, "end": None, }) # 未匹配的场景按剩余时间平均分配 unmatched_indices = [i for i, item in enumerate(timeline) if item["start"] is None] if unmatched_indices: remaining = audio_duration - matched_end if remaining > 0: seg_dur = remaining / len(unmatched_indices) t = matched_end for idx in unmatched_indices: timeline[idx]["start"] = t timeline[idx]["end"] = t + seg_dur t += seg_dur else: for idx in unmatched_indices: timeline[idx]["start"] = matched_end timeline[idx]["end"] = matched_end + 2.0 if timeline: timeline[-1]["end"] = audio_duration return timeline # --- 策略 3:没有任何时间戳,用 ASR 文本匹配或平均分配 --- if asr_result is not None: # 准备匹配用的文本 scenes_for_match = [] for scene in scenes: s = dict(scene) text_val = s.get("text", "") lines_val = s.get("lines", "") raw = text_val or lines_val or "" if isinstance(raw, str) and raw: s["lines"] = [x.strip() for x in raw.split("。") if x.strip()] elif isinstance(raw, list): s["lines"] = raw else: s["lines"] = [] scenes_for_match.append(s) asr_segments = asr_result.get("segments", []) timeline = assign_scenes_to_segments(scenes_for_match, asr_segments, audio_duration) else: # 平均分配 duration_per_scene = audio_duration / num_scenes timeline = [] current_time = 0.0 for scene in scenes: start = current_time end = min(current_time + duration_per_scene, audio_duration) timeline.append({ "scene_id": scene["scene_id"], "start": start, "end": end, }) current_time = end return timeline def _normalize_text(text: str) -> str: """清理文本:去除标点、空格、转小写,用于相似度比较""" if not text: return "" # 去除所有非字母数字和非中文字符 text = re.sub(r'[^\w\u4e00-\u9fff]', '', text) return text.lower() def _longest_common_substring_ratio(s1: str, s2: str) -> float: """ 计算两个字符串的最长公共子串比例 使用动态规划,但为了性能优化: - 如果字符串太长,使用简化的滑动窗口方法 """ if not s1 or not s2: return 0.0 # 对于短字符串,使用精确的 DP 算法 if len(s1) * len(s2) < 100000: # 乘积小于 10万 m, n = len(s1), len(s2) # 优化空间复杂度:只用两行 prev = [0] * (n + 1) curr = [0] * (n + 1) max_len = 0 for i in range(1, m + 1): for j in range(1, n + 1): if s1[i-1] == s2[j-1]: curr[j] = prev[j-1] + 1 max_len = max(max_len, curr[j]) else: curr[j] = 0 prev, curr = curr, [0] * (n + 1) return max_len / max(len(s1), len(s2)) # 对于长字符串,使用简化的滑动窗口 return _sliding_window_similarity(s1, s2) def _sliding_window_similarity(s1: str, s2: str, window_size: int = 20) -> float: """ 滑动窗口相似度(用于长字符串的快速估算) 将 s1 分成多个窗口,在 s2 中查找最佳匹配 """ if len(s1) <= window_size: # 如果 s1 本身很短,直接检查是否在 s2 中 if s1 in s2: return 1.0 # 否则检查每个字符的出现 common = sum(1 for c in s1 if c in s2) return common / len(s1) if s1 else 0.0 # 分窗口检查 total_score = 0.0 num_windows = 0 for i in range(0, len(s1) - window_size + 1, window_size // 2): window = s1[i:i+window_size] if window in s2: total_score += 1.0 else: # 部分匹配 common = sum(1 for c in window if c in s2) total_score += common / window_size num_windows += 1 return total_score / num_windows if num_windows > 0 else 0.0 def assign_scenes_to_segments(scenes: list, asr_segments: list, audio_duration: float) -> list: """ 基于文本相似度的精确匹配 策略: 1. 每个 scene 有 text 字段(原文片段) 2. 在 ASR segments 中搜索这段文本的出现位置 3. 使用最长公共子串相似度容忍 ASR 识别错误 4. 阈值 60%,避免误匹配 """ SIMILARITY_THRESHOLD = 0.5 # 相似度阈值 MAX_WINDOW_SIZE = 20 # 最多合并 20 个 ASR segments(长句需要更多) # 预处理:清理 ASR segments 的文本 asr_cleaned = [] for seg in asr_segments: text = seg.get('text', '').strip() if text: asr_cleaned.append({ 'text': text, 'text_normalized': _normalize_text(text), 'start': seg['start'], 'end': seg['end'] }) def find_best_match(scene_text: str, start_seg_idx: int): """ 找到 scene_text 在 ASR segments 中的最佳匹配位置 使用滑动窗口尝试不同长度的 segment 组合 """ if not scene_text or not scene_text.strip(): return None scene_normalized = _normalize_text(scene_text) best_score = 0.0 best_match = None # 滑动窗口:尝试不同数量的 segment 组合 for window_size in range(1, min(MAX_WINDOW_SIZE + 1, len(asr_cleaned) - start_seg_idx + 1)): for i in range(start_seg_idx, len(asr_cleaned) - window_size + 1): # 合并多个 segments 的文本 combined_text = ''.join( seg['text_normalized'] for seg in asr_cleaned[i:i+window_size] ) # 计算相似度 similarity = _longest_common_substring_ratio(scene_normalized, combined_text) if similarity > best_score: best_score = similarity best_match = { 'start': asr_cleaned[i]['start'], 'end': asr_cleaned[i + window_size - 1]['end'], 'start_idx': i, 'end_idx': i + window_size - 1, 'similarity': similarity } # 如果已经达到很高相似度,提前返回 if similarity >= 0.90: return best_match # 只有达到阈值才算匹配成功 if best_score >= SIMILARITY_THRESHOLD: return best_match return None # 主匹配循环 timeline_raw = [] last_end_idx = 0 # 记录上一个匹配结束的位置 for scene in scenes: # 优先使用 text 字段,兼容旧 lines 字段 scene_text = scene.get('text', '') or scene.get('lines', '') if isinstance(scene_text, list): scene_text = ''.join(scene_text) match = find_best_match(scene_text, last_end_idx) if match: timeline_raw.append({ 'scene_id': scene['scene_id'], 'start': match['start'], 'end': match['end'], 'has_match': True, 'similarity': match['similarity'] }) # 下一个场景从当前匹配结束后开始 last_end_idx = match['end_idx'] + 1 else: timeline_raw.append({ 'scene_id': scene['scene_id'], 'start': None, 'end': None, 'has_match': False, 'similarity': 0.0 }) # 处理未匹配的场景:按剩余音频时间平均分配 unmatched_indices = [i for i, item in enumerate(timeline_raw) if not item['has_match']] if unmatched_indices: # 找到已匹配场景的最大结束时间 matched_end = 0.0 for item in timeline_raw: if item['has_match'] and item['end']: matched_end = max(matched_end, item['end']) remaining_duration = audio_duration - matched_end if remaining_duration > 0 and len(unmatched_indices) > 0: seg_dur = remaining_duration / len(unmatched_indices) current_t = matched_end for idx in unmatched_indices: timeline_raw[idx]['start'] = current_t timeline_raw[idx]['end'] = current_t + seg_dur timeline_raw[idx]['has_match'] = True current_t += seg_dur else: # 如果没有剩余时间,给一个默认时长 for idx in unmatched_indices: timeline_raw[idx]['start'] = matched_end timeline_raw[idx]['end'] = matched_end + 2.0 timeline_raw[idx]['has_match'] = True # 确保每个场景至少有 0.5 秒 for item in timeline_raw: if item['end'] is None or item['end'] <= item['start']: item['end'] = item['start'] + 0.5 # 消除间隙:确保时间连续 for i in range(len(timeline_raw) - 1): curr = timeline_raw[i] nxt = timeline_raw[i + 1] gap = nxt['start'] - curr['end'] if gap > 0.01: # 允许 10ms 的微小间隙 curr['end'] = nxt['start'] # 最后一个场景延伸到音频结尾 if timeline_raw: timeline_raw[-1]['end'] = audio_duration # 合并极短的场景(< 1秒)到下一个场景 merged = [] skip_next = False for i in range(len(timeline_raw)): if skip_next: skip_next = False continue item = dict(timeline_raw[i]) duration = item['end'] - item['start'] # 如果当前场景太短且不是最后一个,合并到下一个 if duration < 1.0 and i < len(timeline_raw) - 1: item['end'] = timeline_raw[i + 1]['end'] merged.append(item) skip_next = True else: merged.append(item) return merged def find_scene_image(workspace: str, scene_id: int) -> str: """查找场景图""" candidates = [ os.path.join(workspace, "scene", f"scene_{scene_id:03d}.png"), os.path.join(workspace, "scene", f"scene_{scene_id}.png"), ] for path in candidates: if os.path.exists(path): return path return None def create_video(workspace: str, timeline: list, audio_path: str, output_path: str, fps: int = 24, img_size: tuple = (1280, 720), subtitle: bool = True, log_fn=None): """ 用 moviepy 创建视频 Args: workspace: 工作区路径 timeline: 场景时间线 audio_path: 音频文件路径 output_path: 输出视频路径 fps: 帧率 img_size: 视频尺寸 (w, h) subtitle: 是否添加字幕 log_fn: 日志回调函数 """ def log(msg): if log_fn: log_fn(msg) else: print(msg) clips = [] for item in timeline: scene_id = item["scene_id"] start = item["start"] end = item["end"] duration = end - start if duration <= 0: continue img_path = find_scene_image(workspace, scene_id) if img_path: img_clip = ImageClip(img_path).with_duration(duration) img_clip = img_clip.resized(new_size=img_size) else: log(f" [警告] scene_{scene_id} 无图片,使用纯色背景") img_clip = ColorClip(size=img_size, color=(50, 50, 50)).with_duration(duration) img_clip = img_clip.with_start(start) clips.append(img_clip) # 字幕 if subtitle: asr_path = os.path.join(workspace, "result.json") if os.path.exists(asr_path): with open(asr_path, encoding="utf-8") as f: asr_result = json.load(f) segments = asr_result.get("segments", []) log(f"[*] 原始 ASR 片段: {len(segments)} 条") # 将 ASR segments 按时间分组(每段 3-5 秒,避免字幕堆满屏幕) def group_segments_into_sentences(segments: list) -> list: """ 将短的 ASR segments 合并成合理的字幕组。 支持中文(字级)和英文(词级)ASR 输出。 限制条件: - 每组最多 4 秒时长 - 每组最多 40 个字符(中文)/ 60 个字符(英文,含空格) - 遇到句号等结束标点时提前分组 """ if not segments: return [] sentence_endings = set('。!?.!?…\n') MAX_CHARS = 60 # 字符上限(英文含空格时会更长) MAX_DURATION = 4.0 def flush(text, start, end): if text and start is not None: grouped.append({"text": text.strip(), "start": start, "end": end}) grouped = [] current_words = [] # 词/字列表 current_start = None current_end = None for seg in segments: text = seg.get("text", "").strip() if not text: continue seg_start = seg.get("start", 0) seg_end = seg.get("end", 0) if current_start is None: current_start = seg_start # 判断加入当前词后是否超限 # 用空格拼英文,中文直接拼(中文 seg 通常每个词自带空格或无需加) preview_words = current_words + [text] # 根据是否含非 ASCII 字符判断是中文还是英文 is_ascii_word = all(ord(c) < 128 for c in text) if is_ascii_word: preview_text = " ".join(preview_words) else: preview_text = "".join(preview_words) preview_duration = seg_end - current_start # 先检查:加入后是否超限 → 如果是,先 flush 旧组,再开新组 overflow = ( len(preview_text) > MAX_CHARS or preview_duration > MAX_DURATION ) if overflow and current_words: # flush 当前组(不含新词) if is_ascii_word: flush(" ".join(current_words), current_start, current_end) else: flush("".join(current_words), current_start, current_end) current_words = [text] current_start = seg_start current_end = seg_end else: current_words.append(text) current_end = seg_end # 如果当前词是句尾标点,立刻 flush if text and text[-1] in sentence_endings: if is_ascii_word: flush(" ".join(current_words), current_start, current_end) else: flush("".join(current_words), current_start, current_end) current_words = [] current_start = None current_end = None # flush 剩余 if current_words: if all(ord(c) < 128 for c in current_words[0]): flush(" ".join(current_words), current_start or 0, current_end or 0) else: flush("".join(current_words), current_start or 0, current_end or 0) return grouped # 按句子分组 sentence_groups = group_segments_into_sentences(segments) log(f"[*] 添加字幕,共 {len(sentence_groups)} 个句子(从 {len(segments)} 个片段合并)") w, h = img_size font_size = max(int(h * 0.045), 20) margin_bottom = int(h * 0.10) max_chars_per_line = max(int(w / (font_size * 0.85)), 15) # 字体:优先黑体,fallback arial font_path = "C:/Windows/Fonts/simhei.ttf" if not os.path.exists(font_path): font_path = "C:/Windows/Fonts/arial.ttf" try: pil_font = ImageFont.truetype(font_path, font_size) except Exception: pil_font = ImageFont.load_default() # 字体度量 tmp_img = Image.new("RGBA", (1, 1)) tmp_draw = ImageDraw.Draw(tmp_img) metrics = tmp_draw.textbbox((0, 0), "gypj8q", font=pil_font) font_full_height = metrics[3] - metrics[1] def wrap_text(text: str, max_chars: int) -> list: """按字符数拆分多行,优先在标点处断行""" if len(text) <= max_chars: return [text] lines = [] while len(text) > max_chars: cut = max_chars for i in range(max_chars, max(max_chars - 5, 0), -1): if text[i] in ",。!?、;:\u201c\u201d\u2018\u2019\u2026\u2014,.!?;: ": cut = i + 1 break lines.append(text[:cut]) text = text[cut:] if text: lines.append(text) return lines def render_subtitle_pil(text_lines: list) -> np.ndarray: """用 PIL 渲染多行字幕,返回 RGBA numpy 数组""" line_spacing = int(font_size * 0.35) line_height = font_full_height + line_spacing line_widths = [] for line in text_lines: bbox = tmp_draw.textbbox((0, 0), line, font=pil_font) line_widths.append(bbox[2] - bbox[0]) img_w = max(line_widths) + 6 img_h = len(text_lines) * line_height + int(font_size * 0.3) img = Image.new("RGBA", (img_w, img_h), (0, 0, 0, 0)) draw = ImageDraw.Draw(img) for idx, line in enumerate(text_lines): x = (img_w - line_widths[idx]) // 2 y = idx * line_height for dx in (-2, -1, 0, 1, 2): for dy in (-2, -1, 0, 1, 2): if dx == 0 and dy == 0: continue draw.text((x + dx, y + dy), line, fill=(0, 0, 0, 255), font=pil_font) draw.text((x, y), line, fill=(255, 255, 255, 255), font=pil_font) return np.array(img) # 按句子显示字幕(而不是逐个片段) for sentence in sentence_groups: text = sentence["text"].strip() if not text: continue seg_start = sentence["start"] seg_end = sentence["end"] seg_dur = seg_end - seg_start if seg_dur <= 0: continue lines = wrap_text(text, max_chars_per_line) rgba = render_subtitle_pil(lines) txt_clip = ImageClip(rgba) txt_clip = txt_clip.with_position(("center", h - margin_bottom - rgba.shape[0])) txt_clip = txt_clip.with_duration(seg_dur).with_start(seg_start) clips.append(txt_clip) else: log("[提示] 未找到 result.json,跳过字幕") if not clips: raise ValueError("没有可用的场景片段!") # 加载音频 audio = AudioFileClip(audio_path) # 合成视频 video = CompositeVideoClip(clips, size=img_size) video = video.with_audio(audio) video = video.with_duration(audio.duration) # 导出 video.write_videofile( output_path, fps=fps, codec='libx264', audio_codec='aac', bitrate='8000k', ) video.close() log(f"[OK] 视频已保存: {output_path}") def main(workspace: str = None, fps: int = 24, size: str = "1280x720", subtitle: bool = True, log_fn=None): """ 主入口 Args: workspace: 工作区路径 fps: 帧率 size: 视频尺寸 "WxH" subtitle: 是否添加字幕 log_fn: 日志回调 """ from config import DEFAULT_VIDEO_SIZE if workspace is None: workspace = os.path.join(os.path.dirname(os.path.abspath(__file__)), "workspace", "1") w, h = map(int, (size or DEFAULT_VIDEO_SIZE).split("x")) img_size = (w, h) def log(msg): if log_fn: log_fn(msg) else: print(msg) log(f"[视频合成] 工作区: {workspace}") log(f"[视频合成] 尺寸: {img_size}, FPS: {fps}") # 1. 加载 scene_plan scene_plan = load_scene_plan(workspace) num_scenes = len(scene_plan["scenes"]) log(f"[视频合成] 共 {num_scenes} 个场景") # 2. 加载音频 audio_path = os.path.join(workspace, "voice.mp3") if not os.path.exists(audio_path): raise FileNotFoundError(f"音频文件不存在: {audio_path}") # 使用 try-finally 确保音频资源正确释放 audio = AudioFileClip(audio_path) try: audio_duration = audio.duration log(f"[视频合成] 音频时长: {audio_duration:.2f} 秒") finally: audio.close() # 3. 读取 ASR 结果 asr_result = load_asr_result(workspace) if asr_result: seg_count = len(asr_result.get("segments", [])) log(f"[视频合成] ASR: {seg_count} 个片段") else: log("[视频合成] 无 ASR 结果,按平均分配") # 4. 构建时间线 timeline = build_scene_timeline(scene_plan, asr_result, audio_duration, workspace=workspace) log("[视频合成] 场景时间分配:") matched_count = 0 unmatched_count = 0 for item in timeline: dur = item['end'] - item['start'] # 显示匹配质量 similarity = item.get('similarity', 0.0) if similarity > 0: quality = f"[{similarity:.0%}]" matched_count += 1 elif item.get('has_match', False): quality = "[OK]" matched_count += 1 else: quality = "[兜底]" unmatched_count += 1 log(f" Scene {item['scene_id']:2d}: {item['start']:6.2f}s - {item['end']:6.2f}s ({dur:.2f}s) {quality}") if unmatched_count > 0: log(f"[警告] {unmatched_count}/{len(timeline)} 个场景未精确匹配,使用兜底分配") else: log(f"[OK] 所有场景均成功匹配") # 5. 生成视频 has_image = sum(1 for item in timeline if find_scene_image(workspace, item["scene_id"]) is not None) log(f"[视频合成] 有图片场景: {has_image}/{len(timeline)}") if has_image == 0: log("[警告] 所有场景都没有图片,视频将使用纯色背景!") output_path = os.path.join(workspace, "output_video.mp4") create_video(workspace, timeline, audio_path, output_path, fps=fps, img_size=img_size, subtitle=subtitle, log_fn=log_fn) log(f"[完成] 视频: {output_path}") return output_path if __name__ == "__main__": main()