Coverage for ase / io / lammpsrun.py: 88.85%

260 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-02-04 10:20 +0000

1"""IO for LAMMPS dump files.""" 

2 

3import gzip 

4import struct 

5from collections.abc import Iterator 

6from os.path import splitext 

7from typing import Any, TextIO 

8 

9import numpy as np 

10 

11from ase.atoms import Atoms 

12from ase.calculators.lammps import convert 

13from ase.calculators.singlepoint import SinglePointCalculator 

14from ase.data import atomic_masses, chemical_symbols 

15from ase.io.utils import ImageChunk, ImageIterator 

16from ase.parallel import paropen 

17from ase.utils import reader 

18 

19 

20def read_lammps_dump(infileobj, **kwargs): 

21 """Method which reads a LAMMPS dump file. 

22 

23 LAMMPS chooses output method depending on the given suffix: 

24 - .bin : binary file 

25 - .gz : output piped through gzip 

26 - .mpiio: using mpiio (should be like cleartext, 

27 with different ordering) 

28 - else : normal clear-text format 

29 

30 :param infileobj: string to file, opened file or file-like stream 

31 

32 """ 

33 # !TODO: add support for lammps-regex naming schemes (output per 

34 # processor and timestep wildcards) 

35 

36 opened = False 

37 if isinstance(infileobj, str): 

38 opened = True 

39 suffix = splitext(infileobj)[-1] 

40 if suffix == '.bin': 

41 fileobj = paropen(infileobj, 'rb') 

42 elif suffix == '.gz': 

43 # !TODO: save for parallel execution? 

44 fileobj = gzip.open(infileobj, 'rb') 

45 else: 

46 fileobj = paropen(infileobj) 

47 else: 

48 suffix = splitext(infileobj.name)[-1] 

49 fileobj = infileobj 

50 

51 if suffix == '.bin': 

52 out = read_lammps_dump_binary(fileobj, **kwargs) 

53 if opened: 

54 fileobj.close() 

55 return out 

56 

57 out = read_lammps_dump_text(fileobj, **kwargs) 

58 

59 if opened: 

60 fileobj.close() 

61 

62 return out 

63 

64 

65def _lammps_data_to_ase_atoms( 

66 data, 

67 colnames, 

68 cell, 

69 celldisp, 

70 pbc=False, 

71 order=True, 

72 specorder=None, 

73 prismobj=None, 

74 units='metal', 

75): 

76 """Extract positions and other per-atom parameters and create Atoms. 

77 

78 Parameters 

79 ---------- 

80 data : np.ndarray 

81 Structured array for `ITEM: ATOMS`. 

82 colnames: list[str] 

83 Column names for `ITEM: ATOMS`. 

84 cell : np.ndarray 

85 Cell. 

86 celldisp : np.ndarray 

87 Origin shift. 

88 pbc : bool | list[bool] 

89 Periodic boundary conditions. 

90 order : bool 

91 Sort atoms by `id`. Might be faster to turn off. 

92 Disregarded in case `id` column is not given in file. 

93 specorder : list[str] 

94 List of species to map LAMMPS types to ASE species. 

95 (Usually .dump files do not contain type to species mapping.) 

96 prismobj : Prism 

97 Coordinate transformation between LAMMPS and ASE. 

98 units : str 

99 LAMMPS units for unit transformation between LAMMPS and ASE. 

100 

101 Returns 

102 ------- 

103 :class:`~ase.Atoms` 

104 

105 """ 

106 # read IDs if given and order if needed 

107 if 'id' in colnames: 

108 ids = data['id'] 

109 if order: 

110 sort_order = np.argsort(ids) 

111 data = data[sort_order] 

112 

113 # determine the elements 

114 if 'element' in colnames: 

115 # priority to elements written in file 

116 elements = data['element'] 

117 elif 'mass' in colnames: 

118 # try to determine elements from masses 

119 elements = [_mass2element(m) for m in data['mass']] 

120 elif 'type' in colnames: 

121 # fall back to `types` otherwise 

122 elements = data['type'] 

123 

124 # reconstruct types from given specorder 

125 if specorder: 

126 elements = [specorder[t - 1] for t in elements] 

127 else: 

128 # todo: what if specorder give but no types? 

129 # in principle the masses could work for atoms, but that needs 

130 # lots of cases and new code I guess 

131 raise ValueError('Cannot determine atom types form LAMMPS dump file') 

132 

133 def get_quantity(labels, quantity=None): 

