Coverage for ase / io / trajectory.py: 89.78%

274 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-30 08:22 +0000

1# fmt: off 

2 

3"""Trajectory""" 

4import contextlib 

5import io 

6import warnings 

7 

8import numpy as np 

9 

10from ase import __version__ 

11from ase.atoms import Atoms 

12from ase.calculators.calculator import PropertyNotImplementedError 

13from ase.calculators.singlepoint import SinglePointCalculator, all_properties 

14from ase.io.formats import is_compressed 

15from ase.io.jsonio import decode, encode 

16from ase.io.pickletrajectory import PickleTrajectory 

17from ase.parallel import world 

18from ase.utils import tokenize_version 

19 

20__all__ = ['Trajectory', 'PickleTrajectory'] 

21 

22 

23def Trajectory(filename, mode='r', atoms=None, properties=None, master=None, 

24 comm=world): 

25 """A Trajectory can be created in read, write or append mode. 

26 

27 Parameters 

28 ---------- 

29 

30 filename: str | Path 

31 The name/path of the file. Traditionally ends in .traj. 

32 mode: str 

33 The mode. 'r' is read mode, the file should already exist, and 

34 no atoms argument should be specified. 

35 'w' is write mode. The atoms argument specifies the Atoms 

36 object to be written to the file, if not given it must instead 

37 be given as an argument to the write() method. 

38 'a' is append mode. It acts as write mode, except that 

39 data is appended to a preexisting file. 

40 atoms: Atoms object 

41 The Atoms object to be written in write or append mode. 

42 properties: list of str 

43 If specified, these calculator properties are saved in the 

44 trajectory. If not specified, all supported quantities are 

45 saved. Possible values: energy, forces, stress, dipole, 

46 charges, magmom and magmoms. 

47 master: bool 

48 Controls which process does the actual writing. The 

49 default is that process number 0 does this. If this 

50 argument is given, processes where it is True will write. 

51 comm: Communicator object 

52 Communicator to handle parallel file reading and writing. 

53 

54 The atoms, properties and master arguments are ignored in read mode. 

55 """ 

56 if mode == 'r': 

57 return TrajectoryReader(filename) 

58 return TrajectoryWriter(filename, mode, atoms, properties, master=master, 

59 comm=comm) 

60 

61 

62class TrajectoryWriter: 

63 """Writes Atoms objects to a .traj file.""" 

64 

65 def __init__(self, filename, mode='w', atoms=None, properties=None, 

66 master=None, comm=world): 

67 """A Trajectory writer, in write or append mode. 

68 

69 Parameters 

70 ---------- 

71 

72 filename: str | Path 

73 The name of the file. Traditionally ends in .traj. 

74 mode: str 

75 The mode. 'r' is read mode, the file should already exist, and 

76 no atoms argument should be specified. 

77 'w' is write mode. The atoms argument specifies the Atoms 

78 object to be written to the file, if not given it must instead 

79 be given as an argument to the write() method. 

80 'a' is append mode. It acts as write mode, except that 

81 data is appended to a preexisting file. 

82 atoms: Atoms object 

83 The Atoms object to be written in write or append mode. 

84 properties: list of str 

85 If specified, these calculator properties are saved in the 

86 trajectory. If not specified, all supported quantities are 

87 saved. Possible values: energy, forces, stress, dipole, 

88 charges, magmom and magmoms. 

89 master: bool 

90 Controls which process does the actual writing. The 

91 default is that process number 0 does this. If this 

92 argument is given, processes where it is True will write. 

93 comm: MPI communicator 

94 MPI communicator for this trajectory writer, by default world. 

95 Passing a different communicator facilitates writing of 

96 different trajectories on different MPI ranks. 

97 """ 

98 if master is None: 

99 master = comm.rank == 0 

100 

101 self.filename = filename 

102 self.mode = mode 

103 self.atoms = atoms 

104 self.properties = properties 

105 self.master = master 

106 self.comm = comm 

107 

108 self.description = {} 

109 self.header_data = None 

110 self.multiple_headers = False 

111 

112 self._open(filename, mode) 

113 

114 def __enter__(self): 

115 return self 

116 

117 def __exit__(self, exc_type, exc_value, tb): 

118 self.close() 

119 

120 def set_description(self, description): 

121 self.description.update(description) 

122 

