Coverage for /builds/ase/ase/ase/io/trajectory.py: 89.89%

277 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2025-08-02 00:12 +0000

1# fmt: off 

2 

3"""Trajectory""" 

4import contextlib 

5import io 

6import warnings 

7from typing import Tuple 

8 

9import numpy as np 

10 

11from ase import __version__ 

12from ase.atoms import Atoms 

13from ase.calculators.calculator import PropertyNotImplementedError 

14from ase.calculators.singlepoint import SinglePointCalculator, all_properties 

15from ase.io.formats import is_compressed 

16from ase.io.jsonio import decode, encode 

17from ase.io.pickletrajectory import PickleTrajectory 

18from ase.parallel import world 

19from ase.utils import tokenize_version 

20 

21__all__ = ['Trajectory', 'PickleTrajectory'] 

22 

23 

24def Trajectory(filename, mode='r', atoms=None, properties=None, master=None, 

25 comm=world): 

26 """A Trajectory can be created in read, write or append mode. 

27 

28 Parameters: 

29 

30 filename: str | Path 

31 The name/path of the file. Traditionally ends in .traj. 

32 mode: str 

33 The mode. 'r' is read mode, the file should already exist, and 

34 no atoms argument should be specified. 

35 'w' is write mode. The atoms argument specifies the Atoms 

36 object to be written to the file, if not given it must instead 

37 be given as an argument to the write() method. 

38 'a' is append mode. It acts as write mode, except that 

39 data is appended to a preexisting file. 

40 atoms: Atoms object 

41 The Atoms object to be written in write or append mode. 

42 properties: list of str 

43 If specified, these calculator properties are saved in the 

44 trajectory. If not specified, all supported quantities are 

45 saved. Possible values: energy, forces, stress, dipole, 

46 charges, magmom and magmoms. 

47 master: bool 

48 Controls which process does the actual writing. The 

49 default is that process number 0 does this. If this 

50 argument is given, processes where it is True will write. 

51 comm: Communicator object 

52 Communicator to handle parallel file reading and writing. 

53 

54 The atoms, properties and master arguments are ignored in read mode. 

55 """ 

56 if mode == 'r': 

57 return TrajectoryReader(filename) 

58 return TrajectoryWriter(filename, mode, atoms, properties, master=master, 

59 comm=comm) 

60 

61 

62class TrajectoryWriter: 

63 """Writes Atoms objects to a .traj file.""" 

64 

65 def __init__(self, filename, mode='w', atoms=None, properties=None, 

66 master=None, comm=world): 

67 """A Trajectory writer, in write or append mode. 

68 

69 Parameters: 

70 

71 filename: str | Path 

72 The name of the file. Traditionally ends in .traj. 

73 mode: str 

74 The mode. 'r' is read mode, the file should already exist, and 

75 no atoms argument should be specified. 

76 'w' is write mode. The atoms argument specifies the Atoms 

77 object to be written to the file, if not given it must instead 

78 be given as an argument to the write() method. 

79 'a' is append mode. It acts as write mode, except that 

80 data is appended to a preexisting file. 

81 atoms: Atoms object 

82 The Atoms object to be written in write or append mode. 

83 properties: list of str 

84 If specified, these calculator properties are saved in the 

85 trajectory. If not specified, all supported quantities are 

86 saved. Possible values: energy, forces, stress, dipole, 

87 charges, magmom and magmoms. 

88 master: bool 

89 Controls which process does the actual writing. The 

90 default is that process number 0 does this. If this 

91 argument is given, processes where it is True will write. 

92 comm: MPI communicator 

93 MPI communicator for this trajectory writer, by default world. 

94 Passing a different communicator facilitates writing of 

95 different trajectories on different MPI ranks. 

96 """ 

97 if master is None: 

98 master = comm.rank == 0 

99 

100 self.filename = filename 

101 self.mode = mode 

102 self.atoms = atoms 

103 self.properties = properties 

104 self.master = master 

105 self.comm = comm 

106 

107 self.description = {} 

108 self.header_data = None 

109 self.multiple_headers = False 

110 

111 self._open(filename, mode) 

112 

113 def __enter__(self): 

114 return self 

115 

116 def __exit__(self, exc_type, exc_value, tb): 

117 self.close() 

118 

119 def set_description(self, description): 

