Skip to content

md

Time Series Analysis

This submodule offers techniques for the analysis of state trajectories—commonly known as Molecular Dynamics (MD)—without relying on Markov state models. It encompasses functions for determining timescales, recognizing significant events, correcting dynamical anomalies, and evaluating various state discretization methods. These functions provide a comprehensive solution for analyzing time-series data and understanding the underlying dynamics of complex systems.

The submodule is structured into the following submodules:

  • comparison: This submodule holds methods to quantify the similarity of different state discretizations.
  • corrections: This submodule provides an implementation of dynamical coring.
  • timescales: This submodule contains methods for estimating various timescales based on discrete time series.

compare_discretization(traj1, traj2, method='symmetric')

Compare similarity of two state discretizations.

This method compares the similarity of two state discretizations of the same dataset. There are two different methods, 'directed' gives a measure on how high is the probable to assign a frame correclty knowing the traj1. Hence splitting a state into many is not penalized, while merging multiple into a single state is. Selecting 'symmetric' it is check in both directions, so it checks for each state if it is possible to assigned it forward or backward. Hence, splitting and merging states is not penalized.

Parameters:

  • traj1 (StateTraj like) –

    First state discretization.

  • traj2 (StateTraj like) –

    Second state discretization.

  • method ([symmetric, directed], default: 'symmetric' ) –

    Selecting similarity norm. 'symmetric' compares if each frame is forward or backward assignable, while 'directed' checks only if it is forard assignable.

Returns:

  • similarity ( float ) –

    Similarity going from [0, 1], where 1 means identical and 0 no similarity at all.

Source code in src/msmhelper/md/comparison.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def compare_discretization(traj1, traj2, method='symmetric'):
    """Compare similarity of two state discretizations.

    This method compares the similarity of two state discretizations of the
    same dataset. There are two different methods, 'directed' gives a measure
    on how high is the probable to assign a frame correclty knowing the
    `traj1`. Hence splitting a state into many is not penalized, while merging
    multiple into a single state is. Selecting 'symmetric' it is check in both
    directions, so it checks for each state if it is possible to assigned it
    forward or backward. Hence, splitting and merging states is not penalized.

    Parameters
    ----------
    traj1 : StateTraj like
        First state discretization.
    traj2 : StateTraj like
        Second state discretization.
    method : ['symmetric', 'directed']
        Selecting similarity norm. 'symmetric' compares if each frame is
        forward or backward assignable, while 'directed' checks only if it is
        forard assignable.

    Returns
    -------
    similarity : float
        Similarity going from [0, 1], where 1 means identical and 0 no
        similarity at all.

    """
    # format input
    traj1, traj2 = StateTraj(traj1), StateTraj(traj2)
    if method not in {'symmetric', 'directed'}:
        raise ValueError(
            'Only methods "symmetric" and "directed" are supported',
        )

    # check if same length
    if traj1.nframes != traj2.nframes:
        raise ValueError(
            'Trajectories are of different length: ' +
            '{0} vs {1}'.format(traj1.nframes, traj2.nframes),
        )

    # check if only single state
    if traj1.nstates == 1 or traj2.nstates == 1:
        raise ValueError(
            'Trajectories needs to have at least two states: ' +
            '{0} and {1}'.format(traj1.nstates, traj2.nstates),
        )
    return _compare_discretization(traj1, traj2, method)

dynamical_coring(trajs, lagtime, iterative=True)

Fix spurious transitions with dynamical coring.

Projecting high dimensional data onto low dimensional collective variables can result in spurious state transitions which can be correct for applying dynamical coring, for more details see Nagel et al. 1.

Note

Applying dynamical coring on a msmhelper.LumpedStateTraj is not supported. The reason is that while applying dynamical coring on the microstate level leads to different coarse-graining, applying it on the macrostate level the HS-Projection is not well defined anymore.


  1. Nagel et al., Dynamical coring of Markov state models, J. Chem. Phys., 150, 094111 (2019), doi:10.1063/1.5081767 

Parameters:

  • trajs (StateTraj or list or ndarray or list of ndarray) –

    State trajectory/trajectories. The states should start from zero and need to be integers.

  • lagtime (int) –

    Lagtime [frames] is the minimum time a trajectory is required to spend in a new state to be accepted.

  • iterative (bool, default: True ) –

    If True dynamical coring is applied iteratively with increasing lagtimes, so lagtime=1, 2, ..., lagtimes.

Returns:

  • trajs ( StateTraj ) –

    Dynamically corrected state trajectory.

