Coverage for ase / mep / autoneb.py: 75.07%
341 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 08:22 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 08:22 +0000
1# fmt: off
3import os
4import shutil
5import types
6from contextlib import ExitStack
7from math import exp, log
8from pathlib import Path
9from typing import Any
11import numpy as np
13import ase.parallel as mpi
14from ase.calculators.singlepoint import SinglePointCalculator
15from ase.io import Trajectory, read
16from ase.mep.neb import NEB
17from ase.optimize import BFGS, FIRE
18from ase.utils import deprecated
21def _forbid_optimizer_string(args: list, kwargs: dict[str, Any]) -> bool:
22 """Replace optimizer string with Optimizer object."""
23 arg_position = 11
24 try:
25 if (
26 len(args) >= arg_position + 1
27 and isinstance(args[arg_position], str)
28 ):
29 args[11] = {'BFGS': BFGS, 'FIRE': FIRE}[args[arg_position]]
31 elif isinstance(kwargs.get("optimizer", None), str):
32 kwargs["optimizer"] = {
33 'BFGS': BFGS, 'FIRE': FIRE}[kwargs["optimizer"]]
34 else:
35 return False
36 except KeyError as err:
37 msg = (
38 '"optimizer" must be "BFGS" or "FIRE" if string is passed, '
39 'but passing a string is deprecated. Use an Optimizer object '
40 'instead'
41 )
42 raise ValueError(msg) from err
44 return True
47class AutoNEB:
48 """AutoNEB object.
50 The AutoNEB algorithm streamlines the execution of NEB and CI-NEB
51 calculations following the algorithm described in:
53 E. L. Kolsbjerg, M. N. Groves, and B. Hammer, J. Chem. Phys,
54 145, 094107, 2016. (doi: 10.1063/1.4961868)
56 The user supplies at minimum the two end-points and possibly also some
57 intermediate images.
59 The stages are:
60 1) Define a set of images and name them sequentially.
61 Must at least have a relaxed starting and ending image
62 User can supply intermediate guesses which do not need to
63 have previously determined energies (probably from another
64 NEB calculation with a lower level of theory)
65 2) AutoNEB will first evaluate the user provided intermediate images
66 3) AutoNEB will then add additional images dynamically until n_max
67 is reached
68 4) A climbing image will attempt to locate the saddle point
69 5) All the images between the highest point and the starting point
70 are further relaxed to smooth the path
71 6) All the images between the highest point and the ending point are
72 further relaxed to smooth the path
74 Step 4 and 5-6 are optional steps!
76 Parameters
77 ----------
79 attach_calculators:
80 Function which adds valid calculators to the list of images supplied.
81 prefix: string or path
82 All files that the AutoNEB method reads and writes are prefixed with
83 prefix
84 n_simul: int
85 The number of relaxations run in parallel.
86 n_max: int
87 The number of images along the NEB path when done.
88 This number includes the two end-points.
89 Important: due to the dynamic adding of images around the peak n_max
90 must be updated if the NEB is restarted.
91 climb: boolean
92 Should a CI-NEB calculation be done at the top-point
93 fmax: float or list of floats
94 The maximum force along the NEB path
95 maxsteps: int
96 The maximum number of steps in each NEB relaxation.
97 If a list is given the first number of steps is used in the build-up
98 and final scan phase;
99 the second number of steps is used in the CI step after all images
100 have been inserted.
101 k: float
102 The spring constant along the NEB path
103 method: str (see neb.py)
104 Choice betweeen three method:
105 'aseneb', legacy ase NEB implementation
106 'improvedtangent', published NEB implementation
107 'eb', full spring force implementation (default)
108 optimizer: object
109 Optimizer object, defaults to FIRE
110 space_energy_ratio: float
111 The preference for new images to be added in a big energy gab
112 with a preference around the peak or in the biggest geometric gab.
113 A space_energy_ratio set to 1 will only considder geometric gabs
114 while one set to 0 will result in only images for energy
115 resolution.
117 Notes
118 -----
119 The AutoNEB method uses a fixed file-naming convention.
120 The initial images should have the naming prefix000.traj, prefix001.traj,
121 ... up until the final image in prefix00N.traj
122 Images are dynamically added in between the first and last image until
123 n_max images have been reached.
124 When doing the i'th NEB optimization a set of files
125 prefixXXXiter00i.traj exists with XXX ranging from 000 to the N images
126 currently in the NEB.
128 The most recent NEB path can always be monitored by:
130 ..code-block:: bash
132 $ ase-gui -n -1 neb???.traj
134 .. deprecated:: 3.22.0
135 Passing ``optimizer`` as a string is deprecated. Use an ``Optimizer``
136 object instead.
137 """
139 @deprecated(
140 "Passing 'optimizer' as a string is deprecated. "
141 "Use an 'Optimizer' object instead.",
142 callback=_forbid_optimizer_string,
143 )
144 def __init__(self, attach_calculators, prefix, n_simul, n_max,
145 iter_folder='AutoNEB_iter',
146 fmax=0.025, maxsteps=10000, k=0.1, climb=True, method='eb',
147 optimizer=FIRE,
148 remove_rotation_and_translation=False, space_energy_ratio=0.5,
149 world=None,
150 parallel=True, smooth_curve=False, interpolate_method='idpp'):
151 self.attach_calculators = attach_calculators
152 self.prefix = Path(prefix)
153 self.n_simul = n_simul
154 self.n_max = n_max
155 self.climb = climb
156 self.all_images = []
158 self.parallel = parallel
159 self.maxsteps = maxsteps
160 self.fmax = fmax
161 self.k = k
162 self.method = method
163 self.remove_rotation_and_translation = remove_rotation_and_translation
164 self.space_energy_ratio = space_energy_ratio
165 if interpolate_method not in ['idpp', 'linear']:
166 self.interpolate_method = 'idpp'
167 print('Interpolation method not implementet.',
168 'Using the IDPP method.')
169 else:
170 self.interpolate_method = interpolate_method
171 if world is None:
172 world = mpi.world
173 self.world = world
174 self.smooth_curve = smooth_curve
176 self.optimizer = optimizer
178 self.iter_folder = Path(self.prefix.parent) / iter_folder
179 self.iter_folder.mkdir(exist_ok=True)
181 def execute_one_neb(self, n_cur, to_run, climb=False, many_steps=False):
182 with ExitStack() as exitstack:
183 self._execute_one_neb(exitstack, n_cur, to_run,
184 climb=climb, many_steps=many_steps)
186 def iter_trajpath(self, i, iiter):
187 """When doing the i'th NEB optimization a set of files
188 prefixXXXiter00i.traj exists with XXX ranging from 000 to the N images
189 currently in the NEB."""
190 return (self.iter_folder /
191 (self.prefix.name + f'{i:03d}iter{iiter:03d}.traj'))
193 def _execute_one_neb(self, exitstack, n_cur, to_run,
194 climb=False, many_steps=False):
195 '''Internal method which executes one NEB optimization.'''
197 closelater = exitstack.enter_context
199 self.iteration += 1
200 # First we copy around all the images we are not using in this
201 # neb (for reproducability purposes)
202 if self.world.rank == 0:
203 for i in range(n_cur):
204 if i not in to_run[1: -1]:
205 filename = '%s%03d.traj' % (self.prefix, i)
206 with Trajectory(filename, mode='w',
207 atoms=self.all_images[i]) as traj:
208 traj.write()
209 filename_ref = self.iter_trajpath(i, self.iteration)
210 if os.path.isfile(filename):
211 shutil.copy2(filename, filename_ref)
212 if self.world.rank == 0:
213 print('Now starting iteration %d on ' % self.iteration, to_run)
214 # Attach calculators to all the images we will include in the NEB
215 self.attach_calculators([self.all_images[i] for i in to_run[1: -1]])
216 neb = NEB([self.all_images[i] for i in to_run],
217 k=[self.k[i] for i in to_run[0:-1]],
218 method=self.method,
219 parallel=self.parallel,
220 remove_rotation_and_translation=self
221 .remove_rotation_and_translation,
222 climb=climb)
224 # Do the actual NEB calculation
225 logpath = (self.iter_folder
226 / f'{self.prefix.name}_log_iter{self.iteration:03d}.log')
227 qn = closelater(self.optimizer(neb, logfile=logpath))
229 # Find the ranks which are masters for each their calculation
230 if self.parallel:
231 nneb = to_run[0]
232 nim = len(to_run) - 2
233 n = self.world.size // nim # number of cpu's per image
234 j = 1 + self.world.rank // n # my image number
235 assert nim * n == self.world.size
236 traj = closelater(Trajectory(
237 '%s%03d.traj' % (self.prefix, j + nneb), 'w',
238 self.all_images[j + nneb],
239 master=(self.world.rank % n == 0)
240 ))
241 filename_ref = self.iter_trajpath(j + nneb, self.iteration)
242 trajhist = closelater(Trajectory(
243 filename_ref, 'w',
244 self.all_images[j + nneb],
245 master=(self.world.rank % n == 0)
246 ))
247 qn.attach(traj)
248 qn.attach(trajhist)
249 else:
250 num = 1
251 for i, j in enumerate(to_run[1: -1]):
252 filename_ref = self.iter_trajpath(j, self.iteration)
253 trajhist = closelater(Trajectory(
254 filename_ref, 'w', self.all_images[j]
255 ))
256 qn.attach(seriel_writer(trajhist, i, num).write)
258 traj = closelater(Trajectory(
259 '%s%03d.traj' % (self.prefix, j), 'w',
260 self.all_images[j]
261 ))
262 qn.attach(seriel_writer(traj, i, num).write)
263 num += 1
265 if isinstance(self.maxsteps, (list, tuple)) and many_steps:
266 steps = self.maxsteps[1]
267 elif isinstance(self.maxsteps, (list, tuple)) and not many_steps:
268 steps = self.maxsteps[0]
269 else:
270 steps = self.maxsteps
272 if isinstance(self.fmax, (list, tuple)) and many_steps:
273 fmax = self.fmax[1]
274 elif isinstance(self.fmax, (list, tuple)) and not many_steps:
275 fmax = self.fmax[0]
276 else:
277 fmax = self.fmax
278 qn.run(fmax=fmax, steps=steps)
280 # Remove the calculators and replace them with single
281 # point calculators and update all the nodes for
282 # preperration for next iteration
283 neb.distribute = types.MethodType(store_E_and_F_in_spc, neb)
284 neb.distribute()
286 def run(self):
287 '''Run the AutoNEB optimization algorithm.'''
288 n_cur = self.__initialize__()
289 while len(self.all_images) < self.n_simul + 2:
290 if isinstance(self.k, (float, int)):
291 self.k = [self.k] * (len(self.all_images) - 1)
292 if self.world.rank == 0:
293 print('Now adding images for initial run')
294 # Insert a new image where the distance between two images is
295 # the largest
296 spring_lengths = []
297 for j in range(n_cur - 1):
298 spring_vec = self.all_images[j + 1].get_positions() - \
299 self.all_images[j].get_positions()
300 spring_lengths.append(np.linalg.norm(spring_vec))
301 jmax = np.argmax(spring_lengths)
303 if self.world.rank == 0:
304 print('Max length between images is at ', jmax)
306 # The interpolation used to make initial guesses
307 # If only start and end images supplied make all img at ones
308 if len(self.all_images) == 2:
309 n_between = self.n_simul
310 else:
311 n_between = 1
313 toInterpolate = [self.all_images[jmax]]
314 for _ in range(n_between):
315 toInterpolate += [toInterpolate[0].copy()]
316 toInterpolate += [self.all_images[jmax + 1]]
318 neb = NEB(toInterpolate)
319 neb.interpolate(method=self.interpolate_method)
321 tmp = self.all_images[:jmax + 1]
322 tmp += toInterpolate[1:-1]
323 tmp.extend(self.all_images[jmax + 1:])
325 self.all_images = tmp
327 # Expect springs to be in equilibrium
328 k_tmp = self.k[:jmax]
329 k_tmp += [self.k[jmax] * (n_between + 1)] * (n_between + 1)
330 k_tmp.extend(self.k[jmax + 1:])
331 self.k = k_tmp
333 # Run the NEB calculation with the new image included
334 n_cur += n_between
336 # Determine if any images do not have a valid energy yet
337 energies = self.get_energies()
339 n_non_valid_energies = len([e for e in energies if e != e])
341 if self.world.rank == 0:
342 print('Start of evaluation of the initial images')
344 while n_non_valid_energies != 0:
345 if isinstance(self.k, (float, int)):
346 self.k = [self.k] * (len(self.all_images) - 1)
348 # First do one run since some energie are non-determined
349 to_run, climb_safe = self.which_images_to_run_on()
350 self.execute_one_neb(n_cur, to_run, climb=False)
352 energies = self.get_energies()
353 n_non_valid_energies = len([e for e in energies if e != e])
355 if self.world.rank == 0:
356 print('Finished initialisation phase.')
358 # Then add one image at a time until we have n_max images
359 while n_cur < self.n_max:
360 if isinstance(self.k, (float, int)):
361 self.k = [self.k] * (len(self.all_images) - 1)
362 # Insert a new image where the distance between two images
363 # is the largest OR where a higher energy reselution is needed
364 if self.world.rank == 0:
365 print('****Now adding another image until n_max is reached',
366 f'({n_cur}/{self.n_max})****')
367 spring_lengths = []
368 for j in range(n_cur - 1):
369 spring_vec = self.all_images[j + 1].get_positions() - \
370 self.all_images[j].get_positions()
371 spring_lengths.append(np.linalg.norm(spring_vec))
373 total_vec = self.all_images[0].get_positions() - \
374 self.all_images[-1].get_positions()
375 tl = np.linalg.norm(total_vec)
377 fR = max(spring_lengths) / tl
379 e = self.get_energies()
380 ed = []
381 emin = min(e)
382 enorm = max(e) - emin
383 for j in range(n_cur - 1):
384 delta_E = (e[j + 1] - e[j]) * (e[j + 1] + e[j] - 2 *
385 emin) / 2 / enorm
386 ed.append(abs(delta_E))
388 gR = max(ed) / enorm
390 if fR / gR > self.space_energy_ratio:
391 jmax = np.argmax(spring_lengths)
392 t = 'spring length!'
393 else:
394 jmax = np.argmax(ed)
395 t = 'energy difference between neighbours!'
397 if self.world.rank == 0:
398 print(f'Adding image between {jmax} and',
399 f'{jmax + 1}. New image point is selected',
400 'on the basis of the biggest ' + t)
402 toInterpolate = [self.all_images[jmax]]
403 toInterpolate += [toInterpolate[0].copy()]
404 toInterpolate += [self.all_images[jmax + 1]]
406 neb = NEB(toInterpolate)
407 neb.interpolate(method=self.interpolate_method)
409 tmp = self.all_images[:jmax + 1]
410 tmp += toInterpolate[1:-1]
411 tmp.extend(self.all_images[jmax + 1:])
413 self.all_images = tmp
415 # Expect springs to be in equilibrium
416 k_tmp = self.k[:jmax]
417 k_tmp += [self.k[jmax] * 2] * 2
418 k_tmp.extend(self.k[jmax + 1:])
419 self.k = k_tmp
421 # Run the NEB calculation with the new image included
422 n_cur += 1
423 to_run, climb_safe = self.which_images_to_run_on()
425 self.execute_one_neb(n_cur, to_run, climb=False)
427 if self.world.rank == 0:
428 print('n_max images has been reached')
430 # Do a single climb around the top-point if requested
431 if self.climb:
432 if isinstance(self.k, (float, int)):
433 self.k = [self.k] * (len(self.all_images) - 1)
434 if self.world.rank == 0:
435 print('****Now doing the CI-NEB calculation****')
436 to_run, climb_safe = self.which_images_to_run_on()
438 assert climb_safe, 'climb_safe should be true at this point!'
439 self.execute_one_neb(n_cur, to_run, climb=True, many_steps=True)
441 if not self.smooth_curve:
442 return self.all_images
444 # If a smooth_curve is requsted ajust the springs to follow two
445 # gaussian distributions
446 e = self.get_energies()
447 peak = self.get_highest_energy_index()
448 k_max = 10
450 d1 = np.linalg.norm(self.all_images[peak].get_positions() -
451 self.all_images[0].get_positions())
452 d2 = np.linalg.norm(self.all_images[peak].get_positions() -
453 self.all_images[-1].get_positions())
454 l1 = -d1 ** 2 / log(0.2)
455 l2 = -d2 ** 2 / log(0.2)
457 x1 = []
458 x2 = []
459 for i in range(peak):
460 v = (self.all_images[i].get_positions() +
461 self.all_images[i + 1].get_positions()) / 2 - \
462 self.all_images[0].get_positions()
463 x1.append(np.linalg.norm(v))
465 for i in range(peak, len(self.all_images) - 1):
466 v = (self.all_images[i].get_positions() +
467 self.all_images[i + 1].get_positions()) / 2 - \
468 self.all_images[0].get_positions()
469 x2.append(np.linalg.norm(v))
470 k_tmp = []
471 for x in x1:
472 k_tmp.append(k_max * exp(-((x - d1) ** 2) / l1))
473 for x in x2:
474 k_tmp.append(k_max * exp(-((x - d1) ** 2) / l2))
476 self.k = k_tmp
477 # Roll back to start from the top-point
478 if self.world.rank == 0:
479 print('Now moving from top to start')
480 highest_energy_index = self.get_highest_energy_index()
481 nneb = highest_energy_index - self.n_simul - 1
482 while nneb >= 0:
483 self.execute_one_neb(n_cur, range(nneb, nneb + self.n_simul + 2),
484 climb=False)
485 nneb -= 1
487 # Roll forward from the top-point until the end
488 nneb = self.get_highest_energy_index()
490 if self.world.rank == 0:
491 print('Now moving from top to end')
492 while nneb <= self.n_max - self.n_simul - 2:
493 self.execute_one_neb(n_cur, range(nneb, nneb + self.n_simul + 2),
494 climb=False)
495 nneb += 1
496 return self.all_images
498 def __initialize__(self):
499 '''Load files from the filesystem.'''
500 if not os.path.isfile(f'{self.prefix}000.traj'):
501 raise OSError(f'No file with name {self.prefix}000.traj',
502 'was found. Should contain initial image')
504 # Find the images that exist
505 index_exists = [i for i in range(self.n_max) if
506 os.path.isfile('%s%03d.traj' % (self.prefix, i))]
508 n_cur = index_exists[-1] + 1
510 if self.world.rank == 0:
511 print('The NEB initially has %d images ' % len(index_exists),
512 '(including the end-points)')
513 if len(index_exists) == 1:
514 raise Exception('Only a start point exists')
516 for i in range(len(index_exists)):
517 if i != index_exists[i]:
518 raise Exception('Files must be ordered sequentially',
519 'without gaps.')
520 if self.world.rank == 0:
521 for i in index_exists:
522 filename_ref = self.iter_trajpath(i, 0)
523 if os.path.isfile(filename_ref):
524 try:
525 os.rename(filename_ref, str(filename_ref) + '.bak')
526 except OSError:
527 pass
528 filename = '%s%03d.traj' % (self.prefix, i)
529 try:
530 shutil.copy2(filename, filename_ref)
531 except OSError:
532 pass
533 # Wait for file system on all nodes is syncronized
534 self.world.barrier()
535 # And now lets read in the configurations
536 for i in range(n_cur):
537 if i in index_exists:
538 filename = '%s%03d.traj' % (self.prefix, i)
539 newim = read(filename)
540 self.all_images.append(newim)
541 else:
542 self.all_images.append(self.all_images[0].copy())
544 self.iteration = 0
545 return n_cur
547 def get_energies(self):
548 """Utility method to extract all energies and insert np.NaN at
549 invalid images."""
550 energies = []
551 for a in self.all_images:
552 try:
553 energies.append(a.get_potential_energy())
554 except RuntimeError:
555 energies.append(np.nan)
556 return energies
558 def get_energies_one_image(self, image):
559 """Utility method to extract energy of an image and return np.NaN
560 if invalid."""
561 try:
562 energy = image.get_potential_energy()
563 except RuntimeError:
564 energy = np.nan
565 return energy
567 def get_highest_energy_index(self):
568 """Find the index of the image with the highest energy."""
569 energies = self.get_energies()
570 valid_entries = [(i, e) for i, e in enumerate(energies) if e == e]
571 highest_energy_index = max(valid_entries, key=lambda x: x[1])[0]
572 return highest_energy_index
574 def which_images_to_run_on(self):
575 """Determine which set of images to do a NEB at.
576 The priority is to first include all images without valid energies,
577 secondly include the highest energy image."""
578 n_cur = len(self.all_images)
579 energies = self.get_energies()
580 # Find out which image is the first one missing the energy and
581 # which is the last one missing the energy
582 first_missing = n_cur
583 last_missing = 0
584 n_missing = 0
585 for i in range(1, n_cur - 1):
586 if energies[i] != energies[i]:
587 n_missing += 1
588 first_missing = min(first_missing, i)
589 last_missing = max(last_missing, i)
591 highest_energy_index = self.get_highest_energy_index()
593 nneb = highest_energy_index - 1 - self.n_simul // 2
594 nneb = max(nneb, 0)
595 nneb = min(nneb, n_cur - self.n_simul - 2)
596 nneb = min(nneb, first_missing - 1)
597 nneb = max(nneb + self.n_simul, last_missing) - self.n_simul
598 to_use = list(range(nneb, nneb + self.n_simul + 2))
600 while self.get_energies_one_image(self.all_images[to_use[0]]) != \
601 self.get_energies_one_image(self.all_images[to_use[0]]):
602 to_use[0] -= 1
603 while self.get_energies_one_image(self.all_images[to_use[-1]]) != \
604 self.get_energies_one_image(self.all_images[to_use[-1]]):
605 to_use[-1] += 1
607 return to_use, (highest_energy_index in to_use[1: -1])
610class seriel_writer:
611 def __init__(self, traj, i, num):
612 self.traj = traj
613 self.i = i
614 self.num = num
616 def write(self):
617 if self.num % (self.i + 1) == 0:
618 self.traj.write()
621def store_E_and_F_in_spc(self):
622 """Collect the energies and forces on all nodes and store as
623 single point calculators"""
624 # Make sure energies and forces are known on all nodes
625 self.get_forces()
626 images = self.images
627 if self.parallel:
628 energy = np.empty(1)
629 forces = np.empty((self.natoms, 3))
631 for i in range(1, self.nimages - 1):
632 # Determine which node is the leading for image i
633 root = (i - 1) * self.world.size // (self.nimages - 2)
634 # If on this node, extract the calculated numbers
635 if self.world.rank == root:
636 forces = images[i].get_forces()
637 energy[0] = images[i].get_potential_energy()
638 # Distribute these numbers to other nodes
639 self.world.broadcast(energy, root)
640 self.world.broadcast(forces, root)
641 # On all nodes, remove the calculator, keep only energy
642 # and force in single point calculator
643 self.images[i].calc = SinglePointCalculator(
644 self.images[i],
645 energy=energy[0],
646 forces=forces)