Coverage for /builds/ase/ase/ase/calculators/vasp/interactive.py: 20.65%
92 statements
« prev ^ index » next coverage.py v7.5.3, created at 2025-08-02 00:12 +0000
« prev ^ index » next coverage.py v7.5.3, created at 2025-08-02 00:12 +0000
1# fmt: off
3import os
4import time
5from subprocess import PIPE, Popen
7from ase.calculators.calculator import Calculator
8from ase.config import cfg
9from ase.io import read
11from .create_input import GenerateVaspInput
14class VaspInteractive(GenerateVaspInput, Calculator): # type: ignore[misc]
15 name = "VaspInteractive"
16 implemented_properties = ['energy', 'forces', 'stress']
18 mandatory_input = {'potim': 0.0,
19 'ibrion': -1,
20 'interactive': True,
21 }
23 default_input = {'nsw': 2000,
24 }
26 def __init__(self, txt="interactive.log", print_log=False, process=None,
27 command=None, path="./", **kwargs):
29 GenerateVaspInput.__init__(self)
31 for kw, val in self.mandatory_input.items():
32 if kw in kwargs and val != kwargs[kw]:
33 raise ValueError('Keyword {} cannot be overridden! '
34 'It must have have value {}, but {} '
35 'was provided instead.'.format(kw, val,
36 kwargs[kw]))
37 kwargs.update(self.mandatory_input)
39 for kw, val in self.default_input.items():
40 if kw not in kwargs:
41 kwargs[kw] = val
43 self.set(**kwargs)
45 self.process = process
46 self.path = path
48 if txt is not None:
49 self.txt = open(txt, "a")
50 else:
51 self.txt = None
52 self.print_log = print_log
54 if command is not None:
55 self.command = command
56 elif 'VASP_COMMAND' in cfg:
57 self.command = cfg['VASP_COMMAND']
58 elif 'VASP_SCRIPT' in cfg:
59 self.command = cfg['VASP_SCRIPT']
60 else:
61 raise RuntimeError('Please set either command in calculator'
62 ' or VASP_COMMAND environment variable')
64 if isinstance(self.command, str):
65 self.command = self.command.split()
67 self.atoms = None
69 def _stdin(self, text, ending="\n"):
70 if self.txt is not None:
71 self.txt.write(text + ending)
72 if self.print_log:
73 print(text, end=ending)
74 self.process.stdin.write(text + ending)
75 self.process.stdin.flush()
77 def _stdout(self, text):
78 if self.txt is not None:
79 self.txt.write(text)
80 if self.print_log:
81 print(text, end="")
83 def _run_vasp(self, atoms):
84 if self.process is None:
85 stopcar = os.path.join(self.path, 'STOPCAR')
86 if os.path.isfile(stopcar):
87 os.remove(stopcar)
88 self._stdout("Writing VASP input files\n")
89 self.initialize(atoms)
90 self.write_input(atoms, directory=self.path)
91 self._stdout("Starting VASP for initial step...\n")
92 self.process = Popen(self.command, stdout=PIPE,
93 stdin=PIPE, stderr=PIPE, cwd=self.path,
94 universal_newlines=True)
95 else:
96 self._stdout("Inputting positions...\n")
97 for atom in atoms.get_scaled_positions():
98 self._stdin(' '.join(map('{:19.16f}'.format, atom)))
100 while self.process.poll() is None:
101 text = self.process.stdout.readline()
102 self._stdout(text)
103 if "POSITIONS: reading from stdin" in text:
104 return
106 # If we've reached this point, then VASP has exited without asking for
107 # new positions, meaning it either exited without error unexpectedly,
108 # or it exited with an error. Either way, we need to raise an error.
110 raise RuntimeError("VASP exited unexpectedly with exit code {}"
111 "".format(self.process.poll()))
113 def close(self):
114 if self.process is None:
115 return
117 self._stdout('Attemping to close VASP cleanly\n')
118 with open(os.path.join(self.path, 'STOPCAR'), 'w') as stopcar:
119 stopcar.write('LABORT = .TRUE.')
121 self._run_vasp(self.atoms)
122 self._run_vasp(self.atoms)
123 while self.process.poll() is None:
124 time.sleep(1)
125 self._stdout("VASP has been closed\n")
126 self.process = None
128 def calculate(self, atoms=None, properties=['energy'],
129 system_changes=['positions', 'numbers', 'cell']):
130 Calculator.calculate(self, atoms, properties, system_changes)
132 if not system_changes:
133 return
135 if 'numbers' in system_changes:
136 self.close()
138 self._run_vasp(atoms)
140 new = read(os.path.join(self.path, 'vasprun.xml'), index=-1)
142 self.results = {
143 'free_energy': new.get_potential_energy(force_consistent=True),
144 'energy': new.get_potential_energy(),
145 'forces': new.get_forces()[self.resort],
146 'stress': new.get_stress()}
148 def __del__(self):
149 self.close()