# STUMPY# Copyright 2019 TD Ameritrade. Released under the terms of the 3-Clause BSD license.# STUMPY is a trademark of TD Ameritrade IP Company, Inc. All rights reserved.importmathimportnumpyasnpfrom.importcorefrom.aampdist_snippetsimportaampdist_snippetsfrom.coreimport_get_mask_slices,check_window_sizefrom.mpdistimport_mpdist_vectdef_get_all_profiles(T,m,percentage=1.0,s=None,mpdist_percentage=0.05,mpdist_k=None,mpdist_custom_func=None,mpdist_T_subseq_isconstant=None,):""" For each non-overlapping subsequence, `S[i]`, in `T`, compute the matrix profile distance measure vector between the `i`th non-overlapping subsequence and each sliding window subsequence, `T[j : j + m]`, within `T` where `j < len(T) - m + 1`. Parameters ---------- T : numpy.ndarray The time series or sequence for which to find the snippets m : int The window size for each non-overlapping subsequence, `S[i]`. percentage : float, default 1.0 With the length of each non-overlapping subsequence, `S[i]`, set to `m`, this is the percentage of `S[i]` (i.e., `percentage * m`) to set the `s` to. When `percentage == 1.0`, then the full length of `S[i]` is used to compute the `mpdist_vect`. When `percentage < 1.0`, then shorter subsequences from `S[i]` is used to compute `mpdist_vect`. s : int, default None With the length of each non-overlapping subsequence, `S[i]`, set to `m`, this is essentially the sub-subsequence length (i.e., a shorter part of `S[i]`). When `s == m`, then the full length of `S[i]` is used to compute the `mpdist_vect`. When `s < m`, then shorter subsequences with length `s` from each `S[i]` is used to compute `mpdist_vect`. When `s` is not `None`, then the `percentage` parameter is ignored. mpdist_percentage : float, default 0.05 The percentage of distances that will be used to report `mpdist`. The value is between 0.0 and 1.0. mpdist_k : int Specify the `k`th value in the concatenated matrix profiles to return. When `mpdist_k` is not `None`, then the `mpdist_percentage` parameter is ignored. mpdist_custom_func : func, default None A custom user defined function for selecting the desired value from the sorted `P_ABBA` array. This function may need to leverage `functools.partial` and should take `P_ABBA` as its only input parameter and return a single `MPdist` value. The `percentage` and `k` parameters are ignored when `mpdist_custom_func` is not None. mpdist_T_subseq_isconstant : numpy.ndarray or function, default None A boolean array that indicates whether a subsequence (of length `s`) in `T` is constant (True). Alternatively, a custom, user-defined function that returns a boolean array that indicates whether a subsequence in `T` is constant (True). The function must only take two arguments, `a`, a 1-D array, and `w`, the window size, while additional arguments may be specified by currying the user-defined function using `functools.partial`. Any subsequence with at least one np.nan/np.inf will automatically have its corresponding value set to False in this boolean array. Returns ------- D : numpy.ndarray MPdist profiles Notes ----- `DOI: 10.1109/ICBK.2018.00058 \ <https://www.cs.ucr.edu/~eamonn/Time_Series_Snippets_10pages.pdf>`__ See Table II """ifm>T.shape[0]//2:# pragma: no coverraiseValueError(f"The window size{m} for each non-overlapping subsequence is too large "f"for a time series with length{T.shape[0]}. "f"Please try `m <= len(T) // 2`.")ifsisnotNone:s=min(int(s),m)else:percentage=np.clip(percentage,0.0,1.0)s=min(math.ceil(percentage*m),m)right_pad=0T_subseq_isconstant=core.process_isconstant(T,s,mpdist_T_subseq_isconstant)n_contiguous_windows=int(T.shape[0]//m)ifT.shape[0]%m!=0:right_pad=int(m*np.ceil(T.shape[0]/m)-T.shape[0])pad_width=(0,right_pad)T=np.pad(T,pad_width,mode="constant",constant_values=np.nan)T_subseq_isconstant=np.pad(T_subseq_isconstant,pad_width,mode="constant",constant_values=False)n_padded=T.shape[0]D=np.empty((n_contiguous_windows,n_padded-m+1),dtype=np.float64)M_T,Σ_T=core.compute_mean_std(T,s)# Iterate over non-overlapping subsequences, see Definition 3foriinrange(n_contiguous_windows):start=i*mstop=(i+1)*mS_i=T[start:stop]D[i,:]=_mpdist_vect(S_i,T,s,M_T[start:stop-s+1],Σ_T[start:stop-s+1],M_T,Σ_T,T_subseq_isconstant[start:stop-s+1],T_subseq_isconstant,percentage=mpdist_percentage,k=mpdist_k,custom_func=mpdist_custom_func,query_idx=start,)stop_idx=n_padded-m+1-right_padD=D[:,:stop_idx]returnD[docs]@core.non_normalized(aampdist_snippets,exclude=["normalize","mpdist_T_subseq_isconstant",],)defsnippets(T,m,k,percentage=1.0,s=None,mpdist_percentage=0.05,mpdist_k=None,normalize=True,p=2.0,mpdist_T_subseq_isconstant=None,):""" Identify the top ``k`` snippets that best represent the time series, ``T`` Parameters ---------- T : numpy.ndarray The time series or sequence for which to find the snippets. m : int The snippet window size. k : int The desired number of snippets. percentage : float, default 1.0 With the length of each non-overlapping subsequence, ``S[i]``, set to ``m``, this is the percentage of ``S[i]`` (i.e., ``percentage * m``) to set ``s`` (the sub-subsequence length) to. When ``percentage == 1.0``, then the full length of ``S[i]`` is used to compute the ``mpdist_vect``. When ``percentage < 1.0``, then a shorter sub-subsequence length of ``s = min(math.ceil(percentage * m), m)`` from each ``S[i]`` is used to compute ``mpdist_vect``. When ``s`` is not ``None``, then the ``percentage`` parameter is ignored. s : int, default None With the length of each non-overlapping subsequence, ``S[i]``, set to ``m``, this is essentially the sub-subsequence length (i.e., a shorter part of ``S[i]``). When ``s == m``, then the full length of ``S[i]`` is used to compute the ``mpdist_vect``. When ``s < m``, then shorter subsequences with length ``s`` from each ``S[i]`` is used to compute ``mpdist_vect``. When ``s`` is not ``None``, then the ``percentage`` parameter is ignored. mpdist_percentage : float, default 0.05 The percentage of distances that will be used to report ``mpdist``. The value is between ``0.0`` and ``1.0``. mpdist_k : int Specify the ``k``-th value in the concatenated matrix profiles to return. When ``mpdist_k`` is not ``None``, then the ``mpdist_percentage`` parameter is ignored. normalize : bool, default True When set to ``True``, this z-normalizes subsequences prior to computing distances. Otherwise, this function gets re-routed to its complementary non-normalized equivalent set in the ``@core.non_normalized`` function decorator. p : float, default 2.0 The p-norm to apply for computing the Minkowski distance. Minkowski distance is typically used with ``p`` being ``1`` or ``2``, which correspond to the Manhattan distance and the Euclidean distance, respectively. This parameter is ignored when ``normalize == True``. mpdist_T_subseq_isconstant : numpy.ndarray or function, default None A boolean array that indicates whether a subsequence (of length equal to ``len(s)``) in ``T`` is constant (``True``). Alternatively, a custom, user-defined function that returns a boolean array that indicates whether a subsequence in ``T`` is constant (``True``). The function must only take two arguments, ``a``, a 1-D array, and ``w``, the window size, while additional arguments may be specified by currying the user-defined function using ``functools.partial``. Any subsequence with at least one ``np.nan``/``np.inf`` will automatically have its corresponding value set to ``False`` in this boolean array. Returns ------- snippets : numpy.ndarray The top ``k`` snippets. snippets_indices : numpy.ndarray The index locations for each of top ``k`` snippets. snippets_profiles : numpy.ndarray The MPdist profiles for each of the top ``k`` snippets. snippets_fractions : numpy.ndarray The fraction of data that each of the top ``k`` snippets represents. snippets_areas : numpy.ndarray The area under the curve corresponding to each profile for each of the top ``k`` snippets. snippets_regimes: numpy.ndarray The index slices corresponding to the set of regimes for each of the top ``k`` snippets. The first column is the (zero-based) snippet index while the second and third columns correspond to the (inclusive) regime start indices and the (exclusive) regime stop indices, respectively. Notes ----- `DOI: 10.1109/ICBK.2018.00058 \ <https://www.cs.ucr.edu/~eamonn/Time_Series_Snippets_10pages.pdf>`__ See Table I Examples -------- >>> import stumpy >>> import numpy as np >>> stumpy.snippets(np.array([584., -11., 23., 79., 1001., 0., -19.]), m=3, k=2) (array([[ 584., -11., 23.], [ 79., 1001., 0.]]), array([0, 3]), array([[0. , 3.2452632 , 3.00009263, 2.982409 , 0.11633857], [2.982409 , 2.69407392, 3.01719586, 0. , 2.92154586]]), array([0.6, 0.4]), array([9.3441034 , 5.81050512]), array([[0, 0, 1], [0, 2, 3], [0, 4, 5], [1, 1, 2], [1, 3, 4]])) """T=core._preprocess(T)ifm>T.shape[0]//2:# pragma: no coverraiseValueError(f"The snippet window size of{m} is too large for a time series with "f"length{T.shape[0]}. Please try `m <= len(T) // 2`.")check_window_size(m,max_size=T.shape[0]//2)D=_get_all_profiles(T,m,percentage=percentage,s=s,mpdist_percentage=mpdist_percentage,mpdist_k=mpdist_k,mpdist_T_subseq_isconstant=mpdist_T_subseq_isconstant,)snippets=np.empty((k,m),dtype=np.float64)snippets_indices=np.empty(k,dtype=np.int64)snippets_profiles=np.empty((k,D.shape[-1]),dtype=np.float64)snippets_fractions=np.empty(k,dtype=np.float64)snippets_areas=np.empty(k,dtype=np.float64)Q=np.full(D.shape[-1],np.inf,dtype=np.float64)indices=np.arange(D.shape[0],dtype=np.int64)*msnippets_regimes_list=[]foriinrange(k):profile_areas=np.sum(np.minimum(D,Q),axis=1)idx=np.argmin(profile_areas)snippets[i]=T[indices[idx]:indices[idx]+m]snippets_indices[i]=indices[idx]snippets_profiles[i]=D[idx]snippets_areas[i]=np.sum(np.minimum(D[idx],Q))Q[:]=np.minimum(D[idx],Q)total_min=np.min(snippets_profiles,axis=0)foriinrange(k):mask=snippets_profiles[i]<=total_minsnippets_fractions[i]=np.sum(mask)/total_min.shape[0]total_min=total_min-mask.astype(np.float64)slices=_get_mask_slices(mask)snippets_regimes_list.append(slices)n_slices=[regime.shape[0]forregimeinsnippets_regimes_list]snippets_regimes=np.empty((sum(n_slices),3),dtype=np.int64)snippets_regimes[:,0]=np.repeat(np.arange(len(snippets_regimes_list)),n_slices)snippets_regimes[:,1:]=np.vstack(snippets_regimes_list)return(snippets,snippets_indices,snippets_profiles,snippets_fractions,snippets_areas,snippets_regimes,)