Coverage for ase / io / lammpsrun.py: 90.58%

276 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-30 08:22 +0000

1"""IO for LAMMPS dump files.""" 

2 

3import gzip 

4import struct 

5from collections.abc import Iterator 

6from os.path import splitext 

7from typing import IO, Any, TextIO 

8 

9import numpy as np 

10 

11from ase.atoms import Atoms 

12from ase.calculators.lammps import convert 

13from ase.calculators.singlepoint import SinglePointCalculator 

14from ase.data import atomic_masses, chemical_symbols 

15from ase.io.utils import ImageChunk, ImageIterator 

16from ase.parallel import paropen 

17from ase.utils import reader 

18 

19 

20def read_lammps_dump(infileobj, **kwargs): 

21 """Method which reads a LAMMPS dump file. 

22 

23 LAMMPS chooses output method depending on the given suffix: 

24 - .bin : binary file 

25 - .gz : output piped through gzip 

26 - .mpiio: using mpiio (should be like cleartext, 

27 with different ordering) 

28 - else : normal clear-text format 

29 

30 :param infileobj: string to file, opened file or file-like stream 

31 

32 """ 

33 # !TODO: add support for lammps-regex naming schemes (output per 

34 # processor and timestep wildcards) 

35 

36 opened = False 

37 if isinstance(infileobj, str): 

38 opened = True 

39 suffix = splitext(infileobj)[-1] 

40 if suffix == '.bin': 

41 fileobj = paropen(infileobj, 'rb') 

42 elif suffix == '.gz': 

43 # !TODO: save for parallel execution? 

44 fileobj = gzip.open(infileobj, 'rb') 

45 else: 

46 fileobj = paropen(infileobj) 

47 else: 

48 suffix = splitext(infileobj.name)[-1] 

49 fileobj = infileobj 

50 

51 if suffix == '.bin': 

52 out = read_lammps_dump_binary(fileobj, **kwargs) 

53 if opened: 

54 fileobj.close() 

55 return out 

56 

57 out = read_lammps_dump_text(fileobj, **kwargs) 

58 

59 if opened: 

60 fileobj.close() 

61 

62 return out 

63 

64 

65def _lammps_data_to_ase_atoms( 

66 data, 

67 colnames, 

68 cell, 

69 celldisp, 

70 pbc=False, 

71 order=True, 

72 specorder=None, 

73 prismobj=None, 

74 units='metal', 

75): 

76 """Extract positions and other per-atom parameters and create Atoms. 

77 

78 Parameters 

79 ---------- 

80 data : np.ndarray 

81 Structured array for `ITEM: ATOMS`. 

82 colnames: list[str] 

83 Column names for `ITEM: ATOMS`. 

84 cell : np.ndarray 

85 Cell. 

86 celldisp : np.ndarray 

87 Origin shift. 

88 pbc : bool | list[bool] 

89 Periodic boundary conditions. 

90 order : bool 

91 Sort atoms by `id`. Might be faster to turn off. 

92 Disregarded in case `id` column is not given in file. 

93 specorder : list[str] 

94 List of species to map LAMMPS types to ASE species. 

95 (Usually .dump files do not contain type to species mapping.) 

96 prismobj : Prism 

97 Coordinate transformation between LAMMPS and ASE. 

98 units : str 

99 LAMMPS units for unit transformation between LAMMPS and ASE. 

100 

101 Returns 

102 ------- 

103 :class:`~ase.Atoms` 

104 

105 """ 

106 # read IDs if given and order if needed 

107 if 'id' in colnames: 

108 ids = data['id'] 

109 if order: 

110 sort_order = np.argsort(ids) 

111 data = data[sort_order] 

112 

113 # determine the elements 

114 if 'element' in colnames: 

115 # priority to elements written in file 

116 elements = data['element'] 

117 elif 'mass' in colnames: 

118 # try to determine elements from masses 

119 elements = [_mass2element(m) for m in data['mass']] 

120 elif 'type' in colnames: 