Source code in src/msmhelper/md/corrections.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def dynamical_coring(trajs, lagtime, iterative=True):
    """Fix spurious transitions with dynamical coring.

    Projecting high dimensional data onto low dimensional collective variables
    can result in spurious state transitions which can be correct for applying
    dynamical coring, for more details see Nagel et al. [^1].

    !!! note
        Applying dynamical coring on a [msmhelper.LumpedStateTraj][] is not
        supported. The reason is that while applying dynamical coring on the
        microstate level leads to different coarse-graining, applying it on the
        macrostate level the HS-Projection is not well defined anymore.

    [^1]: Nagel et al., **Dynamical coring of Markov state models**,
        *J. Chem. Phys.*, 150, 094111 (2019),
        doi:[10.1063/1.5081767](https://doi.org/10.1063/1.5081767)

    Parameters
    ----------
    trajs : StateTraj or list or ndarray or list of ndarray
        State trajectory/trajectories. The states should start from zero and
        need to be integers.
    lagtime : int
        Lagtime [frames] is the minimum time a trajectory is required to spend
        in a new state to be accepted.
    iterative : bool, optional
        If `True` dynamical coring is applied iteratively with increasing
        lagtimes, so lagtime=1, 2, ..., lagtimes.

    Returns
    -------
    trajs : StateTraj
        Dynamically corrected state trajectory.

    """
    trajs = StateTraj(trajs)

    if isinstance(trajs, LumpedStateTraj):
        raise NotImplementedError(
            'Applying dynamical coring on a LumpedStateTraj is not supported. '
            'The reason is that while applying dynamical coring on the '
            'microstate level leads to different coarse-graining, applying it '
            'on the macrostate level the HS-Projection is not well defined '
            'anymore.'
        )

    # convert trajs to numba list # noqa: SC100
    if numba.config.DISABLE_JIT:
        cored_trajs = trajs.trajs
    else:  # pragma: no cover
        cored_trajs = numba.typed.List(trajs.trajs)

    if lagtime <= 0:
        raise ValueError('The lagtime should be greater 0.')

    # if lagtime == 1 nothing changes
    if lagtime == 1:
        return trajs

    # catch if lagtime <=1
    return StateTraj(
        _dynamical_coring(cored_trajs, lagtime, iterative),
    )

estimate_waiting_times(trajs, start, final)

Estimates waiting times between stated states.

The stated states (from/to) will be treated as a basin. The function calculates all transitions from first entering the start-basin until first reaching the final-basin.

Parameters:

  • trajs (statetraj or list or ndarray or list of ndarray) –

    State trajectory/trajectories. The states should start from zero and need to be integers.

  • start (int or list of) –

    States to start counting.

  • final (int or list of) –

    States to start counting.

Returns:

  • wt ( ndarray ) –

    List of waiting times, given in frames.

Source code in src/msmhelper/md/timescales.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@decorit.alias('estimate_wt')
def estimate_waiting_times(trajs, start, final):
    """Estimates waiting times between stated states.

    The stated states (from/to) will be treated as a basin. The function
    calculates all transitions from first entering the start-basin until first
    reaching the final-basin.

    Parameters
    ----------
    trajs : statetraj or list or ndarray or list of ndarray
        State trajectory/trajectories. The states should start from zero and
        need to be integers.
    start : int or list of
        States to start counting.
    final : int or list of
        States to start counting.

    Returns
    -------
    wt : ndarray
        List of waiting times, given in frames.

    """
    # check correct input format
    trajs = StateTraj(trajs)

    states_start, states_final = np.unique(start), np.unique(final)

    if intersect(states_start, states_final):
        raise ValueError('States `start` and `final` do overlap.')

    # check that all states exist in trajectory
    for states in (states_start, states_final):
        if intersect(states, trajs.states) != len(states):
            raise ValueError(
                'Selected states does not exist in state trajectory.',
            )

    # do not convert for pytest coverage
    if numba.config.DISABLE_JIT:
        return _estimate_waiting_times(trajs, states_start, states_final)
    return _estimate_waiting_times(  # pragma: no cover
        numba.typed.List(trajs),
        numba.typed.List(states_start),
        numba.typed.List(states_final),
    )

estimate_paths(trajs, start, final)

Estimates paths and waiting times between stated states.

The stated states (from/to) will be treated as a basin. The function calculates all transitions from first entering the start-basin until first reaching the final-basin. The results will be listed by the corresponding pathways, where loops are removed occuring first.

Parameters:

  • trajs (statetraj or list or ndarray or list of ndarray) –

    State trajectory/trajectories. The states should start from zero and need to be integers.

  • start (int or list of) –

    States to start counting.

  • final (int or list of) –

    States to start counting.

Returns:

  • paths ( dict ) –

    Dictionary containing the the paths as keys and and an array holding the times of all paths as value.

Source code in src/msmhelper/md/timescales.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def estimate_paths(trajs, start, final):
    """Estimates paths and waiting times between stated states.

    The stated states (from/to) will be treated as a basin. The function
    calculates all transitions from first entering the start-basin until first
    reaching the final-basin. The results will be listed by the corresponding
    pathways, where loops are removed occuring first.

    Parameters
    ----------
    trajs : statetraj or list or ndarray or list of ndarray
        State trajectory/trajectories. The states should start from zero and
        need to be integers.
    start : int or list of
        States to start counting.
    final : int or list of
        States to start counting.

    Returns
    -------
    paths : dict
        Dictionary containing the the paths as keys and and an array holding
        the times of all paths as value.

    """
    # check correct input format
    trajs = StateTraj(trajs)

    states_start, states_final = np.unique(start), np.unique(final)

    if intersect(states_start, states_final):
        raise ValueError('States `start` and `final` do overlap.')

    # check that all states exist in trajectory
    for states in (states_start, states_final):
        if intersect(states, trajs.states) != len(states):
            raise ValueError(
                'Selected states does not exist in state trajectory.',
            )

    # do not convert for pytest coverage
    if numba.config.DISABLE_JIT:  # pragma: no cover
        path_tuples = _estimate_paths(trajs, states_start, states_final)
    else:
        path_tuples = _estimate_paths(  # pragma: no cover
            numba.typed.List(trajs),
            numba.typed.List(states_start),
            numba.typed.List(states_final),
        )
    paths = defaultdict(list)
    for path, time in path_tuples:
        paths[tuple(path)].append(time)

    return paths