Coverage for /builds/ase/ase/ase/io/trajectory.py: 89.89%
277 statements
« prev ^ index » next coverage.py v7.5.3, created at 2025-08-02 00:12 +0000
« prev ^ index » next coverage.py v7.5.3, created at 2025-08-02 00:12 +0000
1# fmt: off
3"""Trajectory"""
4import contextlib
5import io
6import warnings
7from typing import Tuple
9import numpy as np
11from ase import __version__
12from ase.atoms import Atoms
13from ase.calculators.calculator import PropertyNotImplementedError
14from ase.calculators.singlepoint import SinglePointCalculator, all_properties
15from ase.io.formats import is_compressed
16from ase.io.jsonio import decode, encode
17from ase.io.pickletrajectory import PickleTrajectory
18from ase.parallel import world
19from ase.utils import tokenize_version
21__all__ = ['Trajectory', 'PickleTrajectory']
24def Trajectory(filename, mode='r', atoms=None, properties=None, master=None,
25 comm=world):
26 """A Trajectory can be created in read, write or append mode.
28 Parameters:
30 filename: str | Path
31 The name/path of the file. Traditionally ends in .traj.
32 mode: str
33 The mode. 'r' is read mode, the file should already exist, and
34 no atoms argument should be specified.
35 'w' is write mode. The atoms argument specifies the Atoms
36 object to be written to the file, if not given it must instead
37 be given as an argument to the write() method.
38 'a' is append mode. It acts as write mode, except that
39 data is appended to a preexisting file.
40 atoms: Atoms object
41 The Atoms object to be written in write or append mode.
42 properties: list of str
43 If specified, these calculator properties are saved in the
44 trajectory. If not specified, all supported quantities are
45 saved. Possible values: energy, forces, stress, dipole,
46 charges, magmom and magmoms.
47 master: bool
48 Controls which process does the actual writing. The
49 default is that process number 0 does this. If this
50 argument is given, processes where it is True will write.
51 comm: Communicator object
52 Communicator to handle parallel file reading and writing.
54 The atoms, properties and master arguments are ignored in read mode.
55 """
56 if mode == 'r':
57 return TrajectoryReader(filename)
58 return TrajectoryWriter(filename, mode, atoms, properties, master=master,
59 comm=comm)
62class TrajectoryWriter:
63 """Writes Atoms objects to a .traj file."""
65 def __init__(self, filename, mode='w', atoms=None, properties=None,
66 master=None, comm=world):
67 """A Trajectory writer, in write or append mode.
69 Parameters:
71 filename: str | Path
72 The name of the file. Traditionally ends in .traj.
73 mode: str
74 The mode. 'r' is read mode, the file should already exist, and
75 no atoms argument should be specified.
76 'w' is write mode. The atoms argument specifies the Atoms
77 object to be written to the file, if not given it must instead
78 be given as an argument to the write() method.
79 'a' is append mode. It acts as write mode, except that
80 data is appended to a preexisting file.
81 atoms: Atoms object
82 The Atoms object to be written in write or append mode.
83 properties: list of str
84 If specified, these calculator properties are saved in the
85 trajectory. If not specified, all supported quantities are
86 saved. Possible values: energy, forces, stress, dipole,
87 charges, magmom and magmoms.
88 master: bool
89 Controls which process does the actual writing. The
90 default is that process number 0 does this. If this
91 argument is given, processes where it is True will write.
92 comm: MPI communicator
93 MPI communicator for this trajectory writer, by default world.
94 Passing a different communicator facilitates writing of
95 different trajectories on different MPI ranks.
96 """
97 if master is None:
98 master = comm.rank == 0
100 self.filename = filename
101 self.mode = mode
102 self.atoms = atoms
103 self.properties = properties
104 self.master = master
105 self.comm = comm
107 self.description = {}
108 self.header_data = None
109 self.multiple_headers = False
111 self._open(filename, mode)
113 def __enter__(self):
114 return self
116 def __exit__(self, exc_type, exc_value, tb):
117 self.close()
119 def set_description(self, description):
120 self.description.update(description)
122 def _open(self, filename, mode):
123 import ase.io.ulm as ulm
124 if mode not in 'aw':
125 raise ValueError('mode must be "w" or "a".')
126 if self.master:
127 self.backend = ulm.open(filename, mode, tag='ASE-Trajectory')
128 if len(self.backend) > 0 and mode == 'a':
129 with Trajectory(filename) as traj:
130 atoms = traj[0]
131 self.header_data = get_header_data(atoms)
132 else:
133 self.backend = ulm.DummyWriter()
135 def write(self, atoms=None, **kwargs):
136 """Write the atoms to the file.
138 If the atoms argument is not given, the atoms object specified
139 when creating the trajectory object is used.
141 Use keyword arguments to add extra properties::
143 writer.write(atoms, energy=117, dipole=[0, 0, 1.0])
144 """
145 if atoms is None:
146 atoms = self.atoms
148 for image in atoms.iterimages():
149 self._write_atoms(image, **kwargs)
151 def _write_atoms(self, atoms, **kwargs):
152 b = self.backend
154 if self.header_data is None:
155 b.write(version=1, ase_version=__version__)
156 if self.description:
157 b.write(description=self.description)
158 # Atomic numbers and periodic boundary conditions are written
159 # in the header in the beginning.
160 #
161 # If an image later on has other numbers/pbc, we write a new
162 # header. All subsequent images will then have their own header
163 # whether or not their numbers/pbc change.
164 self.header_data = get_header_data(atoms)
165 write_header = True
166 else:
167 if not self.multiple_headers:
168 header_data = get_header_data(atoms)
169 self.multiple_headers = not headers_equal(self.header_data,
170 header_data)
171 write_header = self.multiple_headers
173 write_atoms(b, atoms, write_header=write_header)
175 calc = atoms.calc
177 if calc is None and len(kwargs) > 0:
178 calc = SinglePointCalculator(atoms)
180 if calc is not None:
181 if not hasattr(calc, 'get_property'):
182 calc = OldCalculatorWrapper(calc)
183 c = b.child('calculator')
184 c.write(name=calc.name)
185 if hasattr(calc, 'todict'):
186 c.write(parameters=calc.todict())
187 for prop in all_properties:
188 if prop in kwargs:
189 x = kwargs[prop]
190 else:
191 if self.properties is not None:
192 if prop in self.properties:
193 x = calc.get_property(prop, atoms)
194 else:
195 x = None
196 else:
197 try:
198 x = calc.get_property(prop, atoms,
199 allow_calculation=False)
200 except (PropertyNotImplementedError, KeyError):
201 # KeyError is needed for Jacapo.
202 # XXX We can perhaps remove this.
203 x = None
204 if x is not None:
205 if prop in ['stress', 'dipole']:
206 x = x.tolist()
207 c.write(prop, x)
209 info = {}
210 for key, value in atoms.info.items():
211 try:
212 encode(value)
213 except TypeError:
214 warnings.warn(f'Skipping "{key}" info.')
215 else:
216 info[key] = value
217 if info:
218 b.write(info=info)
220 b.sync()
222 def close(self):
223 """Close the trajectory file."""
224 self.backend.close()
226 def __len__(self):
227 return self.comm.sum_scalar(len(self.backend))
230class TrajectoryReader:
231 """Reads Atoms objects from a .traj file."""
233 def __init__(self, filename):
234 """A Trajectory in read mode.
236 The filename traditionally ends in .traj.
237 """
238 self.filename = filename
239 self.numbers = None
240 self.pbc = None
241 self.masses = None
243 self._open(filename)
245 def __enter__(self):
246 return self
248 def __exit__(self, exc_type, exc_value, tb):
249 self.close()
251 def _open(self, filename):
252 import ase.io.ulm as ulm
253 self.backend = ulm.open(filename, 'r')
254 self._read_header()
256 def _read_header(self):
257 b = self.backend
258 if b.get_tag() != 'ASE-Trajectory':
259 raise OSError('This is not a trajectory file!')
261 if len(b) > 0:
262 self.pbc = b.pbc
263 self.numbers = b.numbers
264 self.masses = b.get('masses')
265 self.constraints = b.get('constraints', '[]')
266 self.description = b.get('description')
267 self.version = b.version
268 self.ase_version = b.get('ase_version')
270 def close(self):
271 """Close the trajectory file."""
272 self.backend.close()
274 def __getitem__(self, i=-1):
275 if isinstance(i, slice):
276 return SlicedTrajectory(self, i)
277 b = self.backend[i]
278 if 'numbers' in b:
279 # numbers and other header info was written alongside the image:
280 atoms = read_atoms(b, traj=self)
281 else:
282 # header info was not written because they are the same:
283 atoms = read_atoms(b,
284 header=[self.pbc, self.numbers, self.masses,
285 self.constraints],
286 traj=self)
287 if 'calculator' in b:
288 results = {}
289 implemented_properties = []
290 c = b.calculator
291 for prop in all_properties:
292 if prop in c:
293 results[prop] = c.get(prop)
294 implemented_properties.append(prop)
295 calc = SinglePointCalculator(atoms, **results)
296 calc.name = b.calculator.name
297 calc.implemented_properties = implemented_properties
299 if 'parameters' in c:
300 calc.parameters.update(c.parameters)
301 atoms.calc = calc
303 return atoms
305 def __len__(self):
306 return len(self.backend)
308 def __iter__(self):
309 for i in range(len(self)):
310 yield self[i]
313class SlicedTrajectory:
314 """Wrapper to return a slice from a trajectory without loading
315 from disk. Initialize with a trajectory (in read mode) and the
316 desired slice object."""
318 def __init__(self, trajectory, sliced):
319 self.trajectory = trajectory
320 self.map = range(len(self.trajectory))[sliced]
322 def __getitem__(self, i):
323 if isinstance(i, slice):
324 # Map directly to the original traj, not recursively.
325 traj = SlicedTrajectory(self.trajectory, slice(0, None))
326 traj.map = self.map[i]
327 return traj
328 return self.trajectory[self.map[i]]
330 def __len__(self):
331 return len(self.map)
334def get_header_data(atoms):
335 return {'pbc': atoms.pbc.copy(),
336 'numbers': atoms.get_atomic_numbers(),
337 'masses': atoms.get_masses() if atoms.has('masses') else None,
338 'constraints': list(atoms.constraints)}
341def headers_equal(headers1, headers2):
342 assert len(headers1) == len(headers2)
343 eq = True
344 for key in headers1:
345 eq &= np.array_equal(headers1[key], headers2[key])
346 return eq
349class VersionTooOldError(Exception):
350 pass
353def read_atoms(backend,
354 header: Tuple = None,
355 traj: TrajectoryReader = None,
356 _try_except: bool = True) -> Atoms:
357 from ase.constraints import dict2constraint
359 if _try_except:
360 try:
361 return read_atoms(backend, header, traj, False)
362 except Exception as ex:
363 if (traj is not None and tokenize_version(__version__) <
364 tokenize_version(traj.ase_version)):
365 msg = ('You are trying to read a trajectory file written '
366 f'by ASE-{traj.ase_version} from ASE-{__version__}. '
367 'It might help to update your ASE')
368 raise VersionTooOldError(msg) from ex
369 else:
370 raise
372 b = backend
373 if header:
374 pbc, numbers, masses, constraints = header
375 else:
376 pbc = b.pbc
377 numbers = b.numbers
378 masses = b.get('masses')
379 constraints = b.get('constraints', '[]')
381 atoms = Atoms(positions=b.positions,
382 numbers=numbers,
383 cell=b.cell,
384 masses=masses,
385 pbc=pbc,
386 info=b.get('info'),
387 constraint=[dict2constraint(d)
388 for d in decode(constraints)],
389 momenta=b.get('momenta'),
390 magmoms=b.get('magmoms'),
391 charges=b.get('charges'),
392 tags=b.get('tags'))
393 return atoms
396def write_atoms(backend, atoms, write_header=True):
397 b = backend
399 if write_header:
400 b.write(pbc=atoms.pbc.tolist(),
401 numbers=atoms.numbers)
402 if atoms.constraints:
403 if all(hasattr(c, 'todict') for c in atoms.constraints):
404 b.write(constraints=encode(atoms.constraints))
406 if atoms.has('masses'):
407 b.write(masses=atoms.get_masses())
409 b.write(positions=atoms.get_positions(),
410 cell=atoms.get_cell().tolist())
412 if atoms.has('tags'):
413 b.write(tags=atoms.get_tags())
414 if atoms.has('momenta'):
415 b.write(momenta=atoms.get_momenta())
416 if atoms.has('initial_magmoms'):
417 b.write(magmoms=atoms.get_initial_magnetic_moments())
418 if atoms.has('initial_charges'):
419 b.write(charges=atoms.get_initial_charges())
422def read_traj(fd, index):
423 trj = TrajectoryReader(fd)
424 for i in range(*index.indices(len(trj))):
425 yield trj[i]
428@contextlib.contextmanager
429def defer_compression(fd):
430 """Defer the file compression until all the configurations are read."""
431 # We do this because the trajectory and compressed-file
432 # internals do not play well together.
433 # Be advised not to defer compression of very long trajectories
434 # as they use a lot of memory.
435 if is_compressed(fd):
436 with io.BytesIO() as bytes_io:
437 try:
438 # write the uncompressed data to the buffer
439 yield bytes_io
440 finally:
441 # write the buffered data to the compressed file
442 bytes_io.seek(0)
443 fd.write(bytes_io.read())
444 else:
445 yield fd
448def write_traj(fd, images):
449 """Write image(s) to trajectory."""
450 if isinstance(images, Atoms):
451 images = [images]
452 with defer_compression(fd) as fd_uncompressed:
453 trj = TrajectoryWriter(fd_uncompressed)
454 for atoms in images:
455 trj.write(atoms)
458class OldCalculatorWrapper:
459 def __init__(self, calc):
460 self.calc = calc
461 try:
462 self.name = calc.name
463 except AttributeError:
464 self.name = calc.__class__.__name__.lower()
466 def get_property(self, prop, atoms, allow_calculation=True):
467 try:
468 if (not allow_calculation and
469 self.calc.calculation_required(atoms, [prop])):
470 return None
471 except AttributeError:
472 pass
474 method = 'get_' + {'energy': 'potential_energy',
475 'magmom': 'magnetic_moment',
476 'magmoms': 'magnetic_moments',
477 'dipole': 'dipole_moment'}.get(prop, prop)
478 try:
479 result = getattr(self.calc, method)(atoms)
480 except AttributeError:
481 raise PropertyNotImplementedError
482 return result
485def convert(name):
486 import os
487 t = TrajectoryWriter(name + '.new')
488 for atoms in PickleTrajectory(name, _warn=False):
489 t.write(atoms)
490 t.close()
491 os.rename(name, name + '.old')
492 os.rename(name + '.new', name)
495def main():
496 import optparse
497 parser = optparse.OptionParser(usage='python -m ase.io.trajectory '
498 'a1.traj [a2.traj ...]',
499 description='Convert old trajectory '
500 'file(s) to new format. '
501 'The old file is kept as a1.traj.old.')
502 _opts, args = parser.parse_args()
503 for name in args:
504 convert(name)
507if __name__ == '__main__':
508 main()