Coverage for ase / optimize / optimize.py: 92.99%

214 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-02-04 10:20 +0000

1# fmt: off 

2 

3"""Structure optimization. """ 

4import time 

5import warnings 

6from collections.abc import Callable 

7from contextlib import contextmanager 

8from functools import cached_property 

9from os.path import isfile 

10from pathlib import Path 

11from typing import IO, Any 

12 

13from ase import Atoms 

14from ase.calculators.calculator import PropertyNotImplementedError 

15from ase.filters import UnitCellFilter 

16from ase.parallel import world 

17from ase.utils import IOContext 

18from ase.utils.abc import Optimizable 

19 

20DEFAULT_MAX_STEPS = 100_000_000 

21 

22 

23class RestartError(RuntimeError): 

24 pass 

25 

26 

27class OptimizableAtoms(Optimizable): 

28 def __init__(self, atoms): 

29 self.atoms = atoms 

30 

31 def get_x(self): 

32 return self.atoms.get_positions().ravel() 

33 

34 def set_x(self, x): 

35 self.atoms.set_positions(x.reshape(-1, 3)) 

36 

37 def get_gradient(self): 

38 return -self.atoms.get_forces().ravel() 

39 

40 @cached_property 

41 def _use_force_consistent_energy(self): 

42 # This boolean is in principle invalidated if the 

43 # calculator changes. This can lead to weird things 

44 # in multi-step optimizations. 

45 try: 

46 self.atoms.get_potential_energy(force_consistent=True) 

47 except PropertyNotImplementedError: 

48 # warnings.warn( 

49 # 'Could not get force consistent energy (\'free_energy\'). ' 

50 # 'Please make sure calculator provides \'free_energy\', even ' 

51 # 'if equal to the ordinary energy. ' 

52 # 'This will raise an error in future versions of ASE.', 

53 # FutureWarning) 

54 return False 

55 else: 

56 return True 

57 

58 def get_value(self): 

59 force_consistent = self._use_force_consistent_energy 

60 return self.atoms.get_potential_energy( 

61 force_consistent=force_consistent) 

62 

63 def iterimages(self): 

64 # XXX document purpose of iterimages 

65 return self.atoms.iterimages() 

66 

67 def ndofs(self): 

68 return 3 * len(self.atoms) 

69 

70 

71class Log: 

72 def __init__(self, logfile, comm): 

73 self._logfile = logfile 

74 self._comm = comm 

75 

76 def write(self, msg): 

77 if self._logfile is None or self._comm.rank != 0: 

78 return 

79 

80 if hasattr(self._logfile, 'write'): 

81 self._logfile.write(msg) 

82 return 

83 

84 if self._logfile == '-': 

85 print(msg, end='') 

86 return 

87 

88 with open(self._logfile, mode='a') as fd: 

89 fd.write(msg) 

90 

91 def flush(self): 

92 warnings.warn('Log flushes by default now. Please do not call ' 

93 'flush(). If you want a different flushing behaviour ' 

94 'please open logfile yourself and choose ' 

95 'buffering mode as appropriate. ' 

96 'This flush() method currently does nothing.', 

97 FutureWarning) 

98 

99 

100class BaseDynamics(IOContext): 

101 """Common superclass for optimization and MD. 

102 

103 This class implements logfile, trajectory, and observer handling. 

104 It should do as little as possible other than that. 

105 

106 We should try to remove the "atoms" parameter from this class, 

107 since Optimizers only require Optimizables. 

108 """ 

109 def __init__( 

110 self, 

111 atoms: Atoms, 

112 logfile: IO | Path | str | None = None, 

113 trajectory: str | Path | None = None, 

114 append_trajectory: bool = False, 

115 master: bool | None = None, 

116 comm=world, 

117 *, 

118 loginterval: int = 1, 

119 ): 

120 self.atoms = atoms 

121 self.logfile = Log(logfile, comm=comm) 

122 self.observers: list[tuple[Callable, int, tuple, dict[str, Any]]] = [] 

123 self.nsteps = 0 

124 self.max_steps = 0 # to be updated in run or irun 

125 self.comm = comm 

126 

127 self._orig_trajectory = trajectory 

128 self._master = master 

129 

130 if trajectory is None or hasattr(trajectory, 'write'): 

131 # 'write' attribute is meant to check for a Trajectory 

132 # (could be BundleTrajectory). 

133 # There is no unified way to check for Trajectory. 

134 # In this case the trajectory is already open so we do not 

135 # open it. 

136 pass 

137 else: 

138 trajectory = Path(trajectory) 

139 if comm.rank == 0 and not append_trajectory: 