120 self.description.update(description) 

121 

122 def _open(self, filename, mode): 

123 import ase.io.ulm as ulm 

124 if mode not in 'aw': 

125 raise ValueError('mode must be "w" or "a".') 

126 if self.master: 

127 self.backend = ulm.open(filename, mode, tag='ASE-Trajectory') 

128 if len(self.backend) > 0 and mode == 'a': 

129 with Trajectory(filename) as traj: 

130 atoms = traj[0] 

131 self.header_data = get_header_data(atoms) 

132 else: 

133 self.backend = ulm.DummyWriter() 

134 

135 def write(self, atoms=None, **kwargs): 

136 """Write the atoms to the file. 

137 

138 If the atoms argument is not given, the atoms object specified 

139 when creating the trajectory object is used. 

140 

141 Use keyword arguments to add extra properties:: 

142 

143 writer.write(atoms, energy=117, dipole=[0, 0, 1.0]) 

144 """ 

145 if atoms is None: 

146 atoms = self.atoms 

147 

148 for image in atoms.iterimages(): 

149 self._write_atoms(image, **kwargs) 

150 

151 def _write_atoms(self, atoms, **kwargs): 

152 b = self.backend 

153 

154 if self.header_data is None: 

155 b.write(version=1, ase_version=__version__) 

156 if self.description: 

157 b.write(description=self.description) 

158 # Atomic numbers and periodic boundary conditions are written 

159 # in the header in the beginning. 

160 # 

161 # If an image later on has other numbers/pbc, we write a new 

162 # header. All subsequent images will then have their own header 

163 # whether or not their numbers/pbc change. 

164 self.header_data = get_header_data(atoms) 

165 write_header = True 

166 else: 

167 if not self.multiple_headers: 

168 header_data = get_header_data(atoms) 

169 self.multiple_headers = not headers_equal(self.header_data, 

170 header_data) 

171 write_header = self.multiple_headers 

172 

173 write_atoms(b, atoms, write_header=write_header) 

174 

175 calc = atoms.calc 

176 

177 if calc is None and len(kwargs) > 0: 

178 calc = SinglePointCalculator(atoms) 

179 

180 if calc is not None: 

181 if not hasattr(calc, 'get_property'): 

182 calc = OldCalculatorWrapper(calc) 

183 c = b.child('calculator') 

184 c.write(name=calc.name) 

185 if hasattr(calc, 'todict'): 

186 c.write(parameters=calc.todict()) 

187 for prop in all_properties: 

188 if prop in kwargs: 

189 x = kwargs[prop] 

190 else: 

191 if self.properties is not None: 

192 if prop in self.properties: 

193 x = calc.get_property(prop, atoms) 

194 else: 

195 x = None 

196 else: 

197 try: 

198 x = calc.get_property(prop, atoms, 

199 allow_calculation=False) 

200 except (PropertyNotImplementedError, KeyError): 

201 # KeyError is needed for Jacapo. 

202 # XXX We can perhaps remove this. 

203 x = None 

204 if x is not None: 

205 if prop in ['stress', 'dipole']: 

206 x = x.tolist() 

207 c.write(prop, x) 

208 

209 info = {} 

210 for key, value in atoms.info.items(): 

211 try: 

212 encode(value) 

213 except TypeError: 

214 warnings.warn(f'Skipping "{key}" info.') 

215 else: 

216 info[key] = value 

217 if info: 

218 b.write(info=info) 

219 

220 b.sync() 

221 

222 def close(self): 

223 """Close the trajectory file.""" 

224 self.backend.close() 

225 

226 def __len__(self): 

227 return self.comm.sum_scalar(len(self.backend)) 

228 

229 

230class TrajectoryReader: 

231 """Reads Atoms objects from a .traj file.""" 

232 

233 def __init__(self, filename): 

234 """A Trajectory in read mode. 

235 

236 The filename traditionally ends in .traj. 

237 """ 

238 self.filename = filename 

239 self.numbers = None 

240 self.pbc = None 

241 self.masses = None 

242 

243 self._open(filename) 

244 

245 def __enter__(self): 

246 return self 

247 

248 def __exit__(self, exc_type, exc_value, tb): 

249 self.close() 

250 

251 def _open(self, filename): 

252 import ase.io.ulm as ulm 

253 self.backend = ulm.open(filename, 'r') 