123 def _open(self, filename, mode): 

124 import ase.io.ulm as ulm 

125 if mode not in 'aw': 

126 raise ValueError('mode must be "w" or "a".') 

127 if self.master: 

128 self.backend = ulm.open(filename, mode, tag='ASE-Trajectory') 

129 if len(self.backend) > 0 and mode == 'a': 

130 with Trajectory(filename) as traj: 

131 atoms = traj[0] 

132 self.header_data = get_header_data(atoms) 

133 else: 

134 self.backend = ulm.DummyWriter() 

135 

136 def write(self, atoms=None, **kwargs): 

137 """Write the atoms to the file. 

138 

139 If the atoms argument is not given, the atoms object specified 

140 when creating the trajectory object is used. 

141 

142 Use keyword arguments to add extra properties:: 

143 

144 writer.write(atoms, energy=117, dipole=[0, 0, 1.0]) 

145 """ 

146 if atoms is None: 

147 atoms = self.atoms 

148 

149 for image in atoms.iterimages(): 

150 self._write_atoms(image, **kwargs) 

151 

152 def _write_atoms(self, atoms, **kwargs): 

153 b = self.backend 

154 

155 if self.header_data is None: 

156 b.write(version=1, ase_version=__version__) 

157 if self.description: 

158 b.write(description=self.description) 

159 # Atomic numbers and periodic boundary conditions are written 

160 # in the header in the beginning. 

161 # 

162 # If an image later on has other numbers/pbc, we write a new 

163 # header. All subsequent images will then have their own header 

164 # whether or not their numbers/pbc change. 

165 self.header_data = get_header_data(atoms) 

166 write_header = True 

167 else: 

168 if not self.multiple_headers: 

169 header_data = get_header_data(atoms) 

170 self.multiple_headers = not headers_equal(self.header_data, 

171 header_data) 

172 write_header = self.multiple_headers 

173 

174 write_atoms(b, atoms, write_header=write_header) 

175 

176 calc = atoms.calc 

177 

178 if calc is None and len(kwargs) > 0: 

179 calc = SinglePointCalculator(atoms) 

180 

181 if calc is not None: 

182 if not hasattr(calc, 'get_property'): 

183 calc = OldCalculatorWrapper(calc) 

184 c = b.child('calculator') 

185 c.write(name=calc.name) 

186 if hasattr(calc, 'todict'): 

187 c.write(parameters=calc.todict()) 

188 for prop in all_properties: 

189 if prop in kwargs: 

190 x = kwargs[prop] 

191 else: 

192 if self.properties is not None: 

193 if prop in self.properties: 

194 x = calc.get_property(prop, atoms) 

195 else: 

196 x = None 

197 else: 

198 try: 

199 x = calc.get_property(prop, atoms, 

200 allow_calculation=False) 

201 except (PropertyNotImplementedError, KeyError): 

202 # KeyError is needed for Jacapo. 

203 # XXX We can perhaps remove this. 

204 x = None 

205 if x is not None: 

206 if prop in ['stress', 'dipole']: 

207 x = x.tolist() 

208 c.write(prop, x) 

209 

210 info = {} 

211 for key, value in atoms.info.items(): 

212 try: 

213 encode(value) 

214 except TypeError: 

215 warnings.warn(f'Skipping "{key}" info.') 

216 else: 

217 info[key] = value 

218 if info: 

219 b.write(info=info) 

220 

221 b.sync() 

222 

223 def close(self): 

224 """Close the trajectory file.""" 

225 self.backend.close() 

226 

227 def __len__(self): 

228 return self.comm.sum_scalar(len(self.backend)) 

229 

230 

231class TrajectoryReader: 

232 """Reads Atoms objects from a .traj file.""" 

233 

234 def __init__(self, filename): 

235 """A Trajectory in read mode. 

236 

237 The filename traditionally ends in .traj. 

238 """ 

239 self.filename = filename 

240 self.numbers = None 

241 self.pbc = None 

242 self.masses = None 

243 

244 self._open(filename) 

245 

246 def __enter__(self): 

247 return self 

248 

249 def __exit__(self, exc_type, exc_value, tb): 

250 self.close() 

251 

252 def _open(self, filename): 

253 import ase.io.ulm as ulm 

254 self.backend = ulm.open(filename, 'r') 