121 # fall back to `types` otherwise 

122 elements = data['type'] 

123 

124 # reconstruct types from given specorder 

125 if specorder: 

126 elements = [specorder[t - 1] for t in elements] 

127 else: 

128 # todo: what if specorder give but no types? 

129 # in principle the masses could work for atoms, but that needs 

130 # lots of cases and new code I guess 

131 raise ValueError('Cannot determine atom types form LAMMPS dump file') 

132 

133 def get_quantity(labels, quantity=None): 

134 try: 

135 cols = np.column_stack([data[label] for label in labels]) 

136 if quantity: 

137 return convert(cols, quantity, units, 'ASE') 

138 return cols 

139 except ValueError: 

140 return None 

141 

142 # Positions 

143 positions = None 

144 scaled_positions = None 

145 if 'x' in colnames: 

146 # doc: x, y, z = unscaled atom coordinates 

147 positions = get_quantity(['x', 'y', 'z'], 'distance') 

148 elif 'xs' in colnames: 

149 # doc: xs,ys,zs = scaled atom coordinates 

150 scaled_positions = get_quantity(['xs', 'ys', 'zs']) 

151 elif 'xu' in colnames: 

152 # doc: xu,yu,zu = unwrapped atom coordinates 

153 positions = get_quantity(['xu', 'yu', 'zu'], 'distance') 

154 elif 'xsu' in colnames: 

155 # xsu,ysu,zsu = scaled unwrapped atom coordinates 

156 scaled_positions = get_quantity(['xsu', 'ysu', 'zsu']) 

157 else: 

158 raise ValueError('No atomic positions found in LAMMPS output') 

159 

160 velocities = get_quantity(['vx', 'vy', 'vz'], 'velocity') 

161 charges = get_quantity(['q'], 'charge') 

162 forces = get_quantity(['fx', 'fy', 'fz'], 'force') 

163 # !TODO: how need quaternions be converted? 

164 quaternions = get_quantity(['c_q[1]', 'c_q[2]', 'c_q[3]', 'c_q[4]']) 

165 

166 # convert cell 

167 cell = convert(cell, 'distance', units, 'ASE') 

168 celldisp = convert(celldisp, 'distance', units, 'ASE') 

169 if prismobj: 

170 celldisp = prismobj.vector_to_ase(celldisp) 

171 cell = prismobj.update_cell(cell) 

172 

173 if quaternions is not None: 

174 out_atoms = Atoms( 

175 symbols=elements, 

176 positions=positions, 

177 cell=cell, 

178 celldisp=celldisp, 

179 pbc=pbc, 

180 ) 

181 out_atoms.new_array('quaternions', quaternions, dtype=float) 

182 elif positions is not None: 

183 # reverse coordinations transform to lammps system 

184 # (for all vectors = pos, vel, force) 

185 if prismobj: 

186 positions = prismobj.vector_to_ase(positions, wrap=True) 

187 

188 out_atoms = Atoms( 

189 symbols=elements, 

190 positions=positions, 

191 pbc=pbc, 

192 celldisp=celldisp, 

193 cell=cell, 

194 ) 

195 elif scaled_positions is not None: 

196 out_atoms = Atoms( 

197 symbols=elements, 

198 scaled_positions=scaled_positions, 

199 pbc=pbc, 

200 celldisp=celldisp, 

201 cell=cell, 

202 ) 

203 else: 

204 raise RuntimeError('No atomsobj created from LAMMPS data!') 

205 

206 if velocities is not None: 

207 if prismobj: 

208 velocities = prismobj.vector_to_ase(velocities) 

209 out_atoms.set_velocities(velocities) 

210 if charges is not None: 

211 out_atoms.set_initial_charges([charge[0] for charge in charges]) 

212 if forces is not None: 

213 if prismobj: 

214 forces = prismobj.vector_to_ase(forces) 

215 # !TODO: use another calculator if available (or move forces 

216 # to atoms.property) (other problem: synchronizing 

217 # parallel runs) 