134 try: 

135 cols = np.column_stack([data[label] for label in labels]) 

136 if quantity: 

137 return convert(cols, quantity, units, 'ASE') 

138 return cols 

139 except ValueError: 

140 return None 

141 

142 # Positions 

143 positions = None 

144 scaled_positions = None 

145 if 'x' in colnames: 

146 # doc: x, y, z = unscaled atom coordinates 

147 positions = get_quantity(['x', 'y', 'z'], 'distance') 

148 elif 'xs' in colnames: 

149 # doc: xs,ys,zs = scaled atom coordinates 

150 scaled_positions = get_quantity(['xs', 'ys', 'zs']) 

151 elif 'xu' in colnames: 

152 # doc: xu,yu,zu = unwrapped atom coordinates 

153 positions = get_quantity(['xu', 'yu', 'zu'], 'distance') 

154 elif 'xsu' in colnames: 

155 # xsu,ysu,zsu = scaled unwrapped atom coordinates 

156 scaled_positions = get_quantity(['xsu', 'ysu', 'zsu']) 

157 else: 

158 raise ValueError('No atomic positions found in LAMMPS output') 

159 

160 velocities = get_quantity(['vx', 'vy', 'vz'], 'velocity') 

161 charges = get_quantity(['q'], 'charge') 

162 forces = get_quantity(['fx', 'fy', 'fz'], 'force') 

163 # !TODO: how need quaternions be converted? 

164 quaternions = get_quantity(['c_q[1]', 'c_q[2]', 'c_q[3]', 'c_q[4]']) 

165 

166 # convert cell 

167 cell = convert(cell, 'distance', units, 'ASE') 

168 celldisp = convert(celldisp, 'distance', units, 'ASE') 

169 if prismobj: 

170 celldisp = prismobj.vector_to_ase(celldisp) 

171 cell = prismobj.update_cell(cell) 

172 

173 if quaternions is not None: 

174 out_atoms = Atoms( 

175 symbols=elements, 

176 positions=positions, 

177 cell=cell, 

178 celldisp=celldisp, 

179 pbc=pbc, 

180 ) 

181 out_atoms.new_array('quaternions', quaternions, dtype=float) 

182 elif positions is not None: 

183 # reverse coordinations transform to lammps system 

184 # (for all vectors = pos, vel, force) 

185 if prismobj: 

186 positions = prismobj.vector_to_ase(positions, wrap=True) 

187 

188 out_atoms = Atoms( 

189 symbols=elements, 

190 positions=positions, 

191 pbc=pbc, 

192 celldisp=celldisp, 

193 cell=cell, 

194 ) 

195 elif scaled_positions is not None: 

196 out_atoms = Atoms( 

197 symbols=elements, 

198 scaled_positions=scaled_positions, 

199 pbc=pbc, 

200 celldisp=celldisp, 

201 cell=cell, 

202 ) 

203 else: 

204 raise RuntimeError('No atomsobj created from LAMMPS data!') 

205 

206 if velocities is not None: 

207 if prismobj: 

208 velocities = prismobj.vector_to_ase(velocities) 

209 out_atoms.set_velocities(velocities) 

210 if charges is not None: 

211 out_atoms.set_initial_charges([charge[0] for charge in charges]) 

212 if forces is not None: 

213 if prismobj: 

214 forces = prismobj.vector_to_ase(forces) 

215 # !TODO: use another calculator if available (or move forces 

216 # to atoms.property) (other problem: synchronizing 

217 # parallel runs) 

218 calculator = SinglePointCalculator(out_atoms, energy=0.0, forces=forces) 

219 out_atoms.calc = calculator 

220 

221 # process the extra columns of fixes, variables and computes 

222 # that can be dumped, add as additional arrays to atoms object 

223 for colname in colnames: 

224 # determine if it is a compute, fix or 

225 # custom property/atom (but not the quaternian) 

226 if ( 

227 colname.startswith('f_') 

228 or colname.startswith('v_') 

229 or colname.startswith('d_') 

230 or colname.startswith('d2_') 

231 or (colname.startswith('c_') and not colname.startswith('c_q[')) 

232 ): 

233 out_atoms.new_array(colname, data[colname], dtype='float') 

234 

235 elif colname.startswith('i_') or colname.startswith('i2_'): 

236 out_atoms.new_array(colname, data[colname], dtype='int') 

237 elif colname == 'type': 

238 try: 

239 out_atoms.new_array(colname, data['type'], dtype='int') 