255 self._read_header() 

256 

257 def _read_header(self): 

258 b = self.backend 

259 if b.get_tag() != 'ASE-Trajectory': 

260 raise OSError('This is not a trajectory file!') 

261 

262 if len(b) > 0: 

263 self.pbc = b.pbc 

264 self.numbers = b.numbers 

265 self.masses = b.get('masses') 

266 self.constraints = b.get('constraints', '[]') 

267 self.description = b.get('description') 

268 self.version = b.version 

269 self.ase_version = b.get('ase_version') 

270 

271 def close(self): 

272 """Close the trajectory file.""" 

273 self.backend.close() 

274 

275 def __getitem__(self, i=-1): 

276 if isinstance(i, slice): 

277 return SlicedTrajectory(self, i) 

278 b = self.backend[i] 

279 if 'numbers' in b: 

280 # numbers and other header info was written alongside the image: 

281 atoms = read_atoms(b, traj=self) 

282 else: 

283 # header info was not written because they are the same: 

284 atoms = read_atoms(b, 

285 header=[self.pbc, self.numbers, self.masses, 

286 self.constraints], 

287 traj=self) 

288 if 'calculator' in b: 

289 results = {} 

290 implemented_properties = [] 

291 c = b.calculator 

292 for prop in all_properties: 

293 if prop in c: 

294 results[prop] = c.get(prop) 

295 implemented_properties.append(prop) 

296 calc = SinglePointCalculator(atoms, **results) 

297 calc.name = b.calculator.name 

298 calc.implemented_properties = implemented_properties 

299 

300 if 'parameters' in c: 

301 calc.parameters.update(c.parameters) 

302 atoms.calc = calc 

303 

304 return atoms 

305 

306 def __len__(self): 

307 return len(self.backend) 

308 

309 def __iter__(self): 

310 for i in range(len(self)): 

311 yield self[i] 

312 

313 

314class SlicedTrajectory: 

315 """Wrapper to return a slice from a trajectory without loading 

316 from disk. Initialize with a trajectory (in read mode) and the 

317 desired slice object.""" 

318 

319 def __init__(self, trajectory, sliced): 

320 self.trajectory = trajectory 

321 self.map = range(len(self.trajectory))[sliced] 

322 

323 def __getitem__(self, i): 

324 if isinstance(i, slice): 

325 # Map directly to the original traj, not recursively. 

326 traj = SlicedTrajectory(self.trajectory, slice(0, None)) 

327 traj.map = self.map[i] 

328 return traj 

329 return self.trajectory[self.map[i]] 

330 

331 def __len__(self): 

332 return len(self.map) 

333 

334 

335def get_header_data(atoms): 

336 return {'pbc': atoms.pbc.copy(), 

337 'numbers': atoms.get_atomic_numbers(), 

338 'masses': atoms.get_masses() if atoms.has('masses') else None, 

339 'constraints': list(atoms.constraints)} 

340 

341 

342def headers_equal(headers1, headers2): 

343 assert len(headers1) == len(headers2) 

344 eq = True 

345 for key in headers1: 

346 eq &= np.array_equal(headers1[key], headers2[key]) 

347 return eq 

348 

349 

350class VersionTooOldError(Exception): 

351 pass 

352 

353 

354def read_atoms(backend, 

355 header: tuple | None = None, 

356 traj: TrajectoryReader | None = None, 

357 _try_except: bool = True) -> Atoms: 

358 from ase.constraints import dict2constraint 

359 

360 if _try_except: 

361 try: 

362 return read_atoms(backend, header, traj, False) 

363 except Exception as ex: 

364 if (traj is not None and tokenize_version(__version__) < 

365 tokenize_version(traj.ase_version)): 

366 msg = ('You are trying to read a trajectory file written ' 

367 f'by ASE-{traj.ase_version} from ASE-{__version__}. ' 

368 'It might help to update your ASE') 

369 raise VersionTooOldError(msg) from ex 

370 else: 

371 raise 

372 

373 b = backend 

374 if header: 

375 pbc, numbers, masses, constraints = header 

376 else: 

377 pbc = b.pbc 

378 numbers = b.numbers 

379 masses = b.get('masses') 

380 constraints = b.get('constraints', '[]') 

381 