254 self._read_header() 

255 

256 def _read_header(self): 

257 b = self.backend 

258 if b.get_tag() != 'ASE-Trajectory': 

259 raise OSError('This is not a trajectory file!') 

260 

261 if len(b) > 0: 

262 self.pbc = b.pbc 

263 self.numbers = b.numbers 

264 self.masses = b.get('masses') 

265 self.constraints = b.get('constraints', '[]') 

266 self.description = b.get('description') 

267 self.version = b.version 

268 self.ase_version = b.get('ase_version') 

269 

270 def close(self): 

271 """Close the trajectory file.""" 

272 self.backend.close() 

273 

274 def __getitem__(self, i=-1): 

275 if isinstance(i, slice): 

276 return SlicedTrajectory(self, i) 

277 b = self.backend[i] 

278 if 'numbers' in b: 

279 # numbers and other header info was written alongside the image: 

280 atoms = read_atoms(b, traj=self) 

281 else: 

282 # header info was not written because they are the same: 

283 atoms = read_atoms(b, 

284 header=[self.pbc, self.numbers, self.masses, 

285 self.constraints], 

286 traj=self) 

287 if 'calculator' in b: 

288 results = {} 

289 implemented_properties = [] 

290 c = b.calculator 

291 for prop in all_properties: 

292 if prop in c: 

293 results[prop] = c.get(prop) 

294 implemented_properties.append(prop) 

295 calc = SinglePointCalculator(atoms, **results) 

296 calc.name = b.calculator.name 

297 calc.implemented_properties = implemented_properties 

298 

299 if 'parameters' in c: 

300 calc.parameters.update(c.parameters) 

301 atoms.calc = calc 

302 

303 return atoms 

304 

305 def __len__(self): 

306 return len(self.backend) 

307 

308 def __iter__(self): 

309 for i in range(len(self)): 

310 yield self[i] 

311 

312 

313class SlicedTrajectory: 

314 """Wrapper to return a slice from a trajectory without loading 

315 from disk. Initialize with a trajectory (in read mode) and the 

316 desired slice object.""" 

317 

318 def __init__(self, trajectory, sliced): 

319 self.trajectory = trajectory 

320 self.map = range(len(self.trajectory))[sliced] 

321 

322 def __getitem__(self, i): 

323 if isinstance(i, slice): 

324 # Map directly to the original traj, not recursively. 

325 traj = SlicedTrajectory(self.trajectory, slice(0, None)) 

326 traj.map = self.map[i] 

327 return traj 

328 return self.trajectory[self.map[i]] 

329 

330 def __len__(self): 

331 return len(self.map) 

332 

333 

334def get_header_data(atoms): 

335 return {'pbc': atoms.pbc.copy(), 

336 'numbers': atoms.get_atomic_numbers(), 

337 'masses': atoms.get_masses() if atoms.has('masses') else None, 

338 'constraints': list(atoms.constraints)} 

339 

340 

341def headers_equal(headers1, headers2): 

342 assert len(headers1) == len(headers2) 

343 eq = True 

344 for key in headers1: 

345 eq &= np.array_equal(headers1[key], headers2[key]) 

346 return eq 

347 

348 

349class VersionTooOldError(Exception): 

350 pass 

351 

352 

353def read_atoms(backend, 

354 header: Tuple = None, 

355 traj: TrajectoryReader = None, 

356 _try_except: bool = True) -> Atoms: 

357 from ase.constraints import dict2constraint 

358 

359 if _try_except: 

360 try: 

361 return read_atoms(backend, header, traj, False) 

362 except Exception as ex: 

363 if (traj is not None and tokenize_version(__version__) < 

364 tokenize_version(traj.ase_version)): 

365 msg = ('You are trying to read a trajectory file written ' 

366 f'by ASE-{traj.ase_version} from ASE-{__version__}. ' 

367 'It might help to update your ASE') 

368 raise VersionTooOldError(msg) from ex 

369 else: 

370 raise 

371 

372 b = backend 

373 if header: 

374 pbc, numbers, masses, constraints = header 

375 else: 

376 pbc = b.pbc 

377 numbers = b.numbers 

378 masses = b.get('masses') 

379 constraints = b.get('constraints', '[]') 

380 

