Source code for mythos.simulators.oxdna.utils

"""Utilities for the oxDNA simulator."""

import datetime
import functools
import operator
from pathlib import Path

import jax
import jax.numpy as jnp
import pandas as pd
import sympy

import mythos.utils.types as jd_types
from mythos.input import oxdna_input, topology, trajectory
from mythos.input.trajectory import Trajectory
from mythos.utils.types import PathOrStr, oxDNAFormat, oxDNAModelHType

ERR_CANNOT_PROCESS_SRC_H = "Cannot process src/model.h file. Failed parsing: {}"
ERR_INVALID_HEADER_TYPE = "Invalid header value variable {} with value {}"

SYMPY_EVAL_N: int = 32

DEFAULT_OXDNA_VARIABLE_MAPPER = {
    # fene
    "eps_backbone": "FENE_EPS",
    "delta_backbone": "FENE_DELTA",
    "r0_backbone": "FENE_R0_OXDNA",
    # excluded_volume
    "eps_exc": "EXCL_EPS",
    "sigma_backbone": "EXCL_S1",
    "sigma_base": "EXCL_S2",
    "sigma_back_base": "EXCL_S3",
    "sigma_base_back": "EXCL_S4",
    "dr_star_backbone": "EXCL_R1",
    "dr_star_base": "EXCL_R2",
    "dr_star_back_base": "EXCL_R3",
    "dr_star_base_back": "EXCL_R4",
    "b_backbone": "EXCL_B1",
    "b_base": "EXCL_B2",
    "b_back_base": "EXCL_B3",
    "b_base_back": "EXCL_B4",
    "dr_c_backbone": "EXCL_RC1",
    "dr_c_base": "EXCL_RC2",
    "dr_c_back_base": "EXCL_RC3",
    "dr_c_base_back": "EXCL_RC4",
    # stacking
    # func f1(dr_stack)
    "eps_stack_base": "STCK_BASE_EPS_OXDNA",
    "eps_stack_kt_coeff": "STCK_FACT_EPS_OXDNA",
    "a_stack": "STCK_A",
    "dr0_stack": "STCK_R0",
    "dr_c_stack": "STCK_RC",
    "dr_low_stack": "STCK_RLOW",
    "dr_high_stack": "STCK_RHIGH",
    "b_low_stack": "STCK_BLOW",
    "b_high_stack": "STCK_BHIGH",
    "dr_c_low_stack": "STCK_RCLOW",
    "dr_c_high_stack": "STCK_RCHIGH",
    # func f4(theta_4)
    "a_stack_4": "STCK_THETA4_A",
    "theta0_stack_4": "STCK_THETA4_T0",
    "delta_theta_star_stack_4": "STCK_THETA4_TS",
    "b_stack_4": "STCK_THETA4_B",
    "delta_theta_stack_4_c": "STCK_THETA4_TC",
    # func f4(theta_5p)
    "a_stack_5": "STCK_THETA5_A",
    "theta0_stack_5": "STCK_THETA5_T0",
    "delta_theta_star_stack_5": "STCK_THETA5_TS",
    "b_stack_5": "STCK_THETA5_B",
    "delta_theta_stack_5_c": "STCK_THETA5_TC",
    # func f4(theta_6p)
    "a_stack_6": "STCK_THETA6_A",
    "theta0_stack_6": "STCK_THETA6_T0",
    "delta_theta_star_stack_6": "STCK_THETA6_TS",
    "b_stack_6": "STCK_THETA6_B",
    "delta_theta_stack_6_c": "STCK_THETA6_TC",
    # func f5(-cos(phi1))
    "a_stack_1": "STCK_PHI1_A",
    "neg_cos_phi1_star_stack": "STCK_PHI1_XS",
    "b_neg_cos_phi1_stack": "STCK_PHI1_B",
    "neg_cos_phi1_c_stack": "STCK_PHI1_XC",
    # func f5(-cos(phi2))
    "a_stack_2": "STCK_PHI2_A",
    "neg_cos_phi2_star_stack": "STCK_PHI2_XS",
    "b_neg_cos_phi2_stack": "STCK_PHI2_B",
    "neg_cos_phi2_c_stack": "STCK_PHI2_XC",
    # hydrogen_bonding
    # func f1(dr_hb)
    "eps_hb": "HYDR_EPS_OXDNA",
    "a_hb": "HYDR_A",
    "dr0_hb": "HYDR_R0",
    "dr_c_hb": "HYDR_RC",
    "dr_low_hb": "HYDR_RLOW",
    "dr_high_hb": "HYDR_RHIGH",
    "b_low_hb": "HYDR_BLOW",
    "dr_c_low_hb": "HYDR_RCLOW",
    "b_high_hb": "HYDR_BHIGH",
    "dr_c_high_hb": "HYDR_RCHIGH",
    # func f4(theta_1)
    "a_hb_1": "HYDR_THETA1_A",
    "theta0_hb_1": "HYDR_THETA1_T0",
    "delta_theta_star_hb_1": "HYDR_THETA1_TS",
    "b_hb_1": "HYDR_THETA1_B",
    "delta_theta_hb_1_c": "HYDR_THETA1_TC",
    # func f4(theta_2)
    "a_hb_2": "HYDR_THETA2_A",
    "theta0_hb_2": "HYDR_THETA2_T0",
    "delta_theta_star_hb_2": "HYDR_THETA2_TS",
    "b_hb_2": "HYDR_THETA2_B",
    "delta_theta_hb_2_c": "HYDR_THETA2_TC",
    # func f4(theta_3)
    "a_hb_3": "HYDR_THETA3_A",
    "theta0_hb_3": "HYDR_THETA3_T0",
    "delta_theta_star_hb_3": "HYDR_THETA3_TS",
    "b_hb_3": "HYDR_THETA3_B",
    "delta_theta_hb_3_c": "HYDR_THETA3_TC",
    # func f4(theta_4)
    "a_hb_4": "HYDR_THETA4_A",
    "theta0_hb_4": "HYDR_THETA4_T0",
    "delta_theta_star_hb_4": "HYDR_THETA4_TS",
    "b_hb_4": "HYDR_THETA4_B",
    "delta_theta_hb_4_c": "HYDR_THETA4_TC",
    # func f4(theta_7)
    "a_hb_7": "HYDR_THETA7_A",
    "theta0_hb_7": "HYDR_THETA7_T0",
    "delta_theta_star_hb_7": "HYDR_THETA7_TS",
    "b_hb_7": "HYDR_THETA7_B",
    "delta_theta_hb_7_c": "HYDR_THETA7_TC",
    # func f4(theta_8)
    "a_hb_8": "HYDR_THETA8_A",
    "theta0_hb_8": "HYDR_THETA8_T0",
    "delta_theta_star_hb_8": "HYDR_THETA8_TS",
    "b_hb_8": "HYDR_THETA8_B",
    "delta_theta_hb_8_c": "HYDR_THETA8_TC",
    # cross_stacking
    # func f2(dr_cross)
    "k_cross": "CRST_K",
    "r0_cross": "CRST_R0",
    "dr_c_cross": "CRST_RC",
    "dr_low_cross": "CRST_RLOW",
    "dr_high_cross": "CRST_RHIGH",
    "b_low_cross": "CRST_BLOW",
    "dr_c_low_cross": "CRST_RCLOW",
    "b_high_cross": "CRST_BHIGH",
    "dr_c_high_cross": "CRST_RCHIGH",
    # func f4(theta_1)
    "a_cross_1": "CRST_THETA1_A",
    "theta0_cross_1": "CRST_THETA1_T0",
    "delta_theta_star_cross_1": "CRST_THETA1_TS",
    "b_cross_1": "CRST_THETA1_B",
    "delta_theta_cross_1_c": "CRST_THETA1_TC",
    # func f4(theta_2)
    "a_cross_2": "CRST_THETA2_A",
    "theta0_cross_2": "CRST_THETA2_T0",
    "delta_theta_star_cross_2": "CRST_THETA2_TS",
    "b_cross_2": "CRST_THETA2_B",
    "delta_theta_cross_2_c": "CRST_THETA2_TC",
    # func f4(theta_3)
    "a_cross_3": "CRST_THETA3_A",
    "theta0_cross_3": "CRST_THETA3_T0",
    "delta_theta_star_cross_3": "CRST_THETA3_TS",
    "b_cross_3": "CRST_THETA3_B",
    "delta_theta_cross_3_c": "CRST_THETA3_TC",
    # func f4(theta_4) + f4(pi - theta_4)
    "a_cross_4": "CRST_THETA4_A",
    "theta0_cross_4": "CRST_THETA4_T0",
    "delta_theta_star_cross_4": "CRST_THETA4_TS",
    "b_cross_4": "CRST_THETA4_B",
    "delta_theta_cross_4_c": "CRST_THETA4_TC",
    # func f4(theta_7) + f4(pi - theta_7)
    "a_cross_7": "CRST_THETA7_A",
    "theta0_cross_7": "CRST_THETA7_T0",
    "delta_theta_star_cross_7": "CRST_THETA7_TS",
    "b_cross_7": "CRST_THETA7_B",
    "delta_theta_cross_7_c": "CRST_THETA7_TC",
    # func f4(theta_8) + f4(pi - theta_8)
    "a_cross_8": "CRST_THETA8_A",
    "theta0_cross_8": "CRST_THETA8_T0",
    "delta_theta_star_cross_8": "CRST_THETA8_TS",
    "b_cross_8": "CRST_THETA8_B",
    "delta_theta_cross_8_c": "CRST_THETA8_TC",
    # coaxial_stacking
    # func f2(dr_coax)
    "k_coax": "CXST_K_OXDNA",
    "dr0_coax": "CXST_R0",
    "dr_c_coax": "CXST_RC",
    "dr_low_coax": "CXST_RLOW",
    "dr_high_coax": "CXST_RHIGH",
    "b_low_coax": "CXST_BLOW",
    "dr_c_low_coax": "CXST_RCLOW",
    "b_high_coax": "CXST_BHIGH",
    "dr_c_high_coax": "CXST_RCHIGH",
    # func f4(theta_1) + f4(2*pi - theta_1)
    "a_coax_1": "CXST_THETA1_A",
    "theta0_coax_1": "CXST_THETA1_T0_OXDNA",
    "delta_theta_star_coax_1": "CXST_THETA1_TS",
    "b_coax_1": "CXST_THETA1_B",
    "delta_theta_coax_1_c": "CXST_THETA1_TC",
    # func f4(theta_4)
    "a_coax_4": "CXST_THETA4_A",
    "theta0_coax_4": "CXST_THETA4_T0",
    "delta_theta_star_coax_4": "CXST_THETA4_TS",
    "b_coax_4": "CXST_THETA4_B",
    "delta_theta_coax_4_c": "CXST_THETA4_TC",
    # func f4(theta_5) + f4(pi - theta_5)
    "a_coax_5": "CXST_THETA5_A",
    "theta0_coax_5": "CXST_THETA5_T0",
    "delta_theta_star_coax_5": "CXST_THETA5_TS",
    "b_coax_5": "CXST_THETA5_B",
    "delta_theta_coax_5_c": "CXST_THETA5_TC",
    # func f4(theta_6) + f4(pi - theta_6)
    "a_coax_6": "CXST_THETA6_A",
    "theta0_coax_6": "CXST_THETA6_T0",
    "delta_theta_star_coax_6": "CXST_THETA6_TS",
    "b_coax_6": "CXST_THETA6_B",
    "delta_theta_coax_6_c": "CXST_THETA6_TC",
    # func f5(cos(phi3))
    "a_coax_3p": "CXST_PHI3_A",
    "cos_phi3_star_coax": "CXST_PHI3_XS",
    "b_cos_phi3_coax": "CXST_PHI3_B",
    "cos_phi3_c_coax": "CXST_PHI3_XC",
    # func f5(cos(phi4))
    "a_coax_4p": "CXST_PHI4_A",
    "cos_phi4_star_coax": "CXST_PHI4_XS",
    "b_cos_phi4_coax": "CXST_PHI4_B",
    "cos_phi4_c_coax": "CXST_PHI4_XC",
    # oxdna2 specific coaxial stacking params
    "a_coax_1_f6": "CXST_THETA1_SA",
    "b_coax_1_f6": "CXST_THETA1_SB",
}