240 except ValueError: 

241 pass # in case type is not integer 

242 

243 return out_atoms 

244 

245 

246def construct_cell(diagdisp, offdiag): 

247 """Help function to create an ASE-cell with displacement vector from 

248 the lammps coordination system parameters. 

249 

250 :param diagdisp: cell dimension convoluted with the displacement vector 

251 :param offdiag: off-diagonal cell elements 

252 :returns: cell and cell displacement vector 

253 :rtype: tuple 

254 """ 

255 xlo, xhi, ylo, yhi, zlo, zhi = diagdisp 

256 xy, xz, yz = offdiag 

257 

258 # create ase-cell from lammps-box 

259 xhilo = (xhi - xlo) - abs(xy) - abs(xz) 

260 yhilo = (yhi - ylo) - abs(yz) 

261 zhilo = zhi - zlo 

262 celldispx = xlo - min(0, xy) - min(0, xz) 

263 celldispy = ylo - min(0, yz) 

264 celldispz = zlo 

265 cell = np.array([[xhilo, 0, 0], [xy, yhilo, 0], [xz, yz, zhilo]]) 

266 celldisp = np.array([celldispx, celldispy, celldispz]) 

267 

268 return cell, celldisp 

269 

270 

271def _parse_pbc(tilt_items: list[str]) -> list[bool]: 

272 """Handle pbc conditions.""" 

273 pbc_items = tilt_items[-3:] if len(tilt_items) >= 3 else ['f', 'f', 'f'] 

274 return ['p' in d.lower() for d in pbc_items] 

275 

276 

277def _parse_box_bound(line: str, cell_lines: list[str]) -> tuple: 

278 # save labels behind "ITEM: BOX BOUNDS" in triclinic case 

279 # (>=lammps-7Jul09) 

280 tilt_items = line.split()[3:] 

281 cell_data = np.loadtxt(cell_lines) 

282 

283 # general triclinic boxes (>=patch_17Apr2024) 

284 if tilt_items[0] == 'abc': 

285 cell = cell_data[:, :3] 

286 celldisp = cell_data[:, 3] 

287 pbc = _parse_pbc(tilt_items) 

288 return cell, celldisp, pbc 

289 

290 diagdisp = cell_data[:, :2].flatten() 

291 

292 # determine cell tilt (triclinic case!) 

293 if len(cell_data[0]) > 2: 

294 # for >=lammps-7Jul09 use labels behind "ITEM: BOX BOUNDS" 

295 # to assign tilt (vector) elements ... 

296 offdiag = cell_data[:, 2] 

297 # ... otherwise assume default order in 3rd column 

298 # (if the latter was present) 

299 if len(tilt_items) >= 3: 

300 sort_index = [tilt_items.index(i) for i in ['xy', 'xz', 'yz']] 

301 offdiag = offdiag[sort_index] 

302 else: 

303 offdiag = np.zeros(3) 

304 

305 cell, celldisp = construct_cell(diagdisp, offdiag) 

306 

307 pbc = _parse_pbc(tilt_items) 

308 

309 return cell, celldisp, pbc 

310 

311 

312def get_max_index(index): 

313 if np.isscalar(index): 

314 return index 

315 elif isinstance(index, slice): 

316 return index.stop if (index.stop is not None) else float('inf') 

317 

318 

319def _colnames2dtypes(colnames: list[str]) -> list[tuple[str, Any]]: 

320 # Determine the data types for each column 

321 dtype: list[tuple[str, Any]] = [] 

322 for colname in colnames: 

323 if ( 

324 colname in {'id', 'type'} 

325 or colname.startswith('i_') 

326 or colname.startswith('i2_') 

327 ): 

328 dtype.append((colname, int)) 

329 elif colname == 'element': 

330 # 'U10' for strings with a max length of 10 characters 

331 dtype.append((colname, 'U10')) 

332 else: 

333 dtype.append((colname, float)) 

334 return dtype 

335 

336 

337def _read_lammps_dump_text_frame(fd: TextIO, n: int, **kwargs) -> Atoms: 

338 # avoid references before assignment in case of incorrect file structure 

339 cell, celldisp, pbc, info = None, None, False, {} 

340 

341 fd.seek(n) # jump to the position just after 'ITEM: TIMESTEP' 

342 line = fd.readline() 

343 info['timestep'] = int(line.split()[0]) 

344 

345 while line := fd.readline(): 

346 if 'ITEM: NUMBER OF ATOMS' in line: 

