Source code for sire.mol._dynamics

__all__ = ["Dynamics"]


class DynamicsData:
    """
    Internal class that is designed to only be used by the Dynamics
    class. This holds the shared state for dynamics on a set
    of molecule(s).
    """

    def __init__(self, mols=None, map=None, **kwargs):
        from ..base import create_map

        map = create_map(map, kwargs)

        # Save the original view, so that we can return an object
        # of the same type in .commit()
        self._orig_mols = mols

        if mols is not None:
            self._map = map  # this is already a PropertyMap

            # see if there are any fixed atoms
            if map.specified("fixed"):
                if map["fixed"].has_value():
                    fixed_atoms = map["fixed"].value()
                else:
                    fixed_atoms = map["fixed"].source()

                from . import selection_to_atoms

                # turn the fixed atoms into a list of atoms
                self._map.set(
                    "fixed",
                    mols.atoms().find(selection_to_atoms(mols, fixed_atoms)),
                )

            # see if there is a QM/MM engine
            if map.specified("qm_engine"):
                qm_engine = map["qm_engine"].value()

                from ..legacy.Convert import QMEngine
                from warnings import warn

                if qm_engine and not isinstance(qm_engine, QMEngine):
                    raise ValueError(
                        "'qm_engine' must be an instance of 'sire.legacy.Convert.QMEngine'"
                    )

                # If a QM/MM engine is specified, then we need to check that there is a
                # perturbable molecule.
                try:
                    pert_mol = mols["property is_perturbable"]
                except:
                    raise ValueError(
                        "You are trying to run QM/MM dynamics for a system without "
                        "a QM/MM enabled molecule!"
                    )

                # Check the constraints and raise a warning if the perturbable_constraint
                # is not "none".

                if map.specified("perturbable_constraint"):
                    perturbable_constraint = map["perturbable_constraint"].source()
                    if perturbable_constraint.lower() != "none":
                        warn(
                            "Running a QM/MM simulation with constraints on the QM "
                            "region is not recommended."
                        )
                else:
                    # The perturbable constraint is unset, so will follow the constraint.
                    # Make sure this is "none".
                    if map.specified("constraint"):
                        constraint = map["constraint"].source()
                        if constraint.lower() != "none":
                            warn(
                                "Running a QM/MM simulation with constraints on the QM "
                                "region is not recommended."
                            )
                    # Constraints will be automatically applied, so we can't guarantee that
                    # the constraint is "none".
                    else:
                        warn(
                            "Running a QM/MM simulation with constraints on the QM "
                            "region is not recommended."
                        )

            if map.specified("cutoff"):
                cutoff = map["cutoff"]

                if cutoff.has_source():
                    cutoff = cutoff.source()

                    if cutoff.lower() == "none" or cutoff.lower().startswith("infinit"):
                        self._map.set("cutoff_type", "NONE")
                        self._map.unset("cutoff")
                    elif cutoff.lower() == "auto":
                        self._map.unset("cutoff")
                    elif cutoff != "cutoff":
                        from .. import u

                        self._map.set("cutoff", u(cutoff))

            # get the forcefield info from the passed parameters
            # and from whatever we can glean from the molecules
            from ..system import ForceFieldInfo, System

            self._ffinfo = ForceFieldInfo(mols, map=self._map)

            # We want to store the molecules as a System so that
            # we can more easily track the space and time properties
            if type(mols) is System:
                # work on our own copy of the system
                self._sire_mols = mols.clone()
            else:
                # create a system to work on
                self._sire_mols = System()
                self._sire_mols._system.add(mols.molecules().to_molecule_group())
                self._sire_mols._system.set_property("space", self._ffinfo.space())

            # see if this is an interpolation simulation
            if map.specified("lambda_interpolate"):
                if map["lambda_interpolate"].has_value():
                    lambda_interpolate = map["lambda_interpolate"].value()
                else:
                    lambda_interpolate = map["lambda_interpolate"].source()

                # Single lambda value.
                try:
                    lambda_interpolate = float(lambda_interpolate)
                    map.set("lambda_value", lambda_interpolate)
                # Two lambda values.
                except:
                    try:
                        if not len(lambda_interpolate) == 2:
                            raise
                        lambda_interpolate = [float(x) for x in lambda_interpolate]
                        map.set("lambda_value", lambda_interpolate[0])
                    except:
                        raise ValueError(
                            "'lambda_interpolate' must be a float or a list of two floats"
                        )

                from ..units import kcal_per_mol

                self._is_interpolate = True
                self._lambda_interpolate = lambda_interpolate
                self._work = 0 * kcal_per_mol
                self._nrg_prev = 0 * kcal_per_mol

            else:
                self._is_interpolate = False

            # find the existing energy trajectory - we will build on this
            self._energy_trajectory = self._sire_mols.energy_trajectory(
                to_pandas=False, map=self._map
            )

            # make sure that the ensemble is recorded in the trajectory
            self._energy_trajectory.set_property("ensemble", self.ensemble())

            self._num_atoms = len(self._sire_mols.atoms())

            from ..units import nanosecond

            try:
                current_time = (
                    self._sire_mols.property(self._map["time"]).to(nanosecond)
                    * nanosecond
                )
            except Exception:
                current_time = 0 * nanosecond
                self._sire_mols.set_property("time", current_time)

            self._current_time = current_time
            self._current_step = 0
            self._elapsed_time = 0 * nanosecond
            self._walltime = 0 * nanosecond
            self._is_running = False
            self._schedule_changed = False

            # Check for a REST2 scaling factor.
            if map.specified("rest2_scale"):
                try:
                    rest2_scale = map["rest2_scale"].value().as_double()
                except:
                    raise ValueError("'rest2_scale' must be of type 'float'")
                if rest2_scale < 1.0:
                    raise ValueError("'rest2_scale' must be >= 1.0")
            else:
                rest2_scale = 1.0

            # Check for an additional REST2 selection.
            if map.specified("rest2_selection"):
                try:
                    rest2_selection = str(map["rest2_selection"])
                except:
                    raise ValueError("'rest2_selection' must be of type 'str'")

                try:
                    from . import selection_to_atoms

                    # Try to find the REST2 selection.
                    atoms = selection_to_atoms(mols, rest2_selection)
                except:
                    raise ValueError(
                        "Invalid 'rest2_selection' string. Please check the selection syntax."
                    )

                # Store all the perturbable molecules associated with the selection
                # and remove perturbable atoms from the selection. Remove alchemical ions
                # from the selection.
                pert_mols = {}
                non_pert_atoms = atoms.to_list()
                for atom in atoms:
                    mol = atom.molecule()
                    if mol.has_property("is_alchemical_ion"):
                        non_pert_atoms.remove(atom)
                    elif mol.has_property("is_perturbable"):
                        non_pert_atoms.remove(atom)
                        if mol.number() not in pert_mols:
                            pert_mols[mol.number()] = [atom]
                        else:
                            pert_mols[mol.number()].append(atom)

                # Now create a boolean is_rest2 mask for the atoms in the perturbable molecules.
                # Only do this if there are perturbable atoms in the selection.
                if len(non_pert_atoms) != len(atoms):
                    for num in pert_mols:
                        mol = self._sire_mols[num]
                        is_rest2 = [False] * mol.num_atoms()
                        for atom in pert_mols[num]:
                            is_rest2[atom.index().value()] = True

                        # Set the is_rest2 property for each perturbable molecule.
                        mol = (
                            mol.edit()
                            .set_property("is_rest2", is_rest2)
                            .molecule()
                            .commit()
                        )

                        # Update the system.
                        self._sire_mols.update(mol)

            # Search for alchemical ions and exclude them via a REST2 mask.
            try:
                for mol in self._sire_mols.molecules("property is_alchemical_ion"):
                    is_rest2 = [False] * mol.num_atoms()
                    mol = (
                        mol.edit()
                        .set_property("is_rest2", is_rest2)
                        .molecule()
                        .commit()
                    )
                    self._sire_mols.update(mol)
            except:
                pass

            from ..convert import to

            self._omm_mols = to(self._sire_mols, "openmm", map=self._map)
            self._clear_state()

            if self._ffinfo.space().is_periodic():
                self._enforce_periodic_box = True

                if self._map.specified("enforce_periodic_box"):
                    self._enforce_periodic_box = (
                        self._map["enforce_periodic_box"].value().as_boolean()
                    )
            else:
                self._enforce_periodic_box = False

            # Prepare the OpenMM REST2 data structures.
            if map.specified("rest2_selection"):
                if len(non_pert_atoms) > 0:
                    self._omm_mols._prepare_rest2(self._sire_mols, non_pert_atoms)
        else:
            self._sire_mols = None
            self._energy_trajectory = None

    def is_null(self):
        return self._sire_mols is None

    def _clear_state(self):
        self._omm_state = None
        self._omm_state_has_cv = (False, False)

    def _update_from(self, state, state_has_cv, nsteps_completed):
        if self.is_null():
            return

        self._current_step = nsteps_completed

        if not state_has_cv[0]:
            # there is no information to update
            return

        from ..legacy.Convert import (
            openmm_extract_coordinates_and_velocities,
            openmm_extract_coordinates,
            openmm_extract_space,
        )

        if self._sire_mols.num_atoms() == self._omm_mols.get_atom_index().count():
            # all of the atoms in all molecules are in the context,
            # and we can assume they are in atom index order
            mols_to_update = self._sire_mols.molecules()
        else:
            # some of the atoms aren't in the context, and they may be
            # in a different order
            mols_to_update = self._omm_mols.get_atom_index().atoms()
            mols_to_update.update(self._sire_mols.molecules())

        if state_has_cv[1]:
            # get velocities too
            mols_to_update = openmm_extract_coordinates_and_velocities(
                state,
                mols_to_update,
                # black auto-formats this to a long line
                perturbable_maps=self._omm_mols.get_lambda_lever().get_perturbable_molecule_maps(),  # noqa: E501
                map=self._map,
            )
        else:
            mols_to_update = openmm_extract_coordinates(
                state,
                mols_to_update,
                # black auto-formats this to a long line
                perturbable_maps=self._omm_mols.get_lambda_lever().get_perturbable_molecule_maps(),  # noqa: E501
                map=self._map,
            )

        self._sire_mols.update(mols_to_update.molecules())

        if self._ffinfo.space().is_periodic():
            # don't change the space if it is infinite - this
            # cannot change during the simulation and OpenMM
            # likes giving back fake spaces
            space = openmm_extract_space(state)
            self._sire_mols.set_property("space", space)
            self._ffinfo.set_space(space)

        self._sire_mols.set_property("time", self._current_time)

    def _enter_dynamics_block(self):
        if self._is_running:
            raise SystemError("Cannot start dynamics while it is already running!")

        self._is_running = True
        self._clear_state()

    def _exit_dynamics_block(
        self,
        save_frame: bool = False,
        save_energy: bool = False,
        lambda_windows=[],
        rest2_scale_factors=[],
        save_velocities: bool = False,
        delta_lambda: float = None,
        num_energy_neighbours: int = None,
        null_energy: float = None,
    ):
        if not self._is_running:
            raise SystemError("Cannot stop dynamics that is not running!")

        import openmm
        from ..units import nanosecond, kcal_per_mol

        if save_frame:
            self._omm_state = self._omm_mols.getState(
                getEnergy=True,
                getPositions=True,
                getVelocities=save_velocities,
                enforcePeriodicBox=self._enforce_periodic_box,
            )

            self._omm_state_has_cv = (True, save_velocities)
        else:
            self._omm_state = self._omm_mols.getState(getEnergy=True)
            self._omm_state_has_cv = (False, False)

        current_time = (
            self._omm_state.getTime().value_in_unit(openmm.unit.nanosecond) * nanosecond
        )

        delta = current_time - self._elapsed_time

        self._elapsed_time = current_time
        self._current_time += delta

        # store the number of lambda windows
        if lambda_windows is not None:
            num_lambda_windows = len(lambda_windows)

            # compute energies for all windows
            if num_energy_neighbours is None:
                num_energy_neighbours = num_lambda_windows

        if save_energy:
            # should save energy here
            nrgs = {}

            nrgs["kinetic"] = (
                self._omm_state.getKineticEnergy().value_in_unit(
                    openmm.unit.kilocalorie_per_mole
                )
                * kcal_per_mol
            )

            nrgs["potential"] = (
                self._omm_state.getPotentialEnergy().value_in_unit(
                    openmm.unit.kilocalorie_per_mole
                )
                * kcal_per_mol
            )

            sim_lambda_value = self._omm_mols.get_lambda()
            sim_rest2_scale = self._omm_mols.get_rest2_scale()

            # store the potential energy and accumulated non-equilibrium work
            if self._is_interpolate:
                nrg = nrgs["potential"]

                if (
                    len(self._lambda_interpolate) == 2
                    and sim_lambda_value != self._lambda_interpolate[0]
                ):
                    self._work += delta_lambda * (nrg - self._nrg_prev)
                self._nrg_prev = nrg
                nrgs["work"] = self._work
            else:
                nrgs[str(sim_lambda_value)] = nrgs["potential"]

                if lambda_windows is not None:
                    # get the index of the simulation lambda value in the
                    # lambda windows list
                    try:
                        lambda_index = lambda_windows.index(sim_lambda_value)
                    except:
                        num_energy_neighbours = num_lambda_windows
                        lambda_index = i

                    for i, (lambda_value, rest2_scale) in enumerate(
                        zip(lambda_windows, rest2_scale_factors)
                    ):
                        if lambda_value != sim_lambda_value:
                            if abs(lambda_index - i) <= num_energy_neighbours:
                                self._omm_mols.set_lambda(
                                    lambda_value,
                                    rest2_scale=rest2_scale,
                                    update_constraints=False,
                                )
                                nrgs[str(lambda_value)] = (
                                    self._omm_mols.get_potential_energy(
                                        to_sire_units=False
                                    ).value_in_unit(openmm.unit.kilocalorie_per_mole)
                                    * kcal_per_mol
                                )
                            else:
                                nrgs[str(lambda_value)] = null_energy * kcal_per_mol

                self._omm_mols.set_lambda(
                    sim_lambda_value,
                    rest2_scale=sim_rest2_scale,
                    update_constraints=False,
                )

            if self._is_interpolate:
                self._energy_trajectory.set(
                    self._current_time, nrgs, {"lambda": f"{sim_lambda_value:.8f}"}
                )
            else:
                self._energy_trajectory.set(
                    self._current_time, nrgs, {"lambda": str(sim_lambda_value)}
                )

            # update the interpolation lambda value
            if self._is_interpolate:
                if delta_lambda:
                    sim_lambda_value += delta_lambda
                    # clamp to [0, 1]
                    if sim_lambda_value < 0.0:
                        sim_lambda_value = 0.0
                    elif sim_lambda_value > 1.0:
                        sim_lambda_value = 1.0
                self._omm_mols.set_lambda(sim_lambda_value)

        self._is_running = False

        return (self._omm_state, self._omm_state_has_cv)

    def _get_current_state(
        self, include_coords: bool = False, include_velocities: bool = False
    ):
        if self._omm_state is not None:
            if self._omm_state_has_cv == (include_coords, include_velocities):
                return self._omm_state

        # we need to get this again as either it doesn't exist,
        # or we now want velocities as well
        if include_coords or include_velocities:
            self._omm_state = self._omm_mols.getState(
                getEnergy=True,
                getPositions=include_coords,
                getVelocities=include_velocities,
                enforcePeriodicBox=self._enforce_periodic_box,
            )
        else:
            self._omm_state = self._omm_mols.getState(getEnergy=True)

        self._omm_state_has_cv = (include_coords, include_velocities)

        return self._omm_state

    def timestep(self):
        if self.is_null():
            return 0
        else:
            from openmm.unit import picosecond as _omm_ps
            from ..units import picosecond as _sire_ps

            t = self._omm_mols.getIntegrator().getStepSize()

            return t.value_in_unit(_omm_ps) * _sire_ps

    def ensemble(self):
        if self.is_null():
            return None
        else:
            from ..move import Ensemble

            return Ensemble(map=self._map)

    def randomise_velocities(self, temperature=None, random_seed: int = None):
        """
        Set the velocities to random values, drawn from the Boltzmann
        distribution for the current temperature.

        Parameters
        ----------

        - temperature (temperature): The temperature to use. If None, then
          the current temperature will be used
        - random_seed (int): The random seed to use. If None, then
          a random seed will be generated
        """
        if self.is_null():
            return

        if temperature is not None:
            self.set_temperature(temperature, rescale_velocities=False)

        from ..units import kelvin

        if random_seed is None:
            self._omm_mols.setVelocitiesToTemperature(
                self.ensemble().temperature().to(kelvin)
            )
        else:
            self._omm_mols.setVelocitiesToTemperature(
                self.ensemble().temperature().to(kelvin), random_seed
            )

    def set_ensemble(self, ensemble, rescale_velocities: bool = True):
        """
        Set the ensemble for the dynamics. Note that this will
        only let you change the temperature and/or pressure of the
        ensemble. You can't change its fundemental nature.

        If rescale_velocities is True, then the velocities will
        be rescaled to the new temperature.
        """
        if self.is_null():
            return

        if ensemble.name() != self.ensemble().name():
            raise ValueError(
                "You cannot change the ensemble of the dynamics. "
                f"Currently the ensemble is {self.ensemble().name()}, "
                f"but you tried to set it to {ensemble.name()}."
            )

        if ensemble.temperature() != self.ensemble().temperature():
            self._map["temperature"] = ensemble.temperature()
            self._omm_mols.set_temperature(
                ensemble.temperature(), rescale_velocities=rescale_velocities
            )

        if ensemble.is_constant_pressure():
            if ensemble.pressure() != self.ensemble().pressure():
                self._map["pressure"] = ensemble.pressure()
                self._omm_mols.set_pressure(ensemble.pressure())

    def set_temperature(self, temperature, rescale_velocities: bool = True):
        """
        Set the temperature for the dynamics. Note that this will only
        let you change the temperature of the ensemble.
        You can't change its fundemental nature.

        If rescale_velocities is True, then the velocities will be
        rescaled to the new temperature.
        """
        if self.is_null():
            return

        ensemble = self.ensemble()

        ensemble.set_temperature(temperature)

        self.set_ensemble(ensemble=ensemble, rescale_velocities=rescale_velocities)

    def set_pressure(self, pressure):
        """
        Set the pressure for the dynamics. Note that this will only
        let you change the pressure of the ensemble.
        You can't change its fundemental nature.
        """
        if self.is_null():
            return

        ensemble = self.ensemble()

        ensemble.set_pressure(pressure)

        self.set_ensemble(ensemble=ensemble)

    def constraint(self):
        if self.is_null():
            return None
        else:
            return self._omm_mols.get_constraint()

    def perturbable_constraint(self):
        if self.is_null():
            return None
        else:
            return self._omm_mols.get_perturbable_constraint()

    def get_constraints(self):
        """
        Return the actual list of constraints that have been applied
        to this system. This is two lists of atoms, plus a list of
        distances. The constraint is atom0[i]::atom1[i] with distance[i]
        """
        if self.is_null():
            return None
        else:
            return self._omm_mols.get_constraints()

    def get_schedule(self):
        if self.is_null():
            return None
        else:
            return self._omm_mols.get_lambda_schedule()

    def set_schedule(self, schedule):
        if not self.is_null():
            self._omm_mols.set_lambda_schedule(schedule)
            self._schedule_changed = True
            self.set_lambda(
                self._omm_mols.get_lambda(),
                rest2_scale=self._omm_mols.get_rest2_scale(),
            )

    def get_lambda(self):
        if self.is_null():
            return None
        else:
            return self._omm_mols.get_lambda()

    def set_lambda(
        self,
        lambda_value: float,
        rest2_scale: float = 1.0,
        update_constraints: bool = True,
    ):
        if not self.is_null():
            s = self.get_schedule()

            if s is None:
                return

            lambda_value = s.clamp(lambda_value)

            if (
                (not self._schedule_changed)
                and (lambda_value == self._omm_mols.get_lambda())
                and (rest2_scale == self._omm_mols.get_rest2_scale())
            ):
                # nothing to do
                return

            self._omm_mols.set_lambda(
                lambda_value=lambda_value,
                rest2_scale=rest2_scale,
                update_constraints=update_constraints,
            )
            self._schedule_changed = False
            self._clear_state()

    def integrator(self):
        if self.is_null():
            return None
        else:
            return self._omm_mols.getIntegrator()

    def info(self):
        if self.is_null():
            return None
        else:
            return self._ffinfo

    def platform(self):
        if self.is_null():
            return None
        else:
            return self._omm_mols.getPlatform().getName()

    def current_step(self):
        if self.is_null():
            return 0
        else:
            return self._current_step

    def current_time(self):
        if self.is_null():
            return 0
        else:
            return self._current_time

    def current_space(self):
        if self.is_null():
            return None
        else:
            return self._ffinfo.space()

    def current_walltime(self):
        if self.is_null():
            return 0
        else:
            return self._walltime

    def elapsed_time(self):
        if self.is_null():
            return 0
        else:
            return self._elapsed_time

    def walltime(self):
        if self.is_null():
            return 0
        else:
            return self._walltime

    def current_energy(self):
        if self.is_null():
            return 0
        else:
            from openmm.unit import kilocalorie_per_mole as _omm_kcal_mol
            from ..units import kcal_per_mol as _sire_kcal_mol

            state = self._get_current_state()

            nrg = state.getKineticEnergy() + state.getPotentialEnergy()

            return nrg.value_in_unit(_omm_kcal_mol) * _sire_kcal_mol

    def current_potential_energy(self):
        if self.is_null():
            return 0
        else:
            from openmm.unit import kilocalorie_per_mole as _omm_kcal_mol
            from ..units import kcal_per_mol as _sire_kcal_mol

            state = self._get_current_state()

            nrg = state.getPotentialEnergy()

            return nrg.value_in_unit(_omm_kcal_mol) * _sire_kcal_mol

    def current_kinetic_energy(self):
        if self.is_null():
            return 0
        else:
            from openmm.unit import kilocalorie_per_mole as _omm_kcal_mol
            from ..units import kcal_per_mol as _sire_kcal_mol

            state = self._get_current_state()

            nrg = state.getKineticEnergy()

            return nrg.value_in_unit(_omm_kcal_mol) * _sire_kcal_mol

    def energy_trajectory(self):
        return self._energy_trajectory.clone()

    def step(self, num_steps: int = 1):
        """
        Just perform 'num_steps' steps of dynamics, without saving
        anything or running anything in a background thread. This is
        designed for times when we want a minimial overhead, e.g.
        when we want to run a small number of steps quickly.
        """
        if self._is_running:
            raise SystemError("Cannot step dynamics while it is already running!")

        self._clear_state()

        self._omm_mols.getIntegrator().step(num_steps)

    def get_minimisation_log(self):
        """
        Return the log from the last minimisation
        """
        if self.is_null():
            return None
        else:
            try:
                return self._minimisation_log
            except Exception:
                return None

    def run_minimisation(
        self,
        max_iterations: int = 10000,
        tolerance: float = 10.0,
        max_restarts: int = 10,
        max_ratchets: int = 20,
        ratchet_frequency: int = 500,
        starting_k: float = 400.0,
        ratchet_scale: float = 10.0,
        max_constraint_error: float = 0.001,
        timeout: str = "300s",
    ):
        """
        Internal method that runs minimisation on the molecules.

        If the system is constrained, then a ratcheting algorithm is used.
        The constraints are replaced by harmonic restraints with an
        force constant based on `tolerance` and `starting_k`. Minimisation
        is performed, with the actual constrained bond lengths checked
        whenever minimisation converges, or when ratchet_frequency steps
        have completed (whichever is sooner). The force constant of
        the restraints is ratcheted up by `ratchet_scale`, and minimisation
        continues until there is no large change in energy or the maximum
        number of ratchets has been reached. In addition, at each ratchet,
        the actual bond lengths of constrained bonds are compared against
        the constrained values. If these have drifted too far away from
        the constrained values, then the minimisation is restarted,
        going back to the starting conformation and starting minimisation
        at one higher ratchet level. This will repeat a maximum of
        `max_restarts` times.

        If a stable structure cannot be reached, then an exception
        will be raised.

        Parameters:

        - max_iterations (int): The maximum number of iterations to run
        - tolerance (float): The tolerance to use for the minimisation
        - max_restarts (int): The maximum number of restarts before giving up
        - max_ratchets (int): The maximum number of ratchets before giving up
        - ratchet_frequency (int): The maximum number of steps between ratchets
        - starting_k (float): The starting value of k for the minimisation
        - ratchet_scale (float): The amount to scale k at each ratchet
        - max_constraint_error (float): The maximum error in the constraint in nm
        - timeout (float): The maximum time to run the minimisation for in seconds.
                           A value of <=0 will disable the timeout.
        """
        from ..legacy.Convert import minimise_openmm_context

        if max_iterations <= 0:
            max_iterations = 0

        try:
            from ..units import second
            from .. import u

            timeout = u(timeout)
            if not timeout.has_same_units(second):
                raise ValueError("'timeout' must have units of time")
        except:
            raise ValueError("Unable to parse 'timeout' as a time")

        self._clear_state()

        self._minimisation_log = minimise_openmm_context(
            self._omm_mols,
            tolerance=tolerance,
            max_iterations=max_iterations,
            max_restarts=max_restarts,
            max_ratchets=max_ratchets,
            ratchet_frequency=ratchet_frequency,
            starting_k=starting_k,
            ratchet_scale=ratchet_scale,
            max_constraint_error=max_constraint_error,
            timeout=timeout.to(second),
        )

    def _rebuild_and_minimise(self):
        if self.is_null():
            return

        from ..utils import Console

        Console.warning(
            "Something went wrong when running dynamics. The system will be "
            "minimised, and then dynamics will be attempted again. If an "
            "error still occurs, then it is likely that the step size is too "
            "large, the molecules are over-constrained, or there is something "
            "more fundemental going wrong..."
        )

        # rebuild the molecules
        from ..convert import to

        self._omm_mols = to(self._sire_mols, "openmm", map=self._map)

        self.run_minimisation()

    def run(
        self,
        time,
        save_frequency=None,
        frame_frequency=None,
        energy_frequency=None,
        lambda_windows=None,
        rest2_scale_factors=None,
        save_velocities: bool = None,
        save_frame_on_exit: bool = False,
        save_energy_on_exit: bool = False,
        auto_fix_minimise: bool = True,
        num_energy_neighbours: int = None,
        null_energy: str = None,
    ):
        if self.is_null():
            return

        orig_args = {
            "time": time,
            "save_frequency": save_frequency,
            "frame_frequency": frame_frequency,
            "energy_frequency": energy_frequency,
            "lambda_windows": lambda_windows,
            "rest2_scale_factors": rest2_scale_factors,
            "save_velocities": save_velocities,
            "save_frame_on_exit": save_frame_on_exit,
            "save_energy_on_exit": save_energy_on_exit,
            "auto_fix_minimise": auto_fix_minimise,
            "num_energy_neighbours": num_energy_neighbours,
            "null_energy": null_energy,
        }

        from concurrent.futures import ThreadPoolExecutor
        import openmm

        from ..units import picosecond
        from .. import u

        time = u(time)

        if save_frequency is not None:
            save_frequency = u(save_frequency)

        if frame_frequency is not None:
            frame_frequency = u(frame_frequency)

        if energy_frequency is not None:
            energy_frequency = u(energy_frequency)

        if null_energy is not None:
            null_energy = u(null_energy)
        else:
            null_energy = u("10000 kcal/mol")

        if num_energy_neighbours is not None:
            try:
                num_energy_neighbours = int(num_energy_neighbours)
            except:
                num_energy_neighbours = len(lambda_windows)

        try:
            steps_to_run = int(time.to(picosecond) / self.timestep().to(picosecond))
        except Exception:
            # passed in the number of steps instead
            steps_to_run = int(time)

        if steps_to_run < 1:
            steps_to_run = 1

        if steps_to_run <= 0:
            return

        # get the energy and frame save frequencies
        if save_frequency != 0:
            if save_frequency is None:
                if self._map.specified("save_frequency"):
                    save_frequency = self._map["save_frequency"].value().to(picosecond)
                else:
                    save_frequency = 25
            else:
                save_frequency = save_frequency.to(picosecond)
            no_save = False
        else:
            no_save = True
            save_frequency = steps_to_run + 1

        if energy_frequency != 0:
            no_save_energy = False
            if energy_frequency is None:
                if self._map.specified("energy_frequency"):
                    energy_frequency = (
                        self._map["energy_frequency"].value().to(picosecond)
                    )
                else:
                    energy_frequency = save_frequency
                    no_save_energy = no_save
            else:
                energy_frequency = energy_frequency.to(picosecond)
        else:
            energy_frequency = steps_to_run + 1
            no_save_energy = True

        if frame_frequency != 0:
            no_save_frame = False
            if frame_frequency is None:
                if self._map.specified("frame_frequency"):
                    frame_frequency = (
                        self._map["frame_frequency"].value().to(picosecond)
                    )
                else:
                    frame_frequency = save_frequency
                    no_save_frame = no_save
            else:
                frame_frequency = frame_frequency.to(picosecond)
        else:
            frame_frequency = steps_to_run + 1
            no_save_frame = True

        completed = 0

        frame_frequency_steps = int(frame_frequency / self.timestep().to(picosecond))

        energy_frequency_steps = int(energy_frequency / self.timestep().to(picosecond))

        # If performing QM/MM lambda interpolation, then we just compute energies
        # for the pure MM (0.0) and QM (1.0) potentials.
        if self._is_interpolate:
            lambda_windows = [0.0, 1.0]
            # Work out the lambda increment.
            if isinstance(self._lambda_interpolate, list):
                divisor = (steps_to_run / energy_frequency_steps) - 1.0
                delta_lambda = (
                    self._lambda_interpolate[1] - self._lambda_interpolate[0]
                ) / divisor
            # Fixed lambda value.
            else:
                delta_lambda = None

            # Set the REST2 scaling factors.
            rest2_scale_factors = [1.0, 1.0]
        else:
            delta_lambda = None
            if lambda_windows is not None:
                if not isinstance(lambda_windows, list):
                    lambda_windows = [lambda_windows]
            else:
                if self._map.specified("lambda_windows"):
                    lambda_windows = self._map["lambda_windows"].value()

            if rest2_scale_factors is not None:
                if len(rest2_scale_factors) != len(lambda_windows):
                    raise ValueError(
                        "len(rest2_scale_factors) must be equal to len(lambda_windows)"
                    )
            else:
                if lambda_windows is not None:
                    rest2_scale_factors = [1.0] * len(lambda_windows)

        def runfunc(num_steps):
            try:
                integrator = self._omm_mols.getIntegrator()
                integrator.step(num_steps)
                return 0
            except Exception as e:
                return e

        def process_block(state, state_has_cv, nsteps_completed):
            self._update_from(state, state_has_cv, nsteps_completed)

            if state_has_cv[0] or state_has_cv[1]:
                self._sire_mols.save_frame(
                    map={
                        "space": self.current_space(),
                        "time": self.current_time(),
                    }
                )

        state = None
        state_has_cv = (False, False)
        saved_last_frame = False

        # whether the energy or frame were saved after the current block
        have_saved_frame = False
        have_saved_energy = False

        class NeedsMinimiseError(Exception):
            pass

        nsteps_before_run = self._current_step
        # if this is the first call, then set the save frequencies
        if nsteps_before_run == 0:
            self._next_save_frame = frame_frequency_steps
            self._next_save_energy = energy_frequency_steps
            self._prev_frame_frequency_steps = frame_frequency_steps
            self._prev_energy_frequency_steps = energy_frequency_steps
            self._prev_no_frame = no_save_frame
            self._prev_no_energy = no_save_energy
        # handle adjustments to the save frequencies
        else:
            if frame_frequency_steps != self._prev_frame_frequency_steps:
                if self._prev_no_frame:
                    self._next_save_frame = nsteps_before_run + frame_frequency_steps
                else:
                    self._next_save_frame = (
                        self._next_save_frame
                        + frame_frequency_steps
                        - self._prev_frame_frequency_steps
                    )
            if energy_frequency_steps != self._prev_energy_frequency_steps:
                if self._prev_no_energy:
                    self._next_save_energy = nsteps_before_run + energy_frequency_steps
                else:
                    self._next_save_energy = (
                        self._next_save_energy
                        + energy_frequency_steps
                        - self._prev_energy_frequency_steps
                    )
            self._prev_no_frame = no_save_frame
            self._prev_frame_frequency_steps = frame_frequency_steps
            self._prev_no_energy = no_save_energy
            self._prev_energy_frequency_steps = energy_frequency_steps

        from ..base import ProgressBar
        from ..units import second
        from datetime import datetime
        from math import isnan

        try:
            with ProgressBar(total=steps_to_run, text="dynamics") as progress:
                progress.set_speed_unit("steps / s")

                start_time = datetime.now()

                with ThreadPoolExecutor() as pool:
                    while completed < steps_to_run:
                        block_size = 50

                        steps_till_frame = self._next_save_frame - (
                            completed + nsteps_before_run
                        )
                        if steps_till_frame <= 0 or (
                            steps_till_frame <= block_size
                            and steps_till_frame <= steps_to_run - completed
                        ):
                            save_frame = True
                            self._next_save_frame += frame_frequency_steps
                            if frame_frequency_steps < block_size:
                                block_size = frame_frequency_steps
                        else:
                            save_frame = False

                        steps_till_energy = self._next_save_energy - (
                            completed + nsteps_before_run
                        )
                        if steps_till_energy <= 0 or (
                            steps_till_energy <= block_size
                            and steps_till_energy <= steps_to_run - completed
                        ):
                            save_energy = True
                            self._next_save_energy += energy_frequency_steps
                            if energy_frequency_steps < block_size:
                                block_size = energy_frequency_steps
                        else:
                            save_energy = False

                        # save the last frame if we're about to exit and the user
                        # has requested it
                        if (
                            save_frame_on_exit
                            and completed + block_size >= steps_to_run
                        ):
                            save_frame = True

                        # save the last energy if we're about to exit and the user
                        # has requested it
                        if (
                            save_energy_on_exit
                            and completed + block_size >= steps_to_run
                        ):
                            save_energy = True

                        self._enter_dynamics_block()

                        # process the last block in the foreground
                        if state is not None:
                            process_promise = pool.submit(
                                process_block,
                                state,
                                state_has_cv,
                                nsteps_before_run + completed,
                            )
                        else:
                            process_promise = None

                        nrun = block_size

                        # this block will exceed the run time so reduce the size
                        if nrun > steps_to_run - completed:
                            nrun = steps_to_run - completed

                        # run the current block in the background
                        run_promise = pool.submit(runfunc, nrun)

                        result = None

                        while not run_promise.done():
                            try:
                                result = run_promise.result(timeout=1.0)
                            except Exception as e:
                                if (
                                    "NaN" in str(e)
                                    and not have_saved_frame
                                    and not have_saved_energy
                                    and auto_fix_minimise
                                ):
                                    raise NeedsMinimiseError()

                        # catch rare edge case where the promise timed
                        # out, but then completed before the .done()
                        # test in the next loop iteration
                        if result is None:
                            result = run_promise.result()

                        if result == 0:
                            completed += nrun
                            progress.set_progress(completed)
                            run_promise = None
                        else:
                            # make sure we finish processing the last block
                            if process_promise is not None:
                                try:
                                    process_promise.result()
                                except Exception as e:
                                    if (
                                        "NaN" in str(e)
                                        and not have_saved_frame
                                        and not have_saved_energy
                                        and auto_fix_minimise
                                    ):
                                        raise NeedsMinimiseError()

                            if (
                                not have_saved_frame
                                and not have_saved_energy
                                and auto_fix_minimise
                            ):
                                raise NeedsMinimiseError()

                            # something went wrong - re-raise the exception
                            raise result

                        # make sure we've finished processing the last block
                        if process_promise is not None:
                            process_promise.result()
                            process_promise = None
                            saved_last_frame = True

                        # get the state, including coordinates and velocities
                        state, state_has_cv = self._exit_dynamics_block(
                            save_frame=save_frame,
                            save_energy=save_energy,
                            lambda_windows=lambda_windows,
                            rest2_scale_factors=rest2_scale_factors,
                            save_velocities=save_velocities,
                            delta_lambda=delta_lambda,
                            num_energy_neighbours=num_energy_neighbours,
                            null_energy=null_energy.value(),
                        )

                        saved_last_frame = False

                        have_saved_frame = save_frame
                        have_saved_energy = save_energy

                        kinetic_energy = state.getKineticEnergy().value_in_unit(
                            openmm.unit.kilocalorie_per_mole
                        )

                        ke_per_atom = kinetic_energy / self._num_atoms

                        if isnan(ke_per_atom) or ke_per_atom > 1000:
                            # The system has blown up!
                            state = None
                            saved_last_frame = True

                            if (
                                not have_saved_frame
                                and not have_saved_energy
                                and auto_fix_minimise
                            ):
                                raise NeedsMinimiseError()

                            raise RuntimeError(
                                "The kinetic energy has exceeded 1000 kcal "
                                f"mol-1 per atom (it is {ke_per_atom} kcal "
                                f"mol-1 atom-1, and {kinetic_energy} kcal "
                                "mol-1 total). This suggests that the "
                                "simulation has become unstable. Try reducing "
                                "the timestep and/or minimising the system "
                                "and run again."
                            )

                self._walltime += (datetime.now() - start_time).total_seconds() * second

            if state is not None and not saved_last_frame:
                # we can process the last block in the main thread
                process_block(
                    state=state,
                    state_has_cv=state_has_cv,
                    nsteps_completed=nsteps_before_run + completed,
                )

        except NeedsMinimiseError:
            # try to fix this problem by minimising,
            # then running again
            self._is_running = False
            self._clear_state()
            self._rebuild_and_minimise()
            orig_args["auto_fix_minimise"] = False
            self.run(**orig_args)
            return

    def to_xml(self, f=None):
        """
        Save the current state of the dynamics to XML.
        This is mostly used for debugging. This will return the
        XML string if 'f' is None. Otherwise it will write the
        XML to 'f' (either a filename, or a FILE object)
        """
        return self._omm_mols.to_xml(f=f)

    def commit(self, return_as_system: bool = False):
        if self.is_null():
            return

        self._update_from(
            state=self._get_current_state(include_coords=True, include_velocities=True),
            state_has_cv=(True, True),
            nsteps_completed=self._current_step,
        )

        self._sire_mols.set_energy_trajectory(self._energy_trajectory, map=self._map)

        self._sire_mols.set_ensemble(self.ensemble())

        if return_as_system:
            return self._sire_mols.clone()

        elif self._orig_mols is not None:
            from ..system import System

            if System.is_system(self._orig_mols):
                return self._sire_mols.clone()
            else:
                r = self._orig_mols.clone()
                r.update(self._sire_mols.molecules())
                return r
        else:
            return self._sire_mols.clone()


