Links: notebook, html, PDF, python, slides, GitHub

Quelques éléments de code pour le hackathon 2022.

from jyquickhelper import add_notebook_menu

Télécharger depuis youtube#


from pytube import YouTube
# YouTube('').streams.first().download()
yt = YouTube('')
down = yt.streams.first().download()

Si cela ne fonctionne pas, voir pytube.exceptions.RegexMatchError: get_throttling_function_name: could not find match for multiple.

import moviepy.editor as me
dat = me.AudioFileClip(down)
wav = dat.to_soundarray()
dat.write_audiofile("sound.wav", 44100, 2, 2000, "pcm_s32le")


Il faut utiliser la version de github. pip install git+

from import Pipeline
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization")

# apply pretrained pipeline
diarization = pipeline("sound.wav")

# print the result
for turn, _, speaker in diarization.itertracks(yield_label=True):
    print(f"start={turn.start:.1f}s stop={turn.end:.1f}s speaker_{speaker}")


Voir Speaker Diarization using GRU in PyTorch.

import onnxruntime as ort
sess = ort.InferenceSession("speaker_diarization.onnx")
for i in sess.get_inputs():
import torch
import librosa
import numpy as np
from itertools import groupby
from scipy.ndimage import gaussian_filter1d

def zcr_vad(y, shift=0.025, win_len=2048, hop_len=1024, threshold=0.005):
    if isinstance(y, torch.Tensor):
        y = y.cpu().numpy()
    if y.ndim == 2:
        y = y[0]
    zcr = librosa.feature.zero_crossing_rate(y + shift, win_len, hop_len)[0]
    activity = gaussian_filter1d(zcr, 1) > threshold
    activity = np.repeat(activity, len(y) // len(activity) + 1)
    activity = activity[:len(y)]
    return activity

def get_timestamp(activity):
    mask = [k for k, _ in groupby(activity)]
    change = np.argwhere(activity[:-1] != activity[1:]).flatten()
    span = np.concatenate([[0], change, [len(activity)]])
    span = list(zip(span[:-1], span[1:]))
    span = np.array(span)[mask]
    return span
import torchaudio
from torchaudio.transforms import MFCC, Resample

sr = 16000
waveform, ori_sr = torchaudio.load("sound.wav")
waveform = waveform.mean(0, keepdims=True)
_resample = Resample(ori_sr, sr)
audio = _resample(waveform)
activity = zcr_vad(y)
spans = get_timestamp(activity)
embed = [self._encode_segment(y, span) for span in spans]
embed =
speakers = OptimizedAgglomerativeClustering().fit_predict(embed)
import torch
import torchaudio
from torchaudio.transforms import MFCC, Resample
from import Dataset, DataLoader

class BaseLoad:
    def __init__(self, sr=16000, n_mfcc=40): = sr
        self.n_mfcc = n_mfcc
        self._mfcc = MFCC(sr, n_mfcc=40, log_mels=True)

    def _load(self, path, mfcc=True):
            waveform, ori_sr = torchaudio.load(path)
            waveform = waveform.mean(0, keepdims=True)
        except RuntimeError:
            raise Exception(f"Error loading {path}")
        _resample = Resample(ori_sr,
        audio = _resample(waveform)

        if mfcc:
            audio = self._mfcc(audio)
        return audio

class BasePredictor(BaseLoad):
    def __init__(self, config_path, max_frame, hop):
        config = torch.load(config_path)
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        super().__init__(config.get('sr', 16000), config.get('n_mfcc', 40))
        self.ndim = config.get('ndim', 3)
        self.max_frame = max_frame
        self.hop = hop

    def _plot_diarization(y, spans, speakers):
        c = y[0].cpu().numpy().copy()
        for (start, end), speaker in zip(spans, speakers):
            c[start:end] = speaker

        plt.figure(figsize=(15, 2))
        plt.plot(y[0], "k-")
        for idx, speaker in enumerate(set(speakers)):
            plt.fill_between(range(len(c)), -1, 1, where=(c==speaker), alpha=0.5, label=f"speaker_{speaker}")
        plt.legend(loc="upper center", ncol=idx+1, bbox_to_anchor=(0.5, -0.25))

class PyTorchPredictor(BasePredictor):
    def __init__(self, config_path, model_path, max_frame=45, hop=3):
        super().__init__(config_path, max_frame, hop)

        weight = torch.load(model_path, map_location="cpu")
        self.model = Encoder(self.ndim).to(self.device)

    def predict(self, path, plot=False):
        y = self._load(path, mfcc=False)
        activity = zcr_vad(y)
        spans = get_timestamp(activity)

        embed = [self._encode_segment(y, span) for span in spans]
        embed =
        speakers = OptimizedAgglomerativeClustering().fit_predict(embed)

        if plot:
            self._plot_diarization(y, spans, speakers)

        timestamp = np.array(spans) /
        return timestamp, speakers

    def _encode_segment(self, y, span):
        start, end = span
        mfcc = self._mfcc(y[:, start:end]).to(self.device)
        mfcc = mfcc.unfold(2, self.max_frame, self.hop).permute(2, 0, 1, 3)
        with torch.no_grad():
            embed = self.model(mfcc).mean(0, keepdims=True)
        return embed

p = PyTorchPredictor("weights_best.pth", "configs.pth")