347 line = fd.readline() 

348 n_atoms = int(line.split()[0]) 

349 

350 if 'ITEM: BOX BOUNDS' in line: 

351 cell_lines = [fd.readline() for _ in range(3)] 

352 cell, celldisp, pbc = _parse_box_bound(line, cell_lines) 

353 

354 if 'ITEM: ATOMS' in line: 

355 colnames = line.split()[2:] 

356 dtype = _colnames2dtypes(colnames) 

357 datarows = [fd.readline() for _ in range(n_atoms)] 

358 data = np.loadtxt(datarows, dtype=dtype, ndmin=1) 

359 out_atoms = _lammps_data_to_ase_atoms( 

360 data=data, 

361 colnames=colnames, 

362 cell=cell, 

363 celldisp=celldisp, 

364 pbc=pbc, 

365 **kwargs, 

366 ) 

367 out_atoms.info.update(info) 

368 return out_atoms 

369 

370 raise RuntimeError('Incomplete LAMMPS dump text chunk') 

371 

372 

373class _LAMMPSDumpTextChunk(ImageChunk): 

374 def __init__(self, fd: TextIO, pos: int) -> None: 

375 self.fd = fd 

376 self.pos = pos 

377 

378 def build(self, **kwargs) -> Atoms: 

379 return _read_lammps_dump_text_frame(self.fd, self.pos, **kwargs) 

380 

381 

382def _i_lammps_dump_text_chunks(fd: TextIO) -> Iterator[_LAMMPSDumpTextChunk]: 

383 while line := fd.readline(): 

384 if 'ITEM: TIMESTEP' in line: 

385 pos = fd.tell() # position just after 'ITEM: TIMESTEP' 

386 yield _LAMMPSDumpTextChunk(fd, pos) 

387 

388 

389iread_lammps_dump_text = ImageIterator(_i_lammps_dump_text_chunks) 

390 

391 

392@reader 

393def read_lammps_dump_text(fd, index=-1, **kwargs): 

394 """Read a LAMMPS text dump file.""" 

395 g = iread_lammps_dump_text(fd, index=index, **kwargs) 

396 return list(g) if isinstance(index, (slice, str)) else next(g) 

397 

398 

399def read_lammps_dump_binary( 

400 fileobj, index=-1, colnames=None, intformat='SMALLBIG', **kwargs 

401): 

402 """Read binary dump-files (after binary2txt.cpp from lammps/tools). 

403 

404 :param fileobj: file-stream containing the binary lammps data 

405 :param index: integer or slice object (default: get the last timestep) 

406 :param colnames: data is columns and identified by a header 

407 :param intformat: lammps support different integer size. Parameter set \ 

408 at compile-time and can unfortunately not derived from data file 

409 :returns: list of Atoms-objects 

410 :rtype: list 

411 """ 

412 # depending on the chosen compilation flag lammps uses either normal 

413 # integers or long long for its id or timestep numbering 

414 # !TODO: tags are cast to double -> missing/double ids (add check?) 

415 _tagformat, bigformat = { 

416 'SMALLSMALL': ('i', 'i'), 

417 'SMALLBIG': ('i', 'q'), 

418 'BIGBIG': ('q', 'q'), 

419 }[intformat] 

420 

421 index_end = get_max_index(index) 

422 

423 # Standard columns layout from lammpsrun 

424 if not colnames: 

425 colnames = [ 

426 'id', 

427 'type', 

428 'x', 

429 'y', 

430 'z', 

431 'vx', 

432 'vy', 

433 'vz', 

434 'fx', 

435 'fy', 

436 'fz', 

437 ] 

438 

439 images = [] 

440 

441 # wrap struct.unpack to raise EOFError 

442 def read_variables(string): 

443 obj_len = struct.calcsize(string) 

444 data_obj = fileobj.read(obj_len) 

445 if obj_len != len(data_obj): 

446 raise EOFError 

447 return struct.unpack(string, data_obj) 

448 

449 while True: 

450 try: 

451 # Assume that the binary dump file is in the old (pre-29Oct2020) 

452 # format 

453 magic_string = None 

454 

455 # read header 

456 (ntimestep,) = read_variables('=' + bigformat) 

457 

458 # In the new LAMMPS binary dump format (version 29Oct2020 and 

459 # onward), a negative timestep is used to indicate that the next 

460 # few bytes will contain certain metadata 

461 if ntimestep < 0: 

462 # First bigint was actually encoding the negative of the format 

