Coverage for ase / io / lammpsrun.py: 88.85%
260 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-04 10:20 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-04 10:20 +0000
1"""IO for LAMMPS dump files."""
3import gzip
4import struct
5from collections.abc import Iterator
6from os.path import splitext
7from typing import Any, TextIO
9import numpy as np
11from ase.atoms import Atoms
12from ase.calculators.lammps import convert
13from ase.calculators.singlepoint import SinglePointCalculator
14from ase.data import atomic_masses, chemical_symbols
15from ase.io.utils import ImageChunk, ImageIterator
16from ase.parallel import paropen
17from ase.utils import reader
20def read_lammps_dump(infileobj, **kwargs):
21 """Method which reads a LAMMPS dump file.
23 LAMMPS chooses output method depending on the given suffix:
24 - .bin : binary file
25 - .gz : output piped through gzip
26 - .mpiio: using mpiio (should be like cleartext,
27 with different ordering)
28 - else : normal clear-text format
30 :param infileobj: string to file, opened file or file-like stream
32 """
33 # !TODO: add support for lammps-regex naming schemes (output per
34 # processor and timestep wildcards)
36 opened = False
37 if isinstance(infileobj, str):
38 opened = True
39 suffix = splitext(infileobj)[-1]
40 if suffix == '.bin':
41 fileobj = paropen(infileobj, 'rb')
42 elif suffix == '.gz':
43 # !TODO: save for parallel execution?
44 fileobj = gzip.open(infileobj, 'rb')
45 else:
46 fileobj = paropen(infileobj)
47 else:
48 suffix = splitext(infileobj.name)[-1]
49 fileobj = infileobj
51 if suffix == '.bin':
52 out = read_lammps_dump_binary(fileobj, **kwargs)
53 if opened:
54 fileobj.close()
55 return out
57 out = read_lammps_dump_text(fileobj, **kwargs)
59 if opened:
60 fileobj.close()
62 return out
65def _lammps_data_to_ase_atoms(
66 data,
67 colnames,
68 cell,
69 celldisp,
70 pbc=False,
71 order=True,
72 specorder=None,
73 prismobj=None,
74 units='metal',
75):
76 """Extract positions and other per-atom parameters and create Atoms.
78 Parameters
79 ----------
80 data : np.ndarray
81 Structured array for `ITEM: ATOMS`.
82 colnames: list[str]
83 Column names for `ITEM: ATOMS`.
84 cell : np.ndarray
85 Cell.
86 celldisp : np.ndarray
87 Origin shift.
88 pbc : bool | list[bool]
89 Periodic boundary conditions.
90 order : bool
91 Sort atoms by `id`. Might be faster to turn off.
92 Disregarded in case `id` column is not given in file.
93 specorder : list[str]
94 List of species to map LAMMPS types to ASE species.
95 (Usually .dump files do not contain type to species mapping.)
96 prismobj : Prism
97 Coordinate transformation between LAMMPS and ASE.
98 units : str
99 LAMMPS units for unit transformation between LAMMPS and ASE.
101 Returns
102 -------
103 :class:`~ase.Atoms`
105 """
106 # read IDs if given and order if needed
107 if 'id' in colnames:
108 ids = data['id']
109 if order:
110 sort_order = np.argsort(ids)
111 data = data[sort_order]
113 # determine the elements
114 if 'element' in colnames:
115 # priority to elements written in file
116 elements = data['element']
117 elif 'mass' in colnames:
118 # try to determine elements from masses
119 elements = [_mass2element(m) for m in data['mass']]
120 elif 'type' in colnames:
121 # fall back to `types` otherwise
122 elements = data['type']
124 # reconstruct types from given specorder
125 if specorder:
126 elements = [specorder[t - 1] for t in elements]
127 else:
128 # todo: what if specorder give but no types?
129 # in principle the masses could work for atoms, but that needs
130 # lots of cases and new code I guess
131 raise ValueError('Cannot determine atom types form LAMMPS dump file')
133 def get_quantity(labels, quantity=None):
134 try:
135 cols = np.column_stack([data[label] for label in labels])
136 if quantity:
137 return convert(cols, quantity, units, 'ASE')
138 return cols
139 except ValueError:
140 return None
142 # Positions
143 positions = None
144 scaled_positions = None
145 if 'x' in colnames:
146 # doc: x, y, z = unscaled atom coordinates
147 positions = get_quantity(['x', 'y', 'z'], 'distance')
148 elif 'xs' in colnames:
149 # doc: xs,ys,zs = scaled atom coordinates
150 scaled_positions = get_quantity(['xs', 'ys', 'zs'])
151 elif 'xu' in colnames:
152 # doc: xu,yu,zu = unwrapped atom coordinates
153 positions = get_quantity(['xu', 'yu', 'zu'], 'distance')
154 elif 'xsu' in colnames:
155 # xsu,ysu,zsu = scaled unwrapped atom coordinates
156 scaled_positions = get_quantity(['xsu', 'ysu', 'zsu'])
157 else:
158 raise ValueError('No atomic positions found in LAMMPS output')
160 velocities = get_quantity(['vx', 'vy', 'vz'], 'velocity')
161 charges = get_quantity(['q'], 'charge')
162 forces = get_quantity(['fx', 'fy', 'fz'], 'force')
163 # !TODO: how need quaternions be converted?
164 quaternions = get_quantity(['c_q[1]', 'c_q[2]', 'c_q[3]', 'c_q[4]'])
166 # convert cell
167 cell = convert(cell, 'distance', units, 'ASE')
168 celldisp = convert(celldisp, 'distance', units, 'ASE')
169 if prismobj:
170 celldisp = prismobj.vector_to_ase(celldisp)
171 cell = prismobj.update_cell(cell)
173 if quaternions is not None:
174 out_atoms = Atoms(
175 symbols=elements,
176 positions=positions,
177 cell=cell,
178 celldisp=celldisp,
179 pbc=pbc,
180 )
181 out_atoms.new_array('quaternions', quaternions, dtype=float)
182 elif positions is not None:
183 # reverse coordinations transform to lammps system
184 # (for all vectors = pos, vel, force)
185 if prismobj:
186 positions = prismobj.vector_to_ase(positions, wrap=True)
188 out_atoms = Atoms(
189 symbols=elements,
190 positions=positions,
191 pbc=pbc,
192 celldisp=celldisp,
193 cell=cell,
194 )
195 elif scaled_positions is not None:
196 out_atoms = Atoms(
197 symbols=elements,
198 scaled_positions=scaled_positions,
199 pbc=pbc,
200 celldisp=celldisp,
201 cell=cell,
202 )
203 else:
204 raise RuntimeError('No atomsobj created from LAMMPS data!')
206 if velocities is not None:
207 if prismobj:
208 velocities = prismobj.vector_to_ase(velocities)
209 out_atoms.set_velocities(velocities)
210 if charges is not None:
211 out_atoms.set_initial_charges([charge[0] for charge in charges])
212 if forces is not None:
213 if prismobj:
214 forces = prismobj.vector_to_ase(forces)
215 # !TODO: use another calculator if available (or move forces
216 # to atoms.property) (other problem: synchronizing
217 # parallel runs)
218 calculator = SinglePointCalculator(out_atoms, energy=0.0, forces=forces)
219 out_atoms.calc = calculator
221 # process the extra columns of fixes, variables and computes
222 # that can be dumped, add as additional arrays to atoms object
223 for colname in colnames:
224 # determine if it is a compute, fix or
225 # custom property/atom (but not the quaternian)
226 if (
227 colname.startswith('f_')
228 or colname.startswith('v_')
229 or colname.startswith('d_')
230 or colname.startswith('d2_')
231 or (colname.startswith('c_') and not colname.startswith('c_q['))
232 ):
233 out_atoms.new_array(colname, data[colname], dtype='float')
235 elif colname.startswith('i_') or colname.startswith('i2_'):
236 out_atoms.new_array(colname, data[colname], dtype='int')
237 elif colname == 'type':
238 try:
239 out_atoms.new_array(colname, data['type'], dtype='int')
240 except ValueError:
241 pass # in case type is not integer
243 return out_atoms
246def construct_cell(diagdisp, offdiag):
247 """Help function to create an ASE-cell with displacement vector from
248 the lammps coordination system parameters.
250 :param diagdisp: cell dimension convoluted with the displacement vector
251 :param offdiag: off-diagonal cell elements
252 :returns: cell and cell displacement vector
253 :rtype: tuple
254 """
255 xlo, xhi, ylo, yhi, zlo, zhi = diagdisp
256 xy, xz, yz = offdiag
258 # create ase-cell from lammps-box
259 xhilo = (xhi - xlo) - abs(xy) - abs(xz)
260 yhilo = (yhi - ylo) - abs(yz)
261 zhilo = zhi - zlo
262 celldispx = xlo - min(0, xy) - min(0, xz)
263 celldispy = ylo - min(0, yz)
264 celldispz = zlo
265 cell = np.array([[xhilo, 0, 0], [xy, yhilo, 0], [xz, yz, zhilo]])
266 celldisp = np.array([celldispx, celldispy, celldispz])
268 return cell, celldisp
271def _parse_pbc(tilt_items: list[str]) -> list[bool]:
272 """Handle pbc conditions."""
273 pbc_items = tilt_items[-3:] if len(tilt_items) >= 3 else ['f', 'f', 'f']
274 return ['p' in d.lower() for d in pbc_items]
277def _parse_box_bound(line: str, cell_lines: list[str]) -> tuple:
278 # save labels behind "ITEM: BOX BOUNDS" in triclinic case
279 # (>=lammps-7Jul09)
280 tilt_items = line.split()[3:]
281 cell_data = np.loadtxt(cell_lines)
283 # general triclinic boxes (>=patch_17Apr2024)
284 if tilt_items[0] == 'abc':
285 cell = cell_data[:, :3]
286 celldisp = cell_data[:, 3]
287 pbc = _parse_pbc(tilt_items)
288 return cell, celldisp, pbc
290 diagdisp = cell_data[:, :2].flatten()
292 # determine cell tilt (triclinic case!)
293 if len(cell_data[0]) > 2:
294 # for >=lammps-7Jul09 use labels behind "ITEM: BOX BOUNDS"
295 # to assign tilt (vector) elements ...
296 offdiag = cell_data[:, 2]
297 # ... otherwise assume default order in 3rd column
298 # (if the latter was present)
299 if len(tilt_items) >= 3:
300 sort_index = [tilt_items.index(i) for i in ['xy', 'xz', 'yz']]
301 offdiag = offdiag[sort_index]
302 else:
303 offdiag = np.zeros(3)
305 cell, celldisp = construct_cell(diagdisp, offdiag)
307 pbc = _parse_pbc(tilt_items)
309 return cell, celldisp, pbc
312def get_max_index(index):
313 if np.isscalar(index):
314 return index
315 elif isinstance(index, slice):
316 return index.stop if (index.stop is not None) else float('inf')
319def _colnames2dtypes(colnames: list[str]) -> list[tuple[str, Any]]:
320 # Determine the data types for each column
321 dtype: list[tuple[str, Any]] = []
322 for colname in colnames:
323 if (
324 colname in {'id', 'type'}
325 or colname.startswith('i_')
326 or colname.startswith('i2_')
327 ):
328 dtype.append((colname, int))
329 elif colname == 'element':
330 # 'U10' for strings with a max length of 10 characters
331 dtype.append((colname, 'U10'))
332 else:
333 dtype.append((colname, float))
334 return dtype
337def _read_lammps_dump_text_frame(fd: TextIO, n: int, **kwargs) -> Atoms:
338 # avoid references before assignment in case of incorrect file structure
339 cell, celldisp, pbc, info = None, None, False, {}
341 fd.seek(n) # jump to the position just after 'ITEM: TIMESTEP'
342 line = fd.readline()
343 info['timestep'] = int(line.split()[0])
345 while line := fd.readline():
346 if 'ITEM: NUMBER OF ATOMS' in line:
347 line = fd.readline()
348 n_atoms = int(line.split()[0])
350 if 'ITEM: BOX BOUNDS' in line:
351 cell_lines = [fd.readline() for _ in range(3)]
352 cell, celldisp, pbc = _parse_box_bound(line, cell_lines)
354 if 'ITEM: ATOMS' in line:
355 colnames = line.split()[2:]
356 dtype = _colnames2dtypes(colnames)
357 datarows = [fd.readline() for _ in range(n_atoms)]
358 data = np.loadtxt(datarows, dtype=dtype, ndmin=1)
359 out_atoms = _lammps_data_to_ase_atoms(
360 data=data,
361 colnames=colnames,
362 cell=cell,
363 celldisp=celldisp,
364 pbc=pbc,
365 **kwargs,
366 )
367 out_atoms.info.update(info)
368 return out_atoms
370 raise RuntimeError('Incomplete LAMMPS dump text chunk')
373class _LAMMPSDumpTextChunk(ImageChunk):
374 def __init__(self, fd: TextIO, pos: int) -> None:
375 self.fd = fd
376 self.pos = pos
378 def build(self, **kwargs) -> Atoms:
379 return _read_lammps_dump_text_frame(self.fd, self.pos, **kwargs)
382def _i_lammps_dump_text_chunks(fd: TextIO) -> Iterator[_LAMMPSDumpTextChunk]:
383 while line := fd.readline():
384 if 'ITEM: TIMESTEP' in line:
385 pos = fd.tell() # position just after 'ITEM: TIMESTEP'
386 yield _LAMMPSDumpTextChunk(fd, pos)
389iread_lammps_dump_text = ImageIterator(_i_lammps_dump_text_chunks)
392@reader
393def read_lammps_dump_text(fd, index=-1, **kwargs):
394 """Read a LAMMPS text dump file."""
395 g = iread_lammps_dump_text(fd, index=index, **kwargs)
396 return list(g) if isinstance(index, (slice, str)) else next(g)
399def read_lammps_dump_binary(
400 fileobj, index=-1, colnames=None, intformat='SMALLBIG', **kwargs
401):
402 """Read binary dump-files (after binary2txt.cpp from lammps/tools).
404 :param fileobj: file-stream containing the binary lammps data
405 :param index: integer or slice object (default: get the last timestep)
406 :param colnames: data is columns and identified by a header
407 :param intformat: lammps support different integer size. Parameter set \
408 at compile-time and can unfortunately not derived from data file
409 :returns: list of Atoms-objects
410 :rtype: list
411 """
412 # depending on the chosen compilation flag lammps uses either normal
413 # integers or long long for its id or timestep numbering
414 # !TODO: tags are cast to double -> missing/double ids (add check?)
415 _tagformat, bigformat = {
416 'SMALLSMALL': ('i', 'i'),
417 'SMALLBIG': ('i', 'q'),
418 'BIGBIG': ('q', 'q'),
419 }[intformat]
421 index_end = get_max_index(index)
423 # Standard columns layout from lammpsrun
424 if not colnames:
425 colnames = [
426 'id',
427 'type',
428 'x',
429 'y',
430 'z',
431 'vx',
432 'vy',
433 'vz',
434 'fx',
435 'fy',
436 'fz',
437 ]
439 images = []
441 # wrap struct.unpack to raise EOFError
442 def read_variables(string):
443 obj_len = struct.calcsize(string)
444 data_obj = fileobj.read(obj_len)
445 if obj_len != len(data_obj):
446 raise EOFError
447 return struct.unpack(string, data_obj)
449 while True:
450 try:
451 # Assume that the binary dump file is in the old (pre-29Oct2020)
452 # format
453 magic_string = None
455 # read header
456 (ntimestep,) = read_variables('=' + bigformat)
458 # In the new LAMMPS binary dump format (version 29Oct2020 and
459 # onward), a negative timestep is used to indicate that the next
460 # few bytes will contain certain metadata
461 if ntimestep < 0:
462 # First bigint was actually encoding the negative of the format
463 # name string length (we call this 'magic_string' to
464 magic_string_len = -ntimestep
466 # The next `magic_string_len` bytes will hold a string
467 # indicating the format of the dump file
468 magic_string = b''.join(
469 read_variables('=' + str(magic_string_len) + 'c')
470 )
472 # Read endianness (integer). For now, we'll disregard the value
473 # and simply use the host machine's endianness (via '='
474 # character used with struct.calcsize).
475 #
476 # TODO: Use the endianness of the dump file in subsequent
477 # read_variables rather than just assuming it will match
478 # that of the host
479 read_variables('=i')
481 # Read revision number (integer)
482 (revision,) = read_variables('=i')
484 # Finally, read the actual timestep (bigint)
485 (ntimestep,) = read_variables('=' + bigformat)
487 _n_atoms, triclinic = read_variables('=' + bigformat + 'i')
488 boundary = read_variables('=6i')
490 if triclinic == 0:
491 diagdisp = read_variables('=6d')
492 offdiag = (0.0,) * 3
493 cell, celldisp = construct_cell(diagdisp, offdiag)
494 elif triclinic == 1:
495 diagdisp = read_variables('=6d')
496 offdiag = read_variables('=3d')
497 cell, celldisp = construct_cell(diagdisp, offdiag)
498 elif triclinic == 2: # general triclinic boxes (>=patch_17Apr2024)
499 cell = np.array(read_variables('=9d')).reshape(3, 3)
500 celldisp = np.array(read_variables('=3d'))
501 else:
502 raise ValueError(triclinic)
504 (size_one,) = read_variables('=i')
506 if len(colnames) != size_one:
507 raise ValueError('Provided columns do not match binary file')
509 if magic_string and revision > 1:
510 # New binary dump format includes units string,
511 # columns string, and time
512 (units_str_len,) = read_variables('=i')
514 if units_str_len > 0:
515 # Read lammps units style
516 _ = b''.join(read_variables('=' + str(units_str_len) + 'c'))
518 (flag,) = read_variables('=c')
519 if flag != b'\x00':
520 # Flag was non-empty string
521 read_variables('=d')
523 # Length of column string
524 (columns_str_len,) = read_variables('=i')
526 # Read column string (e.g., "id type x y z vx vy vz fx fy fz")
527 _ = b''.join(read_variables('=' + str(columns_str_len) + 'c'))
529 (nchunk,) = read_variables('=i')
531 # lammps cells/boxes can have different boundary conditions on each
532 # sides (makes mainly sense for different non-periodic conditions
533 # (e.g. [f]ixed and [s]hrink for a irradiation simulation))
534 # periodic case: b 0 = 'p'
535 # non-peridic cases 1: 'f', 2 : 's', 3: 'm'
536 pbc = np.sum(np.array(boundary).reshape((3, 2)), axis=1) == 0
538 data = []
539 for _ in range(nchunk):
540 # number-of-data-entries
541 (n_data,) = read_variables('=i')
542 # retrieve per atom data
543 data += read_variables('=' + str(n_data) + 'd')
544 data = np.array(data).reshape((-1, size_one))
546 # convert the 2D float array to the structured array
547 data = np.rec.fromarrays(data.T, dtype=_colnames2dtypes(colnames))
549 # map data-chunk to ase atoms
550 out_atoms = _lammps_data_to_ase_atoms(
551 data=data,
552 colnames=colnames,
553 cell=cell,
554 celldisp=celldisp,
555 pbc=pbc,
556 **kwargs,
557 )
559 images.append(out_atoms)
561 # stop if requested index has been found
562 if len(images) > index_end >= 0: # type: ignore[comparison-overlap]
563 break
565 except EOFError:
566 break
568 return images[index]
571def _mass2element(mass):
572 """
573 Guess the element corresponding to a given atomic mass.
575 :param mass: Atomic mass for searching.
576 :return: Element symbol as a string.
577 """
578 min_idx = np.argmin(np.abs(atomic_masses - mass))
579 element = chemical_symbols[min_idx]
580 return element