381 atoms = Atoms(positions=b.positions, 

382 numbers=numbers, 

383 cell=b.cell, 

384 masses=masses, 

385 pbc=pbc, 

386 info=b.get('info'), 

387 constraint=[dict2constraint(d) 

388 for d in decode(constraints)], 

389 momenta=b.get('momenta'), 

390 magmoms=b.get('magmoms'), 

391 charges=b.get('charges'), 

392 tags=b.get('tags')) 

393 return atoms 

394 

395 

396def write_atoms(backend, atoms, write_header=True): 

397 b = backend 

398 

399 if write_header: 

400 b.write(pbc=atoms.pbc.tolist(), 

401 numbers=atoms.numbers) 

402 if atoms.constraints: 

403 if all(hasattr(c, 'todict') for c in atoms.constraints): 

404 b.write(constraints=encode(atoms.constraints)) 

405 

406 if atoms.has('masses'): 

407 b.write(masses=atoms.get_masses()) 

408 

409 b.write(positions=atoms.get_positions(), 

410 cell=atoms.get_cell().tolist()) 

411 

412 if atoms.has('tags'): 

413 b.write(tags=atoms.get_tags()) 

414 if atoms.has('momenta'): 

415 b.write(momenta=atoms.get_momenta()) 

416 if atoms.has('initial_magmoms'): 

417 b.write(magmoms=atoms.get_initial_magnetic_moments()) 

418 if atoms.has('initial_charges'): 

419 b.write(charges=atoms.get_initial_charges()) 

420 

421 

422def read_traj(fd, index): 

423 trj = TrajectoryReader(fd) 

424 for i in range(*index.indices(len(trj))): 

425 yield trj[i] 

426 

427 

428@contextlib.contextmanager 

429def defer_compression(fd): 

430 """Defer the file compression until all the configurations are read.""" 

431 # We do this because the trajectory and compressed-file 

432 # internals do not play well together. 

433 # Be advised not to defer compression of very long trajectories 

434 # as they use a lot of memory. 

435 if is_compressed(fd): 

436 with io.BytesIO() as bytes_io: 

437 try: 

438 # write the uncompressed data to the buffer 

439 yield bytes_io 

440 finally: 

441 # write the buffered data to the compressed file 

442 bytes_io.seek(0) 

443 fd.write(bytes_io.read()) 

444 else: 

445 yield fd 

446 

447 

448def write_traj(fd, images): 

449 """Write image(s) to trajectory.""" 

450 if isinstance(images, Atoms): 

451 images = [images] 

452 with defer_compression(fd) as fd_uncompressed: 

453 trj = TrajectoryWriter(fd_uncompressed) 

454 for atoms in images: 

455 trj.write(atoms) 

456 

457 

458class OldCalculatorWrapper: 

459 def __init__(self, calc): 

460 self.calc = calc 

461 try: 

462 self.name = calc.name 

463 except AttributeError: 

464 self.name = calc.__class__.__name__.lower() 

465 

466 def get_property(self, prop, atoms, allow_calculation=True): 

467 try: 

468 if (not allow_calculation and 

469 self.calc.calculation_required(atoms, [prop])): 

470 return None 

471 except AttributeError: 

472 pass 

473 

474 method = 'get_' + {'energy': 'potential_energy', 

475 'magmom': 'magnetic_moment', 

476 'magmoms': 'magnetic_moments', 

477 'dipole': 'dipole_moment'}.get(prop, prop) 

478 try: 

479 result = getattr(self.calc, method)(atoms) 

480 except AttributeError: 

481 raise PropertyNotImplementedError 

482 return result 

483 

484 

485def convert(name): 

486 import os 

487 t = TrajectoryWriter(name + '.new') 

488 for atoms in PickleTrajectory(name, _warn=False): 

489 t.write(atoms) 

490 t.close() 

491 os.rename(name, name + '.old') 

492 os.rename(name + '.new', name) 

493 

494 

495def main(): 

496 import optparse 

497 parser = optparse.OptionParser(usage='python -m ase.io.trajectory ' 

498 'a1.traj [a2.traj ...]', 

499 description='Convert old trajectory ' 

500 'file(s) to new format. ' 

501 'The old file is kept as a1.traj.old.') 

502 _opts, args = parser.parse_args() 

503 for name in args: 

504 convert(name) 

505 

506 

507if __name__ == '__main__': 

508 main()