Files
video/make_video.py
2026-04-25 12:50:36 +08:00

746 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
make_video.py - 场景图 + ASR 时间戳 + 音频 → 视频
流程:
1. 读取 voice.mp3 获取总时长
2. 读取 scene_plan.json 获取场景列表
3. 如果有 result.json(ASR 对齐),按对齐结果分配时间;否则平均分配
4. 用 moviepy 将场景图在对应时段显示,配上音频生成视频
"""
import os
import json
import math
import re
import numpy as np
from PIL import Image, ImageDraw, ImageFont
from moviepy import (
AudioFileClip,
ImageClip,
CompositeVideoClip,
ColorClip,
concatenate_videoclips
)
def load_scene_plan(workspace: str) -> dict:
"""读取 scene_plan.json"""
path = os.path.join(workspace, 'scene_plan.json')
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
def load_asr_result(workspace: str) -> dict:
"""读取 result.json(ASR 对齐结果),如果不存在返回 None"""
path = os.path.join(workspace, "result.json")
if os.path.exists(path):
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
return None
def build_scene_timeline(scene_plan: dict, asr_result: dict, audio_duration: float,
workspace: str = None) -> list:
"""
构建场景时间线。优先使用 scene 中已有的 start_time/end_timeASR 步骤写入),
没有则根据 ASR 文本匹配分配,再没有则平均分配。
"""
scenes = scene_plan["scenes"]
num_scenes = len(scenes)
# --- 策略 1:直接使用 scene 已有的时间戳(ASR 步骤写入的) ---
if all("start_time" in s and "end_time" in s for s in scenes):
timeline = []
for scene in scenes:
timeline.append({
"scene_id": scene["scene_id"],
"start": scene["start_time"],
"end": scene["end_time"],
"has_match": True,
})
# 确保最后一个场景延伸到音频结尾
if timeline:
timeline[-1]["end"] = audio_duration
return timeline
# --- 策略 2:部分有时间戳,部分没有 ---
has_timestamp = [s for s in scenes if "start_time" in s and "end_time" in s]
if has_timestamp and len(has_timestamp) < num_scenes:
timeline = []
matched_end = 0.0
for scene in scenes:
if "start_time" in scene and "end_time" in scene:
timeline.append({
"scene_id": scene["scene_id"],
"start": scene["start_time"],
"end": scene["end_time"],
})
matched_end = max(matched_end, scene["end_time"])
else:
timeline.append({
"scene_id": scene["scene_id"],
"start": None,
"end": None,
})
# 未匹配的场景按剩余时间平均分配
unmatched_indices = [i for i, item in enumerate(timeline) if item["start"] is None]
if unmatched_indices:
remaining = audio_duration - matched_end
if remaining > 0:
seg_dur = remaining / len(unmatched_indices)
t = matched_end
for idx in unmatched_indices:
timeline[idx]["start"] = t
timeline[idx]["end"] = t + seg_dur
t += seg_dur
else:
for idx in unmatched_indices:
timeline[idx]["start"] = matched_end
timeline[idx]["end"] = matched_end + 2.0
if timeline:
timeline[-1]["end"] = audio_duration
return timeline
# --- 策略 3:没有任何时间戳,用 ASR 文本匹配或平均分配 ---
if asr_result is not None:
# 准备匹配用的文本
scenes_for_match = []
for scene in scenes:
s = dict(scene)
text_val = s.get("text", "")
lines_val = s.get("lines", "")
raw = text_val or lines_val or ""
if isinstance(raw, str) and raw:
s["lines"] = [x.strip() for x in raw.split("") if x.strip()]
elif isinstance(raw, list):
s["lines"] = raw
else:
s["lines"] = []
scenes_for_match.append(s)
asr_segments = asr_result.get("segments", [])
timeline = assign_scenes_to_segments(scenes_for_match, asr_segments, audio_duration)
else:
# 平均分配
duration_per_scene = audio_duration / num_scenes
timeline = []
current_time = 0.0
for scene in scenes:
start = current_time
end = min(current_time + duration_per_scene, audio_duration)
timeline.append({
"scene_id": scene["scene_id"],
"start": start,
"end": end,
})
current_time = end
return timeline
def _normalize_text(text: str) -> str:
"""清理文本:去除标点、空格、转小写,用于相似度比较"""
if not text:
return ""
# 去除所有非字母数字和非中文字符
text = re.sub(r'[^\w\u4e00-\u9fff]', '', text)
return text.lower()
def _longest_common_substring_ratio(s1: str, s2: str) -> float:
"""
计算两个字符串的最长公共子串比例
使用动态规划,但为了性能优化:
- 如果字符串太长,使用简化的滑动窗口方法
"""
if not s1 or not s2:
return 0.0
# 对于短字符串,使用精确的 DP 算法
if len(s1) * len(s2) < 100000: # 乘积小于 10万
m, n = len(s1), len(s2)
# 优化空间复杂度:只用两行
prev = [0] * (n + 1)
curr = [0] * (n + 1)
max_len = 0
for i in range(1, m + 1):
for j in range(1, n + 1):
if s1[i-1] == s2[j-1]:
curr[j] = prev[j-1] + 1
max_len = max(max_len, curr[j])
else:
curr[j] = 0
prev, curr = curr, [0] * (n + 1)
return max_len / max(len(s1), len(s2))
# 对于长字符串,使用简化的滑动窗口
return _sliding_window_similarity(s1, s2)
def _sliding_window_similarity(s1: str, s2: str, window_size: int = 20) -> float:
"""
滑动窗口相似度(用于长字符串的快速估算)
将 s1 分成多个窗口,在 s2 中查找最佳匹配
"""
if len(s1) <= window_size:
# 如果 s1 本身很短,直接检查是否在 s2 中
if s1 in s2:
return 1.0
# 否则检查每个字符的出现
common = sum(1 for c in s1 if c in s2)
return common / len(s1) if s1 else 0.0
# 分窗口检查
total_score = 0.0
num_windows = 0
for i in range(0, len(s1) - window_size + 1, window_size // 2):
window = s1[i:i+window_size]
if window in s2:
total_score += 1.0
else:
# 部分匹配
common = sum(1 for c in window if c in s2)
total_score += common / window_size
num_windows += 1
return total_score / num_windows if num_windows > 0 else 0.0
def assign_scenes_to_segments(scenes: list, asr_segments: list, audio_duration: float) -> list:
"""
基于文本相似度的精确匹配
策略:
1. 每个 scene 有 text 字段(原文片段)
2. 在 ASR segments 中搜索这段文本的出现位置
3. 使用最长公共子串相似度容忍 ASR 识别错误
4. 阈值 60%,避免误匹配
"""
SIMILARITY_THRESHOLD = 0.5 # 相似度阈值
MAX_WINDOW_SIZE = 20 # 最多合并 20 个 ASR segments(长句需要更多)
# 预处理:清理 ASR segments 的文本
asr_cleaned = []
for seg in asr_segments:
text = seg.get('text', '').strip()
if text:
asr_cleaned.append({
'text': text,
'text_normalized': _normalize_text(text),
'start': seg['start'],
'end': seg['end']
})
def find_best_match(scene_text: str, start_seg_idx: int):
"""
找到 scene_text 在 ASR segments 中的最佳匹配位置
使用滑动窗口尝试不同长度的 segment 组合
"""
if not scene_text or not scene_text.strip():
return None
scene_normalized = _normalize_text(scene_text)
best_score = 0.0
best_match = None
# 滑动窗口:尝试不同数量的 segment 组合
for window_size in range(1, min(MAX_WINDOW_SIZE + 1, len(asr_cleaned) - start_seg_idx + 1)):
for i in range(start_seg_idx, len(asr_cleaned) - window_size + 1):
# 合并多个 segments 的文本
combined_text = ''.join(
seg['text_normalized'] for seg in asr_cleaned[i:i+window_size]
)
# 计算相似度
similarity = _longest_common_substring_ratio(scene_normalized, combined_text)
if similarity > best_score:
best_score = similarity
best_match = {
'start': asr_cleaned[i]['start'],
'end': asr_cleaned[i + window_size - 1]['end'],
'start_idx': i,
'end_idx': i + window_size - 1,
'similarity': similarity
}
# 如果已经达到很高相似度,提前返回
if similarity >= 0.90:
return best_match
# 只有达到阈值才算匹配成功
if best_score >= SIMILARITY_THRESHOLD:
return best_match
return None
# 主匹配循环
timeline_raw = []
last_end_idx = 0 # 记录上一个匹配结束的位置
for scene in scenes:
# 优先使用 text 字段,兼容旧 lines 字段
scene_text = scene.get('text', '') or scene.get('lines', '')
if isinstance(scene_text, list):
scene_text = ''.join(scene_text)
match = find_best_match(scene_text, last_end_idx)
if match:
timeline_raw.append({
'scene_id': scene['scene_id'],
'start': match['start'],
'end': match['end'],
'has_match': True,
'similarity': match['similarity']
})
# 下一个场景从当前匹配结束后开始
last_end_idx = match['end_idx'] + 1
else:
timeline_raw.append({
'scene_id': scene['scene_id'],
'start': None,
'end': None,
'has_match': False,
'similarity': 0.0
})
# 处理未匹配的场景:按剩余音频时间平均分配
unmatched_indices = [i for i, item in enumerate(timeline_raw) if not item['has_match']]
if unmatched_indices:
# 找到已匹配场景的最大结束时间
matched_end = 0.0
for item in timeline_raw:
if item['has_match'] and item['end']:
matched_end = max(matched_end, item['end'])
remaining_duration = audio_duration - matched_end
if remaining_duration > 0 and len(unmatched_indices) > 0:
seg_dur = remaining_duration / len(unmatched_indices)
current_t = matched_end
for idx in unmatched_indices:
timeline_raw[idx]['start'] = current_t
timeline_raw[idx]['end'] = current_t + seg_dur
timeline_raw[idx]['has_match'] = True
current_t += seg_dur
else:
# 如果没有剩余时间,给一个默认时长
for idx in unmatched_indices:
timeline_raw[idx]['start'] = matched_end
timeline_raw[idx]['end'] = matched_end + 2.0
timeline_raw[idx]['has_match'] = True
# 确保每个场景至少有 0.5 秒
for item in timeline_raw:
if item['end'] is None or item['end'] <= item['start']:
item['end'] = item['start'] + 0.5
# 消除间隙:确保时间连续
for i in range(len(timeline_raw) - 1):
curr = timeline_raw[i]
nxt = timeline_raw[i + 1]
gap = nxt['start'] - curr['end']
if gap > 0.01: # 允许 10ms 的微小间隙
curr['end'] = nxt['start']
# 最后一个场景延伸到音频结尾
if timeline_raw:
timeline_raw[-1]['end'] = audio_duration
# 合并极短的场景(< 1秒)到下一个场景
merged = []
skip_next = False
for i in range(len(timeline_raw)):
if skip_next:
skip_next = False
continue
item = dict(timeline_raw[i])
duration = item['end'] - item['start']
# 如果当前场景太短且不是最后一个,合并到下一个
if duration < 1.0 and i < len(timeline_raw) - 1:
item['end'] = timeline_raw[i + 1]['end']
merged.append(item)
skip_next = True
else:
merged.append(item)
return merged
def find_scene_image(workspace: str, scene_id: int) -> str:
"""查找场景图"""
candidates = [
os.path.join(workspace, "scene", f"scene_{scene_id:03d}.png"),
os.path.join(workspace, "scene", f"scene_{scene_id}.png"),
]
for path in candidates:
if os.path.exists(path):
return path
return None
def create_video(workspace: str, timeline: list, audio_path: str, output_path: str,
fps: int = 24, img_size: tuple = (1280, 720),
subtitle: bool = True, log_fn=None):
"""
用 moviepy 创建视频
Args:
workspace: 工作区路径
timeline: 场景时间线
audio_path: 音频文件路径
output_path: 输出视频路径
fps: 帧率
img_size: 视频尺寸 (w, h)
subtitle: 是否添加字幕
log_fn: 日志回调函数
"""
def log(msg):
if log_fn:
log_fn(msg)
else:
print(msg)
clips = []
for item in timeline:
scene_id = item["scene_id"]
start = item["start"]
end = item["end"]
duration = end - start
if duration <= 0:
continue
img_path = find_scene_image(workspace, scene_id)
if img_path:
img_clip = ImageClip(img_path).with_duration(duration)
img_clip = img_clip.resized(new_size=img_size)
else:
log(f" [警告] scene_{scene_id} 无图片,使用纯色背景")
img_clip = ColorClip(size=img_size, color=(50, 50, 50)).with_duration(duration)
img_clip = img_clip.with_start(start)
clips.append(img_clip)
# 字幕
if subtitle:
asr_path = os.path.join(workspace, "result.json")
if os.path.exists(asr_path):
with open(asr_path, encoding="utf-8") as f:
asr_result = json.load(f)
segments = asr_result.get("segments", [])
log(f"[*] 原始 ASR 片段: {len(segments)}")
# 将 ASR segments 按时间分组(每段 3-5 秒,避免字幕堆满屏幕)
def group_segments_into_sentences(segments: list) -> list:
"""
将短的 ASR segments 合并成合理的字幕组。
支持中文(字级)和英文(词级)ASR 输出。
限制条件:
- 每组最多 4 秒时长
- 每组最多 40 个字符(中文)/ 60 个字符(英文,含空格)
- 遇到句号等结束标点时提前分组
"""
if not segments:
return []
sentence_endings = set('。!?.!?…\n')
MAX_CHARS = 60 # 字符上限(英文含空格时会更长)
MAX_DURATION = 4.0
def flush(text, start, end):
if text and start is not None:
grouped.append({"text": text.strip(), "start": start, "end": end})
grouped = []
current_words = [] # 词/字列表
current_start = None
current_end = None
for seg in segments:
text = seg.get("text", "").strip()
if not text:
continue
seg_start = seg.get("start", 0)
seg_end = seg.get("end", 0)
if current_start is None:
current_start = seg_start
# 判断加入当前词后是否超限
# 用空格拼英文,中文直接拼(中文 seg 通常每个词自带空格或无需加)
preview_words = current_words + [text]
# 根据是否含非 ASCII 字符判断是中文还是英文
is_ascii_word = all(ord(c) < 128 for c in text)
if is_ascii_word:
preview_text = " ".join(preview_words)
else:
preview_text = "".join(preview_words)
preview_duration = seg_end - current_start
# 先检查:加入后是否超限 → 如果是,先 flush 旧组,再开新组
overflow = (
len(preview_text) > MAX_CHARS or
preview_duration > MAX_DURATION
)
if overflow and current_words:
# flush 当前组(不含新词)
if is_ascii_word:
flush(" ".join(current_words), current_start, current_end)
else:
flush("".join(current_words), current_start, current_end)
current_words = [text]
current_start = seg_start
current_end = seg_end
else:
current_words.append(text)
current_end = seg_end
# 如果当前词是句尾标点,立刻 flush
if text and text[-1] in sentence_endings:
if is_ascii_word:
flush(" ".join(current_words), current_start, current_end)
else:
flush("".join(current_words), current_start, current_end)
current_words = []
current_start = None
current_end = None
# flush 剩余
if current_words:
if all(ord(c) < 128 for c in current_words[0]):
flush(" ".join(current_words), current_start or 0, current_end or 0)
else:
flush("".join(current_words), current_start or 0, current_end or 0)
return grouped
# 按句子分组
sentence_groups = group_segments_into_sentences(segments)
log(f"[*] 添加字幕,共 {len(sentence_groups)} 个句子(从 {len(segments)} 个片段合并)")
w, h = img_size
font_size = max(int(h * 0.045), 20)
margin_bottom = int(h * 0.10)
max_chars_per_line = max(int(w / (font_size * 0.85)), 15)
# 字体:优先黑体,fallback arial
font_path = "C:/Windows/Fonts/simhei.ttf"
if not os.path.exists(font_path):
font_path = "C:/Windows/Fonts/arial.ttf"
try:
pil_font = ImageFont.truetype(font_path, font_size)
except Exception:
pil_font = ImageFont.load_default()
# 字体度量
tmp_img = Image.new("RGBA", (1, 1))
tmp_draw = ImageDraw.Draw(tmp_img)
metrics = tmp_draw.textbbox((0, 0), "gypj8q", font=pil_font)
font_full_height = metrics[3] - metrics[1]
def wrap_text(text: str, max_chars: int) -> list:
"""按字符数拆分多行,优先在标点处断行"""
if len(text) <= max_chars:
return [text]
lines = []
while len(text) > max_chars:
cut = max_chars
for i in range(max_chars, max(max_chars - 5, 0), -1):
if text[i] in ",。!?、;:\u201c\u201d\u2018\u2019\u2026\u2014,.!?;: ":
cut = i + 1
break
lines.append(text[:cut])
text = text[cut:]
if text:
lines.append(text)
return lines
def render_subtitle_pil(text_lines: list) -> np.ndarray:
"""用 PIL 渲染多行字幕,返回 RGBA numpy 数组"""
line_spacing = int(font_size * 0.35)
line_height = font_full_height + line_spacing
line_widths = []
for line in text_lines:
bbox = tmp_draw.textbbox((0, 0), line, font=pil_font)
line_widths.append(bbox[2] - bbox[0])
img_w = max(line_widths) + 6
img_h = len(text_lines) * line_height + int(font_size * 0.3)
img = Image.new("RGBA", (img_w, img_h), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
for idx, line in enumerate(text_lines):
x = (img_w - line_widths[idx]) // 2
y = idx * line_height
for dx in (-2, -1, 0, 1, 2):
for dy in (-2, -1, 0, 1, 2):
if dx == 0 and dy == 0:
continue
draw.text((x + dx, y + dy), line, fill=(0, 0, 0, 255), font=pil_font)
draw.text((x, y), line, fill=(255, 255, 255, 255), font=pil_font)
return np.array(img)
# 按句子显示字幕(而不是逐个片段)
for sentence in sentence_groups:
text = sentence["text"].strip()
if not text:
continue
seg_start = sentence["start"]
seg_end = sentence["end"]
seg_dur = seg_end - seg_start
if seg_dur <= 0:
continue
lines = wrap_text(text, max_chars_per_line)
rgba = render_subtitle_pil(lines)
txt_clip = ImageClip(rgba)
txt_clip = txt_clip.with_position(("center", h - margin_bottom - rgba.shape[0]))
txt_clip = txt_clip.with_duration(seg_dur).with_start(seg_start)
clips.append(txt_clip)
else:
log("[提示] 未找到 result.json,跳过字幕")
if not clips:
raise ValueError("没有可用的场景片段!")
# 加载音频
audio = AudioFileClip(audio_path)
# 合成视频
video = CompositeVideoClip(clips, size=img_size)
video = video.with_audio(audio)
video = video.with_duration(audio.duration)
# 导出
video.write_videofile(
output_path,
fps=fps,
codec='libx264',
audio_codec='aac',
bitrate='8000k',
)
video.close()
log(f"[OK] 视频已保存: {output_path}")
def main(workspace: str = None, fps: int = 24, size: str = "1280x720",
subtitle: bool = True, log_fn=None):
"""
主入口
Args:
workspace: 工作区路径
fps: 帧率
size: 视频尺寸 "WxH"
subtitle: 是否添加字幕
log_fn: 日志回调
"""
from config import DEFAULT_VIDEO_SIZE
if workspace is None:
workspace = os.path.join(os.path.dirname(os.path.abspath(__file__)), "workspace", "1")
w, h = map(int, (size or DEFAULT_VIDEO_SIZE).split("x"))
img_size = (w, h)
def log(msg):
if log_fn:
log_fn(msg)
else:
print(msg)
log(f"[视频合成] 工作区: {workspace}")
log(f"[视频合成] 尺寸: {img_size}, FPS: {fps}")
# 1. 加载 scene_plan
scene_plan = load_scene_plan(workspace)
num_scenes = len(scene_plan["scenes"])
log(f"[视频合成] 共 {num_scenes} 个场景")
# 2. 加载音频
audio_path = os.path.join(workspace, "voice.mp3")
if not os.path.exists(audio_path):
raise FileNotFoundError(f"音频文件不存在: {audio_path}")
# 使用 try-finally 确保音频资源正确释放
audio = AudioFileClip(audio_path)
try:
audio_duration = audio.duration
log(f"[视频合成] 音频时长: {audio_duration:.2f}")
finally:
audio.close()
# 3. 读取 ASR 结果
asr_result = load_asr_result(workspace)
if asr_result:
seg_count = len(asr_result.get("segments", []))
log(f"[视频合成] ASR: {seg_count} 个片段")
else:
log("[视频合成] 无 ASR 结果,按平均分配")
# 4. 构建时间线
timeline = build_scene_timeline(scene_plan, asr_result, audio_duration, workspace=workspace)
log("[视频合成] 场景时间分配:")
matched_count = 0
unmatched_count = 0
for item in timeline:
dur = item['end'] - item['start']
# 显示匹配质量
similarity = item.get('similarity', 0.0)
if similarity > 0:
quality = f"[{similarity:.0%}]"
matched_count += 1
elif item.get('has_match', False):
quality = "[OK]"
matched_count += 1
else:
quality = "[兜底]"
unmatched_count += 1
log(f" Scene {item['scene_id']:2d}: {item['start']:6.2f}s - {item['end']:6.2f}s ({dur:.2f}s) {quality}")
if unmatched_count > 0:
log(f"[警告] {unmatched_count}/{len(timeline)} 个场景未精确匹配,使用兜底分配")
else:
log(f"[OK] 所有场景均成功匹配")
# 5. 生成视频
has_image = sum(1 for item in timeline if find_scene_image(workspace, item["scene_id"]) is not None)
log(f"[视频合成] 有图片场景: {has_image}/{len(timeline)}")
if has_image == 0:
log("[警告] 所有场景都没有图片,视频将使用纯色背景!")
output_path = os.path.join(workspace, "output_video.mp4")
create_video(workspace, timeline, audio_path, output_path, fps=fps, img_size=img_size,
subtitle=subtitle, log_fn=log_fn)
log(f"[完成] 视频: {output_path}")
return output_path
if __name__ == "__main__":
main()