Wangtwohappy's picture
Upload folder using huggingface_hub
f8ba0eb verified
import os
from concurrent.futures import ThreadPoolExecutor
import math
import cv2
from moviepy.editor import VideoFileClip
import tqdm
def split_video_segment(input_video_path, output_dir, start_time, end_time, output_filename):
"""使用 moviepy 提取并保存一个视频片段。"""
output_path = os.path.join(output_dir, output_filename)
try:
# 使用 with 语句可以确保资源被正确释放
with VideoFileClip(input_video_path) as video:
# 提取子片段
subclip = video.subclip(start_time, end_time)
# 仅写视频不写音频,并使用 ultrafast 预设以提升速度
subclip.write_videofile(
output_path,
codec='libx264',
audio=False,
preset='ultrafast',
threads=os.cpu_count() or 4,
logger=None,
)
print(f"成功创建: {output_filename}")
except Exception as e:
# 这里我们先打印错误信息
print(f"分割片段 {output_filename} 时出错: {e}")
print("已设置为仅视频输出(audio=False)。请检查输入/输出格式兼容性或更换编码器。")
def main():
# --- 请在这里配置 ---
input_video = "/mnt/data/xiuying/Code/local_deploy/video/video.mp4" # <--- 请修改为你的视频文件路径
output_dir = "/mnt/data/xiuying/Code/local_deploy/video/new/Clips_60s" # <--- 请修改为输出目录名
split_seconds = 60 # <--- 请修改为每个片段的秒数 (n)
# --------------------
# 使用CPU核心数作为最大线程数,可以根据需要调整
max_threads = os.cpu_count() or 4
# 检查输入视频是否存在
if not os.path.exists(input_video):
print(f"错误:输入视频文件不存在 -> {input_video}")
return
# 创建输出目录
if not os.path.exists(output_dir):
os.makedirs(output_dir)
print(f"创建输出目录: {output_dir}")
total_duration = 0
try:
# 获取视频总时长
with VideoFileClip(input_video) as video:
total_duration = video.duration
except Exception as e:
print(f"使用 moviepy 读取视频文件时出错: {e}")
print("请确保 ffmpeg 已正确安装并可以被 moviepy 调用。")
return
print(f"视频总时长: {total_duration:.2f} 秒")
# 创建分割任务列表
tasks = []
num_clips = math.ceil(total_duration / split_seconds)
base_filename, file_extension = os.path.splitext(os.path.basename(input_video))
for i in range(num_clips):
start_time = i * split_seconds
# 确保结束时间不会超过视频总长
end_time = min(start_time + split_seconds, total_duration)
# 如果计算出的开始时间已经等于或超过总时长,则停止
if start_time >= total_duration:
break
output_filename = f"{base_filename}_part_{i+1:03d}{file_extension}"
tasks.append((input_video, output_dir, start_time, end_time, output_filename))
print(f"准备将视频分割成 {len(tasks)} 个片段...")
# 使用线程池执行分割任务
with ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(lambda p: split_video_segment(*p), tasks)
print("\n所有视频片段处理完成!")
if __name__ == "__main__":
main()