Coverage for ase / calculators / calculator.py: 84.23%
539 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 08:22 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 08:22 +0000
1# fmt: off
3import copy
4import os
5import shlex
6import subprocess
7import warnings
8from abc import abstractmethod
9from collections.abc import Sequence
10from dataclasses import dataclass, field
11from math import pi, sqrt
12from pathlib import Path
13from typing import Any
15import numpy as np
17from ase.calculators.abc import GetPropertiesMixin
18from ase.cell import Cell
19from ase.config import cfg as _cfg
20from ase.outputs import Properties, all_outputs
21from ase.utils import deprecated, jsonable
23from .names import names
26class CalculatorError(RuntimeError):
27 """Base class of error types related to ASE calculators."""
30class CalculatorSetupError(CalculatorError):
31 """Calculation cannot be performed with the given parameters.
33 Reasons to raise this error are:
34 * The calculator is not properly configured
35 (missing executable, environment variables, ...)
36 * The given atoms object is not supported
37 * Calculator parameters are unsupported
39 Typically raised before a calculation."""
42class EnvironmentError(CalculatorSetupError):
43 """Raised if calculator is not properly set up with ASE.
45 May be missing an executable or environment variables."""
48class InputError(CalculatorSetupError):
49 """Raised if inputs given to the calculator were incorrect.
51 Bad input keywords or values, or missing pseudopotentials.
53 This may be raised before or during calculation, depending on
54 when the problem is detected."""
57class CalculationFailed(CalculatorError):
58 """Calculation failed unexpectedly.
60 Reasons to raise this error are:
61 * Calculation did not converge
62 * Calculation ran out of memory
63 * Segmentation fault or other abnormal termination
64 * Arithmetic trouble (singular matrices, NaN, ...)
66 Typically raised during calculation."""
69class SCFError(CalculationFailed):
70 """SCF loop did not converge."""
73class ReadError(CalculatorError):
74 """Unexpected irrecoverable error while reading calculation results."""
77class PropertyNotImplementedError(NotImplementedError):
78 """Raised if a calculator does not implement the requested property."""
81class PropertyNotPresent(CalculatorError):
82 """Requested property is missing.
84 Maybe it was never calculated, or for some reason was not extracted
85 with the rest of the results, without being a fatal ReadError."""
88def compare_atoms(atoms1, atoms2, tol=1e-15, excluded_properties=None):
89 """Check for system changes since last calculation. Properties in
90 ``excluded_properties`` are not checked."""
91 if atoms1 is None:
92 system_changes = all_changes[:]
93 else:
94 system_changes = []
96 properties_to_check = set(all_changes)
97 if excluded_properties:
98 properties_to_check -= set(excluded_properties)
100 # Check properties that aren't in Atoms.arrays but are attributes of
101 # Atoms objects
102 for prop in ['cell', 'pbc']:
103 if prop in properties_to_check:
104 properties_to_check.remove(prop)
105 if not equal(
106 getattr(atoms1, prop), getattr(atoms2, prop), atol=tol
107 ):
108 system_changes.append(prop)
110 arrays1 = set(atoms1.arrays)
111 arrays2 = set(atoms2.arrays)
113 # Add any properties that are only in atoms1.arrays or only in
114 # atoms2.arrays (and aren't excluded). Note that if, e.g. arrays1 has
115 # `initial_charges` which is merely zeros and arrays2 does not have
116 # this array, we'll still assume that the system has changed. However,
117 # this should only occur rarely.
118 system_changes += properties_to_check & (arrays1 ^ arrays2)
120 # Finally, check all of the non-excluded properties shared by the atoms
121 # arrays.
122 for prop in properties_to_check & arrays1 & arrays2:
123 if not equal(atoms1.arrays[prop], atoms2.arrays[prop], atol=tol):
124 system_changes.append(prop)
126 return system_changes
129all_properties = [
130 'energy',
131 'forces',
132 'stress',
133 'stresses',
134 'dipole',
135 'charges',
136 'magmom',
137 'magmoms',
138 'free_energy',
139 'energies',
140 'dielectric_tensor',
141 'born_effective_charges',
142 'polarization',
143]
146all_changes = [
147 'positions',
148 'numbers',
149 'cell',
150 'pbc',
151 'initial_charges',
152 'initial_magmoms',
153]
156special = {
157 'cp2k': 'CP2K',
158 'demonnano': 'DemonNano',
159 'dftd3': 'DFTD3',
160 'dmol': 'DMol3',
161 'eam': 'EAM',
162 'elk': 'ELK',
163 'emt': 'EMT',
164 'exciting': 'ExcitingGroundStateCalculator',
165 'crystal': 'CRYSTAL',
166 'ff': 'ForceField',
167 'gamess_us': 'GAMESSUS',
168 'gulp': 'GULP',
169 'kim': 'KIM',
170 'lammpsrun': 'LAMMPS',
171 'lammpslib': 'LAMMPSlib',
172 'lj': 'LennardJones',
173 'mopac': 'MOPAC',
174 'morse': 'MorsePotential',
175 'nwchem': 'NWChem',
176 'openmx': 'OpenMX',
177 'orca': 'ORCA',
178 'qchem': 'QChem',
179 'tip3p': 'TIP3P',
180 'tip4p': 'TIP4P',
181}
184external_calculators = {}
187def register_calculator_class(name, cls):
188 """Add the class into the database."""
189 assert name not in external_calculators
190 external_calculators[name] = cls
191 names.append(name)
192 names.sort()
195def get_calculator_class(name):
196 """Return calculator class."""
197 if name == 'asap':
198 from asap3 import EMT as Calculator
199 elif name == 'gpaw':
200 from gpaw import GPAW as Calculator
201 elif name == 'hotbit':
202 from hotbit import Calculator
203 elif name == 'vasp':
204 from ase.calculators.vasp import Vasp as Calculator
205 elif name == 'ace':
206 from ase.calculators.acemolecule import ACE as Calculator
207 elif name == 'Psi4':
208 from ase.calculators.psi4 import Psi4 as Calculator
209 elif name == 'mattersim':
210 from mattersim.forcefield import MatterSimCalculator as Calculator
211 elif name == 'mace_mp':
212 from mace.calculators import mace_mp as Calculator
213 elif name in external_calculators:
214 Calculator = external_calculators[name]
215 else:
216 classname = special.get(name, name.title())
217 module = __import__('ase.calculators.' + name, {}, None, [classname])
218 Calculator = getattr(module, classname)
219 return Calculator
222def equal(a, b, tol=None, rtol=None, atol=None):
223 """ndarray-enabled comparison function."""
224 # XXX Known bugs:
225 # * Comparing cell objects (pbc not part of array representation)
226 # * Infinite recursion for cyclic dicts
227 # * Can of worms is open
228 if tol is not None:
229 msg = 'Use `equal(a, b, rtol=..., atol=...)` instead of `tol=...`'
230 warnings.warn(msg, DeprecationWarning)
231 assert (
232 rtol is None and atol is None
233 ), 'Do not use deprecated `tol` with `atol` and/or `rtol`'
234 rtol = tol
235 atol = tol
237 a_is_dict = isinstance(a, dict)
238 b_is_dict = isinstance(b, dict)
239 if a_is_dict or b_is_dict:
240 # Check that both a and b are dicts
241 if not (a_is_dict and b_is_dict):
242 return False
243 if a.keys() != b.keys():
244 return False
245 return all(equal(a[key], b[key], rtol=rtol, atol=atol) for key in a)
247 if np.shape(a) != np.shape(b):
248 return False
250 if rtol is None and atol is None:
251 return np.array_equal(a, b)
253 if rtol is None:
254 rtol = 0
255 if atol is None:
256 atol = 0
258 return np.allclose(a, b, rtol=rtol, atol=atol)
261def kptdensity2monkhorstpack(atoms, kptdensity=3.5, even=True):
262 """Convert k-point density to Monkhorst-Pack grid size.
264 atoms: Atoms object
265 Contains unit cell and information about boundary conditions.
266 kptdensity: float
267 Required k-point density. Default value is 3.5 point per Ang^-1.
268 even: bool
269 Round up to even numbers.
270 """
272 recipcell = atoms.cell.reciprocal()
273 kpts = []
274 for i in range(3):
275 if atoms.pbc[i]:
276 k = 2 * pi * sqrt((recipcell[i] ** 2).sum()) * kptdensity
277 if even:
278 kpts.append(2 * int(np.ceil(k / 2)))
279 else:
280 kpts.append(int(np.ceil(k)))
281 else:
282 kpts.append(1)
283 return np.array(kpts)
286def kpts2mp(atoms, kpts, even=False):
287 if kpts is None:
288 return np.array([1, 1, 1])
289 if isinstance(kpts, (float, int)):
290 return kptdensity2monkhorstpack(atoms, kpts, even)
291 else:
292 return kpts
295def kpts2sizeandoffsets(
296 size=None, density=None, gamma=None, even=None, atoms=None
297):
298 """Helper function for selecting k-points.
300 Use either size or density.
302 size: 3 ints
303 Number of k-points.
304 density: float
305 K-point density in units of k-points per Ang^-1.
306 gamma: None or bool
307 Should the Gamma-point be included? Yes / no / don't care:
308 True / False / None.
309 even: None or bool
310 Should the number of k-points be even? Yes / no / don't care:
311 True / False / None.
312 atoms: Atoms object
313 Needed for calculating k-point density.
315 """
317 if size is not None and density is not None:
318 raise ValueError(
319 'Cannot specify k-point mesh size and ' 'density simultaneously'
320 )
321 elif density is not None and atoms is None:
322 raise ValueError(
323 'Cannot set k-points from "density" unless '
324 'Atoms are provided (need BZ dimensions).'
325 )
327 if size is None:
328 if density is None:
329 size = [1, 1, 1]
330 else:
331 size = kptdensity2monkhorstpack(atoms, density, None)
333 # Not using the rounding from kptdensity2monkhorstpack as it doesn't do
334 # rounding to odd numbers
335 if even is not None:
336 size = np.array(size)
337 remainder = size % 2
338 if even:
339 size += remainder
340 else: # Round up to odd numbers
341 size += 1 - remainder
343 offsets = [0, 0, 0]
344 if atoms is None:
345 pbc = [True, True, True]
346 else:
347 pbc = atoms.pbc
349 if gamma is not None:
350 for i, s in enumerate(size):
351 if pbc[i] and s % 2 != bool(gamma):
352 offsets[i] = 0.5 / s
354 return size, offsets
357@jsonable('kpoints')
358class KPoints:
359 def __init__(self, kpts=None):
360 if kpts is None:
361 kpts = np.zeros((1, 3))
362 self.kpts = kpts
364 def todict(self):
365 return vars(self)
368def kpts2kpts(kpts, atoms=None):
369 from ase.dft.kpoints import monkhorst_pack
371 if kpts is None:
372 return KPoints()
374 if hasattr(kpts, 'kpts'):
375 return kpts
377 if isinstance(kpts, dict):
378 if 'kpts' in kpts:
379 return KPoints(kpts['kpts'])
380 if 'path' in kpts:
381 cell = Cell.ascell(atoms.cell)
382 return cell.bandpath(pbc=atoms.pbc, **kpts)
383 size, offsets = kpts2sizeandoffsets(atoms=atoms, **kpts)
384 return KPoints(monkhorst_pack(size) + offsets)
386 if isinstance(kpts[0], int):
387 return KPoints(monkhorst_pack(kpts))
389 return KPoints(np.array(kpts))
392def kpts2ndarray(kpts, atoms=None):
393 """Convert kpts keyword to 2-d ndarray of scaled k-points."""
394 return kpts2kpts(kpts, atoms=atoms).kpts
397class Parameters(dict):
398 """Dictionary for parameters.
400 Special feature: If param is a Parameters instance, then param.xc
401 is a shorthand for param['xc'].
402 """
404 def __getattr__(self, key):
405 if key not in self:
406 return dict.__getattribute__(self, key)
407 return self[key]
409 def __setattr__(self, key, value):
410 self[key] = value
412 @classmethod
413 def read(cls, filename):
414 """Read parameters from file."""
415 # We use ast to evaluate literals, avoiding eval()
416 # for security reasons.
417 import ast
419 with open(filename) as fd:
420 txt = fd.read().strip()
421 assert txt.startswith('dict(')
422 assert txt.endswith(')')
423 txt = txt[5:-1]
425 # The tostring() representation "dict(...)" is not actually
426 # a literal, so we manually parse that along with the other
427 # formatting that we did manually:
428 dct = {}
429 for line in txt.splitlines():
430 key, val = line.split('=', 1)
431 key = key.strip()
432 val = val.strip()
433 if val[-1] == ',':
434 val = val[:-1]
435 dct[key] = ast.literal_eval(val)
437 parameters = cls(dct)
438 return parameters
440 def tostring(self):
441 keys = sorted(self)
442 return (
443 'dict('
444 + ',\n '.join(f'{key}={self[key]!r}' for key in keys)
445 + ')\n'
446 )
448 def write(self, filename):
449 Path(filename).write_text(self.tostring())
452class BaseCalculator(GetPropertiesMixin):
453 implemented_properties: list[str] = []
454 'Properties calculator can handle (energy, forces, ...)'
456 # Placeholder object for deprecated arguments. Let deprecated keywords
457 # default to _deprecated and then issue a warning if the user passed
458 # any other object (such as None).
459 _deprecated = object()
461 def __init__(self, parameters=None, use_cache=True):
462 if parameters is None:
463 parameters = {}
465 self.parameters = dict(parameters)
466 self.atoms = None
467 self.results = {}
468 self.use_cache = use_cache
470 def calculate_properties(self, atoms, properties):
471 """This method is experimental; currently for internal use."""
472 for name in properties:
473 if name not in all_outputs:
474 raise ValueError(f'No such property: {name}')
476 # We ignore system changes for now.
477 self.calculate(atoms, properties, system_changes=all_changes)
479 props = self.export_properties()
481 for name in properties:
482 if name not in props:
483 raise PropertyNotPresent(name)
484 return props
486 @abstractmethod
487 def calculate(self, atoms, properties, system_changes):
488 ...
490 def check_state(self, atoms, tol=1e-15):
491 """Check for any system changes since last calculation."""
492 if self.use_cache:
493 return compare_atoms(self.atoms, atoms, tol=tol)
494 else:
495 return all_changes
497 def get_property(self, name, atoms=None, allow_calculation=True):
498 if name not in self.implemented_properties:
499 raise PropertyNotImplementedError(
500 f'{name} property not implemented'
501 )
503 if atoms is None:
504 atoms = self.atoms
505 system_changes = []
506 else:
507 system_changes = self.check_state(atoms)
509 if system_changes:
510 self.atoms = None
511 self.results = {}
513 if name not in self.results:
514 if not allow_calculation:
515 return None
517 if self.use_cache:
518 self.atoms = atoms.copy()
520 self.calculate(atoms, [name], system_changes)
522 if name not in self.results:
523 # For some reason the calculator was not able to do what we want,
524 # and that is OK.
525 raise PropertyNotImplementedError(
526 '{} not present in this ' 'calculation'.format(name)
527 )
529 result = self.results[name]
530 if isinstance(result, np.ndarray):
531 result = result.copy()
532 return result
534 def calculation_required(self, atoms, properties):
535 assert not isinstance(properties, str)
536 system_changes = self.check_state(atoms)
537 if system_changes:
538 return True
539 for name in properties:
540 if name not in self.results:
541 return True
542 return False
544 def export_properties(self):
545 return Properties(self.results)
547 def _get_name(self) -> str: # child class can override this
548 return self.__class__.__name__.lower()
550 @property
551 def name(self) -> str:
552 return self._get_name()
554 def todict(self) -> dict[str, Any]:
555 """Obtain a dictionary of parameter information"""
556 return {}
559class Calculator(BaseCalculator):
560 """Base-class for all ASE calculators.
562 A calculator must raise PropertyNotImplementedError if asked for a
563 property that it can't calculate. So, if calculation of the
564 stress tensor has not been implemented, get_stress(atoms) should
565 raise PropertyNotImplementedError. This can be achieved simply by not
566 including the string 'stress' in the list implemented_properties
567 which is a class member. These are the names of the standard
568 properties: 'energy', 'forces', 'stress', 'dipole', 'charges',
569 'magmom' and 'magmoms'.
570 """
572 default_parameters: dict[str, Any] = {}
573 'Default parameters'
575 ignored_changes: set[str] = set()
576 'Properties of Atoms which we ignore for the purposes of cache '
577 'invalidation with check_state().'
579 discard_results_on_any_change = False
580 'Whether we purge the results following any change in the set() method. '
581 'Most (file I/O) calculators will probably want this.'
583 def __init__(
584 self,
585 restart=None,
586 ignore_bad_restart_file=BaseCalculator._deprecated,
587 label=None,
588 atoms=None,
589 directory='.',
590 **kwargs,
591 ):
592 """Basic calculator implementation.
594 restart: str
595 Prefix for restart file. May contain a directory. Default
596 is None: don't restart.
597 ignore_bad_restart_file: bool
598 Deprecated, please do not use.
599 Passing more than one positional argument to Calculator()
600 is deprecated and will stop working in the future.
601 Ignore broken or missing restart file. By default, it is an
602 error if the restart file is missing or broken.
603 directory: str or PurePath
604 Working directory in which to read and write files and
605 perform calculations.
606 label: str
607 Name used for all files. Not supported by all calculators.
608 May contain a directory, but please use the directory parameter
609 for that instead.
610 atoms: Atoms object
611 Optional Atoms object to which the calculator will be
612 attached. When restarting, atoms will get its positions and
613 unit-cell updated from file.
614 """
615 self.atoms = None # copy of atoms object from last calculation
616 self.results = {} # calculated properties (energy, forces, ...)
617 self.parameters = None # calculational parameters
618 self._directory = None # Initialize
620 if ignore_bad_restart_file is self._deprecated:
621 ignore_bad_restart_file = False
622 else:
623 warnings.warn(
624 FutureWarning(
625 'The keyword "ignore_bad_restart_file" is deprecated and '
626 'will be removed in a future version of ASE. Passing more '
627 'than one positional argument to Calculator is also '
628 'deprecated and will stop functioning in the future. '
629 'Please pass arguments by keyword (key=value) except '
630 'optionally the "restart" keyword.'
631 )
632 )
634 if restart is not None:
635 try:
636 self.read(restart) # read parameters, atoms and results
637 except ReadError:
638 if ignore_bad_restart_file:
639 self.reset()
640 else:
641 raise
643 self.directory = directory
644 self.prefix = None
645 if label is not None:
646 if self.directory == '.' and '/' in label:
647 # We specified directory in label, and nothing in the diretory
648 # key
649 self.label = label
650 elif '/' not in label:
651 # We specified our directory in the directory keyword
652 # or not at all
653 self.label = '/'.join((self.directory, label))
654 else:
655 raise ValueError(
656 'Directory redundantly specified though '
657 'directory="{}" and label="{}". '
658 'Please omit "/" in label.'.format(self.directory, label)
659 )
661 if self.parameters is None:
662 # Use default parameters if they were not read from file:
663 self.parameters = self.get_default_parameters()
665 if atoms is not None:
666 atoms.calc = self
667 if self.atoms is not None:
668 # Atoms were read from file. Update atoms:
669 if not (
670 equal(atoms.numbers, self.atoms.numbers)
671 and (atoms.pbc == self.atoms.pbc).all()
672 ):
673 raise CalculatorError('Atoms not compatible with file')
674 atoms.positions = self.atoms.positions
675 atoms.cell = self.atoms.cell
677 self.set(**kwargs)
679 if not hasattr(self, 'get_spin_polarized'):
680 self.get_spin_polarized = self._deprecated_get_spin_polarized
681 # XXX We are very naughty and do not call super constructor!
683 # For historical reasons we have a particular caching protocol.
684 # We disable the superclass' optional cache.
685 self.use_cache = False
687 @property
688 def directory(self) -> str:
689 return self._directory
691 @directory.setter
692 def directory(self, directory: str | os.PathLike):
693 self._directory = str(Path(directory)) # Normalize path.
695 @property
696 def label(self):
697 if self.directory == '.':
698 return self.prefix
700 # Generally, label ~ directory/prefix
701 #
702 # We use '/' rather than os.pathsep because
703 # 1) directory/prefix does not represent any actual path
704 # 2) We want the same string to work the same on all platforms
705 if self.prefix is None:
706 return self.directory + '/'
708 return f'{self.directory}/{self.prefix}'
710 @label.setter
711 def label(self, label):
712 if label is None:
713 self.directory = '.'
714 self.prefix = None
715 return
717 tokens = label.rsplit('/', 1)
718 if len(tokens) == 2:
719 directory, prefix = tokens
720 else:
721 assert len(tokens) == 1
722 directory = '.'
723 prefix = tokens[0]
724 if prefix == '':
725 prefix = None
726 self.directory = directory
727 self.prefix = prefix
729 def set_label(self, label):
730 """Set label and convert label to directory and prefix.
732 Examples
733 --------
735 * label='abc': (directory='.', prefix='abc')
736 * label='dir1/abc': (directory='dir1', prefix='abc')
737 * label=None: (directory='.', prefix=None)
738 """
739 self.label = label
741 def get_default_parameters(self):
742 return Parameters(copy.deepcopy(self.default_parameters))
744 def todict(self, skip_default=True):
745 defaults = self.get_default_parameters()
746 dct = {}
747 for key, value in self.parameters.items():
748 if hasattr(value, 'todict'):
749 value = value.todict()
750 if skip_default:
751 default = defaults.get(key, '_no_default_')
752 if default != '_no_default_' and equal(value, default):
753 continue
754 dct[key] = value
755 return dct
757 def reset(self):
758 """Clear all information from old calculation."""
760 self.atoms = None
761 self.results = {}
763 def read(self, label):
764 """Read atoms, parameters and calculated properties from output file.
766 Read result from self.label file. Raise ReadError if the file
767 is not there. If the file is corrupted or contains an error
768 message from the calculation, a ReadError should also be
769 raised. In case of success, these attributes must set:
771 atoms: Atoms object
772 The state of the atoms from last calculation.
773 parameters: Parameters object
774 The parameter dictionary.
775 results: dict
776 Calculated properties like energy and forces.
778 The FileIOCalculator.read() method will typically read atoms
779 and parameters and get the results dict by calling the
780 read_results() method."""
782 self.set_label(label)
784 def get_atoms(self):
785 if self.atoms is None:
786 raise ValueError('Calculator has no atoms')
787 atoms = self.atoms.copy()
788 atoms.calc = self
789 return atoms
791 @classmethod
792 def read_atoms(cls, restart, **kwargs):
793 return cls(restart=restart, label=restart, **kwargs).get_atoms()
795 def set(self, **kwargs):
796 """Set parameters like set(key1=value1, key2=value2, ...).
798 A dictionary containing the parameters that have been changed
799 is returned.
801 Subclasses must implement a set() method that will look at the
802 chaneged parameters and decide if a call to reset() is needed.
803 If the changed parameters are harmless, like a change in
804 verbosity, then there is no need to call reset().
806 The special keyword 'parameters' can be used to read
807 parameters from a file."""
809 if 'parameters' in kwargs:
810 filename = kwargs.pop('parameters')
811 parameters = Parameters.read(filename)
812 parameters.update(kwargs)
813 kwargs = parameters
815 changed_parameters = {}
817 for key, value in kwargs.items():
818 oldvalue = self.parameters.get(key)
819 if key not in self.parameters or not equal(value, oldvalue):
820 changed_parameters[key] = value
821 self.parameters[key] = value
823 if self.discard_results_on_any_change and changed_parameters:
824 self.reset()
825 return changed_parameters
827 def check_state(self, atoms, tol=1e-15):
828 """Check for any system changes since last calculation."""
829 return compare_atoms(
830 self.atoms,
831 atoms,
832 tol=tol,
833 excluded_properties=set(self.ignored_changes),
834 )
836 def calculate(
837 self, atoms=None, properties=['energy'], system_changes=all_changes
838 ):
839 """Do the calculation.
841 properties: list of str
842 List of what needs to be calculated. Can be any combination
843 of 'energy', 'forces', 'stress', 'dipole', 'charges', 'magmom'
844 and 'magmoms'.
845 system_changes: list of str
846 List of what has changed since last calculation. Can be
847 any combination of these six: 'positions', 'numbers', 'cell',
848 'pbc', 'initial_charges' and 'initial_magmoms'.
850 Subclasses need to implement this, but can ignore properties
851 and system_changes if they want. Calculated properties should
852 be inserted into results dictionary like shown in this dummy
853 example::
855 self.results = {'energy': 0.0,
856 'forces': np.zeros((len(atoms), 3)),
857 'stress': np.zeros(6),
858 'dipole': np.zeros(3),
859 'charges': np.zeros(len(atoms)),
860 'magmom': 0.0,
861 'magmoms': np.zeros(len(atoms))}
863 The subclass implementation should first call this
864 implementation to set the atoms attribute and create any missing
865 directories.
866 """
867 if atoms is not None:
868 self.atoms = atoms.copy()
869 if not os.path.isdir(self._directory):
870 try:
871 os.makedirs(self._directory)
872 except FileExistsError as e:
873 # We can only end up here in case of a race condition if
874 # multiple Calculators are running concurrently *and* use the
875 # same _directory, which cannot be expected to work anyway.
876 msg = (
877 'Concurrent use of directory '
878 + self._directory
879 + 'by multiple Calculator instances detected. Please '
880 'use one directory per instance.'
881 )
882 raise RuntimeError(msg) from e
884 @deprecated('Please use `ase.calculators.fd.FiniteDifferenceCalculator`.')
885 def calculate_numerical_forces(self, atoms, d=0.001):
886 """Calculate numerical forces using finite difference.
888 All atoms will be displaced by +d and -d in all directions.
890 .. deprecated:: 3.24.0
891 """
892 from ase.calculators.fd import calculate_numerical_forces
894 return calculate_numerical_forces(atoms, eps=d)
896 @deprecated('Please use `ase.calculators.fd.FiniteDifferenceCalculator`.')
897 def calculate_numerical_stress(self, atoms, d=1e-6, voigt=True):
898 """Calculate numerical stress using finite difference.
900 .. deprecated:: 3.24.0
901 """
902 from ase.calculators.fd import calculate_numerical_stress
904 return calculate_numerical_stress(atoms, eps=d, voigt=voigt)
906 def _deprecated_get_spin_polarized(self):
907 msg = (
908 'This calculator does not implement get_spin_polarized(). '
909 'In the future, calc.get_spin_polarized() will work only on '
910 'calculator classes that explicitly implement this method or '
911 'inherit the method via specialized subclasses.'
912 )
913 warnings.warn(msg, FutureWarning)
914 return False
916 def band_structure(self):
917 """Create band-structure object for plotting."""
918 from ase.spectrum.band_structure import get_band_structure
920 # XXX This calculator is supposed to just have done a band structure
921 # calculation, but the calculator may not have the correct Fermi level
922 # if it updated the Fermi level after changing k-points.
923 # This will be a problem with some calculators (currently GPAW), and
924 # the user would have to override this by providing the Fermi level
925 # from the selfconsistent calculation.
926 return get_band_structure(calc=self)
929class OldShellProfile:
930 def __init__(self, command):
931 self.command = command
932 self.configvars = {}
934 def execute(self, calc):
935 if self.command is None:
936 raise EnvironmentError(
937 'Please set ${} environment variable '.format(
938 'ASE_' + self.calc.upper() + '_COMMAND'
939 )
940 + 'or supply the command keyword'
941 )
942 command = self.command
943 if 'PREFIX' in command:
944 command = command.replace('PREFIX', calc.prefix)
946 try:
947 proc = subprocess.Popen(command, shell=True, cwd=calc.directory)
948 except OSError as err:
949 # Actually this may never happen with shell=True, since
950 # probably the shell launches successfully. But we soon want
951 # to allow calling the subprocess directly, and then this
952 # distinction (failed to launch vs failed to run) is useful.
953 msg = f'Failed to execute "{command}"'
954 raise EnvironmentError(msg) from err
956 errorcode = proc.wait()
958 if errorcode:
959 path = os.path.abspath(calc.directory)
960 msg = (
961 'Calculator "{}" failed with command "{}" failed in '
962 '{} with error code {}'.format(
963 calc.name, command, path, errorcode
964 )
965 )
966 raise CalculationFailed(msg)
969@dataclass
970class FileIORules:
971 """Rules for controlling streams options to external command.
973 FileIOCalculator will direct stdin and stdout and append arguments
974 to the calculator command using the specifications on this class.
976 Currently names can contain "{prefix}" which will be substituted by
977 calc.prefix. This will go away if/when we can remove prefix."""
978 extend_argv: Sequence[str] = tuple()
979 stdin_name: str | None = None
980 stdout_name: str | None = None
982 configspec: dict[str, Any] = field(default_factory=dict)
984 def load_config(self, section):
985 dct = {}
986 for key, value in self.configspec.items():
987 if key in section:
988 value = section[key]
989 dct[key] = value
990 return dct
993class BadConfiguration(Exception):
994 pass
997def _validate_command(command: str) -> str:
998 # We like to store commands as strings (and call shlex.split() later),
999 # but we also like to validate them early. This will error out if
1000 # command contains syntax problems and will also normalize e.g.
1001 # multiple spaces:
1002 try:
1003 return shlex.join(shlex.split(command))
1004 except ValueError as err:
1005 raise BadConfiguration('Cannot parse command string') from err
1008@dataclass
1009class StandardProfile:
1010 command: str
1011 configvars: dict[str, Any] = field(default_factory=dict)
1013 def __post_init__(self):
1014 self.command = _validate_command(self.command)
1016 def execute(self, calc):
1017 try:
1018 self._call(calc, subprocess.check_call)
1019 except subprocess.CalledProcessError as err:
1020 directory = Path(calc.directory).resolve()
1021 msg = (f'Calculator {calc.name} failed with args {err.args} '
1022 f'in directory {directory}')
1023 raise CalculationFailed(msg) from err
1025 def execute_nonblocking(self, calc):
1026 return self._call(calc, subprocess.Popen)
1028 @property
1029 def _split_command(self):
1030 # XXX Unduplicate common stuff between StandardProfile and
1031 # that of GenericFileIO
1032 return shlex.split(self.command)
1034 def _call(self, calc, subprocess_function):
1035 from contextlib import ExitStack
1037 directory = Path(calc.directory).resolve()
1038 fileio_rules = calc.fileio_rules
1040 with ExitStack() as stack:
1042 def _maybe_open(name, mode):
1043 if name is None:
1044 return None
1046 name = name.format(prefix=calc.prefix)
1047 directory = Path(calc.directory)
1048 return stack.enter_context(open(directory / name, mode))
1050 stdout_fd = _maybe_open(fileio_rules.stdout_name, 'wb')
1051 stdin_fd = _maybe_open(fileio_rules.stdin_name, 'rb')
1053 argv = [*self._split_command, *fileio_rules.extend_argv]
1054 argv = [arg.format(prefix=calc.prefix) for arg in argv]
1055 return subprocess_function(
1056 argv, cwd=directory,
1057 stdout=stdout_fd,
1058 stdin=stdin_fd)
1061class FileIOCalculator(Calculator):
1062 """Base class for calculators that write/read input/output files."""
1064 # Static specification of rules for this calculator:
1065 fileio_rules: FileIORules | None = None
1067 # command: Optional[str] = None
1068 # 'Command used to start calculation'
1070 # Fallback command when nothing else is specified.
1071 # There will be no fallback in the future; it must be explicitly
1072 # configured.
1073 _legacy_default_command: str | None = None
1075 cfg = _cfg # Ensure easy access to config for subclasses
1077 @classmethod
1078 def ruleset(cls, *args, **kwargs):
1079 """Helper for subclasses to define FileIORules."""
1080 return FileIORules(*args, **kwargs)
1082 def __init__(
1083 self,
1084 restart=None,
1085 ignore_bad_restart_file=Calculator._deprecated,
1086 label=None,
1087 atoms=None,
1088 command=None,
1089 profile=None,
1090 **kwargs,
1091 ):
1092 """File-IO calculator.
1094 command: str
1095 Command used to start calculation.
1096 """
1098 super().__init__(restart, ignore_bad_restart_file, label, atoms,
1099 **kwargs)
1101 if profile is None:
1102 profile = self._initialize_profile(command)
1103 self.profile = profile
1105 @property
1106 def command(self):
1107 # XXX deprecate me
1108 #
1109 # This is for calculators that invoke Popen directly on
1110 # self.command instead of letting us (superclass) do it.
1111 return self.profile.command
1113 @command.setter
1114 def command(self, command):
1115 self.profile.command = command
1117 @classmethod
1118 def load_argv_profile(cls, cfg, section_name):
1119 # Helper method to load configuration.
1120 # This is used by the tests, do not rely on this as it will change.
1121 try:
1122 section = cfg.parser[section_name]
1123 except KeyError:
1124 raise BadConfiguration(f'No {section_name!r} section')
1126 if cls.fileio_rules is not None:
1127 configvars = cls.fileio_rules.load_config(section)
1128 else:
1129 configvars = {}
1131 try:
1132 command = section['command']
1133 except KeyError:
1134 raise BadConfiguration(
1135 f'No command field in {section_name!r} section')
1137 return StandardProfile(command, configvars)
1139 def _initialize_profile(self, command):
1140 if command is None:
1141 name = 'ASE_' + self.name.upper() + '_COMMAND'
1142 command = self.cfg.get(name)
1144 if command is None and self.name in self.cfg.parser:
1145 return self.load_argv_profile(self.cfg, self.name)
1147 if command is None:
1148 # XXX issue a FutureWarning if this causes the command
1149 # to no longer be None
1150 command = self._legacy_default_command
1152 if command is None:
1153 raise EnvironmentError(
1154 f'No configuration of {self.name}. '
1155 f'Missing section [{self.name}] in configuration')
1157 return OldShellProfile(command)
1159 def calculate(
1160 self, atoms=None, properties=['energy'], system_changes=all_changes
1161 ):
1162 Calculator.calculate(self, atoms, properties, system_changes)
1163 self.write_input(self.atoms, properties, system_changes)
1164 self.execute()
1165 self.read_results()
1167 def execute(self):
1168 self.profile.execute(self)
1170 def write_input(self, atoms, properties=None, system_changes=None):
1171 """Write input file(s).
1173 Call this method first in subclasses so that directories are
1174 created automatically."""
1176 absdir = os.path.abspath(self.directory)
1177 if absdir != os.curdir and not os.path.isdir(self.directory):
1178 os.makedirs(self.directory)
1180 def read_results(self):
1181 """Read energy, forces, ... from output file(s)."""