140 trajectory.unlink(missing_ok=True) 

141 

142 if trajectory is not None: 

143 self.attach(self._traj_write_image, interval=loginterval, 

144 description={'interval': loginterval}) 

145 

146 self.trajectory = trajectory 

147 

148 def todict(self) -> dict[str, Any]: 

149 raise NotImplementedError 

150 

151 @contextmanager 

152 def _opentraj(self): 

153 from ase.io.trajectory import Trajectory 

154 assert self._orig_trajectory is not None 

155 

156 if hasattr(self._orig_trajectory, 'write'): 

157 # Already an open trajectory, we are not responsible for open/close 

158 yield self._orig_trajectory 

159 return 

160 

161 with Trajectory( 

162 self._orig_trajectory, 

163 master=self._master, mode='a', comm=self.comm, 

164 ) as traj: 

165 yield traj 

166 

167 def _traj_write_image(self, description: dict): 

168 with self._opentraj() as traj: 

169 # The description is only written when we write a header, 

170 # and we probably only write the header when we need to 

171 # (in append mode) but I am not sure about that. 

172 traj.set_description(self.todict() | description) 

173 traj.write(self.atoms.__ase_optimizable__()) 

174 

175 def _traj_is_empty(self): 

176 with self._opentraj() as traj: 

177 return len(traj) == 0 

178 

179 def _get_gradient(self, forces=None): 

180 if forces is not None: 

181 warnings.warn('Please do not pass forces to step(). ' 

182 'This argument will be removed in ' 

183 'ase 3.28.0.') 

184 return -forces.ravel() 

185 return self.optimizable.get_gradient() 

186 

187 def get_number_of_steps(self): 

188 return self.nsteps 

189 

190 def insert_observer( 

191 self, function, position=0, interval=1, *args, **kwargs 

192 ): 

193 """Insert an observer. 

194 

195 This can be used for pre-processing before logging and dumping. 

196 

197 Examples 

198 -------- 

199 >>> from ase.build import bulk 

200 >>> from ase.calculators.emt import EMT 

201 >>> from ase.optimize import BFGS 

202 ... 

203 ... 

204 >>> def update_info(atoms, opt): 

205 ... atoms.info["nsteps"] = opt.nsteps 

206 ... 

207 ... 

208 >>> atoms = bulk("Cu", cubic=True) * 2 

209 >>> atoms.rattle() 

210 >>> atoms.calc = EMT() 

211 >>> with BFGS(atoms, logfile=None, trajectory="opt.traj") as opt: 

212 ... opt.insert_observer(update_info, atoms=atoms, opt=opt) 

213 ... opt.run(fmax=0.05, steps=10) 

214 True 

215 """ 

216 if not isinstance(function, Callable): 

217 function = function.write 

218 self.observers.insert(position, (function, interval, args, kwargs)) 

219 

220 def attach(self, function, interval=1, *args, **kwargs): 

221 """Attach callback function. 

222 

223 If *interval > 0*, at every *interval* steps, call *function* with 

224 arguments *args* and keyword arguments *kwargs*. 

225 

226 If *interval <= 0*, after step *interval*, call *function* with 

227 arguments *args* and keyword arguments *kwargs*. This is 

228 currently zero indexed.""" 

229 

230 if not isinstance(function, Callable): 

231 function = function.write 

232 self.observers.append((function, interval, args, kwargs)) 

233 

234 def call_observers(self): 

235 for function, interval, args, kwargs in self.observers: 

236 call = False 

237 # Call every interval iterations 

238 if interval > 0: 

239 if (self.nsteps % interval) == 0: 

240 call = True 

241 # Call only on iteration interval 

242 elif interval <= 0: 

243 if self.nsteps == abs(interval): 

244 call = True 

245 if call: 

246 function(*args, **kwargs) 

247 

248 

249class Dynamics(BaseDynamics): 

250 """Historical base-class for MD and structure optimization classes. 

251 

252 This inheritance level can probably be eliminated by merging it with 

253 Optimizer, since by now, the only significant thing it provides 

254 is the implementation of irun() suitable for optimizations. 

255 

256 This class is an implementation detail and may change or be removed. 

257 """ 

258 

259 def __init__(self, atoms: Atoms, *args, **kwargs): 