463 # name string length (we call this 'magic_string' to 

464 magic_string_len = -ntimestep 

465 

466 # The next `magic_string_len` bytes will hold a string 

467 # indicating the format of the dump file 

468 magic_string = b''.join( 

469 read_variables('=' + str(magic_string_len) + 'c') 

470 ) 

471 

472 # Read endianness (integer). For now, we'll disregard the value 

473 # and simply use the host machine's endianness (via '=' 

474 # character used with struct.calcsize). 

475 # 

476 # TODO: Use the endianness of the dump file in subsequent 

477 # read_variables rather than just assuming it will match 

478 # that of the host 

479 read_variables('=i') 

480 

481 # Read revision number (integer) 

482 (revision,) = read_variables('=i') 

483 

484 # Finally, read the actual timestep (bigint) 

485 (ntimestep,) = read_variables('=' + bigformat) 

486 

487 _n_atoms, triclinic = read_variables('=' + bigformat + 'i') 

488 boundary = read_variables('=6i') 

489 

490 if triclinic == 0: 

491 diagdisp = read_variables('=6d') 

492 offdiag = (0.0,) * 3 

493 cell, celldisp = construct_cell(diagdisp, offdiag) 

494 elif triclinic == 1: 

495 diagdisp = read_variables('=6d') 

496 offdiag = read_variables('=3d') 

497 cell, celldisp = construct_cell(diagdisp, offdiag) 

498 elif triclinic == 2: # general triclinic boxes (>=patch_17Apr2024) 

499 cell = np.array(read_variables('=9d')).reshape(3, 3) 

500 celldisp = np.array(read_variables('=3d')) 

501 else: 

502 raise ValueError(triclinic) 

503 

504 (size_one,) = read_variables('=i') 

505 

506 if len(colnames) != size_one: 

507 raise ValueError('Provided columns do not match binary file') 

508 

509 if magic_string and revision > 1: 

510 # New binary dump format includes units string, 

511 # columns string, and time 

512 (units_str_len,) = read_variables('=i') 

513 

514 if units_str_len > 0: 

515 # Read lammps units style 

516 _ = b''.join(read_variables('=' + str(units_str_len) + 'c')) 

517 

518 (flag,) = read_variables('=c') 

519 if flag != b'\x00': 

520 # Flag was non-empty string 

521 read_variables('=d') 

522 

523 # Length of column string 

524 (columns_str_len,) = read_variables('=i') 

525 

526 # Read column string (e.g., "id type x y z vx vy vz fx fy fz") 

527 _ = b''.join(read_variables('=' + str(columns_str_len) + 'c')) 

528 

529 (nchunk,) = read_variables('=i') 

530 

531 # lammps cells/boxes can have different boundary conditions on each 

532 # sides (makes mainly sense for different non-periodic conditions 

533 # (e.g. [f]ixed and [s]hrink for a irradiation simulation)) 

534 # periodic case: b 0 = 'p' 

535 # non-peridic cases 1: 'f', 2 : 's', 3: 'm' 

536 pbc = np.sum(np.array(boundary).reshape((3, 2)), axis=1) == 0 

537 

538 data = [] 

539 for _ in range(nchunk): 

540 # number-of-data-entries 

541 (n_data,) = read_variables('=i') 

542 # retrieve per atom data 

543 data += read_variables('=' + str(n_data) + 'd') 

544 data = np.array(data).reshape((-1, size_one)) 

545 

546 # convert the 2D float array to the structured array 

547 data = np.rec.fromarrays(data.T, dtype=_colnames2dtypes(colnames)) 

548 

549 # map data-chunk to ase atoms 

550 out_atoms = _lammps_data_to_ase_atoms( 

551 data=data, 

552 colnames=colnames, 

553 cell=cell, 

554 celldisp=celldisp, 

555 pbc=pbc, 

556 **kwargs, 

557 ) 

558 

559 images.append(out_atoms) 

560 

561 # stop if requested index has been found 

562 if len(images) > index_end >= 0: # type: ignore[comparison-overlap] 

563 break 

564 

565 except EOFError: 

566 break 

567 

568 return images[index] 

569 

570 

571def _mass2element(mass): 

572 """ 

573 Guess the element corresponding to a given atomic mass. 

574 

575 :param mass: Atomic mass for searching. 

576 :return: Element symbol as a string. 

577 """ 

578 min_idx = np.argmin(np.abs(atomic_masses - mass)) 

579 element = chemical_symbols[min_idx] 

580 return element