218 calculator = SinglePointCalculator(out_atoms, energy=0.0, forces=forces) 

219 out_atoms.calc = calculator 

220 

221 # process the extra columns of fixes, variables and computes 

222 # that can be dumped, add as additional arrays to atoms object 

223 for colname in colnames: 

224 # determine if it is a compute, fix or 

225 # custom property/atom (but not the quaternian) 

226 if ( 

227 colname.startswith('f_') 

228 or colname.startswith('v_') 

229 or colname.startswith('d_') 

230 or colname.startswith('d2_') 

231 or (colname.startswith('c_') and not colname.startswith('c_q[')) 

232 ): 

233 out_atoms.new_array(colname, data[colname], dtype='float') 

234 

235 elif colname.startswith('i_') or colname.startswith('i2_'): 

236 out_atoms.new_array(colname, data[colname], dtype='int') 

237 elif colname == 'type': 

238 try: 

239 out_atoms.new_array(colname, data['type'], dtype='int') 

240 except ValueError: 

241 pass # in case type is not integer 

242 

243 return out_atoms 

244 

245 

246def construct_cell(diagdisp, offdiag): 

247 """Help function to create an ASE-cell with displacement vector from 

248 the lammps coordination system parameters. 

249 

250 :param diagdisp: cell dimension convoluted with the displacement vector 

251 :param offdiag: off-diagonal cell elements 

252 :returns: cell and cell displacement vector 

253 :rtype: tuple 

254 """ 

255 xlo, xhi, ylo, yhi, zlo, zhi = diagdisp 

256 xy, xz, yz = offdiag 

257 

258 # create ase-cell from lammps-box 

259 xhilo = (xhi - xlo) - abs(xy) - abs(xz) 

260 yhilo = (yhi - ylo) - abs(yz) 

261 zhilo = zhi - zlo 

262 celldispx = xlo - min(0, xy) - min(0, xz) 

263 celldispy = ylo - min(0, yz) 

264 celldispz = zlo 

265 cell = np.array([[xhilo, 0, 0], [xy, yhilo, 0], [xz, yz, zhilo]]) 

266 celldisp = np.array([celldispx, celldispy, celldispz]) 

267 

268 return cell, celldisp 

269 

270 

271def _parse_pbc(tilt_items: list[str]) -> list[bool]: 

272 """Handle pbc conditions.""" 

273 pbc_items = tilt_items[-3:] if len(tilt_items) >= 3 else ['f', 'f', 'f'] 

274 return ['p' in d.lower() for d in pbc_items] 

275 

276 

277def _parse_box_bound(line: str, cell_lines: list[str]) -> tuple: 

278 # save labels behind "ITEM: BOX BOUNDS" in triclinic case 

279 # (>=lammps-7Jul09) 

280 tilt_items = line.split()[3:] 

281 cell_data = np.loadtxt(cell_lines) 

282 

283 # general triclinic boxes (>=patch_17Apr2024) 

284 if tilt_items[0] == 'abc': 

285 cell = cell_data[:, :3] 

286 celldisp = cell_data[:, 3] 

287 pbc = _parse_pbc(tilt_items) 

288 return cell, celldisp, pbc 

289 

290 diagdisp = cell_data[:, :2].flatten() 

291 

292 # determine cell tilt (triclinic case!) 

293 if len(cell_data[0]) > 2: 

294 # for >=lammps-7Jul09 use labels behind "ITEM: BOX BOUNDS" 

295 # to assign tilt (vector) elements ... 

296 offdiag = cell_data[:, 2] 

297 # ... otherwise assume default order in 3rd column 

298 # (if the latter was present) 

299 if len(tilt_items) >= 3: 

300 sort_index = [tilt_items.index(i) for i in ['xy', 'xz', 'yz']] 

301 offdiag = offdiag[sort_index] 

302 else: 

303 offdiag = np.zeros(3) 

304 

305 cell, celldisp = construct_cell(diagdisp, offdiag) 

306 