260 """Dynamics object. 

261 

262 Parameters 

263 ---------- 

264 atoms : Atoms object 

265 The Atoms object to operate on. 

266 

267 logfile : file object, Path, or str 

268 If *logfile* is a string, a file with that name will be opened. 

269 Use '-' for stdout. 

270 

271 trajectory : Trajectory object, str, or Path 

272 Attach a trajectory object. If *trajectory* is a string/Path, a 

273 Trajectory will be constructed. Use *None* for no trajectory. 

274 

275 append_trajectory : bool 

276 Defaults to False, which causes the trajectory file to be 

277 overwriten each time the dynamics is restarted from scratch. 

278 If True, the new structures are appended to the trajectory 

279 file instead. 

280 

281 master : bool 

282 Defaults to None, which causes only rank 0 to save files. If set to 

283 true, this rank will save files. 

284 

285 comm : Communicator object 

286 Communicator to handle parallel file reading and writing. 

287 

288 loginterval : int, default: 1 

289 Only write a log line for every *loginterval* time steps. 

290 """ 

291 super().__init__(atoms, *args, **kwargs) 

292 self.atoms = atoms 

293 self.optimizable = atoms.__ase_optimizable__() 

294 

295 def irun(self, steps=DEFAULT_MAX_STEPS): 

296 """Run dynamics algorithm as generator. 

297 

298 Parameters 

299 ---------- 

300 steps : int, default=DEFAULT_MAX_STEPS 

301 Number of dynamics steps to be run. 

302 

303 Yields 

304 ------ 

305 converged : bool 

306 True if the forces on atoms are converged. 

307 

308 Examples 

309 -------- 

310 This method allows, e.g., to run two optimizers or MD thermostats at 

311 the same time. 

312 >>> opt1 = BFGS(atoms) 

313 >>> opt2 = BFGS(StrainFilter(atoms)).irun() 

314 >>> for _ in opt2: 

315 ... opt1.run() 

316 """ 

317 

318 # update the maximum number of steps 

319 self.max_steps = self.nsteps + steps 

320 

321 # compute the initial step 

322 gradient = self.optimizable.get_gradient() 

323 

324 # log the initial step 

325 if self.nsteps == 0: 

326 self.log(gradient) 

327 

328 # we write a trajectory file if it is None 

329 if self.trajectory is None: 

330 self.call_observers() 

331 # We do not write on restart w/ an existing trajectory file 

332 # present. This duplicates the same entry twice 

333 elif self._traj_is_empty(): 

334 self.call_observers() 

335 

336 # check convergence 

337 gradient = self.optimizable.get_gradient() 

338 is_converged = self.converged(gradient) 

339 yield is_converged 

340 

341 # run the algorithm until converged or max_steps reached 

342 while not is_converged and self.nsteps < self.max_steps: 

343 # compute the next step 

344 self.step() 

345 self.nsteps += 1 

346 

347 # log the step 

348 gradient = self.optimizable.get_gradient() 

349 self.log(gradient) 

350 self.call_observers() 

351 

352 # check convergence 

353 gradient = self.optimizable.get_gradient() 

354 is_converged = self.converged(gradient) 

355 yield is_converged 

356 

357 def run(self, steps=DEFAULT_MAX_STEPS): 

358 """Run dynamics algorithm. 

359 

360 This method will return when the forces on all individual 

361 atoms are less than *fmax* or when the number of steps exceeds 

362 *steps*. 

363 

364 Parameters 

365 ---------- 

366 steps : int, default=DEFAULT_MAX_STEPS 

367 Number of dynamics steps to be run. 

368 

369 Returns 

370 ------- 

371 converged : bool 

372 True if the forces on atoms are converged. 

373 """ 

374 

375 for converged in Dynamics.irun(self, steps=steps): 

376 pass 

377 return converged 

378 

379 def converged(self, gradient): 

380 """" a dummy function as placeholder for a real criterion, e.g. in 

381 Optimizer """ 

382 return False 

383 

384 def log(self, *args, **kwargs): 

385 """ a dummy function as placeholder for a real logger, e.g. in 

386 Optimizer """ 

387 return True 

388 

389 def step(self): 

390 """this needs to be implemented by subclasses""" 

391 raise RuntimeError("step not implemented.") 

392 

393 

394class Optimizer(Dynamics): 

395 """Base-class for all structure optimization classes.""" 

396 

397 # default maxstep for all optimizers 

398 defaults = {'maxstep': 0.2} 

399 _deprecated = object() 

400 

401 def __init__( 

402 self, 

403 atoms: Atoms, 

404 restart: str | Path | None = None, 

405 logfile: IO | str | Path | None = None, 

406 trajectory: str | Path | None = None, 

407 append_trajectory: bool = False, 

408 **kwargs, 

409 ): 

