Source code for sire.mol._trajectory

__all__ = ["TrajectoryIterator"]

# make sure that GeneralUnit has been modernised
from ..units import GeneralUnit as _GeneralUnit  # noqa: F401


def _get_align_atoms_and_reference(
    view, align, frame=None, mapping=None, map=None, find_all=True
):
    if align is None:
        # this is an all-to-all alignment
        atoms = view.atoms()

        if frame is not None:
            atoms.load_frame(frame)

        return (atoms, atoms)

    elif hasattr(align, "atoms"):
        align = align.atoms()

        if mapping is None:
            from . import AtomMapping

            mapping = AtomMapping(align, align)

        # use the mapping to go from the atoms to align to
        # the atoms in the view
        atoms = mapping.find(align.atoms(), view.atoms(), find_all=find_all)

        if frame is not None:
            align.load_frame(frame)

        if len(atoms) != len(align) and not find_all:
            align = mapping.swap().find(atoms, align, find_all=True)

        return (atoms, align)

    else:
        # This is a search string in 'view'
        atoms = view[align].atoms()

        if frame is not None:
            atoms.load_frame(frame)

        return (atoms, atoms)


[docs] class TrajectoryIterator: """ An iterator that can be used to control which frames of a trajectory are accessed or processed. """ def __init__( self, view=None, align=None, smooth=None, wrap=None, mapping=None, frame=None, map=None, ): if view is not None: from ..base import create_map self._map = create_map(map) self._view = view self._values = list(range(0, max(1, self._view.num_frames(self._map)))) self._times = None self._iter = None self._frame = None self._mapping = mapping self._frame = frame if align is True: # passing in 'True' means align against everything align = "*" elif align is False: # passing in 'False" means don't align anything align = None self._align = align # Get the values of smooth and wrap from either the # constructor args or the property map if (smooth is None) and self._map.specified("smooth"): smooth = self._map["smooth"].value().as_integer() if wrap is None and self._map.specified("wrap"): wrap = self._map["wrap"].value().as_boolean() if smooth not in [None, False, 0]: if smooth is True: smooth = 5 else: smooth = int(smooth) if smooth < 1: smooth = 1 else: smooth = 1 if wrap in [None, False, 0]: wrap = False else: wrap = True # now place them back into the property map self._map.set("smooth", smooth) self._map.set("wrap", wrap) from ..legacy.Mol import TrajectoryAligner if align is not None: (align, reference) = _get_align_atoms_and_reference( view=self._view, align=align, mapping=mapping, frame=frame, map=self._map, find_all=False, ) self._aligner = TrajectoryAligner(align, reference, map=self._map) elif wrap or (smooth != 1): self._aligner = TrajectoryAligner( self._view.evaluate().center(), map=self._map, ) else: # We don't need to do anything to process the frames self._aligner = None else: self._view = None self._values = [] self._times = None self._iter = None self._map = None self._frame = None self._align = None self._aligner = None self._mapping = None self._frame = None def __iter__(self): return self def __next__(self): if self._view is None or self._values is None: raise StopIteration() if self._iter is None: self._iter = self._values.__iter__() self._frame = self._iter.__next__() return self.current() def __len__(self): return len(self._values) def __getitem__(self, val): from copy import copy it = copy(self) if type(val) is int: it._values = [self._values[val]] elif type(val) is slice: it._values = self._values[val] else: it._values = [self._values[v] for v in val] return it def __repr__(self): return self.__str__() def __str__(self): if self._view is None: return "TrajectoryIterator::null" elif len(self._values) <= 1: return f"Trajectory({self._view}, num_frames=1)" else: return f"Trajectory({self._view}, num_frames={len(self._values)})" def _is_trajectory_iterator(self): """ Return that this is a TrajectoryIterator """ return True def _to_legacy_system(self): """ Internal function used to convert the contained view into a legacy.Sire.System """ from .._load import _to_legacy_system return _to_legacy_system(self.current())
[docs] def frame_indexes(self): """ Return the indexes of the frames in this trajectory that will be viewed """ from copy import copy return copy(self._values)
[docs] def align(self, align, frame=None): """ Return a copy of this trajectory where each frame will be aligned against the atoms that match the search string 'align' """ if frame is None: frame = self._frame t = TrajectoryIterator( view=self._view, align=align, frame=frame, mapping=self._mapping, map=self._map, ) t._values = self._values return t
[docs] def smooth(self, smooth): """ Return a copy of this trajectory where each frame will be smoothed over the specified number of frames (or the recommended number if 'smooth' is set to 'True') """ t = TrajectoryIterator( view=self._view, align=self._align, frame=self._frame, smooth=smooth, mapping=self._mapping, map=self._map, ) t._values = self._values return t
[docs] def wrap(self, autowrap=True): """ Return a copy of this trajectory where each frame will be auto-wrapped into the current space """ t = TrajectoryIterator( view=self._view, align=self._align, frame=self._frame, wrap=autowrap, mapping=self._mapping, map=self._map, ) t._values = self._values return t
def num_frames(self): return len(self._values)
[docs] def current(self): """Return the current frame in the trajectory""" if self._view is None or self._values is None: raise StopIteration() if self._frame is None: self._frame = self._values[0] ret = self._view.clone() if self._aligner is None: map = self._map else: map = self._map.clone() map.set("transform", self._aligner[self._frame]) ret.load_frame(self._frame, map=map) try: mol = ret.molecule() except Exception: mol = ret[0].molecule() time_property = self._map["time"] if mol.has_property(time_property): time = mol.property(time_property) ret.frame_time = lambda: time else: from ..units import picosecond ret.frame_time = lambda: 0 * picosecond ret.frame_index = lambda: self._frame return ret
def _populate_map(self, map): """ Internal function called by sire.save to populate the property map that will be used by MoleculeParser to extract each frame from this trajectory. Note that this populates a copy of the map, which is returned """ from ..base import create_map # do it this way around, so that 'map' overwrites # anything in this trajectory's map map = create_map(self._map, map) # now add in the frames to save frames_to_write = self.frame_indexes() map.set("frames_to_write", frames_to_write) # and the aligner if self._aligner is not None: map.set("frame_aligner", self._aligner) return map
[docs] def first(self): """Return the first frame in the trajectory""" if self._view is None or self._values is None: raise StopIteration() old_frame = self._frame self._frame = self._values[0] ret = self.current() self._frame = old_frame return ret
def times(self): if self._times is not None: return self._times if self._view is None: return {} # load the times from the actual underlying trajectory data try: mol = self._view.molecule() except Exception: mol = self._view[0].molecule() traj = mol.property(self._map["trajectory"]) self._times = [] for idx in self._values: self._times.append(traj[idx].time()) return self._times def energies(self, obj1=None, to_pandas=True, map=None): if self._view is None: return {} import numpy as np from ..system import create_forcefield from ..legacy.System import calculate_trajectory_energies from .._colname import colname from . import _to_molecules from ..base import create_map map = self._map.merge(create_map(map)) colnames = [] forcefields = [] if obj1 is None: for v in self.first(): colnames.append(colname(v)) forcefields.append(create_forcefield(v, map=map)) else: if type(obj1) is TrajectoryIterator: if obj1.num_frames() != self.num_frames(): raise ValueError( "The two trajectories have a different " "number of frames! " f"{self.num_frames()} versus f{obj1.num_frames()}." ) obj1_mols = _to_molecules(obj1.first()) for v in self.first(): colnames.append(colname(v)) forcefields.append(create_forcefield(v, obj1_mols, map=map)) else: for v in self.first(): colnames.append(colname(v)) forcefields.append( create_forcefield(v, _to_molecules(obj1), map=map) ) nframes = len(self) times = np.zeros(nframes, dtype=float) indexes = np.zeros(nframes, dtype=int) t = self.times() for i, idx in enumerate(self._values): times[i] = t[i].to_default() indexes[i] = idx time_unit = t[0].get_default().unit_string() energy_unit = None components = {} ff_nrgs = calculate_trajectory_energies(forcefields, self._values, map=map) for ff_idx in range(0, len(forcefields)): nrg = ff_nrgs[ff_idx][0] if ff_idx == 0: energy_unit = nrg.get_default().unit_string() components[colname(colnames[ff_idx], "total")] = np.zeros( nframes, dtype=float ) for key in nrg.components().keys(): components[colname(colnames[ff_idx], key)] = np.zeros( nframes, dtype=float ) for i in range(0, nframes): for ff_idx in range(0, len(forcefields)): nrg = ff_nrgs[ff_idx][i] components[colname(colnames[ff_idx], "total")][i] = nrg.to_default() for key in nrg.components().keys(): try: components[colname(colnames[ff_idx], key)][i] = nrg[ key ].to_default() except KeyError: k = colname(colnames[ff_idx], key) components[k] = np.zeros(nframes, dtype=float) components[k][i] = nrg[key].to_default() data = {} data["frame"] = indexes data["time"] = times colnames = list(components.keys()) colnames.sort() for name in colnames: data[name] = components[name] if to_pandas: import pandas as pd df = pd.DataFrame(data) df.set_index("frame") df.time_unit = lambda: time_unit df.energy_unit = lambda: energy_unit def pretty_plot(x="time", y=None): if y is None: y = colnames if x == "time": xlabel = f"Time / {df.time_unit()}" elif x == "frame": xlabel = "Frame" else: xlabel = x ax = df.plot(x=x, y=y) ax.set_xlabel(xlabel) ax.set_ylabel(f"Energy / {df.energy_unit()}") ax.legend(bbox_to_anchor=(1.05, 1.0)) df.pretty_plot = pretty_plot return df return data def energy(self, obj1=None, to_pandas=True, map=None): if self._view is None: return {} import numpy as np from ..system import create_forcefield from ..legacy.System import calculate_trajectory_energy from ..base import create_map map = self._map.merge(create_map(map)) if obj1 is None: ff = create_forcefield(self.first(), map=map) else: if type(obj1) is TrajectoryIterator: if obj1.num_frames() != self.num_frames(): raise ValueError( "The two trajectories have a different " "number of frames! " f"{self.num_frames()} versus f{obj1.num_frames()}." ) ff = create_forcefield(self.first(), obj1.first(), map=map) else: ff = create_forcefield(self.first(), obj1, map=map) nframes = len(self) times = np.zeros(nframes, dtype=float) indexes = np.zeros(nframes, dtype=int) t = self.times() for i, idx in enumerate(self._values): times[i] = t[i].to_default() indexes[i] = idx time_unit = t[0].get_default().unit_string() energy_unit = None # calculate all the energies nrgs = calculate_trajectory_energy(ff, self._values, map) # convert the result into a pandas dataframe components = {} nrg = nrgs[0] energy_unit = nrg.get_default().unit_string() components["total"] = np.zeros(nframes, dtype=float) for key in nrg.components().keys(): components[key] = np.zeros(nframes, dtype=float) for i in range(0, nframes): nrg = nrgs[i] components["total"][i] = nrg.to_default() for key in nrg.components().keys(): try: components[key][i] = nrg[key].to_default() except KeyError: components[key] = np.zeros(nframes, dtype=float) components[key][i] = nrg[key].to_default() data = {} data["frame"] = indexes data["time"] = times colnames = list(components.keys()) colnames.sort() for colname in colnames: data[colname] = components[colname] if to_pandas: import pandas as pd df = pd.DataFrame(data) df.set_index("frame") df.time_unit = lambda: time_unit df.energy_unit = lambda: energy_unit def pretty_plot(x="time", y=None): if y is None: y = colnames if x == "time": xlabel = f"Time / {df.time_unit()}" elif x == "frame": xlabel = "Frame" else: xlabel = x ax = df.plot(x=x, y=y) ax.set_xlabel(xlabel) ax.set_ylabel(f"Energy / {df.energy_unit()}") ax.legend(bbox_to_anchor=(1.05, 1.0)) df.pretty_plot = pretty_plot return df return data def _simple_measures(self, to_pandas): from .._colname import colname if self._view is None: return {} uses_measures = None if hasattr(self._view, "measures"): uses_measures = True elif hasattr(self._view, "measure"): uses_measures = False else: raise AttributeError( f"This view ({self._view}) does not have a `.measure()` " "or `.measures()` function, so cannot be measured." ) import numpy as np colnames = [] columns = [] nframes = len(self) times = np.zeros(nframes, dtype=float) indexes = np.zeros(nframes, dtype=int) time_unit = None measure_unit = None from ..base import ProgressBar if uses_measures: for view in self._view: colnames.append(colname(view)) columns.append(np.zeros(nframes, dtype=float)) with ProgressBar(total=nframes, text="Looping through frames") as progress: for idx, frame in enumerate(self.__iter__()): for i, measure in enumerate(frame.measures(map=self._map)): columns[i][idx] = measure.to_default() times[idx] = frame.frame_time().to_default() if measure_unit is None: if not measure.is_zero(): measure_unit = measure.get_default().unit_string() if time_unit is None: time = frame.frame_time() if not time.is_zero(): time_unit = time.get_default().unit_string() indexes[idx] = frame.frame_index() progress.set_progress(idx) else: colnames.append(colname(self._view)) column = np.zeros(nframes, dtype=float) with ProgressBar(total=nframes, text="Looping through frames") as progress: for idx, frame in enumerate(self.__iter__()): measure = frame.measure(map=self._map) column[idx] = measure.to_default() times[idx] = frame.frame_time().to_default() if measure_unit is None: if not measure.is_zero(): measure_unit = measure.get_default().unit_string() if time_unit is None: time = frame.frame_time() if not time.is_zero(): time_unit = time.get_default().unit_string() indexes[idx] = frame.frame_index() progress.set_progress(idx) columns = [column] data = {} data["frame"] = indexes data["time"] = times for i in range(0, len(colnames)): data[colnames[i]] = columns[i] if to_pandas: import pandas as pd df = pd.DataFrame(data) df.set_index("frame") df.time_unit = lambda: time_unit df.measure_unit = lambda: measure_unit def pretty_plot(x="time", y=None): if y is None: y = colnames if x == "time": xlabel = f"Time / {df.time_unit()}" elif x == "frame": xlabel = "Frame" else: xlabel = x ax = df.plot(x=x, y=y) ax.set_xlabel(xlabel) ax.set_ylabel(f"Size / {df.measure_unit()}") ax.legend(bbox_to_anchor=(1.05, 1.0)) df.pretty_plot = pretty_plot return df return data def _custom_measures(self, func, to_pandas): if self._view is None: return {} if not type(func) is dict: func = {"custom": func} import numpy as np colnames = [] columns = [] nframes = len(self) times = np.zeros(nframes, dtype=float) indexes = np.zeros(nframes, dtype=int) time_unit = None measure_unit = None for key in func.keys(): colnames.append(key) columns.append(np.zeros(nframes, dtype=float)) from ..base import ProgressBar with ProgressBar(total=nframes, text="Looping through frames") as progress: for idx, frame in enumerate(self.__iter__()): for i, f in enumerate(func.values()): measure = f(frame) columns[i][idx] = measure.to_default() times[idx] = frame.frame_time().to_default() if measure_unit is None: if not measure.is_zero(): measure_unit = measure.get_default().unit_string() if time_unit is None: time = frame.frame_time() if not time.is_zero(): time_unit = time.get_default().unit_string() indexes[idx] = frame.frame_index() progress.set_progress(idx) data = {} data["frame"] = indexes data["time"] = times for i in range(0, len(colnames)): data[colnames[i]] = columns[i] if to_pandas: import pandas as pd df = pd.DataFrame(data) df.set_index("frame") df.time_unit = lambda: time_unit df.measure_unit = lambda: measure_unit def pretty_plot(x="time", y=None): if y is None: y = colnames if x == "time": xlabel = f"Time / {df.time_unit()}" elif x == "frame": xlabel = "Frame" else: xlabel = x ax = df.plot(x=x, y=y) ax.set_xlabel(xlabel) ax.set_ylabel(f"Size / {df.measure_unit()}") ax.legend(bbox_to_anchor=(1.05, 1.0)) df.pretty_plot = pretty_plot return df return data def measures(self, func=None, to_pandas=True): if func is None: return self._simple_measures(to_pandas=to_pandas) else: return self._custom_measures(func=func, to_pandas=to_pandas)
[docs] def apply(self, func, *args, **kwargs): """ Call the passed function on all frames of the trajectory, appending the result to a list of results, which is returned. The function can be either; 1. a string containing the name of the function to call, or 2. an actual function (either a normal function or a lambda expression) You can optionally pass in positional and keyword arguments here that will be passed to the function. Args: func (str or function): The function to be called, or the name of the function to be called. Returns: list: A list of the results, with one result per frame in the trajectory """ result = [] from ..base import ProgressBar nframes = len(self) if str(func) == func: # we calling a named function with ProgressBar(total=nframes, text="Looping through frames") as progress: for i in range(0, nframes): obj = self.__getitem__(i).current() result.append(getattr(obj, func)(*args, **kwargs)) progress.set_progress(i + 1) else: # we have been passed the function to call with ProgressBar(total=nframes, text="Looping through frames") as progress: for i in range(0, nframes): obj = self.__getitem__(i).current() result.append(func(obj, *args, **kwargs)) progress.set_progress(i + 1) return result
def view(self, *args, **kwargs): from ._view import view return view(self, *args, **kwargs)
[docs] def rmsd( self, reference=None, align: bool = None, frame: int = None, mapping=None, match_all: bool = True, map=None, to_pandas: bool = False, ): """ Calculate the RMSD across the frames of this trajectory, against either all atoms or, optionally, the passed view. You can specify whether to align the trajectory against the view (controlling which frame should be the source of alignment via `frame`) reference: molecule container or search string or index or range Anything that is valid to specify a sub-view of the trajectory, or that can be a molecular container itself. If this is a view, then the RMSD only to the atoms in the view is calculated. If the atoms aren't in this trajectory, then you need to provide a mapping that shows how you can map from the atoms in the view to the atoms in the trajectory. align: bool Whether or not to align the frames of the trajectory against the reference. This defaults to True frame: int The frame number of the reference against which to align the trajectory mapping: AtomMapping The mapping from atoms in the reference to atoms in the trajectory. This is only needed if the reference atoms are not contained in the atoms in this trajectory. match_all: bool Whether or not to find and match all of the atoms in 'reference' to the atoms in this trajectory. Normally you do want to do this, so it defaults to True. If this is False, then the RMSD of only atoms in this trajectory that in 'reference' will be calculated, even if it just a single atom. map: dict Any parameters that will overwrite any of the map parameters that are already in this trajectory to_pandas: bool Whether or not to write the output to a Pandas DataFrame (default False) """ from ..legacy.Mol import TrajectoryAligner from ..legacy.Mol import get_rmsd if reference is None: # nothing to align against align = False elif align is None: # auto-align only for smaller systems if hasattr(reference, "molecules") and len(reference.molecules()) > 100: align = False else: align = True (atoms, reference) = _get_align_atoms_and_reference( view=self.current(), align=reference, frame=frame, mapping=mapping, map=self._map, find_all=match_all, ) if map is None: map = self._map else: from ..base import create_map map = self._map.merge(create_map(map)) if align: aligner = TrajectoryAligner(atoms, reference, map=map) else: aligner = TrajectoryAligner() rmsd = get_rmsd( atoms=atoms, reference=reference, aligner=aligner, frames=self._values, map=map, ) if to_pandas: import pandas as pd if len(rmsd) == 0: return pd.DataFrame() # convert the RMSD to default length units from ..units import angstrom, picosecond length_unit = angstrom.get_default() time_unit = picosecond.get_default() rmsd = [x.to(length_unit) for x in rmsd] # get the times for each frame import copy t = copy.copy(self) t._view = self._view.atoms()[0] times = [] for frame in t: times.append(frame.frame_time()) times = [x.to(time_unit) for x in times] df = pd.DataFrame( {"frame": copy.copy(self._values), "time": times, "rmsd": rmsd} ) df.time_unit = lambda: time_unit.unit_string() df.measure_unit = lambda: length_unit.unit_string() def pretty_plot(x="time", y=None): if y is None: y = ["rmsd"] if x == "time": xlabel = f"Time / {df.time_unit()}" elif x == "frame": xlabel = "Frame" else: xlabel = x ax = df.plot(x=x, y=y) ax.set_xlabel(xlabel) ax.set_ylabel(f"RMSD / {df.measure_unit()}") ax.get_legend().remove() df.pretty_plot = pretty_plot return df else: return rmsd