307 pbc = _parse_pbc(tilt_items) 

308 

309 return cell, celldisp, pbc 

310 

311 

312def _colnames2dtypes(colnames: list[str]) -> list[tuple[str, Any]]: 

313 # Determine the data types for each column 

314 dtype: list[tuple[str, Any]] = [] 

315 for colname in colnames: 

316 if ( 

317 colname in {'id', 'type'} 

318 or colname.startswith('i_') 

319 or colname.startswith('i2_') 

320 ): 

321 dtype.append((colname, int)) 

322 elif colname == 'element': 

323 # 'U10' for strings with a max length of 10 characters 

324 dtype.append((colname, 'U10')) 

325 else: 

326 dtype.append((colname, float)) 

327 return dtype 

328 

329 

330def _read_lammps_dump_text_frame(fd: TextIO, n: int, **kwargs) -> Atoms: 

331 # avoid references before assignment in case of incorrect file structure 

332 cell, celldisp, pbc, info = None, None, False, {} 

333 

334 fd.seek(n) # jump to the position just after 'ITEM: TIMESTEP' 

335 line = fd.readline() 

336 info['timestep'] = int(line.split()[0]) 

337 

338 while line := fd.readline(): 

339 if 'ITEM: NUMBER OF ATOMS' in line: 

340 line = fd.readline() 

341 n_atoms = int(line.split()[0]) 

342 

343 if 'ITEM: BOX BOUNDS' in line: 

344 cell_lines = [fd.readline() for _ in range(3)] 

345 cell, celldisp, pbc = _parse_box_bound(line, cell_lines) 

346 

347 if 'ITEM: ATOMS' in line: 

348 colnames = line.split()[2:] 

349 dtype = _colnames2dtypes(colnames) 

350 datarows = [fd.readline() for _ in range(n_atoms)] 

351 data = np.loadtxt(datarows, dtype=dtype, ndmin=1) 

352 out_atoms = _lammps_data_to_ase_atoms( 

353 data=data, 

354 colnames=colnames, 

355 cell=cell, 

356 celldisp=celldisp, 

357 pbc=pbc, 

358 **kwargs, 

359 ) 

360 out_atoms.info.update(info) 

361 return out_atoms 

362 

363 raise RuntimeError('Incomplete LAMMPS dump text chunk') 

364 

365 

366class _LAMMPSDumpTextChunk(ImageChunk): 

367 def __init__(self, fd: TextIO, pos: int) -> None: 

368 self.fd = fd 

369 self.pos = pos 

370 

371 def build(self, **kwargs) -> Atoms: 

372 return _read_lammps_dump_text_frame(self.fd, self.pos, **kwargs) 

373 

374 

375def _i_lammps_dump_text_chunks(fd: TextIO) -> Iterator[_LAMMPSDumpTextChunk]: 

376 while line := fd.readline(): 

377 if 'ITEM: TIMESTEP' in line: 

378 pos = fd.tell() # position just after 'ITEM: TIMESTEP' 

379 yield _LAMMPSDumpTextChunk(fd, pos) 

380 

381 

382iread_lammps_dump_text = ImageIterator(_i_lammps_dump_text_chunks) 

383 

384 

385@reader 

386def read_lammps_dump_text(fd, index=-1, **kwargs): 

387 """Read a LAMMPS text dump file.""" 

388 g = iread_lammps_dump_text(fd, index=index, **kwargs) 

389 return list(g) if isinstance(index, (slice, str)) else next(g) 

390 

391 

392def _read_lammps_dump_binary_data(fd, /, colnames=None, intformat='SMALLBIG'): 

393 # depending on the chosen compilation flag lammps uses either normal 

394 # integers or long long for its id or timestep numbering 

395 # !TODO: tags are cast to double -> missing/double ids (add check?) 

396 _tagformat, bigformat = { 

397 'SMALLSMALL': ('i', 'i'), 

398 'SMALLBIG': ('i', 'q'), 

399 'BIGBIG': ('q', 'q'), 

400 }[intformat] 

