|
import os |
|
from concurrent.futures import ThreadPoolExecutor |
|
import math |
|
import cv2 |
|
from moviepy.editor import VideoFileClip |
|
import tqdm |
|
|
|
def split_video_segment(input_video_path, output_dir, start_time, end_time, output_filename): |
|
"""使用 moviepy 提取并保存一个视频片段。""" |
|
output_path = os.path.join(output_dir, output_filename) |
|
try: |
|
|
|
with VideoFileClip(input_video_path) as video: |
|
|
|
subclip = video.subclip(start_time, end_time) |
|
|
|
subclip.write_videofile( |
|
output_path, |
|
codec='libx264', |
|
audio=False, |
|
preset='ultrafast', |
|
threads=os.cpu_count() or 4, |
|
logger=None, |
|
) |
|
print(f"成功创建: {output_filename}") |
|
except Exception as e: |
|
|
|
print(f"分割片段 {output_filename} 时出错: {e}") |
|
print("已设置为仅视频输出(audio=False)。请检查输入/输出格式兼容性或更换编码器。") |
|
|
|
|
|
def main(): |
|
|
|
input_video = "/mnt/data/xiuying/Code/local_deploy/video/video.mp4" |
|
output_dir = "/mnt/data/xiuying/Code/local_deploy/video/new/Clips_60s" |
|
split_seconds = 60 |
|
|
|
|
|
|
|
max_threads = os.cpu_count() or 4 |
|
|
|
|
|
if not os.path.exists(input_video): |
|
print(f"错误:输入视频文件不存在 -> {input_video}") |
|
return |
|
|
|
|
|
if not os.path.exists(output_dir): |
|
os.makedirs(output_dir) |
|
print(f"创建输出目录: {output_dir}") |
|
|
|
total_duration = 0 |
|
try: |
|
|
|
with VideoFileClip(input_video) as video: |
|
total_duration = video.duration |
|
except Exception as e: |
|
print(f"使用 moviepy 读取视频文件时出错: {e}") |
|
print("请确保 ffmpeg 已正确安装并可以被 moviepy 调用。") |
|
return |
|
|
|
print(f"视频总时长: {total_duration:.2f} 秒") |
|
|
|
|
|
tasks = [] |
|
num_clips = math.ceil(total_duration / split_seconds) |
|
base_filename, file_extension = os.path.splitext(os.path.basename(input_video)) |
|
|
|
for i in range(num_clips): |
|
start_time = i * split_seconds |
|
|
|
end_time = min(start_time + split_seconds, total_duration) |
|
|
|
|
|
if start_time >= total_duration: |
|
break |
|
|
|
output_filename = f"{base_filename}_part_{i+1:03d}{file_extension}" |
|
tasks.append((input_video, output_dir, start_time, end_time, output_filename)) |
|
|
|
print(f"准备将视频分割成 {len(tasks)} 个片段...") |
|
|
|
|
|
with ThreadPoolExecutor(max_workers=max_threads) as executor: |
|
executor.map(lambda p: split_video_segment(*p), tasks) |
|
|
|
print("\n所有视频片段处理完成!") |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|