Source code for nnmnkwii.preprocessing.alignment

import numpy as np
from fastdtw import fastdtw
from nnmnkwii.baseline.gmm import MLPG
from nnmnkwii.preprocessing import trim_zeros_frames
from numpy.linalg import norm
from sklearn.mixture import GaussianMixture


[docs]class DTWAligner(object): """Align feature matrices using fastdtw_. .. _fastdtw: https://github.com/slaypni/fastdtw Attributes: dist (function): Distance function. Default is :func:`numpy.linalg.norm`. radius (int): Radius parameter in fastdtw_. verbose (int): Verbose flag. Default is 0. Examples: >>> from nnmnkwii.util import example_file_data_sources_for_duration_model >>> from nnmnkwii.datasets import FileSourceDataset >>> from nnmnkwii.preprocessing.alignment import DTWAligner >>> _, X = example_file_data_sources_for_duration_model() >>> X = FileSourceDataset(X).asarray() >>> X.shape (3, 40, 5) >>> Y = X.copy() >>> X_aligned, Y_aligned = DTWAligner().transform((X, Y)) >>> X_aligned.shape (3, 40, 5) >>> Y_aligned.shape (3, 40, 5) """ def __init__(self, dist=lambda x, y: norm(x - y), radius=1, verbose=0): self.verbose = verbose self.dist = dist self.radius = radius def transform(self, XY): X, Y = XY assert X.ndim == 3 and Y.ndim == 3 longer_features = X if X.shape[1] > Y.shape[1] else Y X_aligned = np.zeros_like(longer_features) Y_aligned = np.zeros_like(longer_features) for idx, (x, y) in enumerate(zip(X, Y)): x, y = trim_zeros_frames(x), trim_zeros_frames(y) dist, path = fastdtw(x, y, radius=self.radius, dist=self.dist) dist /= len(x) + len(y) pathx = list(map(lambda l: l[0], path)) pathy = list(map(lambda l: l[1], path)) x, y = x[pathx], y[pathy] max_len = max(len(x), len(y)) if max_len > X_aligned.shape[1] or max_len > Y_aligned.shape[1]: pad_size = max( max_len - X_aligned.shape[1], max_len > Y_aligned.shape[1] ) X_aligned = np.pad( X_aligned, [(0, 0), (0, pad_size), (0, 0)], mode="constant", constant_values=0, ) Y_aligned = np.pad( Y_aligned, [(0, 0), (0, pad_size), (0, 0)], mode="constant", constant_values=0, ) X_aligned[idx][: len(x)] = x Y_aligned[idx][: len(y)] = y if self.verbose > 0: print("{}, distance: {}".format(idx, dist)) return X_aligned, Y_aligned
[docs]class IterativeDTWAligner(object): """Align feature matrices iteratively using GMM-based feature conversion. .. _fastdtw: https://github.com/slaypni/fastdtw Attributes: n_iter (int): Number of iterations. dist (function): Distance function radius (int): Radius parameter in fastdtw_. verbose (int): Verbose flag. Default is 0. max_iter_gmm (int): Maximum iteration to train GMM. n_components_gmm (int): Number of mixture components in GMM. Examples: >>> from nnmnkwii.util import example_file_data_sources_for_duration_model >>> from nnmnkwii.datasets import FileSourceDataset >>> from nnmnkwii.preprocessing.alignment import IterativeDTWAligner >>> _, X = example_file_data_sources_for_duration_model() >>> X = FileSourceDataset(X).asarray() >>> X.shape (3, 40, 5) >>> Y = X.copy() >>> X_aligned, Y_aligned = IterativeDTWAligner(n_iter=1).transform((X, Y)) >>> X_aligned.shape (3, 40, 5) >>> Y_aligned.shape (3, 40, 5) """ def __init__( self, n_iter=3, dist=lambda x, y: norm(x - y), radius=1, max_iter_gmm=100, n_components_gmm=16, verbose=0, ): self.n_iter = n_iter self.dist = dist self.radius = radius self.max_iter_gmm = max_iter_gmm self.n_components_gmm = n_components_gmm self.verbose = verbose def transform(self, XY): X, Y = XY assert X.ndim == 3 and Y.ndim == 3 longer_features = X if X.shape[1] > Y.shape[1] else Y Xc = X.copy() # this will be updated iteratively X_aligned = np.zeros_like(longer_features) Y_aligned = np.zeros_like(longer_features) refined_paths = np.empty(len(X), dtype=np.object) for idx in range(self.n_iter): for idx, (x, y) in enumerate(zip(Xc, Y)): x, y = trim_zeros_frames(x), trim_zeros_frames(y) dist, path = fastdtw(x, y, radius=self.radius, dist=self.dist) dist /= len(x) + len(y) pathx = list(map(lambda l: l[0], path)) pathy = list(map(lambda l: l[1], path)) refined_paths[idx] = pathx x, y = x[pathx], y[pathy] max_len = max(len(x), len(y)) if max_len > X_aligned.shape[1] or max_len > Y_aligned.shape[1]: pad_size = max( max_len - X_aligned.shape[1], max_len > Y_aligned.shape[1] ) X_aligned = np.pad( X_aligned, [(0, 0), (0, pad_size), (0, 0)], mode="constant", constant_values=0, ) Y_aligned = np.pad( Y_aligned, [(0, 0), (0, pad_size), (0, 0)], mode="constant", constant_values=0, ) X_aligned[idx][: len(x)] = x Y_aligned[idx][: len(y)] = y if self.verbose > 0: print("{}, distance: {}".format(idx, dist)) # Fit gmm = GaussianMixture( n_components=self.n_components_gmm, covariance_type="full", max_iter=self.max_iter_gmm, ) XY = np.concatenate((X_aligned, Y_aligned), axis=-1).reshape( -1, X.shape[-1] * 2 ) gmm.fit(XY) windows = [(0, 0, np.array([1.0]))] # no delta paramgen = MLPG(gmm, windows=windows) for idx in range(len(Xc)): x = trim_zeros_frames(Xc[idx]) Xc[idx][: len(x)] = paramgen.transform(x) # Finally we can get aligned X for idx in range(len(X_aligned)): x = X[idx][refined_paths[idx]] X_aligned[idx][: len(x)] = x return X_aligned, Y_aligned