401 

402 # wrap struct.unpack to raise EOFError 

403 def read_variables(string): 

404 obj_len = struct.calcsize(string) 

405 data_obj = fd.read(obj_len) 

406 if obj_len != len(data_obj): 

407 raise EOFError 

408 return struct.unpack(string, data_obj) 

409 

410 # Assume that the binary dump file is in the old (pre-29Oct2020) 

411 # format 

412 magic_string = None 

413 

414 # read header 

415 (ntimestep,) = read_variables('=' + bigformat) 

416 

417 # In the new LAMMPS binary dump format (version 29Oct2020 and 

418 # onward), a negative timestep is used to indicate that the next 

419 # few bytes will contain certain metadata 

420 if ntimestep < 0: 

421 # First bigint was actually encoding the negative of the format 

422 # name string length (we call this 'magic_string' to 

423 magic_string_len = -ntimestep 

424 

425 # The next `magic_string_len` bytes will hold a string 

426 # indicating the format of the dump file 

427 magic_string = b''.join( 

428 read_variables('=' + str(magic_string_len) + 'c') 

429 ) 

430 

431 # Read endianness (integer). For now, we'll disregard the value 

432 # and simply use the host machine's endianness (via '=' 

433 # character used with struct.calcsize). 

434 # 

435 # TODO: Use the endianness of the dump file in subsequent 

436 # read_variables rather than just assuming it will match 

437 # that of the host 

438 read_variables('=i') 

439 

440 # Read revision number (integer) 

441 (revision,) = read_variables('=i') 

442 

443 # Finally, read the actual timestep (bigint) 

444 (ntimestep,) = read_variables('=' + bigformat) 

445 

446 _n_atoms, triclinic = read_variables('=' + bigformat + 'i') 

447 boundary = read_variables('=6i') 

448 

449 if triclinic == 0: 

450 diagdisp = read_variables('=6d') 

451 offdiag = (0.0,) * 3 

452 cell, celldisp = construct_cell(diagdisp, offdiag) 

453 elif triclinic == 1: 

454 diagdisp = read_variables('=6d') 

455 offdiag = read_variables('=3d') 

456 cell, celldisp = construct_cell(diagdisp, offdiag) 

457 elif triclinic == 2: # general triclinic boxes (>=patch_17Apr2024) 

458 cell = np.array(read_variables('=9d')).reshape(3, 3) 

459 celldisp = np.array(read_variables('=3d')) 

460 else: 

461 raise ValueError(triclinic) 

462 

463 (size_one,) = read_variables('=i') 

464 

465 if len(colnames) != size_one: 

466 raise ValueError('Provided columns do not match binary file') 

467 

468 if magic_string and revision > 1: 

469 # New binary dump format includes units string, 

470 # columns string, and time 

471 (units_str_len,) = read_variables('=i') 

472 

473 if units_str_len > 0: 

474 # Read lammps units style 

475 _ = b''.join(read_variables('=' + str(units_str_len) + 'c')) 

476 

477 (flag,) = read_variables('=c') 

478 if flag != b'\x00': 

479 # Flag was non-empty string 

480 read_variables('=d') 

481 

482 # Length of column string 

483 (columns_str_len,) = read_variables('=i') 

484 

485 # Read column string (e.g., "id type x y z vx vy vz fx fy fz") 

486 _ = b''.join(read_variables('=' + str(columns_str_len) + 'c')) 

487 

488 (nchunk,) = read_variables('=i') 

489 

490 # lammps cells/boxes can have different boundary conditions on each 

491 # sides (makes mainly sense for different non-periodic conditions 

492 # (e.g. [f]ixed and [s]hrink for a irradiation simulation)) 

493 # periodic case: b 0 = 'p' 

494 # non-peridic cases 1: 'f', 2 : 's', 3: 'm' 

495 pbc = np.sum(np.array(boundary).reshape((3, 2)), axis=1) == 0 

496 

497 data = [] 

498 for _ in range(nchunk): 

