Coverage for ase / io / lammpsrun.py: 90.58%
276 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 08:22 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 08:22 +0000
1"""IO for LAMMPS dump files."""
3import gzip
4import struct
5from collections.abc import Iterator
6from os.path import splitext
7from typing import IO, Any, TextIO
9import numpy as np
11from ase.atoms import Atoms
12from ase.calculators.lammps import convert
13from ase.calculators.singlepoint import SinglePointCalculator
14from ase.data import atomic_masses, chemical_symbols
15from ase.io.utils import ImageChunk, ImageIterator
16from ase.parallel import paropen
17from ase.utils import reader
20def read_lammps_dump(infileobj, **kwargs):
21 """Method which reads a LAMMPS dump file.
23 LAMMPS chooses output method depending on the given suffix:
24 - .bin : binary file
25 - .gz : output piped through gzip
26 - .mpiio: using mpiio (should be like cleartext,
27 with different ordering)
28 - else : normal clear-text format
30 :param infileobj: string to file, opened file or file-like stream
32 """
33 # !TODO: add support for lammps-regex naming schemes (output per
34 # processor and timestep wildcards)
36 opened = False
37 if isinstance(infileobj, str):
38 opened = True
39 suffix = splitext(infileobj)[-1]
40 if suffix == '.bin':
41 fileobj = paropen(infileobj, 'rb')
42 elif suffix == '.gz':
43 # !TODO: save for parallel execution?
44 fileobj = gzip.open(infileobj, 'rb')
45 else:
46 fileobj = paropen(infileobj)
47 else:
48 suffix = splitext(infileobj.name)[-1]
49 fileobj = infileobj
51 if suffix == '.bin':
52 out = read_lammps_dump_binary(fileobj, **kwargs)
53 if opened:
54 fileobj.close()
55 return out
57 out = read_lammps_dump_text(fileobj, **kwargs)
59 if opened:
60 fileobj.close()
62 return out
65def _lammps_data_to_ase_atoms(
66 data,
67 colnames,
68 cell,
69 celldisp,
70 pbc=False,
71 order=True,
72 specorder=None,
73 prismobj=None,
74 units='metal',
75):
76 """Extract positions and other per-atom parameters and create Atoms.
78 Parameters
79 ----------
80 data : np.ndarray
81 Structured array for `ITEM: ATOMS`.
82 colnames: list[str]
83 Column names for `ITEM: ATOMS`.
84 cell : np.ndarray
85 Cell.
86 celldisp : np.ndarray
87 Origin shift.
88 pbc : bool | list[bool]
89 Periodic boundary conditions.
90 order : bool
91 Sort atoms by `id`. Might be faster to turn off.
92 Disregarded in case `id` column is not given in file.
93 specorder : list[str]
94 List of species to map LAMMPS types to ASE species.
95 (Usually .dump files do not contain type to species mapping.)
96 prismobj : Prism
97 Coordinate transformation between LAMMPS and ASE.
98 units : str
99 LAMMPS units for unit transformation between LAMMPS and ASE.
101 Returns
102 -------
103 :class:`~ase.Atoms`
105 """
106 # read IDs if given and order if needed
107 if 'id' in colnames:
108 ids = data['id']
109 if order:
110 sort_order = np.argsort(ids)
111 data = data[sort_order]
113 # determine the elements
114 if 'element' in colnames:
115 # priority to elements written in file
116 elements = data['element']
117 elif 'mass' in colnames:
118 # try to determine elements from masses
119 elements = [_mass2element(m) for m in data['mass']]
120 elif 'type' in colnames:
121 # fall back to `types` otherwise
122 elements = data['type']
124 # reconstruct types from given specorder
125 if specorder:
126 elements = [specorder[t - 1] for t in elements]
127 else:
128 # todo: what if specorder give but no types?
129 # in principle the masses could work for atoms, but that needs
130 # lots of cases and new code I guess
131 raise ValueError('Cannot determine atom types form LAMMPS dump file')
133 def get_quantity(labels, quantity=None):
134 try:
135 cols = np.column_stack([data[label] for label in labels])
136 if quantity:
137 return convert(cols, quantity, units, 'ASE')
138 return cols
139 except ValueError:
140 return None
142 # Positions
143 positions = None
144 scaled_positions = None
145 if 'x' in colnames:
146 # doc: x, y, z = unscaled atom coordinates
147 positions = get_quantity(['x', 'y', 'z'], 'distance')
148 elif 'xs' in colnames:
149 # doc: xs,ys,zs = scaled atom coordinates
150 scaled_positions = get_quantity(['xs', 'ys', 'zs'])
151 elif 'xu' in colnames:
152 # doc: xu,yu,zu = unwrapped atom coordinates
153 positions = get_quantity(['xu', 'yu', 'zu'], 'distance')
154 elif 'xsu' in colnames:
155 # xsu,ysu,zsu = scaled unwrapped atom coordinates
156 scaled_positions = get_quantity(['xsu', 'ysu', 'zsu'])
157 else:
158 raise ValueError('No atomic positions found in LAMMPS output')
160 velocities = get_quantity(['vx', 'vy', 'vz'], 'velocity')
161 charges = get_quantity(['q'], 'charge')
162 forces = get_quantity(['fx', 'fy', 'fz'], 'force')
163 # !TODO: how need quaternions be converted?
164 quaternions = get_quantity(['c_q[1]', 'c_q[2]', 'c_q[3]', 'c_q[4]'])
166 # convert cell
167 cell = convert(cell, 'distance', units, 'ASE')
168 celldisp = convert(celldisp, 'distance', units, 'ASE')
169 if prismobj:
170 celldisp = prismobj.vector_to_ase(celldisp)
171 cell = prismobj.update_cell(cell)
173 if quaternions is not None:
174 out_atoms = Atoms(
175 symbols=elements,
176 positions=positions,
177 cell=cell,
178 celldisp=celldisp,
179 pbc=pbc,
180 )
181 out_atoms.new_array('quaternions', quaternions, dtype=float)
182 elif positions is not None:
183 # reverse coordinations transform to lammps system
184 # (for all vectors = pos, vel, force)
185 if prismobj:
186 positions = prismobj.vector_to_ase(positions, wrap=True)
188 out_atoms = Atoms(
189 symbols=elements,
190 positions=positions,
191 pbc=pbc,
192 celldisp=celldisp,
193 cell=cell,
194 )
195 elif scaled_positions is not None:
196 out_atoms = Atoms(
197 symbols=elements,
198 scaled_positions=scaled_positions,
199 pbc=pbc,
200 celldisp=celldisp,
201 cell=cell,
202 )
203 else:
204 raise RuntimeError('No atomsobj created from LAMMPS data!')
206 if velocities is not None:
207 if prismobj:
208 velocities = prismobj.vector_to_ase(velocities)
209 out_atoms.set_velocities(velocities)
210 if charges is not None:
211 out_atoms.set_initial_charges([charge[0] for charge in charges])
212 if forces is not None:
213 if prismobj:
214 forces = prismobj.vector_to_ase(forces)
215 # !TODO: use another calculator if available (or move forces
216 # to atoms.property) (other problem: synchronizing
217 # parallel runs)
218 calculator = SinglePointCalculator(out_atoms, energy=0.0, forces=forces)
219 out_atoms.calc = calculator
221 # process the extra columns of fixes, variables and computes
222 # that can be dumped, add as additional arrays to atoms object
223 for colname in colnames:
224 # determine if it is a compute, fix or
225 # custom property/atom (but not the quaternian)
226 if (
227 colname.startswith('f_')
228 or colname.startswith('v_')
229 or colname.startswith('d_')
230 or colname.startswith('d2_')
231 or (colname.startswith('c_') and not colname.startswith('c_q['))
232 ):
233 out_atoms.new_array(colname, data[colname], dtype='float')
235 elif colname.startswith('i_') or colname.startswith('i2_'):
236 out_atoms.new_array(colname, data[colname], dtype='int')
237 elif colname == 'type':
238 try:
239 out_atoms.new_array(colname, data['type'], dtype='int')
240 except ValueError:
241 pass # in case type is not integer
243 return out_atoms
246def construct_cell(diagdisp, offdiag):
247 """Help function to create an ASE-cell with displacement vector from
248 the lammps coordination system parameters.
250 :param diagdisp: cell dimension convoluted with the displacement vector
251 :param offdiag: off-diagonal cell elements
252 :returns: cell and cell displacement vector
253 :rtype: tuple
254 """
255 xlo, xhi, ylo, yhi, zlo, zhi = diagdisp
256 xy, xz, yz = offdiag
258 # create ase-cell from lammps-box
259 xhilo = (xhi - xlo) - abs(xy) - abs(xz)
260 yhilo = (yhi - ylo) - abs(yz)
261 zhilo = zhi - zlo
262 celldispx = xlo - min(0, xy) - min(0, xz)
263 celldispy = ylo - min(0, yz)
264 celldispz = zlo
265 cell = np.array([[xhilo, 0, 0], [xy, yhilo, 0], [xz, yz, zhilo]])
266 celldisp = np.array([celldispx, celldispy, celldispz])
268 return cell, celldisp
271def _parse_pbc(tilt_items: list[str]) -> list[bool]:
272 """Handle pbc conditions."""
273 pbc_items = tilt_items[-3:] if len(tilt_items) >= 3 else ['f', 'f', 'f']
274 return ['p' in d.lower() for d in pbc_items]
277def _parse_box_bound(line: str, cell_lines: list[str]) -> tuple:
278 # save labels behind "ITEM: BOX BOUNDS" in triclinic case
279 # (>=lammps-7Jul09)
280 tilt_items = line.split()[3:]
281 cell_data = np.loadtxt(cell_lines)
283 # general triclinic boxes (>=patch_17Apr2024)
284 if tilt_items[0] == 'abc':
285 cell = cell_data[:, :3]
286 celldisp = cell_data[:, 3]
287 pbc = _parse_pbc(tilt_items)
288 return cell, celldisp, pbc
290 diagdisp = cell_data[:, :2].flatten()
292 # determine cell tilt (triclinic case!)
293 if len(cell_data[0]) > 2:
294 # for >=lammps-7Jul09 use labels behind "ITEM: BOX BOUNDS"
295 # to assign tilt (vector) elements ...
296 offdiag = cell_data[:, 2]
297 # ... otherwise assume default order in 3rd column
298 # (if the latter was present)
299 if len(tilt_items) >= 3:
300 sort_index = [tilt_items.index(i) for i in ['xy', 'xz', 'yz']]
301 offdiag = offdiag[sort_index]
302 else:
303 offdiag = np.zeros(3)
305 cell, celldisp = construct_cell(diagdisp, offdiag)
307 pbc = _parse_pbc(tilt_items)
309 return cell, celldisp, pbc
312def _colnames2dtypes(colnames: list[str]) -> list[tuple[str, Any]]:
313 # Determine the data types for each column
314 dtype: list[tuple[str, Any]] = []
315 for colname in colnames:
316 if (
317 colname in {'id', 'type'}
318 or colname.startswith('i_')
319 or colname.startswith('i2_')
320 ):
321 dtype.append((colname, int))
322 elif colname == 'element':
323 # 'U10' for strings with a max length of 10 characters
324 dtype.append((colname, 'U10'))
325 else:
326 dtype.append((colname, float))
327 return dtype
330def _read_lammps_dump_text_frame(fd: TextIO, n: int, **kwargs) -> Atoms:
331 # avoid references before assignment in case of incorrect file structure
332 cell, celldisp, pbc, info = None, None, False, {}
334 fd.seek(n) # jump to the position just after 'ITEM: TIMESTEP'
335 line = fd.readline()
336 info['timestep'] = int(line.split()[0])
338 while line := fd.readline():
339 if 'ITEM: NUMBER OF ATOMS' in line:
340 line = fd.readline()
341 n_atoms = int(line.split()[0])
343 if 'ITEM: BOX BOUNDS' in line:
344 cell_lines = [fd.readline() for _ in range(3)]
345 cell, celldisp, pbc = _parse_box_bound(line, cell_lines)
347 if 'ITEM: ATOMS' in line:
348 colnames = line.split()[2:]
349 dtype = _colnames2dtypes(colnames)
350 datarows = [fd.readline() for _ in range(n_atoms)]
351 data = np.loadtxt(datarows, dtype=dtype, ndmin=1)
352 out_atoms = _lammps_data_to_ase_atoms(
353 data=data,
354 colnames=colnames,
355 cell=cell,
356 celldisp=celldisp,
357 pbc=pbc,
358 **kwargs,
359 )
360 out_atoms.info.update(info)
361 return out_atoms
363 raise RuntimeError('Incomplete LAMMPS dump text chunk')
366class _LAMMPSDumpTextChunk(ImageChunk):
367 def __init__(self, fd: TextIO, pos: int) -> None:
368 self.fd = fd
369 self.pos = pos
371 def build(self, **kwargs) -> Atoms:
372 return _read_lammps_dump_text_frame(self.fd, self.pos, **kwargs)
375def _i_lammps_dump_text_chunks(fd: TextIO) -> Iterator[_LAMMPSDumpTextChunk]:
376 while line := fd.readline():
377 if 'ITEM: TIMESTEP' in line:
378 pos = fd.tell() # position just after 'ITEM: TIMESTEP'
379 yield _LAMMPSDumpTextChunk(fd, pos)
382iread_lammps_dump_text = ImageIterator(_i_lammps_dump_text_chunks)
385@reader
386def read_lammps_dump_text(fd, index=-1, **kwargs):
387 """Read a LAMMPS text dump file."""
388 g = iread_lammps_dump_text(fd, index=index, **kwargs)
389 return list(g) if isinstance(index, (slice, str)) else next(g)
392def _read_lammps_dump_binary_data(fd, /, colnames=None, intformat='SMALLBIG'):
393 # depending on the chosen compilation flag lammps uses either normal
394 # integers or long long for its id or timestep numbering
395 # !TODO: tags are cast to double -> missing/double ids (add check?)
396 _tagformat, bigformat = {
397 'SMALLSMALL': ('i', 'i'),
398 'SMALLBIG': ('i', 'q'),
399 'BIGBIG': ('q', 'q'),
400 }[intformat]
402 # wrap struct.unpack to raise EOFError
403 def read_variables(string):
404 obj_len = struct.calcsize(string)
405 data_obj = fd.read(obj_len)
406 if obj_len != len(data_obj):
407 raise EOFError
408 return struct.unpack(string, data_obj)
410 # Assume that the binary dump file is in the old (pre-29Oct2020)
411 # format
412 magic_string = None
414 # read header
415 (ntimestep,) = read_variables('=' + bigformat)
417 # In the new LAMMPS binary dump format (version 29Oct2020 and
418 # onward), a negative timestep is used to indicate that the next
419 # few bytes will contain certain metadata
420 if ntimestep < 0:
421 # First bigint was actually encoding the negative of the format
422 # name string length (we call this 'magic_string' to
423 magic_string_len = -ntimestep
425 # The next `magic_string_len` bytes will hold a string
426 # indicating the format of the dump file
427 magic_string = b''.join(
428 read_variables('=' + str(magic_string_len) + 'c')
429 )
431 # Read endianness (integer). For now, we'll disregard the value
432 # and simply use the host machine's endianness (via '='
433 # character used with struct.calcsize).
434 #
435 # TODO: Use the endianness of the dump file in subsequent
436 # read_variables rather than just assuming it will match
437 # that of the host
438 read_variables('=i')
440 # Read revision number (integer)
441 (revision,) = read_variables('=i')
443 # Finally, read the actual timestep (bigint)
444 (ntimestep,) = read_variables('=' + bigformat)
446 _n_atoms, triclinic = read_variables('=' + bigformat + 'i')
447 boundary = read_variables('=6i')
449 if triclinic == 0:
450 diagdisp = read_variables('=6d')
451 offdiag = (0.0,) * 3
452 cell, celldisp = construct_cell(diagdisp, offdiag)
453 elif triclinic == 1:
454 diagdisp = read_variables('=6d')
455 offdiag = read_variables('=3d')
456 cell, celldisp = construct_cell(diagdisp, offdiag)
457 elif triclinic == 2: # general triclinic boxes (>=patch_17Apr2024)
458 cell = np.array(read_variables('=9d')).reshape(3, 3)
459 celldisp = np.array(read_variables('=3d'))
460 else:
461 raise ValueError(triclinic)
463 (size_one,) = read_variables('=i')
465 if len(colnames) != size_one:
466 raise ValueError('Provided columns do not match binary file')
468 if magic_string and revision > 1:
469 # New binary dump format includes units string,
470 # columns string, and time
471 (units_str_len,) = read_variables('=i')
473 if units_str_len > 0:
474 # Read lammps units style
475 _ = b''.join(read_variables('=' + str(units_str_len) + 'c'))
477 (flag,) = read_variables('=c')
478 if flag != b'\x00':
479 # Flag was non-empty string
480 read_variables('=d')
482 # Length of column string
483 (columns_str_len,) = read_variables('=i')
485 # Read column string (e.g., "id type x y z vx vy vz fx fy fz")
486 _ = b''.join(read_variables('=' + str(columns_str_len) + 'c'))
488 (nchunk,) = read_variables('=i')
490 # lammps cells/boxes can have different boundary conditions on each
491 # sides (makes mainly sense for different non-periodic conditions
492 # (e.g. [f]ixed and [s]hrink for a irradiation simulation))
493 # periodic case: b 0 = 'p'
494 # non-peridic cases 1: 'f', 2 : 's', 3: 'm'
495 pbc = np.sum(np.array(boundary).reshape((3, 2)), axis=1) == 0
497 data = []
498 for _ in range(nchunk):
499 # number-of-data-entries
500 (n_data,) = read_variables('=i')
501 # retrieve per atom data
502 data += read_variables('=' + str(n_data) + 'd')
504 return data, cell, celldisp, pbc
507class _LAMMPSDumpBinaryChunk(ImageChunk):
508 def __init__(
509 self,
510 fd: IO,
511 colnames: list[str] | None,
512 intformat: str,
513 ) -> None:
514 # Standard columns layout from lammpsrun
515 colnames_default = [
516 'id',
517 'type',
518 'x',
519 'y',
520 'z',
521 'vx',
522 'vy',
523 'vz',
524 'fx',
525 'fy',
526 'fz',
527 ]
528 self.colnames = colnames if colnames else colnames_default
530 _ = _read_lammps_dump_binary_data(fd, self.colnames, intformat)
531 self.data = _[0]
532 self.cell = _[1]
533 self.celldisp = _[2]
534 self.pbc = _[3]
536 def build(self, **kwargs) -> Atoms:
537 data = np.array(self.data).reshape((-1, len(self.colnames)))
539 # convert the 2D float array to the structured array
540 dtype = _colnames2dtypes(self.colnames)
541 data = np.rec.fromarrays(data.T, dtype=dtype)
543 # map data-chunk to ase atoms
544 return _lammps_data_to_ase_atoms(
545 data=data,
546 colnames=self.colnames,
547 cell=self.cell,
548 celldisp=self.celldisp,
549 pbc=self.pbc,
550 **kwargs,
551 )
554class _LAMMPSDumpBinaryChunkIterator:
555 def __init__(self) -> None:
556 self.colnames = None
557 self.intformat = ''
559 def __call__(self, fd: IO) -> Iterator[_LAMMPSDumpBinaryChunk]:
560 while True:
561 try:
562 yield _LAMMPSDumpBinaryChunk(fd, self.colnames, self.intformat)
563 except EOFError:
564 break
567class _LAMMPSDumpBinaryImageIterator(ImageIterator):
568 def __call__(self, fd: IO, index=None, **kwargs) -> Iterator[Atoms]:
569 _kwargs = kwargs.copy()
570 self.ichunks: _LAMMPSDumpBinaryChunkIterator
571 self.ichunks.colnames = _kwargs.pop('colnames', None)
572 self.ichunks.intformat = _kwargs.pop('intformat', 'SMALLBIG')
573 return super().__call__(fd, index, **_kwargs)
576iread_lammps_dump_binary = _LAMMPSDumpBinaryImageIterator(
577 _LAMMPSDumpBinaryChunkIterator()
578)
581def read_lammps_dump_binary(fd, /, index=-1, **kwargs):
582 """Read binary dump-files (after binary2txt.cpp from lammps/tools).
584 :param fileobj: file-stream containing the binary lammps data
585 :param index: integer or slice object (default: get the last timestep)
586 :param colnames: data is columns and identified by a header
587 :param intformat: lammps support different integer size. Parameter set \
588 at compile-time and can unfortunately not derived from data file
589 :returns: list of Atoms-objects
590 :rtype: list
591 """
592 g = iread_lammps_dump_binary(fd, index=index, **kwargs)
593 return list(g) if isinstance(index, (slice, str)) else next(g)
596def _mass2element(mass):
597 """
598 Guess the element corresponding to a given atomic mass.
600 :param mass: Atomic mass for searching.
601 :return: Element symbol as a string.
602 """
603 min_idx = np.argmin(np.abs(atomic_masses - mass))
604 element = chemical_symbols[min_idx]
605 return element