| | import os |
| | import torch |
| | import warnings |
| | warnings.filterwarnings('ignore') |
| | import requests |
| | from io import BytesIO |
| | from transformers.pipelines.audio_utils import ffmpeg_read |
| | import mutagen |
| | from torchaudio import functional as taF |
| | import numpy as np |
| |
|
| | feature_extractor_sampling_rate = 16000 |
| | clip_length = 30*feature_extractor_sampling_rate |
| | clip_drop = feature_extractor_sampling_rate//2 |
| | AUDIO_EXTENSIONS = ('.wav', '.mp3', '.flac', '.opus', '.ogg') |
| |
|
| |
|
| | def load_audio_single(audio_file, seg=None): |
| | assert isinstance(audio_file, str), "audio_file should be a string" |
| | if audio_file.endswith(AUDIO_EXTENSIONS): |
| | inputs=audio_file |
| | in_sampling_rate=mutagen.File(inputs).info.sample_rate |
| | if inputs.startswith("http://") or inputs.startswith("https://"): |
| | |
| | inputs = requests.get(inputs).content |
| | else: |
| | with open(inputs, "rb") as f: |
| | inputs = f.read() |
| | if isinstance(inputs, bytes): |
| | inputs = ffmpeg_read(inputs, in_sampling_rate) |
| | if seg is not None: |
| | inputs = inputs[int(seg[0] * in_sampling_rate):int(seg[1] * in_sampling_rate)] |
| | if in_sampling_rate != feature_extractor_sampling_rate: |
| | inputs = taF.resample( |
| | torch.from_numpy(inputs.copy()), in_sampling_rate, feature_extractor_sampling_rate |
| | ).numpy() |
| | if len(inputs) <= clip_length: |
| | return [inputs] |
| | else: |
| | audios = [] |
| | for i in range(0, len(inputs), clip_length): |
| | chunk = inputs[i : i + clip_length] |
| | chunk_index = len(chunk) |
| | if chunk_index > clip_drop: |
| | audios.append(chunk) |
| | return audios |
| | if audio_file.endswith('.npy'): |
| | return [np.load(audio_file)] |
| | |
| |
|
| | def load_audios(audio_preprocess, audio_files, segs=None, audio_folder=None): |
| | if audio_files is None: |
| | return None, None |
| | if isinstance(audio_files, str): |
| | audio_files = [audio_files] |
| | if segs: |
| | if segs and isinstance(segs[0], float): |
| | segs = [segs] |
| | else: |
| | segs = [None for _ in range(len(audio_files))] |
| | if audio_folder: |
| | audio_files = [os.path.join(audio_folder, afile) for afile in audio_files] |
| | |
| | def get_single_audio(audio_file, seg): |
| | try: |
| | if seg: |
| | audio = load_audio_single(audio_file, seg) |
| | else: |
| | audio = load_audio_single(audio_file) |
| | |
| | audio = [audio_preprocess(aud) for aud in audio] |
| | |
| | except Exception as e: |
| | print(f"Error loading {audio_file} seg {seg}: {e}") |
| | audio = None |
| | |
| | return audio |
| | |
| | audio_size= [] |
| | audio_list = [] |
| | for ii in range(len(audio_files)): |
| | audio_file = audio_files[ii] |
| | seg = segs[ii] |
| | single_audio_list = get_single_audio(audio_file,seg) |
| | audio_size.append(len(single_audio_list)) |
| | audio_list.extend(single_audio_list) |
| | |
| | return audio_list, audio_size |
| |
|
| | class AudioPreprocess: |
| | def __init__(self, image_processor, data_args={}): |
| | self.image_aspect_ratio = getattr(data_args, 'image_aspect_ratio', None) |
| | self.image_processor = image_processor |
| | |
| | |
| | def __call__(self, image): |
| | assert self.image_aspect_ratio == "audio", "image_aspect_ratio should be 'audio' for audio preprocessing" |
| | return self.image_processor(image, sampling_rate=feature_extractor_sampling_rate, return_tensors="pt").input_features |
| | |