MIN_VALID_HEADER_TOKEN_COUNT = 3


[docs] def _parse_value_in(value: str) -> int | float | str: for t in (int, float): try: if t is float: tmp_value = value.replace("f", "").lower() parsed = float(sympy.parse_expr(tmp_value).evalf(n=SYMPY_EVAL_N)) else: parsed = t(value) except (AttributeError, ValueError, SyntaxError, TypeError): continue else: return parsed return value
[docs] def _parse_value_out(value: int | float | str) -> str: # noqa: PYI041 -- this is documentation specific if isinstance(value, int) or (isinstance(value, jax.Array) and (jnp.issubdtype(value.dtype, jnp.integer))): parsed_value = str(value) elif isinstance(value, float) or (isinstance(value, jax.Array) and (jnp.issubdtype(value.dtype, jnp.floating))): parsed_value = f"{value}f" elif isinstance(value, str): parsed_value = value else: raise TypeError(ERR_INVALID_HEADER_TYPE.format(type(value), value)) return parsed_value
[docs] def read_src_h(src_h: Path) -> dict[str, int | float | str]: """Parse the src/model.h file to get the parameters.""" params = {} with src_h.open("r") as f: for line in f: # this is a variable if line.startswith("#define") and "MODEL_H_" not in line: # We need to parse lines of the following varieties: # #define POS_BACK -0.4f # #define HYDR_F1 0 # #define HYDR_THETA8_T0 (PI*0.5f) # #define HYDR_T3_MESH_POINTS HYDR_T2_MESH_POINTS # #define CXST_T5_MESH_POINTS 6 // perfetto parts = line.split() if ( len(parts) >= MIN_VALID_HEADER_TOKEN_COUNT and (var_value := _parse_value_in(" ".join(parts[2:]).split("//")[0].strip())) is not None ): params[parts[1]] = var_value else: raise ValueError(ERR_CANNOT_PROCESS_SRC_H.format(line)) return params
[docs] def write_src_h(src_h: Path, params: dict[str, tuple[oxDNAModelHType, int | float | str]]) -> None: """Write the src/model.h file with the new parameters.""" with src_h.open("w") as f: f.write( "\n".join( [ "/**", " * @file model.h", f" * @date {datetime.datetime.now(tz=datetime.UTC).strftime('%b %d, %Y')}", " * @author fromano -- modified by mythos", " */", "", "#ifndef MODEL_H_", "#define MODEL_H_\n", ] ) ) for key, value in params.items(): try: parsed_value = _parse_value_out(value) except ValueError as e: raise ValueError(ERR_INVALID_HEADER_TYPE.format(key, value)) from e f.write(f"#define {key} {parsed_value}\n") if key == "FENE_DELTA": f.write(f"#define FENE_DELTA2 {value**2}f\n") f.write("#endif /* MODEL_H_ */\n")
[docs] def update_params(src_h: Path, new_params: jd_types.Params | list[jd_types.Params]) -> None: """Update the src/model.h file with the new parameters.""" params = read_src_h(src_h) flattened_params = functools.reduce(operator.or_, new_params, {}) if isinstance(new_params, list) else new_params # it is probably unintentional that this is called with no valid parameters # or should it be that all updates are required? if set(flattened_params).isdisjoint(DEFAULT_OXDNA_VARIABLE_MAPPER): raise ValueError("No valid oxDNA parameters found to update in src/model.h") for np in filter(lambda k: k in DEFAULT_OXDNA_VARIABLE_MAPPER, flattened_params): mapped_name = DEFAULT_OXDNA_VARIABLE_MAPPER[np] if mapped_name in params: params[mapped_name] = flattened_params[np] # Convention is that oxdna2 parameters have corresponding oxdna # labeled parameter when they match the same energy function. oxdna2_name = mapped_name.replace("OXDNA", "OXDNA2") if "OXDNA" in mapped_name and oxdna2_name in params: params[oxdna2_name] = flattened_params[np] # special case for oxdna2 specific coaxial stacking param # CXST_THETA1_SA. oxdna standalone assumes it is pre-devided by 2, # whereas this package does not. if mapped_name == "CXST_THETA1_SA": params[mapped_name] = flattened_params[np] / 2 else: raise ValueError(f"Parameter {np} not found in src/model.h") write_src_h(src_h, params)
[docs] def _get_order_parameter_names(op_file: Path) -> list[str]: with op_file.open("r") as f: return [ line.strip().split("=")[1].strip() for line in f if line.strip().startswith("order_parameter") ]
[docs] def read_energy(simulation_dir: Path) -> pd.DataFrame: """Read the energy.dat file from an oxDNA simulation. Args: simulation_dir: Path to the simulation directory containing the energy.dat and other simulation files. this directory must also contain the input file, which is read to determine the shape of the energy file. Returns: A pandas DataFrame containing the energy data. When umbrella sampling is enabled, the order parameter columns and weight column are also included, where the order parameter name comes from the "order_parameter" specification in the "op_file". """ inputs = oxdna_input.read(simulation_dir / "input") energy_file = simulation_dir / inputs["energy_file"] energy_df_columns_base = [ "time", "potential_energy", "acc_ratio_trans", "acc_ratio_rot", "acc_ratio_vol" ] # This is a space separated file, no header and the first row corresponds to # the 0th step, which does not match the trajectory file, so we skip it. The # resulting energy file should then be the same length as the trajectory and # each row corresponsds to the same step. energy_df = pd.read_table(energy_file, sep=r"\s+", header=None, skiprows=1) if not inputs.get("umbrella_sampling"): energy_df.columns = energy_df_columns_base return energy_df order_param_types = _get_order_parameter_names(simulation_dir / inputs["op_file"]) energy_df_columns = energy_df_columns_base + order_param_types + ["weight"] energy_df.columns = energy_df_columns return energy_df
[docs] def read_output_trajectory(input_file: PathOrStr) -> Trajectory: """Read trajectory from an oxDNA input directory. This is a convenience function that reads the topology and trajectory files from an oxDNA input directory to determine the correct read format. Args: input_file (PathOrStr): path to the oxDNA input file. """ input_dict = oxdna_input.read(Path(input_file)) oxdna_dir = Path(input_file).parent top, fmt = topology.from_oxdna_file(oxdna_dir / input_dict["topology"], return_format=True) return trajectory.from_file( oxdna_dir / input_dict["trajectory_file"], top.strand_counts, is_5p_3p=(fmt == oxDNAFormat.NEW), )
[docs] def read_last_hist(simulation_dir: Path) -> pd.DataFrame: """Read the last histogram from an oxDNA simulation with umbrella sampling. Args: simulation_dir: Path to the simulation directory containing the histograms and other simulation files. this directory must also contain the input file, which is read to determine the shape of the histogram file. Returns: A pandas DataFrame containing the last histogram data. The columns correspond to the order parameters specified in the "op_file", as well as the counts and unbiased counts, and any extrapolation temperatures specified in the input file's extrapolate_hist parameter. """ inputs = oxdna_input.read(simulation_dir / "input") hist_file = simulation_dir / inputs["last_hist_file"] extrap_temps = inputs.get("extrapolate_hist", "").split(",") order_param_types = _get_order_parameter_names(simulation_dir / inputs["op_file"]) # set the histogram dataframe columns hist_df_columns = [*order_param_types, "count", "unbiased_count", *extrap_temps] return pd.read_table(hist_file, sep=r"\s+", header=None, skiprows=1, names=hist_df_columns)