|
from typing import cast, Union |
|
|
|
import torch |
|
|
|
from diffusers import AutoencoderKLHunyuanVideo |
|
from diffusers.video_processor import VideoProcessor |
|
from diffusers.utils import export_to_video |
|
|
|
class EndpointHandler: |
|
def __init__(self, path=""): |
|
self.device = "cuda" |
|
self.dtype = torch.float16 |
|
self.vae = cast(AutoencoderKLHunyuanVideo, AutoencoderKLHunyuanVideo.from_pretrained(path, torch_dtype=self.dtype).to(self.device, self.dtype).eval()) |
|
|
|
self.vae_scale_factor = self.vae_scale_factor_spatial = self.vae.spatial_compression_ratio |
|
self.video_processor = VideoProcessor( |
|
vae_scale_factor=self.vae_scale_factor_spatial |
|
) |
|
|
|
@torch.no_grad() |
|
def __call__(self, data) -> Union[torch.Tensor, bytes]: |
|
""" |
|
Args: |
|
data (:obj:): |
|
includes the input data and the parameters for the inference. |
|
""" |
|
tensor = cast(torch.Tensor, data["inputs"]) |
|
parameters = cast(dict, data.get("parameters", {})) |
|
do_scaling = cast(bool, parameters.get("do_scaling", True)) |
|
output_type = cast(str, parameters.get("output_type", "pil")) |
|
partial_postprocess = cast(bool, parameters.get("partial_postprocess", False)) |
|
if partial_postprocess and output_type != "pt": |
|
output_type = "pt" |
|
|
|
tensor = tensor.to(self.device, self.dtype) |
|
|
|
if do_scaling: |
|
tensor = tensor / self.vae.config.scaling_factor |
|
|
|
with torch.no_grad(): |
|
frames = cast(torch.Tensor, self.vae.decode(tensor, return_dict=False)[0]) |
|
|
|
if partial_postprocess: |
|
frames = frames[0].permute(1, 0, 2, 3) |
|
frames = torch.stack([(frame * 0.5 + 0.5).clamp(0, 1) for frame in frames]) |
|
frames = frames.permute(0, 2, 3, 1).contiguous().float() |
|
frames = (frames * 255).round().to(torch.uint8) |
|
elif output_type == "pil": |
|
frames = cast(torch.Tensor, self.video_processor.postprocess_video(frames, output_type="pt")[0]) |
|
elif output_type == "mp4": |
|
frames = cast(torch.Tensor, self.video_processor.postprocess_video(frames, output_type="pil")[0]) |
|
path = export_to_video(frames, fps=15) |
|
with open(path, "rb") as f: |
|
frames = f.read() |
|
elif output_type == "pt": |
|
frames = frames |
|
|
|
return frames |
|
|