"""
"""
from __future__ import annotations
import sys
import os
import time
import tempfile
import json
import numpy as np
from pathlib import Path
from typing import Any
#from monty.string import marquee, list_strings # is_string,
from monty.json import MontyEncoder
from monty.collections import dict2namedtuple
from ase.atoms import Atoms
from ase.calculators.abinit import Abinit, AbinitProfile
from ase.constraints import ExpCellFilter
from ase.stress import full_3x3_to_voigt_6_stress, voigt_6_to_full_3x3_stress
from abipy.core.abinit_units import eV_Ha, Ang_Bohr
from abipy.core.structure import Structure, StructDiff
from abipy.tools.iotools import workdir_with_prefix
from abipy.tools.context_managers import Timer
from abipy.dynamics.hist import HistFile
from abipy.flowtk import PseudoTable
from abipy.ml.aseml import print_atoms, get_atoms, CalcBuilder, ase_optimizer_cls, abisanitize_atoms, RX_MODE
[docs]
class RelaxationProfiler:
def __init__(self, atoms: Any, pseudos, corr_algo,algorithm, xc_name, kppa, relax_mode: str, fmax: float, mpi_nprocs,
steps=500, verbose: int = 0, optimizer="BFGS", nn_name="chgnet", mpi_runner="mpirun"):
"""
Args:
atoms: ASE atoms, pymatgen structure or file with structure.
pseudos: List of pseudopotentials with cutoff hints.
xc_name: String defining the XC functional e.g. LDA or GGA.
corr_algo: Correction algorithm.
kppa: K-point per atom used for sampling the BZ.
relax_mode: String definining the relaxation mode e.g. "ions" or "cell"
fmax: Tolerance for structural relaxation in eV/Ang.
mpi_nprocs: Number of MPI procs used to run Abinit
steps: Max number of relaxation steps.
verbose: Verbosity level.
optimizer: String defining the ASE optimizer or Optimizer instance.
nn_name: String specifying the ML potential e.g. "m3gnet" or "chgnet".
mpi_runner:
"""
self.algorithm = algorithm
print (f"Algorithm: {self.algorithm}")
atoms = get_atoms(atoms)
self.initial_atoms = atoms.copy()
self.corr_algo = corr_algo
self.xc_name = xc_name
self.relax_mode = relax_mode
RX_MODE.validate(self.relax_mode)
self.fmax = fmax
self.steps = steps
self.verbose = verbose
self.ase_opt_cls = ase_optimizer_cls(optimizer)
self.nn_name = nn_name
self.scalar_pressure = 0.0
# Get pseudos and ecut
structure = Structure.as_structure(atoms)
pseudos = PseudoTable.as_table(pseudos).get_pseudos_for_structure(structure)
pp_paths = [p.filepath for p in pseudos]
hints = [p.hint_for_accuracy("normal") for p in pseudos]
ecut = max(h.ecut for h in hints) * 27.3 # In ASE this is in eV (don't know why!)
#pawecutdg = max(h.pawecutdg for h in hints) if pseudos.allpaw else None
# Automatic K-point sampling.
import pymatgen.io.abinit.abiobjects as aobj
kmesh = aobj.KSampling.automatic_density(structure, kppa, chksymbreak=0).to_abivars()
#print("Using kmesh:", kmesh)
self.gs_kwargs = dict(
ecut=ecut,
# Smoothing PW cutoff energy (mandatory for cell optimization)
ecutsm=0.5 if self.relax_mode == RX_MODE.cell else 0,
tolvrs=1e-8,
expert_user=1, # Ignore warnings (chksymbreak, chksymtnons, chkdilatmx)
autoparal=1,
paral_kgb=1,
#npfft=1,
rmm_diis=1 if all(p.isnc for p in pseudos) else 0,
nstep=100,
prtwf=0,
pseudos=pp_paths,
**kmesh,
)
# Run fully ab-initio relaxation with abinit.
# TODO: Fix issue with ixc set by ASE.
self.relax_kwargs = dict(
ionmov=2,
#ionmov=22,
optcell=0 if self.relax_mode == RX_MODE.ions else 2,
tolmxf=self.fmax * eV_Ha * Ang_Bohr,
ntime=200,
)
self.relax_kwargs.update(**self.gs_kwargs)
argv = f"{mpi_runner} -n {mpi_nprocs} abinit".split()
self.abinit_profile = AbinitProfile(argv)
#def __str__(self):
# return self.to_string()
#def to_string(self, verbose=0) -> str:
def _mkfilter(self, atoms: Atoms):
"""Apply a filter to input atoms depending on the relaxation mode."""
if self.relax_mode == RX_MODE.ions:
return atoms
elif self.relax_mode == RX_MODE.cell:
return ExpCellFilter(atoms, scalar_pressure=self.scalar_pressure)
raise ValueError(f"Invalid value of {self.relax_mode=}")
[docs]
def ml_relax_opt(self, directory):
"""
Relax structure with ML potential only. Return ASE optimizer.
"""
print(f"\nBegin {self.nn_name} relaxation in {str(directory)}")
print("relax_mode:", self.relax_mode, "with fmax:", self.fmax)
directory.mkdir()
ml_calc = CalcBuilder(self.nn_name).get_calculator()
atoms = self.initial_atoms.copy()
atoms.calc = ml_calc
opt_kws = dict(
trajectory=str(directory / f"opt.traj"),
#logfile=str(directory / f"log"),
)
opt = self.ase_opt_cls(self._mkfilter(atoms), **opt_kws)
with Timer() as timer:
opt.run(fmax=self.fmax, steps=self.steps)
if not opt.converged():
raise RuntimeError("ml_relax_opt didn't converge!")
print('%s relaxation completed in %.2f sec after nsteps: %d\n' % (self.nn_name, timer.time, opt.nsteps))
return opt
[docs]
def abi_relax_atoms(self, directory, atoms=None, header="Begin ABINIT relaxation"):
"""
Relax structure with ABINIT. Return namedtuple with results.
"""
print(f"\n{header} in {str(directory)}")
print("relax_mode:", self.relax_mode, "with tolmxf:", self.relax_kwargs["tolmxf"])
if atoms is None:
atoms = self.initial_atoms.copy()
abinit = Abinit(profile=self.abinit_profile, directory=directory, **self.relax_kwargs)
atoms.calc = abinit
with Timer() as timer:
forces = atoms.get_forces()
with HistFile(abinit.directory / "abinito_HIST.nc") as hist:
nsteps = hist.num_steps
energy = float(hist.final_energy)
atoms = get_atoms(hist.final_structure)
print('ABINIT relaxation completed in %.2f sec after nsteps: %d\n' % (timer.time, nsteps))
return dict2namedtuple(
atoms=atoms,
fmax=np.sqrt((forces ** 2).sum(axis=1).max()),
nsteps=nsteps,
energy=energy,
)
[docs]
def abi_relax_atoms_with_ase(self, directory, header="Begin ABINIT+ASE relaxation"):
"""
Relax structure with ABINIT. Return ASE Optimizer
"""
print(f"\n{header} in {str(directory)}")
print("relax_mode:", self.relax_mode, "with tolmxf:", self.relax_kwargs["tolmxf"])
atoms = self.initial_atoms.copy()
abinit = Abinit(profile=self.abinit_profile, directory=directory, **self.gs_kwargs)
atoms.calc = abinit
opt = self.ase_opt_cls(self._mkfilter(atoms))
with Timer() as timer:
opt.run(fmax=self.fmax, steps=self.steps)
if not opt.converged():
raise RuntimeError("Abinit+ASE optimizer didn't converge!")
print('%s relaxation completed in %.2f sec after nsteps: %d\n' % (self.nn_name, timer.time, opt.nsteps))
return opt
[docs]
def abinit_run_gs_atoms(self, directory, atoms):
"""
Perform a GS calculation with ABINIT. Return namedtuple with results in ASE units.
"""
with Timer(header=f"\nBegin ABINIT GS in {str(directory)}", footer="ABINIT GS") as timer:
abinit = Abinit(profile=self.abinit_profile, directory=directory, **self.gs_kwargs)
forces = abinit.get_forces(atoms=atoms)
stress = abinit.get_stress(atoms=atoms)
energy = abinit.get_potential_energy(atoms=atoms)
print('ABINIT GS completed in %.2f sec\n' % (timer.time))
return dict2namedtuple(abinit=abinit, forces=forces,
stress=voigt_6_to_full_3x3_stress(stress),
fmax=np.sqrt((forces ** 2).sum(axis=1).max()),
energy=energy,
)
[docs]
def run(self, workdir=None, prefix=None):
"""
Run the different steps of the benchmark.
"""
workdir = workdir_with_prefix(workdir, prefix)
# Run relaxation with ML potential.
ml_opt = self.ml_relax_opt(workdir / "ml_relax")
# Run fully ab-initio relaxation with abinit.
abi_relax = self.abi_relax_atoms(workdir / "abinit_relax")
if False:
# Run relaxation with ASE optimizer and Abinit forces.
abiase_opt = self.abi_relax_atoms_with_ase(workdir / f"abiase_relax")
# Compare structures
diff = StructDiff(["INITIAL", "ABINIT_RELAX", self.nn_name + "_RELAX"],
[self.initial_atoms, abi_relax.atoms, ml_opt.atoms])
diff.tabulate()
# Run hybrid relaxation (ML + abinit)
ml_calc = CalcBuilder(self.nn_name).get_calculator()
ml_calc.set_correct_forces_algo(self.corr_algo)
ml_calc.set_correct_stress_algo(self.corr_algo)
print(f"\nBegin ABINIT + {self.nn_name} hybrid relaxation")
if self.xc_name == "PBE":
print(f"Starting from ML-optimized Atoms as {self.xc_name=}")
atoms = ml_opt.atoms.copy()
#AA
#atoms = abisanitize_atoms(atoms)
else:
print(f"Starting from initial Atoms as {self.xc_name=}")
atoms = self.initial_atoms.copy()
count, abiml_nsteps, ml_nsteps = 0, 0, 0
count_max = 150 #AA: just playing with for testing delta_force convergance
t_start = time.time()
while count <= count_max:
count += 1
# Compute ab-initio forces/stress.
directory = workdir / f"abiml_gs_count_{count}"
gs = self.abinit_run_gs_atoms(directory, atoms)
abiml_nsteps += 1
print("Iteration:", count, "abi_fmax:", gs.fmax, ", fmax:", self.fmax)
print("abinit_stress_voigt", gs.stress_voigt)
#print_atoms(atoms, cart_forces=gs.forces)
print("abinit_energy", gs.energy )
# Store ab-initio forces/stresses in the ML calculator and attach it to atoms.
ml_calc.store_abi_forstr_atoms(gs.forces, gs.stress, atoms)
atoms.calc = ml_calc
opt_kws = dict(
trajectory=str(gs.abinit.directory / f"opt.traj"),
#logfile=str(abinit.directory / f"log_{count}"),
)
opt = self.ase_opt_cls(self._mkfilter(atoms), **opt_kws)
opt.run(fmax=self.fmax, steps=self.steps)
opt_converged = opt.converged()
ml_nsteps += opt.nsteps
atoms = opt.atoms.copy()
final_mlabi_relax = None
if self.algorithm =='old': do_final_relax = opt_converged and opt.nsteps <= 1
if self.algorithm == 'one-GS': do_final_relax = opt_converged
if do_final_relax:
# Sanitize atoms at each step to avoid possibile issues when relaxing with Abinit.
#atoms = abisanitize_atoms(opt.atoms.copy())
final_mlabi_relax = self.abi_relax_atoms(directory=workdir / "abiml_final_relax",
atoms=atoms,
header="Performing final structural relaxation with ABINIT",
)
abiml_nsteps += final_mlabi_relax.nsteps
break
print(f'ABINIT + {self.nn_name} relaxation completed in {time.time() - t_start :.2f} sec\n')
#print_atoms(atoms, title="Atoms after ABINIT + ML relaxation:")
diff = StructDiff(["INITIAL", self.nn_name + "_RELAX", "ABINIT_RELAX", "ABI_ML"],
[self.initial_atoms, ml_opt.atoms, abi_relax.atoms, final_mlabi_relax.atoms])
diff.tabulate()
print("With correction algo:", self.corr_algo)
print(f"GS steps to relax in pure ML-mode {ml_nsteps=}")
print(f"GS steps to relax in pure ABINIT mode {abi_relax.nsteps=}")
print(f"GS steps to relax in ABI + ML mode {abiml_nsteps=}")
print(f"Single point calculations performed in ABI + ML mode {count=}")
# Write json file with output results.
with open(workdir / "data.json", "wt") as fh:
data = dict(
corr_algo=self.corr_algo,
xc_name=self.xc_name,
gs_kwargs=self.gs_kwargs,
relax_kwargs=self.relax_kwargs,
ml_nsteps=ml_nsteps,
abiml_nsteps=abiml_nsteps,
abi_nsteps=abi_relax.nsteps,
ml_relaxed_structure=Structure.as_structure(ml_opt.atoms),
abi_relaxed_structure=Structure.as_structure(abi_relax.atoms),
abiml_relaxed_structure=Structure.as_structure(final_mlabi_relax.atoms),
)
json.dump(data, fh, indent=4, cls=MontyEncoder)
if __name__ == "__main__":
# Get pseudos
from abipy.flowtk.psrepos import get_oncvpsp_pseudos
xc_name = "PBE"
pseudos = get_oncvpsp_pseudos(xc_name=xc_name, version="0.4")
from ase.build import bulk
atoms = bulk('Si')
atoms.rattle(stdev=0.1, seed=42)
kppa = 200
corr_algo = CORRALGO.from_string("delta")
algorithm = "öld"
prof = RelaxationProfiler(atoms, pseudos, corr_algo, algorithm, xc_name, kppa,
relax_mode="ions", fmax=0.001, mpi_nprocs=2, verbose=1)
prof.run()