Coverage for /builds/ase/ase/ase/io/cp2k.py: 89.68%
155 statements
« prev ^ index » next coverage.py v7.5.3, created at 2025-08-02 00:12 +0000
« prev ^ index » next coverage.py v7.5.3, created at 2025-08-02 00:12 +0000
1# fmt: off
3"""
4Reader for CP2Ks DCD_ALIGNED_CELL format and restart files.
6Based on [pwtools](https://github.com/elcorto/pwtools).
7All information about the dcd format is taken from there.
8The way of reading it is also copied from pwtools.
9Thanks to Steve for agreeing to this.
11Some information also comes directly from the CP2K source,
12so if they decide to change anything this here might break.
14Some parts are adapted from the extxyz reader.
16Contributed by Patrick Melix <chemistry@melix.me>
17"""
19import os
20from itertools import islice
22import numpy as np
24from ase.atoms import Atom, Atoms
25from ase.cell import Cell
26from ase.data import atomic_numbers
27from ase.io.formats import index2range
29__all__ = ['read_cp2k_dcd', 'iread_cp2k_dcd', 'read_cp2k_restart']
31# DCD file header
32# (name, dtype, shape)
33# numpy dtypes:
34# i4 = int32
35# f4 = float32 (single precision)
36# f8 = float64 (double precision)
37# S80 = string of length 80 (80 chars)
38_HEADER_TYPES = [
39 ('blk0-0', 'i4'), # 84 (start of first block, size=84 bytes)
40 ('hdr', 'S4'), # 'CORD'
41 ('9int', ('i4', 9)), # 9 ints, mostly 0
42 ('timestep', 'f4'), # timestep (float32)
43 ('10int', ('i4', 10)), # 10 ints, mostly 0, last is 24
44 ('blk0-1', 'i4'), # 84
45 ('blk1-0', 'i4'), # 164
46 ('ntitle', 'i4'), # 2
47 ('remark1', 'S80'), # remark1
48 ('remark2', 'S80'), # remark2
49 ('blk1-1', 'i4'), # 164
50 ('blk2-0', 'i4'), # 4 (4 bytes = int32)
51 ('natoms', 'i4'), # natoms (int32)
52 ('blk2-1', 'i4'), # 4
53]
55_HEADER_DTYPE = np.dtype(_HEADER_TYPES)
58def _bytes_per_timestep(natoms):
59 return (4 + 6 * 8 + 7 * 4 + 3 * 4 * natoms)
62def _read_metainfo(fileobj):
63 if not hasattr(fileobj, 'seek'):
64 raise TypeError("You should have passed a fileobject opened in binary "
65 "mode, it seems you did not.")
66 fileobj.seek(0)
67 header = np.fromfile(fileobj, _HEADER_DTYPE, 1)[0]
68 natoms = header['natoms']
69 remark1 = str(header['remark1']) # taken from CP2Ks source/motion_utils.F
70 if 'CP2K' not in remark1:
71 raise ValueError("Header should contain mention of CP2K, are you sure "
72 "this file was created by CP2K?")
74 # dtype for fromfile: nStep times dtype of a timestep data block
75 dtype = np.dtype([('x0', 'i4'),
76 ('x1', 'f8', (6,)),
77 ('x2', 'i4', (2,)),
78 ('x3', 'f4', (natoms,)),
79 ('x4', 'i4', (2,)),
80 ('x5', 'f4', (natoms,)),
81 ('x6', 'i4', (2,)),
82 ('x7', 'f4', (natoms,)),
83 ('x8', 'i4')])
85 fd_pos = fileobj.tell()
86 header_end = fd_pos
87 # seek to end
88 fileobj.seek(0, os.SEEK_END)
89 # number of bytes between fd_pos and end
90 fd_rest = fileobj.tell() - fd_pos
91 # reset to pos after header
92 fileobj.seek(fd_pos)
93 # calculate nstep: fd_rest / bytes_per_timestep
94 # 4 - initial 48
95 # 6*8 - cryst_const_dcd
96 # 7*4 - markers between x,y,z and at the end of the block
97 # 3*4*natoms - float32 cartesian coords
98 nsteps = fd_rest / _bytes_per_timestep(natoms)
99 if fd_rest % _bytes_per_timestep(natoms) != 0:
100 raise ValueError("Calculated number of steps is not int, "
101 "cannot read file")
102 nsteps = int(nsteps)
103 return dtype, natoms, nsteps, header_end
106class DCDChunk:
107 def __init__(self, chunk, dtype, natoms, symbols, aligned):
108 self.chunk = chunk
109 self.dtype = dtype
110 self.natoms = natoms
111 self.symbols = symbols
112 self.aligned = aligned
114 def build(self):
115 """Convert unprocessed chunk into Atoms."""
116 return _read_cp2k_dcd_frame(self.chunk, self.dtype, self.natoms,
117 self.symbols, self.aligned)
120def idcdchunks(fd, ref_atoms, aligned):
121 """Yield unprocessed chunks for each image."""
122 if ref_atoms:
123 symbols = ref_atoms.get_chemical_symbols()
124 else:
125 symbols = None
126 dtype, natoms, nsteps, header_end = _read_metainfo(fd)
127 bytes_per_step = _bytes_per_timestep(natoms)
128 fd.seek(header_end)
129 for i in range(nsteps):
130 fd.seek(bytes_per_step * i + header_end)
131 yield DCDChunk(fd, dtype, natoms, symbols, aligned)
134class DCDImageIterator:
135 """"""
137 def __init__(self, ichunks):
138 self.ichunks = ichunks
140 def __call__(self, fd, indices=-1, ref_atoms=None, aligned=False):
141 self.ref_atoms = ref_atoms
142 self.aligned = aligned
143 if not hasattr(indices, 'start'):
144 if indices < 0:
145 indices = slice(indices - 1, indices)
146 else:
147 indices = slice(indices, indices + 1)
149 for chunk in self._getslice(fd, indices):
150 yield chunk.build()
152 def _getslice(self, fd, indices):
153 try:
154 iterator = islice(self.ichunks(fd, self.ref_atoms, self.aligned),
155 indices.start, indices.stop, indices.step)
156 except ValueError:
157 # Negative indices. Adjust slice to positive numbers.
158 _dtype, _natoms, nsteps, _header_end = _read_metainfo(fd)
159 indices_tuple = indices.indices(nsteps + 1)
160 iterator = islice(self.ichunks(fd, self.ref_atoms, self.aligned),
161 *indices_tuple)
162 return iterator
165iread_cp2k_dcd = DCDImageIterator(idcdchunks)
168def read_cp2k_dcd(fileobj, index=-1, ref_atoms=None, aligned=False):
169 """ Read a DCD file created by CP2K.
171 To yield one Atoms object at a time use ``iread_cp2k_dcd``.
173 Note: Other programs use other formats, they are probably not compatible.
175 If ref_atoms is not given, all atoms will have symbol 'X'.
177 To make sure that this is a dcd file generated with the *DCD_ALIGNED_CELL* key
178 in the CP2K input, you need to set ``aligned`` to *True* to get cell information.
179 Make sure you do not set it otherwise, the cell will not match the atomic coordinates!
180 See the following url for details:
181 https://groups.google.com/forum/#!searchin/cp2k/Outputting$20cell$20information$20and$20fractional$20coordinates%7Csort:relevance/cp2k/72MhYykrSrQ/5c9Jaw7S9yQJ.
182 """ # noqa: E501
183 dtype, natoms, nsteps, header_end = _read_metainfo(fileobj)
184 bytes_per_timestep = _bytes_per_timestep(natoms)
185 if ref_atoms:
186 symbols = ref_atoms.get_chemical_symbols()
187 else:
188 symbols = ['X' for _ in range(natoms)]
189 if natoms != len(symbols):
190 raise ValueError("Length of ref_atoms does not match natoms "
191 "from dcd file")
192 trbl = index2range(index, nsteps)
194 for index in trbl:
195 frame_pos = int(header_end + index * bytes_per_timestep)
196 fileobj.seek(frame_pos)
197 yield _read_cp2k_dcd_frame(fileobj, dtype, natoms, symbols,
198 aligned=aligned)
201def _read_cp2k_dcd_frame(fileobj, dtype, natoms, symbols, aligned=False):
202 arr = np.fromfile(fileobj, dtype, 1)
203 cryst_const = np.empty(6, dtype=np.float64)
204 cryst_const[0] = arr['x1'][0, 0]
205 cryst_const[1] = arr['x1'][0, 2]
206 cryst_const[2] = arr['x1'][0, 5]
207 cryst_const[3] = arr['x1'][0, 4]
208 cryst_const[4] = arr['x1'][0, 3]
209 cryst_const[5] = arr['x1'][0, 1]
210 coords = np.empty((natoms, 3), dtype=np.float32)
211 coords[..., 0] = arr['x3'][0, ...]
212 coords[..., 1] = arr['x5'][0, ...]
213 coords[..., 2] = arr['x7'][0, ...]
214 assert len(coords) == len(symbols)
215 if aligned:
216 # convention of the cell is (see cp2ks src/particle_methods.F):
217 # A is in x
218 # B lies in xy plane
219 # luckily this is also the ASE convention for Atoms.set_cell()
220 atoms = Atoms(symbols, coords, cell=cryst_const, pbc=True)
221 else:
222 atoms = Atoms(symbols, coords)
223 return atoms
226def read_cp2k_restart(fileobj):
227 """Read Atoms and Cell from Restart File.
229 Reads the elements, coordinates and cell information from the
230 '&SUBSYS' section of a CP2K restart file.
232 Tries to convert atom names to elements, if this fails element is set to X.
234 Returns an Atoms object.
235 """
237 def _parse_section(inp):
238 """Helper to parse structure to nested dict"""
239 ret = {'content': []}
240 while inp:
241 line = inp.readline().strip()
242 if line.startswith('&END'):
243 return ret
244 elif line.startswith('&'):
245 key = line.replace('&', '')
246 ret[key] = _parse_section(inp)
247 else:
248 ret['content'].append(line)
249 return ret
251 def _fast_forward_to(fileobj, section_header):
252 """Helper to forward to a section"""
253 found = False
254 while fileobj:
255 line = fileobj.readline()
256 if section_header in line:
257 found = True
258 break
259 if not found:
260 raise RuntimeError(f"No {section_header} section found!")
262 def _read_cell(data):
263 """Helper to read cell data, returns cell and pbc"""
264 cell = None
265 pbc = [False, False, False]
266 if 'CELL' in data:
267 content = data['CELL']['content']
268 cell = Cell([[0, 0, 0] for i in range(3)])
269 char2idx = {'A ': 0, 'B ': 1, 'C ': 2}
270 for line in content:
271 # lines starting with A/B/C<whitespace> have cell
272 if line[:2] in char2idx:
273 idx = char2idx[line[:2]]
274 cell[idx] = [float(x) for x in line.split()[1:]]
275 pbc[idx] = True
276 if {len(v) for v in cell} != {3}:
277 raise RuntimeError("Bad Cell Definition found.")
278 return cell, pbc
280 def _read_geometry(content):
281 """Helper to read geometry, returns a list of Atoms"""
282 atom_list = []
283 for entry in content:
284 entry = entry.split()
285 # Get letters for element symbol
286 el = [char.lower() for char in entry[0] if char.isalpha()]
287 el = "".join(el).capitalize()
288 # Get positions
289 pos = [float(x) for x in entry[1:4]]
290 if el in atomic_numbers.keys():
291 atom_list.append(Atom(el, pos))
292 else:
293 atom_list.append(Atom('X', pos))
294 return atom_list
296 # fast forward to &SUBSYS section
297 _fast_forward_to(fileobj, '&SUBSYS')
298 # parse sections into dictionary
299 data = _parse_section(fileobj)
300 # look for &CELL
301 cell, pbc = _read_cell(data)
302 # get the atoms
303 atom_list = _read_geometry(data['COORD']['content'])
304 return Atoms(atom_list, cell=cell, pbc=pbc)