382 atoms = Atoms(positions=b.positions, 

383 numbers=numbers, 

384 cell=b.cell, 

385 masses=masses, 

386 pbc=pbc, 

387 info=b.get('info'), 

388 constraint=[dict2constraint(d) 

389 for d in decode(constraints)], 

390 momenta=b.get('momenta'), 

391 magmoms=b.get('magmoms'), 

392 charges=b.get('charges'), 

393 tags=b.get('tags')) 

394 return atoms 

395 

396 

397def write_atoms(backend, atoms, write_header=True): 

398 b = backend 

399 

400 if write_header: 

401 b.write(pbc=atoms.pbc.tolist(), 

402 numbers=atoms.numbers) 

403 if atoms.constraints: 

404 if all(hasattr(c, 'todict') for c in atoms.constraints): 

405 b.write(constraints=encode(atoms.constraints)) 

406 

407 if atoms.has('masses'): 

408 b.write(masses=atoms.get_masses()) 

409 

410 b.write(positions=atoms.get_positions(), 

411 cell=atoms.get_cell().tolist()) 

412 

413 if atoms.has('tags'): 

414 b.write(tags=atoms.get_tags()) 

415 if atoms.has('momenta'): 

416 b.write(momenta=atoms.get_momenta()) 

417 if atoms.has('initial_magmoms'): 

418 b.write(magmoms=atoms.get_initial_magnetic_moments()) 

419 if atoms.has('initial_charges'): 

420 b.write(charges=atoms.get_initial_charges()) 

421 

422 

423def read_traj(fd, index): 

424 trj = TrajectoryReader(fd) 

425 for i in range(*index.indices(len(trj))): 

426 yield trj[i] 

427 

428 

429@contextlib.contextmanager 

430def defer_compression(fd): 

431 """Defer the file compression until all the configurations are read.""" 

432 # We do this because the trajectory and compressed-file 

433 # internals do not play well together. 

434 # Be advised not to defer compression of very long trajectories 

435 # as they use a lot of memory. 

436 if is_compressed(fd): 

437 with io.BytesIO() as bytes_io: 

438 try: 

439 # write the uncompressed data to the buffer 

440 yield bytes_io 

441 finally: 

442 # write the buffered data to the compressed file 

443 bytes_io.seek(0) 

444 fd.write(bytes_io.read()) 

445 else: 

446 yield fd 

447 

448 

449def write_traj(fd, images): 

450 """Write image(s) to trajectory.""" 

451 if isinstance(images, Atoms): 

452 images = [images] 

453 with defer_compression(fd) as fd_uncompressed: 

454 trj = TrajectoryWriter(fd_uncompressed) 

455 for atoms in images: 

456 trj.write(atoms) 

457 

458 

459class OldCalculatorWrapper: 

460 def __init__(self, calc): 

461 self.calc = calc 

462 try: 

463 self.name = calc.name 

464 except AttributeError: 

465 self.name = calc.__class__.__name__.lower() 

466 

467 def get_property(self, prop, atoms, allow_calculation=True): 

468 try: 

469 if (not allow_calculation and 

470 self.calc.calculation_required(atoms, [prop])): 

471 return None 

472 except AttributeError: 

473 pass 

474 

475 method = 'get_' + {'energy': 'potential_energy', 

476 'magmom': 'magnetic_moment', 

477 'magmoms': 'magnetic_moments', 

478 'dipole': 'dipole_moment'}.get(prop, prop) 

479 try: 

480 result = getattr(self.calc, method)(atoms) 

481 except AttributeError: 

482 raise PropertyNotImplementedError 

483 return result 

484 

485 

486def convert(name): 

487 import os 

488 t = TrajectoryWriter(name + '.new') 

489 for atoms in PickleTrajectory(name, _warn=False): 

490 t.write(atoms) 

491 t.close() 

492 os.rename(name, name + '.old') 

493 os.rename(name + '.new', name) 

494 

495 

496def main(): 

497 import optparse 

498 parser = optparse.OptionParser(usage='python -m ase.io.trajectory ' 

499 'a1.traj [a2.traj ...]', 

500 description='Convert old trajectory ' 

501 'file(s) to new format. ' 

502 'The old file is kept as a1.traj.old.') 

503 _opts, args = parser.parse_args() 

504 for name in args: 

505 convert(name) 

506 

507 

508if __name__ == '__main__': 

509 main()