Coverage for ase / optimize / optimize.py: 92.99%
214 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-04 10:20 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-04 10:20 +0000
1# fmt: off
3"""Structure optimization. """
4import time
5import warnings
6from collections.abc import Callable
7from contextlib import contextmanager
8from functools import cached_property
9from os.path import isfile
10from pathlib import Path
11from typing import IO, Any
13from ase import Atoms
14from ase.calculators.calculator import PropertyNotImplementedError
15from ase.filters import UnitCellFilter
16from ase.parallel import world
17from ase.utils import IOContext
18from ase.utils.abc import Optimizable
20DEFAULT_MAX_STEPS = 100_000_000
23class RestartError(RuntimeError):
24 pass
27class OptimizableAtoms(Optimizable):
28 def __init__(self, atoms):
29 self.atoms = atoms
31 def get_x(self):
32 return self.atoms.get_positions().ravel()
34 def set_x(self, x):
35 self.atoms.set_positions(x.reshape(-1, 3))
37 def get_gradient(self):
38 return -self.atoms.get_forces().ravel()
40 @cached_property
41 def _use_force_consistent_energy(self):
42 # This boolean is in principle invalidated if the
43 # calculator changes. This can lead to weird things
44 # in multi-step optimizations.
45 try:
46 self.atoms.get_potential_energy(force_consistent=True)
47 except PropertyNotImplementedError:
48 # warnings.warn(
49 # 'Could not get force consistent energy (\'free_energy\'). '
50 # 'Please make sure calculator provides \'free_energy\', even '
51 # 'if equal to the ordinary energy. '
52 # 'This will raise an error in future versions of ASE.',
53 # FutureWarning)
54 return False
55 else:
56 return True
58 def get_value(self):
59 force_consistent = self._use_force_consistent_energy
60 return self.atoms.get_potential_energy(
61 force_consistent=force_consistent)
63 def iterimages(self):
64 # XXX document purpose of iterimages
65 return self.atoms.iterimages()
67 def ndofs(self):
68 return 3 * len(self.atoms)
71class Log:
72 def __init__(self, logfile, comm):
73 self._logfile = logfile
74 self._comm = comm
76 def write(self, msg):
77 if self._logfile is None or self._comm.rank != 0:
78 return
80 if hasattr(self._logfile, 'write'):
81 self._logfile.write(msg)
82 return
84 if self._logfile == '-':
85 print(msg, end='')
86 return
88 with open(self._logfile, mode='a') as fd:
89 fd.write(msg)
91 def flush(self):
92 warnings.warn('Log flushes by default now. Please do not call '
93 'flush(). If you want a different flushing behaviour '
94 'please open logfile yourself and choose '
95 'buffering mode as appropriate. '
96 'This flush() method currently does nothing.',
97 FutureWarning)
100class BaseDynamics(IOContext):
101 """Common superclass for optimization and MD.
103 This class implements logfile, trajectory, and observer handling.
104 It should do as little as possible other than that.
106 We should try to remove the "atoms" parameter from this class,
107 since Optimizers only require Optimizables.
108 """
109 def __init__(
110 self,
111 atoms: Atoms,
112 logfile: IO | Path | str | None = None,
113 trajectory: str | Path | None = None,
114 append_trajectory: bool = False,
115 master: bool | None = None,
116 comm=world,
117 *,
118 loginterval: int = 1,
119 ):
120 self.atoms = atoms
121 self.logfile = Log(logfile, comm=comm)
122 self.observers: list[tuple[Callable, int, tuple, dict[str, Any]]] = []
123 self.nsteps = 0
124 self.max_steps = 0 # to be updated in run or irun
125 self.comm = comm
127 self._orig_trajectory = trajectory
128 self._master = master
130 if trajectory is None or hasattr(trajectory, 'write'):
131 # 'write' attribute is meant to check for a Trajectory
132 # (could be BundleTrajectory).
133 # There is no unified way to check for Trajectory.
134 # In this case the trajectory is already open so we do not
135 # open it.
136 pass
137 else:
138 trajectory = Path(trajectory)
139 if comm.rank == 0 and not append_trajectory:
140 trajectory.unlink(missing_ok=True)
142 if trajectory is not None:
143 self.attach(self._traj_write_image, interval=loginterval,
144 description={'interval': loginterval})
146 self.trajectory = trajectory
148 def todict(self) -> dict[str, Any]:
149 raise NotImplementedError
151 @contextmanager
152 def _opentraj(self):
153 from ase.io.trajectory import Trajectory
154 assert self._orig_trajectory is not None
156 if hasattr(self._orig_trajectory, 'write'):
157 # Already an open trajectory, we are not responsible for open/close
158 yield self._orig_trajectory
159 return
161 with Trajectory(
162 self._orig_trajectory,
163 master=self._master, mode='a', comm=self.comm,
164 ) as traj:
165 yield traj
167 def _traj_write_image(self, description: dict):
168 with self._opentraj() as traj:
169 # The description is only written when we write a header,
170 # and we probably only write the header when we need to
171 # (in append mode) but I am not sure about that.
172 traj.set_description(self.todict() | description)
173 traj.write(self.atoms.__ase_optimizable__())
175 def _traj_is_empty(self):
176 with self._opentraj() as traj:
177 return len(traj) == 0
179 def _get_gradient(self, forces=None):
180 if forces is not None:
181 warnings.warn('Please do not pass forces to step(). '
182 'This argument will be removed in '
183 'ase 3.28.0.')
184 return -forces.ravel()
185 return self.optimizable.get_gradient()
187 def get_number_of_steps(self):
188 return self.nsteps
190 def insert_observer(
191 self, function, position=0, interval=1, *args, **kwargs
192 ):
193 """Insert an observer.
195 This can be used for pre-processing before logging and dumping.
197 Examples
198 --------
199 >>> from ase.build import bulk
200 >>> from ase.calculators.emt import EMT
201 >>> from ase.optimize import BFGS
202 ...
203 ...
204 >>> def update_info(atoms, opt):
205 ... atoms.info["nsteps"] = opt.nsteps
206 ...
207 ...
208 >>> atoms = bulk("Cu", cubic=True) * 2
209 >>> atoms.rattle()
210 >>> atoms.calc = EMT()
211 >>> with BFGS(atoms, logfile=None, trajectory="opt.traj") as opt:
212 ... opt.insert_observer(update_info, atoms=atoms, opt=opt)
213 ... opt.run(fmax=0.05, steps=10)
214 True
215 """
216 if not isinstance(function, Callable):
217 function = function.write
218 self.observers.insert(position, (function, interval, args, kwargs))
220 def attach(self, function, interval=1, *args, **kwargs):
221 """Attach callback function.
223 If *interval > 0*, at every *interval* steps, call *function* with
224 arguments *args* and keyword arguments *kwargs*.
226 If *interval <= 0*, after step *interval*, call *function* with
227 arguments *args* and keyword arguments *kwargs*. This is
228 currently zero indexed."""
230 if not isinstance(function, Callable):
231 function = function.write
232 self.observers.append((function, interval, args, kwargs))
234 def call_observers(self):
235 for function, interval, args, kwargs in self.observers:
236 call = False
237 # Call every interval iterations
238 if interval > 0:
239 if (self.nsteps % interval) == 0:
240 call = True
241 # Call only on iteration interval
242 elif interval <= 0:
243 if self.nsteps == abs(interval):
244 call = True
245 if call:
246 function(*args, **kwargs)
249class Dynamics(BaseDynamics):
250 """Historical base-class for MD and structure optimization classes.
252 This inheritance level can probably be eliminated by merging it with
253 Optimizer, since by now, the only significant thing it provides
254 is the implementation of irun() suitable for optimizations.
256 This class is an implementation detail and may change or be removed.
257 """
259 def __init__(self, atoms: Atoms, *args, **kwargs):
260 """Dynamics object.
262 Parameters
263 ----------
264 atoms : Atoms object
265 The Atoms object to operate on.
267 logfile : file object, Path, or str
268 If *logfile* is a string, a file with that name will be opened.
269 Use '-' for stdout.
271 trajectory : Trajectory object, str, or Path
272 Attach a trajectory object. If *trajectory* is a string/Path, a
273 Trajectory will be constructed. Use *None* for no trajectory.
275 append_trajectory : bool
276 Defaults to False, which causes the trajectory file to be
277 overwriten each time the dynamics is restarted from scratch.
278 If True, the new structures are appended to the trajectory
279 file instead.
281 master : bool
282 Defaults to None, which causes only rank 0 to save files. If set to
283 true, this rank will save files.
285 comm : Communicator object
286 Communicator to handle parallel file reading and writing.
288 loginterval : int, default: 1
289 Only write a log line for every *loginterval* time steps.
290 """
291 super().__init__(atoms, *args, **kwargs)
292 self.atoms = atoms
293 self.optimizable = atoms.__ase_optimizable__()
295 def irun(self, steps=DEFAULT_MAX_STEPS):
296 """Run dynamics algorithm as generator.
298 Parameters
299 ----------
300 steps : int, default=DEFAULT_MAX_STEPS
301 Number of dynamics steps to be run.
303 Yields
304 ------
305 converged : bool
306 True if the forces on atoms are converged.
308 Examples
309 --------
310 This method allows, e.g., to run two optimizers or MD thermostats at
311 the same time.
312 >>> opt1 = BFGS(atoms)
313 >>> opt2 = BFGS(StrainFilter(atoms)).irun()
314 >>> for _ in opt2:
315 ... opt1.run()
316 """
318 # update the maximum number of steps
319 self.max_steps = self.nsteps + steps
321 # compute the initial step
322 gradient = self.optimizable.get_gradient()
324 # log the initial step
325 if self.nsteps == 0:
326 self.log(gradient)
328 # we write a trajectory file if it is None
329 if self.trajectory is None:
330 self.call_observers()
331 # We do not write on restart w/ an existing trajectory file
332 # present. This duplicates the same entry twice
333 elif self._traj_is_empty():
334 self.call_observers()
336 # check convergence
337 gradient = self.optimizable.get_gradient()
338 is_converged = self.converged(gradient)
339 yield is_converged
341 # run the algorithm until converged or max_steps reached
342 while not is_converged and self.nsteps < self.max_steps:
343 # compute the next step
344 self.step()
345 self.nsteps += 1
347 # log the step
348 gradient = self.optimizable.get_gradient()
349 self.log(gradient)
350 self.call_observers()
352 # check convergence
353 gradient = self.optimizable.get_gradient()
354 is_converged = self.converged(gradient)
355 yield is_converged
357 def run(self, steps=DEFAULT_MAX_STEPS):
358 """Run dynamics algorithm.
360 This method will return when the forces on all individual
361 atoms are less than *fmax* or when the number of steps exceeds
362 *steps*.
364 Parameters
365 ----------
366 steps : int, default=DEFAULT_MAX_STEPS
367 Number of dynamics steps to be run.
369 Returns
370 -------
371 converged : bool
372 True if the forces on atoms are converged.
373 """
375 for converged in Dynamics.irun(self, steps=steps):
376 pass
377 return converged
379 def converged(self, gradient):
380 """" a dummy function as placeholder for a real criterion, e.g. in
381 Optimizer """
382 return False
384 def log(self, *args, **kwargs):
385 """ a dummy function as placeholder for a real logger, e.g. in
386 Optimizer """
387 return True
389 def step(self):
390 """this needs to be implemented by subclasses"""
391 raise RuntimeError("step not implemented.")
394class Optimizer(Dynamics):
395 """Base-class for all structure optimization classes."""
397 # default maxstep for all optimizers
398 defaults = {'maxstep': 0.2}
399 _deprecated = object()
401 def __init__(
402 self,
403 atoms: Atoms,
404 restart: str | Path | None = None,
405 logfile: IO | str | Path | None = None,
406 trajectory: str | Path | None = None,
407 append_trajectory: bool = False,
408 **kwargs,
409 ):
410 """
412 Parameters
413 ----------
414 atoms: :class:`~ase.Atoms`
415 The Atoms object to relax.
417 restart: str | Path | None
418 Filename for restart file. Default value is *None*.
420 logfile: file object, Path, or str
421 If *logfile* is a string, a file with that name will be opened.
422 Use '-' for stdout.
424 trajectory: Trajectory object, Path, or str
425 Attach trajectory object. If *trajectory* is a string a
426 Trajectory will be constructed. Use *None* for no
427 trajectory.
429 append_trajectory: bool
430 Appended to the trajectory file instead of overwriting it.
432 kwargs : dict, optional
433 Extra arguments passed to :class:`~ase.optimize.optimize.Dynamics`.
435 """
436 super().__init__(
437 atoms=atoms,
438 logfile=logfile,
439 trajectory=trajectory,
440 append_trajectory=append_trajectory,
441 **kwargs,
442 )
444 self.restart = restart
446 self.fmax = None
448 if restart is None or not isfile(restart):
449 self.initialize()
450 else:
451 self.read()
452 self.comm.barrier()
454 def read(self):
455 raise NotImplementedError
457 def todict(self):
458 description = {
459 'type': 'optimization',
460 'optimizer': self.__class__.__name__,
461 'restart': None if self.restart is None else str(self.restart),
462 }
463 # add custom attributes from subclasses
464 for attr in ('maxstep', 'alpha', 'max_steps', 'fmax'):
465 if hasattr(self, attr):
466 description.update({attr: getattr(self, attr)})
467 return description
469 def initialize(self):
470 pass
472 def irun(self, fmax=0.05, steps=DEFAULT_MAX_STEPS):
473 """Run optimizer as generator.
475 Parameters
476 ----------
477 fmax : float
478 Convergence criterion of the forces on atoms.
479 steps : int, default=DEFAULT_MAX_STEPS
480 Number of optimizer steps to be run.
482 Yields
483 ------
484 converged : bool
485 True if the forces on atoms are converged.
486 """
487 self.fmax = fmax
488 return Dynamics.irun(self, steps=steps)
490 def run(self, fmax=0.05, steps=DEFAULT_MAX_STEPS):
491 """Run optimizer.
493 Parameters
494 ----------
495 fmax : float
496 Convergence criterion of the forces on atoms.
497 steps : int, default=DEFAULT_MAX_STEPS
498 Number of optimizer steps to be run.
500 Returns
501 -------
502 converged : bool
503 True if the forces on atoms are converged.
504 """
505 self.fmax = fmax
506 return Dynamics.run(self, steps=steps)
508 def converged(self, gradient):
509 """Did the optimization converge?"""
510 assert gradient.ndim == 1
511 return self.optimizable.converged(gradient, self.fmax)
513 def log(self, gradient):
514 fmax = self.optimizable.gradient_norm(gradient)
515 e = self.optimizable.get_value()
516 T = time.localtime()
517 if self.logfile is not None:
518 name = self.__class__.__name__
519 if self.nsteps == 0:
520 args = (" " * len(name), "Step", "Time", "Energy", "fmax")
521 msg = "%s %4s %8s %15s %12s\n" % args
522 self.logfile.write(msg)
524 args = (name, self.nsteps, T[3], T[4], T[5], e, fmax)
525 msg = "%s: %3d %02d:%02d:%02d %15.6f %15.6f\n" % args
526 self.logfile.write(msg)
528 def dump(self, data):
529 from ase.io.jsonio import write_json
530 if self.comm.rank == 0 and self.restart is not None:
531 with open(self.restart, 'w') as fd:
532 write_json(fd, data)
534 def load(self):
535 from ase.io.jsonio import read_json
536 with open(self.restart) as fd:
537 try:
538 from ase.optimize import BFGS
539 if not isinstance(self, BFGS) and isinstance(
540 self.atoms, UnitCellFilter
541 ):
542 warnings.warn(
543 "WARNING: restart function is untested and may result "
544 "in unintended behavior. Namely orig_cell is not "
545 "loaded in the UnitCellFilter. Please test on your own"
546 " to ensure consistent results."
547 )
548 return read_json(fd, always_array=False)
549 except Exception as ex:
550 msg = ('Could not decode restart file as JSON. '
551 'You may need to delete the restart file '
552 f'{self.restart}')
553 raise RestartError(msg) from ex