import os from concurrent.futures import ThreadPoolExecutor import math import cv2 from moviepy.editor import VideoFileClip import tqdm def split_video_segment(input_video_path, output_dir, start_time, end_time, output_filename): """使用 moviepy 提取并保存一个视频片段。""" output_path = os.path.join(output_dir, output_filename) try: # 使用 with 语句可以确保资源被正确释放 with VideoFileClip(input_video_path) as video: # 提取子片段 subclip = video.subclip(start_time, end_time) # 仅写视频不写音频,并使用 ultrafast 预设以提升速度 subclip.write_videofile( output_path, codec='libx264', audio=False, preset='ultrafast', threads=os.cpu_count() or 4, logger=None, ) print(f"成功创建: {output_filename}") except Exception as e: # 这里我们先打印错误信息 print(f"分割片段 {output_filename} 时出错: {e}") print("已设置为仅视频输出(audio=False)。请检查输入/输出格式兼容性或更换编码器。") def main(): # --- 请在这里配置 --- input_video = "/mnt/data/xiuying/Code/local_deploy/video/video.mp4" # <--- 请修改为你的视频文件路径 output_dir = "/mnt/data/xiuying/Code/local_deploy/video/new/Clips_60s" # <--- 请修改为输出目录名 split_seconds = 60 # <--- 请修改为每个片段的秒数 (n) # -------------------- # 使用CPU核心数作为最大线程数,可以根据需要调整 max_threads = os.cpu_count() or 4 # 检查输入视频是否存在 if not os.path.exists(input_video): print(f"错误:输入视频文件不存在 -> {input_video}") return # 创建输出目录 if not os.path.exists(output_dir): os.makedirs(output_dir) print(f"创建输出目录: {output_dir}") total_duration = 0 try: # 获取视频总时长 with VideoFileClip(input_video) as video: total_duration = video.duration except Exception as e: print(f"使用 moviepy 读取视频文件时出错: {e}") print("请确保 ffmpeg 已正确安装并可以被 moviepy 调用。") return print(f"视频总时长: {total_duration:.2f} 秒") # 创建分割任务列表 tasks = [] num_clips = math.ceil(total_duration / split_seconds) base_filename, file_extension = os.path.splitext(os.path.basename(input_video)) for i in range(num_clips): start_time = i * split_seconds # 确保结束时间不会超过视频总长 end_time = min(start_time + split_seconds, total_duration) # 如果计算出的开始时间已经等于或超过总时长,则停止 if start_time >= total_duration: break output_filename = f"{base_filename}_part_{i+1:03d}{file_extension}" tasks.append((input_video, output_dir, start_time, end_time, output_filename)) print(f"准备将视频分割成 {len(tasks)} 个片段...") # 使用线程池执行分割任务 with ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map(lambda p: split_video_segment(*p), tasks) print("\n所有视频片段处理完成!") if __name__ == "__main__": main()