def _add_extra(extras, key, value):
    if value is not None:
        extras[key] = value


[docs] class Dynamics: """ Class that runs dynamics on the contained molecule(s). Note that this class is not designed to be constructed directly. You should only use this class by calling `.dynamics()` on the molecules(s) you want to simulate """ def __init__( self, mols=None, map=None, cutoff=None, cutoff_type=None, timestep=None, constraint=None, perturbable_constraint=None, schedule=None, lambda_value=None, swap_end_states=None, ignore_perturbations=None, shift_delta=None, shift_coulomb=None, coulomb_power=None, restraints=None, fixed=None, qm_engine=None, lambda_interpolate=None, ): from ..base import create_map from .. import u extras = {} _add_extra(extras, "cutoff", cutoff) _add_extra(extras, "cutoff_type", cutoff_type) if timestep is not None: _add_extra(extras, "timestep", u(timestep)) _add_extra(extras, "constraint", constraint) _add_extra(extras, "perturbable_constraint", perturbable_constraint) _add_extra(extras, "schedule", schedule) _add_extra(extras, "lambda", lambda_value) _add_extra(extras, "swap_end_states", swap_end_states) _add_extra(extras, "ignore_perturbations", ignore_perturbations) if shift_delta is not None: _add_extra(extras, "shift_delta", u(shift_delta)) if shift_coulomb is not None: _add_extra(extras, "shift_coulomb", u(shift_coulomb)) _add_extra(extras, "coulomb_power", coulomb_power) _add_extra(extras, "restraints", restraints) _add_extra(extras, "fixed", fixed) _add_extra(extras, "qm_engine", qm_engine) _add_extra(extras, "lambda_interpolate", lambda_interpolate) map = create_map(map, extras) self._d = DynamicsData(mols=mols, map=map) # Save the original view too, as this is the view that # will be returned to the user self._view = mols def __str__(self): speed = self.time_speed() if speed == 0: if self.current_step() > 0: return ( f"Dynamics(completed={self.current_time()}, " f"energy={self.current_energy()}, speed=FAST ns day-1)" ) else: return "Dynamics(completed=0)" else: return ( f"Dynamics(completed={self.current_time()}, " f"energy={self.current_energy()}, speed={speed:.1f} ns day-1)" ) def __repr__(self): return self.__str__()
[docs] def minimise( self, max_iterations: int = 10000, tolerance: float = 10.0, max_restarts: int = 10, max_ratchets: int = 20, ratchet_frequency: int = 500, starting_k: float = 400.0, ratchet_scale: float = 10.0, max_constraint_error: float = 0.001, timeout: str = "300s", ): """ Internal method that runs minimisation on the molecules. If the system is constrained, then a ratcheting algorithm is used. The constraints are replaced by harmonic restraints with an force constant based on `tolerance` and `starting_k`. Minimisation is performed, with the actual constrained bond lengths checked whenever minimisation converges, or when ratchet_frequency steps have completed (whichever is sooner). The force constant of the restraints is ratcheted up by `ratchet_scale`, and minimisation continues until there is no large change in energy or the maximum number of ratchets has been reached. In addition, at each ratchet, the actual bond lengths of constrained bonds are compared against the constrained values. If these have drifted too far away from the constrained values, then the minimisation is restarted, going back to the starting conformation and starting minimisation at one higher ratchet level. This will repeat a maximum of `max_restarts` times. If a stable structure cannot be reached, then an exception will be raised. Parameters: - max_iterations (int): The maximum number of iterations to run - tolerance (float): The tolerance to use for the minimisation - max_restarts (int): The maximum number of restarts before giving up - max_ratchets (int): The maximum number of ratchets before giving up - ratchet_frequency (int): The maximum number of steps between ratchets - starting_k (float): The starting value of k for the minimisation - ratchet_scale (float): The amount to scale k at each ratchet - max_constraint_error (float): The maximum error in the constraints in nm - timeout (float): The maximum time to run the minimisation for in seconds. A value of <=0 will disable the timeout. """ if not self._d.is_null(): self._d.run_minimisation( max_iterations=max_iterations, tolerance=tolerance, max_restarts=max_restarts, max_ratchets=max_ratchets, ratchet_frequency=ratchet_frequency, starting_k=starting_k, ratchet_scale=ratchet_scale, max_constraint_error=max_constraint_error, timeout=timeout, ) return self
[docs] def step(self, num_steps: int = 1): """ Simple function that performs `num_steps` steps of dynamics. This does not save any frames or energies - it is designed for times when you want to run a small number of steps quickly with minimal overhead. """ if not self._d.is_null(): self._d.step(num_steps=num_steps) return self
[docs] def run( self, time, save_frequency=None, frame_frequency=None, energy_frequency=None, lambda_windows=None, rest2_scale_factors=None, save_velocities: bool = None, save_frame_on_exit: bool = False, save_energy_on_exit: bool = False, auto_fix_minimise: bool = True, num_energy_neighbours: int = None, null_energy: str = None, ): """ Perform dynamics on the molecules. Parameters time: Time The amount of simulation time to run, e.g. dynamics.run(sr.u("5 ps")) would perform 5 picoseconds of dynamics. The number of steps is determined automatically based on the current timestep (e.g. if the timestep was 1 femtosecond, then 5 picoseconds would involve running 5000 steps) save_frequency: Time The amount of simulation time between saving frames (coordinates, velocities) and energies from the trajectory. The number of timesteps between saves will depend on the timestep. For example, if save_frequency was 0.1 picoseconds and the timestep was 2 femtoseconds, then the coordinates would be saved every 50 steps of dynamics. Note that `frame_frequency` or `energy_frequency` can be used to override the frequency of saving frames or energies, if you want them to be saved with different frequencies. Specifying both will mean that the value of `save_frequency` will be ignored. frame_frequency: Time The amount of simulation time between saving frames (coordinates, velocities) from the trajectory. The number of timesteps between saves will depend on the timestep. For example, if save_frequency was 0.1 picoseconds and the timestep was 2 femtoseconds, then the coordinates would be saved every 50 steps of dynamics. The energies will be saved into this object and are accessible via the `energy_trajectory` function. energy_frequency: Time The amount of simulation time between saving energies (kinetic and potential) from the trajectory. The number of timesteps between saves will depend on the timestep. For example, if save_frequency was 0.1 picoseconds and the timestep was 2 femtoseconds, then the coordinates would be saved every 50 steps of dynamics. The energies will be saved into this object and are accessible via the `energy_trajectory` function. lambda_windows: list[float] The values of lambda for which the potential energy will be evaluated at every save. If this is None (the default) then only the current energy will be saved every `energy_frequency` time. If this is not None, then the potential energy for each of the lambda values in this list will be saved. Note that we always save the potential energy of the simulated lambda value, even if it is not in the list of lambda windows. rest2_scale_factors: list[float] The scaling factors for the REST2 region for each lambda window. save_velocities: bool Whether or not to save the velocities when running dynamics. By default this is False. Set this to True if you are interested in saving the velocities. save_frame_on_exit: bool Whether to save a trajectory frame on exit, regardless of whether the frame frequency has been reached. save_energy_on_exit: bool Whether to save the energy on exit, regardless of whether the energy frequency has been reached. auto_fix_minimise: bool Whether or not to automatically run minimisation if the trajectory exits with an error in the first few steps. Such failures often indicate that the system needs minimsing. This automatically runs the minimisation in these cases, and then runs the requested dynamics. num_energy_neighbours: int The number of neighbouring windows to use when computing the energy trajectory for the simulation lambda value. This can be used to compute energies over a subset of the values in 'lambda_windows', hence reducing the cost of computing the energy trajectory. Note that the simulation lambda value must be contained in 'lambda_windows', so it is recommended that the values are rounded. A value of 'null_energy' will be added to the energy trajectory for the lambda windows that are omitted. Note that a similar result can be achieved by simply removing any lambda values from 'lambda_windows' that you don't want, but this will result in an energy trajectory that only contains results for the specified lambda values. If None, then all lambda windows will be used. null_energy: str The energy value to use for lambda windows that are not being computed as part of the energy trajectory, i.e. when 'num_energy_neighbours' is less than len(lambda_windows). By default, a value of '10000 kcal mol-1' is used. """ if not self._d.is_null(): if save_velocities is None: if self._d._map.specified("save_velocities"): save_velocities = self._d._map["save_velocities"].value().as_bool() else: save_velocities = False self._d.run( time=time, save_frequency=save_frequency, frame_frequency=frame_frequency, energy_frequency=energy_frequency, lambda_windows=lambda_windows, rest2_scale_factors=rest2_scale_factors, save_velocities=save_velocities, save_frame_on_exit=save_frame_on_exit, save_energy_on_exit=save_energy_on_exit, auto_fix_minimise=auto_fix_minimise, num_energy_neighbours=num_energy_neighbours, null_energy=null_energy, ) return self
[docs] def get_schedule(self): """ Return the LambdaSchedule that shows how lambda changes the underlying forcefield parameters in the system. Returns None if this isn't a perturbable system. """ return self._d.get_schedule()
[docs] def set_schedule(self, schedule): """ Set the LambdaSchedule that will be used to control how lambda changes the underlying forcefield parameters in the system. This does nothing if this isn't a perturbable system """ self._d.set_schedule(schedule)
[docs] def get_lambda(self): """ Return the current value of lambda for this system. This does nothing if this isn't a perturbable system """ return self._d.get_lambda()
[docs] def set_lambda( self, lambda_value: float, rest2_scale: float = 1.0, update_constraints: bool = True, ): """ Set the current value of lambda for this system. This will update the forcefield parameters in the context according to the data in the LambdaSchedule. This does nothing if this isn't a perturbable system. The `rest2_scale` parameter specifies the temperature of the REST2 region relative to the rest of the system at the specified lambda value. If `update_constraints` is True, then this will also update the constraint length of any constrained perturbable bonds. These will be set to the r0 value for that bond at this value of lambda. If `update_constraints` is False, then the constraint will not be changed. """ self._d.set_lambda( lambda_value=lambda_value, rest2_scale=rest2_scale, update_constraints=update_constraints, )
[docs] def get_rest2_scale(self): """ Return the current REST2 scaling factor. """ if self.is_null(): return None return self._d.get_rest2_scale()
[docs] def set_rest2_scale(self, rest2_scale: float): """ Set the current REST2 scaling factor. """ self._d.set_rest2_scale(rest2_scale=rest2_scale)
[docs] def ensemble(self): """ Return the ensemble in which the simulation is being performed """ return self._d.ensemble()
[docs] def set_ensemble(self, ensemble): """ Set the ensemble that should be used for this simulation. Note that you can only use this function to change temperature and/or pressure values. You can't change the fundemental ensemble of the simulation. """ self._d.set_ensemble(ensemble)
[docs] def set_temperature(self, temperature, rescale_velocities: bool = True): """ Set the temperature for the dynamics. Note that this will only let you change the temperature of the ensemble. You can't change its fundemental nature. If rescale_velocities is True, then the velocities will be rescaled to the new temperature. """ self._d.set_temperature( temperature=temperature, rescale_velocities=rescale_velocities )
[docs] def set_pressure(self, pressure): """ Set the pressure for the dynamics. Note that this will only let you change the pressure of the ensemble. You can't change its fundemental nature. """ self._d.set_pressure(pressure=pressure)
[docs] def randomise_velocities(self, temperature=None, random_seed: int = None): """ Set the velocities to random values, drawn from the Boltzmann distribution for the current temperature. Parameters ---------- - temperature (temperature): The temperature to use. If None, then the current temperature will be used - random_seed (int): The random seed to use. If None, then a random seed will be generated """ self._d.randomise_velocities(temperature=temperature, random_seed=random_seed)
[docs] def constraint(self): """ Return the constraint used for the dynamics (e.g. constraining bonds involving hydrogens etc.) """ return self._d.constraint()
[docs] def perturbable_constraint(self): """ Return the perturbable constraint used for the dynamics (e.g. constraining bonds involving hydrogens etc.) """ return self._d.perturbable_constraint()
[docs] def get_constraints(self): """ Return the actual list of constraints that have been applied to this system. This is two lists of atoms, plus a list of distances. The constraint is atom0[i]::atom1[i] with distance[i] """ return self._d.get_constraints()
[docs] def integrator(self): """ Return the integrator that is used to run dynamics """ return self._d.integrator()
[docs] def context(self): """ Return the underlying OpenMM context that is being driven by this dynamics object. """ return self._d._omm_mols
[docs] def info(self): """ Return the information that describes the forcefield that will be used for dynamics """ return self._d.info()
[docs] def platform(self): """ Return the name of the OpenMM platform being used for dynamics """ return self._d.platform()
[docs] def timestep(self): """ Return the timestep used for this simulation """ return self._d.timestep()
[docs] def current_step(self): """ Return the current number of completed steps of dynamics """ return self._d.current_step()
[docs] def current_time(self): """ Return the current amount of completed time of dynamics """ return self._d.current_time()
[docs] def elapsed_time(self): """ Return the total amount of elapsed time of dynamics. This will be the same as the current_time if this run started from time=0. Otherwise this will be the difference between the start time and the current time. """ return self._d.elapsed_time()
[docs] def walltime(self): """ Return the walltime (real actual runtime) of dynamics, i.e. how much real time has been consumed by the simulation """ return self._d.walltime()
[docs] def step_speed(self, time_unit=None): """ Return the speed of this simulation in number of steps per second """ if time_unit is None: from ..units import second time_unit = second nsteps = self.current_step() time = self.walltime().to(time_unit) if time < 0.000001: return 0 else: return nsteps / time
[docs] def time_speed(self, elapsed_unit=None, time_unit=None): """ Return the speed of this simulation in simulation time per real time (e.g. nanoseconds simulated per day) """ if elapsed_unit is None: from ..units import nanosecond elapsed_unit = nanosecond if time_unit is None: from ..units import day time_unit = day elapsed = self.elapsed_time().to(elapsed_unit) time = self.walltime().to(time_unit) if time < 0.000001: return 0 else: return elapsed / time
[docs] def current_space(self): """ Return the current space in which the simulation is being performed """ return self._d.current_space()
[docs] def current_energy(self): """ Return the current total energy """ return self._d.current_energy()
[docs] def current_potential_energy(self, lambda_values=None, rest2_scale_factors=None): """ Return the current potential energy. If `lambda_values` is passed (which should be a list of lambda values) then this will return the energies (as a list) at the requested lambda values If `rest2_scale_factors` is passed, then these will be used to scale the temperature of the REST2 region at each lambda value. """ if lambda_values is None: return self._d.current_potential_energy() else: if not isinstance(lambda_values, list): lambda_values = [lambda_values] if rest2_scale_factors is None: rest2_scale_factors = [1.0] * len(lambda_values) else: if not isinstance(rest2_scale_factors, list): rest2_scale_factors = [rest2_scale_factors] else: if len(rest2_scale_factors) != len(lambda_values): raise ValueError( "len(rest2_scale_factors) must be equal to len(lambda_values)" ) # save the current value of lambda so we # can restore it old_lambda = self.get_lambda() old_rest2_scale = self.get_rest2_scale() nrgs = [] try: for lambda_value, rest2_scale in zip( lambda_values, rest2_scale_factors ): self.set_lambda( lambda_value, rest2_scale=rest2_scale, update_constraints=False ) nrgs.append(self._d.current_potential_energy()) except Exception: self.set_lambda(old_lambda, old_rest2_scale, update_constraints=False) raise self.set_lambda( old_lambda, rest2_scale=old_rest2_scale, update_constraints=False ) return nrgs
[docs] def current_kinetic_energy(self): """ Return the current kinetic energy """ return self._d.current_kinetic_energy()
[docs] def energy_trajectory(self, to_pandas: bool = False, to_alchemlyb: bool = False): """ Return the energy trajectory. This is the trajectory of energy values that have been captured during dynamics. If 'to_pandas' is True, (the default) then this will be returned as a pandas dataframe, with times and energies in the defined default units """ t = self._d.energy_trajectory() if to_pandas or to_alchemlyb: return t.to_pandas( to_alchemlyb=to_alchemlyb, ) else: return t
[docs] def to_xml(self, f=None): """ Save the current state of the dynamics to XML. This is mostly used for debugging. This will return the XML string if 'f' is None. Otherwise it will write the XML to 'f' (either a filename, or a FILE object) """ return self._d.to_xml(f=f)
[docs] def commit(self, return_as_system: bool = False): """ Commit the dynamics and return the molecules after the simulation. Normally this will return the same view of as was used for construction. If `return_as_system` is True, then this will return a System object instead. """ if not self._d.is_null(): return self._d.commit(return_as_system=return_as_system) else: return None
def __call__( self, time, save_frequency=None, frame_frequency=None, energy_frequency=None, lambda_windows=None, save_velocities: bool = True, auto_fix_minimise: bool = True, ): """ Perform dynamics on the molecules. Parameters time: Time The amount of simulation time to run, e.g. dynamics.run(sr.u("5 ps")) would perform 5 picoseconds of dynamics. The number of steps is determined automatically based on the current timestep (e.g. if the timestep was 1 femtosecond, then 5 picoseconds would involve running 5000 steps) save_frequency: Time The amount of simulation time between saving frames (coordinates, velocities) and energies from the trajectory. The number of timesteps between saves will depend on the timestep. For example, if save_frequency was 0.1 picoseconds and the timestep was 2 femtoseconds, then the coordinates would be saved every 50 steps of dynamics. Note that `frame_frequency` or `energy_frequency` can be used to override the frequency of saving frames or energies, if you want them to be saved with different frequencies. Specifying both will mean that the value of `save_frequency` will be ignored. frame_frequency: Time The amount of simulation time between saving frames (coordinates, velocities) from the trajectory. The number of timesteps between saves will depend on the timestep. For example, if save_frequency was 0.1 picoseconds and the timestep was 2 femtoseconds, then the coordinates would be saved every 50 steps of dynamics. The energies will be saved into this object and are accessible via the `energy_trajectory` function. energy_frequency: Time The amount of simulation time between saving energies (kinetic and potential) from the trajectory. The number of timesteps between saves will depend on the timestep. For example, if save_frequency was 0.1 picoseconds and the timestep was 2 femtoseconds, then the coordinates would be saved every 50 steps of dynamics. The energies will be saved into this object and are accessible via the `energy_trajectory` function. lambda_windows: list[float] The values of lambda for which the potential energy will be evaluated at every save. If this is None (the default) then only the current energy will be saved every `energy_frequency` time. If this is not None, then the potential energy for each of the lambda values in this list will be saved. Note that we always save the potential energy of the simulated lambda value, even if it is not in the list of lambda windows. save_velocities: bool Whether or not to save the velocities when running dynamics. By default this is True. Set this to False if you aren't interested in saving the velocities. auto_fix_minimise: bool Whether or not to automatically run minimisation if the trajectory exits with an error in the first few steps. Such failures often indicate that the system needs minimsing. This automatically runs the minimisation in these cases, and then runs the requested dynamics. """ return self.run( time=time, save_frequency=save_frequency, frame_frequency=frame_frequency, energy_frequency=energy_frequency, lambda_windows=lambda_windows, save_velocities=save_velocities, auto_fix_minimise=auto_fix_minimise, ).commit()