Coverage for ase / io / trajectory.py: 89.78%
274 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 08:22 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 08:22 +0000
1# fmt: off
3"""Trajectory"""
4import contextlib
5import io
6import warnings
8import numpy as np
10from ase import __version__
11from ase.atoms import Atoms
12from ase.calculators.calculator import PropertyNotImplementedError
13from ase.calculators.singlepoint import SinglePointCalculator, all_properties
14from ase.io.formats import is_compressed
15from ase.io.jsonio import decode, encode
16from ase.io.pickletrajectory import PickleTrajectory
17from ase.parallel import world
18from ase.utils import tokenize_version
20__all__ = ['Trajectory', 'PickleTrajectory']
23def Trajectory(filename, mode='r', atoms=None, properties=None, master=None,
24 comm=world):
25 """A Trajectory can be created in read, write or append mode.
27 Parameters
28 ----------
30 filename: str | Path
31 The name/path of the file. Traditionally ends in .traj.
32 mode: str
33 The mode. 'r' is read mode, the file should already exist, and
34 no atoms argument should be specified.
35 'w' is write mode. The atoms argument specifies the Atoms
36 object to be written to the file, if not given it must instead
37 be given as an argument to the write() method.
38 'a' is append mode. It acts as write mode, except that
39 data is appended to a preexisting file.
40 atoms: Atoms object
41 The Atoms object to be written in write or append mode.
42 properties: list of str
43 If specified, these calculator properties are saved in the
44 trajectory. If not specified, all supported quantities are
45 saved. Possible values: energy, forces, stress, dipole,
46 charges, magmom and magmoms.
47 master: bool
48 Controls which process does the actual writing. The
49 default is that process number 0 does this. If this
50 argument is given, processes where it is True will write.
51 comm: Communicator object
52 Communicator to handle parallel file reading and writing.
54 The atoms, properties and master arguments are ignored in read mode.
55 """
56 if mode == 'r':
57 return TrajectoryReader(filename)
58 return TrajectoryWriter(filename, mode, atoms, properties, master=master,
59 comm=comm)
62class TrajectoryWriter:
63 """Writes Atoms objects to a .traj file."""
65 def __init__(self, filename, mode='w', atoms=None, properties=None,
66 master=None, comm=world):
67 """A Trajectory writer, in write or append mode.
69 Parameters
70 ----------
72 filename: str | Path
73 The name of the file. Traditionally ends in .traj.
74 mode: str
75 The mode. 'r' is read mode, the file should already exist, and
76 no atoms argument should be specified.
77 'w' is write mode. The atoms argument specifies the Atoms
78 object to be written to the file, if not given it must instead
79 be given as an argument to the write() method.
80 'a' is append mode. It acts as write mode, except that
81 data is appended to a preexisting file.
82 atoms: Atoms object
83 The Atoms object to be written in write or append mode.
84 properties: list of str
85 If specified, these calculator properties are saved in the
86 trajectory. If not specified, all supported quantities are
87 saved. Possible values: energy, forces, stress, dipole,
88 charges, magmom and magmoms.
89 master: bool
90 Controls which process does the actual writing. The
91 default is that process number 0 does this. If this
92 argument is given, processes where it is True will write.
93 comm: MPI communicator
94 MPI communicator for this trajectory writer, by default world.
95 Passing a different communicator facilitates writing of
96 different trajectories on different MPI ranks.
97 """
98 if master is None:
99 master = comm.rank == 0
101 self.filename = filename
102 self.mode = mode
103 self.atoms = atoms
104 self.properties = properties
105 self.master = master
106 self.comm = comm
108 self.description = {}
109 self.header_data = None
110 self.multiple_headers = False
112 self._open(filename, mode)
114 def __enter__(self):
115 return self
117 def __exit__(self, exc_type, exc_value, tb):
118 self.close()
120 def set_description(self, description):
121 self.description.update(description)
123 def _open(self, filename, mode):
124 import ase.io.ulm as ulm
125 if mode not in 'aw':
126 raise ValueError('mode must be "w" or "a".')
127 if self.master:
128 self.backend = ulm.open(filename, mode, tag='ASE-Trajectory')
129 if len(self.backend) > 0 and mode == 'a':
130 with Trajectory(filename) as traj:
131 atoms = traj[0]
132 self.header_data = get_header_data(atoms)
133 else:
134 self.backend = ulm.DummyWriter()
136 def write(self, atoms=None, **kwargs):
137 """Write the atoms to the file.
139 If the atoms argument is not given, the atoms object specified
140 when creating the trajectory object is used.
142 Use keyword arguments to add extra properties::
144 writer.write(atoms, energy=117, dipole=[0, 0, 1.0])
145 """
146 if atoms is None:
147 atoms = self.atoms
149 for image in atoms.iterimages():
150 self._write_atoms(image, **kwargs)
152 def _write_atoms(self, atoms, **kwargs):
153 b = self.backend
155 if self.header_data is None:
156 b.write(version=1, ase_version=__version__)
157 if self.description:
158 b.write(description=self.description)
159 # Atomic numbers and periodic boundary conditions are written
160 # in the header in the beginning.
161 #
162 # If an image later on has other numbers/pbc, we write a new
163 # header. All subsequent images will then have their own header
164 # whether or not their numbers/pbc change.
165 self.header_data = get_header_data(atoms)
166 write_header = True
167 else:
168 if not self.multiple_headers:
169 header_data = get_header_data(atoms)
170 self.multiple_headers = not headers_equal(self.header_data,
171 header_data)
172 write_header = self.multiple_headers
174 write_atoms(b, atoms, write_header=write_header)
176 calc = atoms.calc
178 if calc is None and len(kwargs) > 0:
179 calc = SinglePointCalculator(atoms)
181 if calc is not None:
182 if not hasattr(calc, 'get_property'):
183 calc = OldCalculatorWrapper(calc)
184 c = b.child('calculator')
185 c.write(name=calc.name)
186 if hasattr(calc, 'todict'):
187 c.write(parameters=calc.todict())
188 for prop in all_properties:
189 if prop in kwargs:
190 x = kwargs[prop]
191 else:
192 if self.properties is not None:
193 if prop in self.properties:
194 x = calc.get_property(prop, atoms)
195 else:
196 x = None
197 else:
198 try:
199 x = calc.get_property(prop, atoms,
200 allow_calculation=False)
201 except (PropertyNotImplementedError, KeyError):
202 # KeyError is needed for Jacapo.
203 # XXX We can perhaps remove this.
204 x = None
205 if x is not None:
206 if prop in ['stress', 'dipole']:
207 x = x.tolist()
208 c.write(prop, x)
210 info = {}
211 for key, value in atoms.info.items():
212 try:
213 encode(value)
214 except TypeError:
215 warnings.warn(f'Skipping "{key}" info.')
216 else:
217 info[key] = value
218 if info:
219 b.write(info=info)
221 b.sync()
223 def close(self):
224 """Close the trajectory file."""
225 self.backend.close()
227 def __len__(self):
228 return self.comm.sum_scalar(len(self.backend))
231class TrajectoryReader:
232 """Reads Atoms objects from a .traj file."""
234 def __init__(self, filename):
235 """A Trajectory in read mode.
237 The filename traditionally ends in .traj.
238 """
239 self.filename = filename
240 self.numbers = None
241 self.pbc = None
242 self.masses = None
244 self._open(filename)
246 def __enter__(self):
247 return self
249 def __exit__(self, exc_type, exc_value, tb):
250 self.close()
252 def _open(self, filename):
253 import ase.io.ulm as ulm
254 self.backend = ulm.open(filename, 'r')
255 self._read_header()
257 def _read_header(self):
258 b = self.backend
259 if b.get_tag() != 'ASE-Trajectory':
260 raise OSError('This is not a trajectory file!')
262 if len(b) > 0:
263 self.pbc = b.pbc
264 self.numbers = b.numbers
265 self.masses = b.get('masses')
266 self.constraints = b.get('constraints', '[]')
267 self.description = b.get('description')
268 self.version = b.version
269 self.ase_version = b.get('ase_version')
271 def close(self):
272 """Close the trajectory file."""
273 self.backend.close()
275 def __getitem__(self, i=-1):
276 if isinstance(i, slice):
277 return SlicedTrajectory(self, i)
278 b = self.backend[i]
279 if 'numbers' in b:
280 # numbers and other header info was written alongside the image:
281 atoms = read_atoms(b, traj=self)
282 else:
283 # header info was not written because they are the same:
284 atoms = read_atoms(b,
285 header=[self.pbc, self.numbers, self.masses,
286 self.constraints],
287 traj=self)
288 if 'calculator' in b:
289 results = {}
290 implemented_properties = []
291 c = b.calculator
292 for prop in all_properties:
293 if prop in c:
294 results[prop] = c.get(prop)
295 implemented_properties.append(prop)
296 calc = SinglePointCalculator(atoms, **results)
297 calc.name = b.calculator.name
298 calc.implemented_properties = implemented_properties
300 if 'parameters' in c:
301 calc.parameters.update(c.parameters)
302 atoms.calc = calc
304 return atoms
306 def __len__(self):
307 return len(self.backend)
309 def __iter__(self):
310 for i in range(len(self)):
311 yield self[i]
314class SlicedTrajectory:
315 """Wrapper to return a slice from a trajectory without loading
316 from disk. Initialize with a trajectory (in read mode) and the
317 desired slice object."""
319 def __init__(self, trajectory, sliced):
320 self.trajectory = trajectory
321 self.map = range(len(self.trajectory))[sliced]
323 def __getitem__(self, i):
324 if isinstance(i, slice):
325 # Map directly to the original traj, not recursively.
326 traj = SlicedTrajectory(self.trajectory, slice(0, None))
327 traj.map = self.map[i]
328 return traj
329 return self.trajectory[self.map[i]]
331 def __len__(self):
332 return len(self.map)
335def get_header_data(atoms):
336 return {'pbc': atoms.pbc.copy(),
337 'numbers': atoms.get_atomic_numbers(),
338 'masses': atoms.get_masses() if atoms.has('masses') else None,
339 'constraints': list(atoms.constraints)}
342def headers_equal(headers1, headers2):
343 assert len(headers1) == len(headers2)
344 eq = True
345 for key in headers1:
346 eq &= np.array_equal(headers1[key], headers2[key])
347 return eq
350class VersionTooOldError(Exception):
351 pass
354def read_atoms(backend,
355 header: tuple | None = None,
356 traj: TrajectoryReader | None = None,
357 _try_except: bool = True) -> Atoms:
358 from ase.constraints import dict2constraint
360 if _try_except:
361 try:
362 return read_atoms(backend, header, traj, False)
363 except Exception as ex:
364 if (traj is not None and tokenize_version(__version__) <
365 tokenize_version(traj.ase_version)):
366 msg = ('You are trying to read a trajectory file written '
367 f'by ASE-{traj.ase_version} from ASE-{__version__}. '
368 'It might help to update your ASE')
369 raise VersionTooOldError(msg) from ex
370 else:
371 raise
373 b = backend
374 if header:
375 pbc, numbers, masses, constraints = header
376 else:
377 pbc = b.pbc
378 numbers = b.numbers
379 masses = b.get('masses')
380 constraints = b.get('constraints', '[]')
382 atoms = Atoms(positions=b.positions,
383 numbers=numbers,
384 cell=b.cell,
385 masses=masses,
386 pbc=pbc,
387 info=b.get('info'),
388 constraint=[dict2constraint(d)
389 for d in decode(constraints)],
390 momenta=b.get('momenta'),
391 magmoms=b.get('magmoms'),
392 charges=b.get('charges'),
393 tags=b.get('tags'))
394 return atoms
397def write_atoms(backend, atoms, write_header=True):
398 b = backend
400 if write_header:
401 b.write(pbc=atoms.pbc.tolist(),
402 numbers=atoms.numbers)
403 if atoms.constraints:
404 if all(hasattr(c, 'todict') for c in atoms.constraints):
405 b.write(constraints=encode(atoms.constraints))
407 if atoms.has('masses'):
408 b.write(masses=atoms.get_masses())
410 b.write(positions=atoms.get_positions(),
411 cell=atoms.get_cell().tolist())
413 if atoms.has('tags'):
414 b.write(tags=atoms.get_tags())
415 if atoms.has('momenta'):
416 b.write(momenta=atoms.get_momenta())
417 if atoms.has('initial_magmoms'):
418 b.write(magmoms=atoms.get_initial_magnetic_moments())
419 if atoms.has('initial_charges'):
420 b.write(charges=atoms.get_initial_charges())
423def read_traj(fd, index):
424 trj = TrajectoryReader(fd)
425 for i in range(*index.indices(len(trj))):
426 yield trj[i]
429@contextlib.contextmanager
430def defer_compression(fd):
431 """Defer the file compression until all the configurations are read."""
432 # We do this because the trajectory and compressed-file
433 # internals do not play well together.
434 # Be advised not to defer compression of very long trajectories
435 # as they use a lot of memory.
436 if is_compressed(fd):
437 with io.BytesIO() as bytes_io:
438 try:
439 # write the uncompressed data to the buffer
440 yield bytes_io
441 finally:
442 # write the buffered data to the compressed file
443 bytes_io.seek(0)
444 fd.write(bytes_io.read())
445 else:
446 yield fd
449def write_traj(fd, images):
450 """Write image(s) to trajectory."""
451 if isinstance(images, Atoms):
452 images = [images]
453 with defer_compression(fd) as fd_uncompressed:
454 trj = TrajectoryWriter(fd_uncompressed)
455 for atoms in images:
456 trj.write(atoms)
459class OldCalculatorWrapper:
460 def __init__(self, calc):
461 self.calc = calc
462 try:
463 self.name = calc.name
464 except AttributeError:
465 self.name = calc.__class__.__name__.lower()
467 def get_property(self, prop, atoms, allow_calculation=True):
468 try:
469 if (not allow_calculation and
470 self.calc.calculation_required(atoms, [prop])):
471 return None
472 except AttributeError:
473 pass
475 method = 'get_' + {'energy': 'potential_energy',
476 'magmom': 'magnetic_moment',
477 'magmoms': 'magnetic_moments',
478 'dipole': 'dipole_moment'}.get(prop, prop)
479 try:
480 result = getattr(self.calc, method)(atoms)
481 except AttributeError:
482 raise PropertyNotImplementedError
483 return result
486def convert(name):
487 import os
488 t = TrajectoryWriter(name + '.new')
489 for atoms in PickleTrajectory(name, _warn=False):
490 t.write(atoms)
491 t.close()
492 os.rename(name, name + '.old')
493 os.rename(name + '.new', name)
496def main():
497 import optparse
498 parser = optparse.OptionParser(usage='python -m ase.io.trajectory '
499 'a1.traj [a2.traj ...]',
500 description='Convert old trajectory '
501 'file(s) to new format. '
502 'The old file is kept as a1.traj.old.')
503 _opts, args = parser.parse_args()
504 for name in args:
505 convert(name)
508if __name__ == '__main__':
509 main()