Source code for lime_timeseries

import numpy as np
import sklearn
from fastdtw import fastdtw
from lime import explanation
from lime import lime_base
from dianna import utils
from dianna.utils.maskers import generate_time_series_masks
from dianna.utils.maskers import mask_data
from dianna.utils.predict import make_predictions


[docs] class LIMETimeseries: """LIME implementation for timeseries. This implementation is inspired by the paper: Validation of XAI explanations for multivariate time series classification in the maritime domain. (https://doi.org/10.1016/j.jocs.2021.101539) """ def __init__( self, kernel_width=25, verbose=False, preprocess_function=None, feature_selection='auto', random_state = None ): """Initializes Lime explainer for timeseries. Args: kernel_width (int): Width of the kernel used in LIME explainer. verbose (bool): Whether to print progress messages during explanation. feature_selection (str): Feature selection method to be used by explainer. preprocess_function (callable, optional): Function to preprocess the time series data before passing it to the explainer. Defaults to None. random_state (int or np.RandomState, optional): seed or random state. Unused variable for current ts method """ def kernel(d): """Kernel function used in LIME explainer.""" return np.sqrt(np.exp(-(d**2) / kernel_width**2))
[docs] self.explainer = lime_base.LimeBase(kernel, verbose)
[docs] self.feature_selection = feature_selection
[docs] self.domain_mapper = explanation.DomainMapper()
[docs] self.preprocess_function = preprocess_function
[docs] self._is_multivariate = False
[docs] def explain( self, model_or_function, input_timeseries, labels=(0, ), class_names=None, num_features=1, num_samples=1, num_slices=1, batch_size=1, mask_type='mean', distance_method='cosine', ): # pylint: disable=too-many-arguments,too-many-locals """Run the LIME explainer for timeseries. Args: model_or_function (callable or str): The function that runs the model to be explained _or_ the path to a ONNX model on disk. input_timeseries (np.ndarray): The input time series data to be explained, with shape [batch_size, sequence_length, num_features]. labels (list): The list of labels for different classes. class_names (list): The list of class names. num_features (int): The number of features to include in the explanation. num_samples (int): The number of samples to generate for the LIME explainer. num_slices (int): The number of slices to divide the time series data into. batch_size (int): The batch size to use for running the model. mask_type (str): The type of mask to apply to the time series data. Can be "mean" or "noise". distance_method (str): The distance metric to use for LIME. Can be "cosine" or "euclidean". Returns: np.ndarray: An array (np.ndarray) containing the LIME explanations for each class. """ # TODO: p_keep does not exist in LIME. LIME will mask every point, which means the number # of steps masked is 1. We should updating it after adapting maskers function to LIME. # wrap up the input model or function using the runner runner = utils.get_function( model_or_function, preprocess_function=self.preprocess_function) masks = generate_time_series_masks(input_timeseries.shape, num_samples, p_keep=0.1) # NOTE: Required by `lime_base` explainer since the first instance must be the original data # For more details, check this link # https://github.com/marcotcr/lime/blob/fd7eb2e6f760619c29fca0187c07b82157601b32/lime/lime_base.py#L148 masks[0, :, :] = 1.0 masked = mask_data(input_timeseries, masks, mask_type=mask_type) # generate predictions using the masked data. predictions = make_predictions(masked, runner, batch_size) # need to reshape for the calculation of distance _, sequence, n_var = masked.shape masked = masked.reshape((-1, sequence * n_var)) distance = self._calculate_distance(masked, distance_method=distance_method) exp = explanation.Explanation(domain_mapper=self.domain_mapper, class_names=class_names) # Expected shape of input: # masked[num_samples, channels * num_slices], # predictions[num_samples, labels], # distances[num_samples] for label in labels: ( exp.intercept[int(label)], exp.local_exp[int(label)], exp.score, exp.local_pred, ) = self.explainer.explain_instance_with_data( masked, predictions, distance, label=label, num_features=num_features, model_regressor=None, ) # extract scores from lime explainer saliency = [] for i, label in enumerate(labels): local_exp = sorted(exp.local_exp[label]) # shape of local_exp [(index, saliency)] selected_saliency = [i[1] for i in local_exp] saliency.append(selected_saliency[:]) return np.concatenate(saliency).reshape(-1, sequence, n_var)
[docs] def _calculate_distance(self, masked_data, distance_method='cosine'): """Calcuate distance between perturbed data and the original samples. Args: masked_data (np.ndarray): The perturbed time series data. *Note: The first instance is the original timeseries distance_method (str): The distance metric to use. Defaults to "cosine". Supported options are: - 'cosine': Computes the cosine similarity between the two vectors. - 'euclidean': Computes the Euclidean distance between the two vectors. - 'dtw': Uses Dynamic Time Warping to calculate the distance between the two time series. Returns: np.ndarray: A vector containing the distance between two timeseries. Raises: ValueError: If the given `distance_method` is not supported. Notes: - The cosine similarity is a measure of the similarity between two non-zero vectors of an inner product space that measures the cosine of the angle between them. - The Euclidean distance is the straight-line distance between two points in Euclidean space. - Dynamic Time Warping is an algorithm for measuring similarity between two time series sequences that may vary in speed or timing. """ support_methods = ['cosine', 'euclidean'] if distance_method == 'dtw': distance = self._dtw_distance(masked_data) elif distance_method in support_methods: distance = (sklearn.metrics.pairwise.pairwise_distances( masked_data, masked_data[0].reshape([1, -1]), metric=distance_method).ravel()) if distance_method == 'cosine': distance *= 100 # make sure it has same scale as other methods else: raise ValueError( f'Given method {distance_method} is not supported. Please ' "choose from 'dtw', 'cosine' and 'euclidean'.") return distance
[docs] def _dtw_distance(self, masked_data): """Calculate distance based on dynamic time warping. Args: masked_data (np.ndarray): An array of time series with some segments masked out. *Note: The first instance is the original timeseries Returns: np.ndarray: DTW distances. """ distance = np.asarray([ fastdtw(masked_data[0], one_masked_data)[0] for one_masked_data in masked_data ]) return distance