410 """ 

411 

412 Parameters 

413 ---------- 

414 atoms: :class:`~ase.Atoms` 

415 The Atoms object to relax. 

416 

417 restart: str | Path | None 

418 Filename for restart file. Default value is *None*. 

419 

420 logfile: file object, Path, or str 

421 If *logfile* is a string, a file with that name will be opened. 

422 Use '-' for stdout. 

423 

424 trajectory: Trajectory object, Path, or str 

425 Attach trajectory object. If *trajectory* is a string a 

426 Trajectory will be constructed. Use *None* for no 

427 trajectory. 

428 

429 append_trajectory: bool 

430 Appended to the trajectory file instead of overwriting it. 

431 

432 kwargs : dict, optional 

433 Extra arguments passed to :class:`~ase.optimize.optimize.Dynamics`. 

434 

435 """ 

436 super().__init__( 

437 atoms=atoms, 

438 logfile=logfile, 

439 trajectory=trajectory, 

440 append_trajectory=append_trajectory, 

441 **kwargs, 

442 ) 

443 

444 self.restart = restart 

445 

446 self.fmax = None 

447 

448 if restart is None or not isfile(restart): 

449 self.initialize() 

450 else: 

451 self.read() 

452 self.comm.barrier() 

453 

454 def read(self): 

455 raise NotImplementedError 

456 

457 def todict(self): 

458 description = { 

459 'type': 'optimization', 

460 'optimizer': self.__class__.__name__, 

461 'restart': None if self.restart is None else str(self.restart), 

462 } 

463 # add custom attributes from subclasses 

464 for attr in ('maxstep', 'alpha', 'max_steps', 'fmax'): 

465 if hasattr(self, attr): 

466 description.update({attr: getattr(self, attr)}) 

467 return description 

468 

469 def initialize(self): 

470 pass 

471 

472 def irun(self, fmax=0.05, steps=DEFAULT_MAX_STEPS): 

473 """Run optimizer as generator. 

474 

475 Parameters 

476 ---------- 

477 fmax : float 

478 Convergence criterion of the forces on atoms. 

479 steps : int, default=DEFAULT_MAX_STEPS 

480 Number of optimizer steps to be run. 

481 

482 Yields 

483 ------ 

484 converged : bool 

485 True if the forces on atoms are converged. 

486 """ 

487 self.fmax = fmax 

488 return Dynamics.irun(self, steps=steps) 

489 

490 def run(self, fmax=0.05, steps=DEFAULT_MAX_STEPS): 

491 """Run optimizer. 

492 

493 Parameters 

494 ---------- 

495 fmax : float 

496 Convergence criterion of the forces on atoms. 

497 steps : int, default=DEFAULT_MAX_STEPS 

498 Number of optimizer steps to be run. 

499 

500 Returns 

501 ------- 

502 converged : bool 

503 True if the forces on atoms are converged. 

504 """ 

505 self.fmax = fmax 

506 return Dynamics.run(self, steps=steps) 

507 

508 def converged(self, gradient): 

509 """Did the optimization converge?""" 

510 assert gradient.ndim == 1 

511 return self.optimizable.converged(gradient, self.fmax) 

512 

513 def log(self, gradient): 

514 fmax = self.optimizable.gradient_norm(gradient) 

515 e = self.optimizable.get_value() 

516 T = time.localtime() 

517 if self.logfile is not None: 

518 name = self.__class__.__name__ 

519 if self.nsteps == 0: 

520 args = (" " * len(name), "Step", "Time", "Energy", "fmax") 

521 msg = "%s %4s %8s %15s %12s\n" % args 

522 self.logfile.write(msg) 

523 

524 args = (name, self.nsteps, T[3], T[4], T[5], e, fmax) 

525 msg = "%s: %3d %02d:%02d:%02d %15.6f %15.6f\n" % args 

526 self.logfile.write(msg) 

527 

528 def dump(self, data): 

529 from ase.io.jsonio import write_json 

530 if self.comm.rank == 0 and self.restart is not None: 

531 with open(self.restart, 'w') as fd: 

532 write_json(fd, data) 

533 

534 def load(self): 

535 from ase.io.jsonio import read_json 

536 with open(self.restart) as fd: 

537 try: 

538 from ase.optimize import BFGS 

539 if not isinstance(self, BFGS) and isinstance( 

540 self.atoms, UnitCellFilter 

541 ): 

542 warnings.warn( 

543 "WARNING: restart function is untested and may result " 

544 "in unintended behavior. Namely orig_cell is not " 

545 "loaded in the UnitCellFilter. Please test on your own" 

546 " to ensure consistent results." 

547 ) 

548 return read_json(fd, always_array=False) 

549 except Exception as ex: 

550 msg = ('Could not decode restart file as JSON. ' 

551 'You may need to delete the restart file ' 

552 f'{self.restart}') 

553 raise RestartError(msg) from ex