499 # number-of-data-entries 

500 (n_data,) = read_variables('=i') 

501 # retrieve per atom data 

502 data += read_variables('=' + str(n_data) + 'd') 

503 

504 return data, cell, celldisp, pbc 

505 

506 

507class _LAMMPSDumpBinaryChunk(ImageChunk): 

508 def __init__( 

509 self, 

510 fd: IO, 

511 colnames: list[str] | None, 

512 intformat: str, 

513 ) -> None: 

514 # Standard columns layout from lammpsrun 

515 colnames_default = [ 

516 'id', 

517 'type', 

518 'x', 

519 'y', 

520 'z', 

521 'vx', 

522 'vy', 

523 'vz', 

524 'fx', 

525 'fy', 

526 'fz', 

527 ] 

528 self.colnames = colnames if colnames else colnames_default 

529 

530 _ = _read_lammps_dump_binary_data(fd, self.colnames, intformat) 

531 self.data = _[0] 

532 self.cell = _[1] 

533 self.celldisp = _[2] 

534 self.pbc = _[3] 

535 

536 def build(self, **kwargs) -> Atoms: 

537 data = np.array(self.data).reshape((-1, len(self.colnames))) 

538 

539 # convert the 2D float array to the structured array 

540 dtype = _colnames2dtypes(self.colnames) 

541 data = np.rec.fromarrays(data.T, dtype=dtype) 

542 

543 # map data-chunk to ase atoms 

544 return _lammps_data_to_ase_atoms( 

545 data=data, 

546 colnames=self.colnames, 

547 cell=self.cell, 

548 celldisp=self.celldisp, 

549 pbc=self.pbc, 

550 **kwargs, 

551 ) 

552 

553 

554class _LAMMPSDumpBinaryChunkIterator: 

555 def __init__(self) -> None: 

556 self.colnames = None 

557 self.intformat = '' 

558 

559 def __call__(self, fd: IO) -> Iterator[_LAMMPSDumpBinaryChunk]: 

560 while True: 

561 try: 

562 yield _LAMMPSDumpBinaryChunk(fd, self.colnames, self.intformat) 

563 except EOFError: 

564 break 

565 

566 

567class _LAMMPSDumpBinaryImageIterator(ImageIterator): 

568 def __call__(self, fd: IO, index=None, **kwargs) -> Iterator[Atoms]: 

569 _kwargs = kwargs.copy() 

570 self.ichunks: _LAMMPSDumpBinaryChunkIterator 

571 self.ichunks.colnames = _kwargs.pop('colnames', None) 

572 self.ichunks.intformat = _kwargs.pop('intformat', 'SMALLBIG') 

573 return super().__call__(fd, index, **_kwargs) 

574 

575 

576iread_lammps_dump_binary = _LAMMPSDumpBinaryImageIterator( 

577 _LAMMPSDumpBinaryChunkIterator() 

578) 

579 

580 

581def read_lammps_dump_binary(fd, /, index=-1, **kwargs): 

582 """Read binary dump-files (after binary2txt.cpp from lammps/tools). 

583 

584 :param fileobj: file-stream containing the binary lammps data 

585 :param index: integer or slice object (default: get the last timestep) 

586 :param colnames: data is columns and identified by a header 

587 :param intformat: lammps support different integer size. Parameter set \ 

588 at compile-time and can unfortunately not derived from data file 

589 :returns: list of Atoms-objects 

590 :rtype: list 

591 """ 

592 g = iread_lammps_dump_binary(fd, index=index, **kwargs) 

593 return list(g) if isinstance(index, (slice, str)) else next(g) 

594 

595 

596def _mass2element(mass): 

597 """ 

598 Guess the element corresponding to a given atomic mass. 

599 

600 :param mass: Atomic mass for searching. 

601 :return: Element symbol as a string. 

602 """ 

603 min_idx = np.argmin(np.abs(atomic_masses - mass)) 

604 element = chemical_symbols[min_idx] 

605 return element