Coverage for /builds/ase/ase/ase/io/cp2k.py: 89.68%

155 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2025-08-02 00:12 +0000

1# fmt: off 

2 

3""" 

4Reader for CP2Ks DCD_ALIGNED_CELL format and restart files. 

5 

6Based on [pwtools](https://github.com/elcorto/pwtools). 

7All information about the dcd format is taken from there. 

8The way of reading it is also copied from pwtools. 

9Thanks to Steve for agreeing to this. 

10 

11Some information also comes directly from the CP2K source, 

12so if they decide to change anything this here might break. 

13 

14Some parts are adapted from the extxyz reader. 

15 

16Contributed by Patrick Melix <chemistry@melix.me> 

17""" 

18 

19import os 

20from itertools import islice 

21 

22import numpy as np 

23 

24from ase.atoms import Atom, Atoms 

25from ase.cell import Cell 

26from ase.data import atomic_numbers 

27from ase.io.formats import index2range 

28 

29__all__ = ['read_cp2k_dcd', 'iread_cp2k_dcd', 'read_cp2k_restart'] 

30 

31# DCD file header 

32# (name, dtype, shape) 

33# numpy dtypes: 

34# i4 = int32 

35# f4 = float32 (single precision) 

36# f8 = float64 (double precision) 

37# S80 = string of length 80 (80 chars) 

38_HEADER_TYPES = [ 

39 ('blk0-0', 'i4'), # 84 (start of first block, size=84 bytes) 

40 ('hdr', 'S4'), # 'CORD' 

41 ('9int', ('i4', 9)), # 9 ints, mostly 0 

42 ('timestep', 'f4'), # timestep (float32) 

43 ('10int', ('i4', 10)), # 10 ints, mostly 0, last is 24 

44 ('blk0-1', 'i4'), # 84 

45 ('blk1-0', 'i4'), # 164 

46 ('ntitle', 'i4'), # 2 

47 ('remark1', 'S80'), # remark1 

48 ('remark2', 'S80'), # remark2 

49 ('blk1-1', 'i4'), # 164 

50 ('blk2-0', 'i4'), # 4 (4 bytes = int32) 

51 ('natoms', 'i4'), # natoms (int32) 

52 ('blk2-1', 'i4'), # 4 

53] 

54 

55_HEADER_DTYPE = np.dtype(_HEADER_TYPES) 

56 

57 

58def _bytes_per_timestep(natoms): 

59 return (4 + 6 * 8 + 7 * 4 + 3 * 4 * natoms) 

60 

61 

62def _read_metainfo(fileobj): 

63 if not hasattr(fileobj, 'seek'): 

64 raise TypeError("You should have passed a fileobject opened in binary " 

65 "mode, it seems you did not.") 

66 fileobj.seek(0) 

67 header = np.fromfile(fileobj, _HEADER_DTYPE, 1)[0] 

68 natoms = header['natoms'] 

69 remark1 = str(header['remark1']) # taken from CP2Ks source/motion_utils.F 

70 if 'CP2K' not in remark1: 

71 raise ValueError("Header should contain mention of CP2K, are you sure " 

72 "this file was created by CP2K?") 

73 

74 # dtype for fromfile: nStep times dtype of a timestep data block 

75 dtype = np.dtype([('x0', 'i4'), 

76 ('x1', 'f8', (6,)), 

77 ('x2', 'i4', (2,)), 

78 ('x3', 'f4', (natoms,)), 

79 ('x4', 'i4', (2,)), 

80 ('x5', 'f4', (natoms,)), 

81 ('x6', 'i4', (2,)), 

82 ('x7', 'f4', (natoms,)), 

83 ('x8', 'i4')]) 

84 

85 fd_pos = fileobj.tell() 

86 header_end = fd_pos 

87 # seek to end 

88 fileobj.seek(0, os.SEEK_END) 

89 # number of bytes between fd_pos and end 

90 fd_rest = fileobj.tell() - fd_pos 

91 # reset to pos after header 

92 fileobj.seek(fd_pos) 

93 # calculate nstep: fd_rest / bytes_per_timestep 

94 # 4 - initial 48 

95 # 6*8 - cryst_const_dcd 

96 # 7*4 - markers between x,y,z and at the end of the block 

97 # 3*4*natoms - float32 cartesian coords 

98 nsteps = fd_rest / _bytes_per_timestep(natoms) 

99 if fd_rest % _bytes_per_timestep(natoms) != 0: 

100 raise ValueError("Calculated number of steps is not int, " 

101 "cannot read file") 

102 nsteps = int(nsteps) 

103 return dtype, natoms, nsteps, header_end 

104 

105 

106class DCDChunk: 

107 def __init__(self, chunk, dtype, natoms, symbols, aligned): 

108 self.chunk = chunk 

109 self.dtype = dtype 

110 self.natoms = natoms 

111 self.symbols = symbols 

112 self.aligned = aligned 

113 

114 def build(self): 

115 """Convert unprocessed chunk into Atoms.""" 

116 return _read_cp2k_dcd_frame(self.chunk, self.dtype, self.natoms, 

117 self.symbols, self.aligned) 

118 

119 

120def idcdchunks(fd, ref_atoms, aligned): 

121 """Yield unprocessed chunks for each image.""" 

122 if ref_atoms: 

123 symbols = ref_atoms.get_chemical_symbols() 

124 else: 

125 symbols = None 

126 dtype, natoms, nsteps, header_end = _read_metainfo(fd) 

127 bytes_per_step = _bytes_per_timestep(natoms) 

128 fd.seek(header_end) 

129 for i in range(nsteps): 

130 fd.seek(bytes_per_step * i + header_end) 

131 yield DCDChunk(fd, dtype, natoms, symbols, aligned) 

132 

133 

134class DCDImageIterator: 

135 """""" 

136 

137 def __init__(self, ichunks): 

138 self.ichunks = ichunks 

139 

140 def __call__(self, fd, indices=-1, ref_atoms=None, aligned=False): 

141 self.ref_atoms = ref_atoms 

142 self.aligned = aligned 

143 if not hasattr(indices, 'start'): 

144 if indices < 0: 

145 indices = slice(indices - 1, indices) 

146 else: 

147 indices = slice(indices, indices + 1) 

148 

149 for chunk in self._getslice(fd, indices): 

150 yield chunk.build() 

151 

152 def _getslice(self, fd, indices): 

153 try: 

154 iterator = islice(self.ichunks(fd, self.ref_atoms, self.aligned), 

155 indices.start, indices.stop, indices.step) 

156 except ValueError: 

157 # Negative indices. Adjust slice to positive numbers. 

158 _dtype, _natoms, nsteps, _header_end = _read_metainfo(fd) 

159 indices_tuple = indices.indices(nsteps + 1) 

160 iterator = islice(self.ichunks(fd, self.ref_atoms, self.aligned), 

161 *indices_tuple) 

162 return iterator 

163 

164 

165iread_cp2k_dcd = DCDImageIterator(idcdchunks) 

166 

167 

168def read_cp2k_dcd(fileobj, index=-1, ref_atoms=None, aligned=False): 

169 """ Read a DCD file created by CP2K. 

170 

171 To yield one Atoms object at a time use ``iread_cp2k_dcd``. 

172 

173 Note: Other programs use other formats, they are probably not compatible. 

174 

175 If ref_atoms is not given, all atoms will have symbol 'X'. 

176 

177 To make sure that this is a dcd file generated with the *DCD_ALIGNED_CELL* key 

178 in the CP2K input, you need to set ``aligned`` to *True* to get cell information. 

179 Make sure you do not set it otherwise, the cell will not match the atomic coordinates! 

180 See the following url for details: 

181 https://groups.google.com/forum/#!searchin/cp2k/Outputting$20cell$20information$20and$20fractional$20coordinates%7Csort:relevance/cp2k/72MhYykrSrQ/5c9Jaw7S9yQJ. 

182 """ # noqa: E501 

183 dtype, natoms, nsteps, header_end = _read_metainfo(fileobj) 

184 bytes_per_timestep = _bytes_per_timestep(natoms) 

185 if ref_atoms: 

186 symbols = ref_atoms.get_chemical_symbols() 

187 else: 

188 symbols = ['X' for _ in range(natoms)] 

189 if natoms != len(symbols): 

190 raise ValueError("Length of ref_atoms does not match natoms " 

191 "from dcd file") 

192 trbl = index2range(index, nsteps) 

193 

194 for index in trbl: 

195 frame_pos = int(header_end + index * bytes_per_timestep) 

196 fileobj.seek(frame_pos) 

197 yield _read_cp2k_dcd_frame(fileobj, dtype, natoms, symbols, 

198 aligned=aligned) 

199 

200 

201def _read_cp2k_dcd_frame(fileobj, dtype, natoms, symbols, aligned=False): 

202 arr = np.fromfile(fileobj, dtype, 1) 

203 cryst_const = np.empty(6, dtype=np.float64) 

204 cryst_const[0] = arr['x1'][0, 0] 

205 cryst_const[1] = arr['x1'][0, 2] 

206 cryst_const[2] = arr['x1'][0, 5] 

207 cryst_const[3] = arr['x1'][0, 4] 

208 cryst_const[4] = arr['x1'][0, 3] 

209 cryst_const[5] = arr['x1'][0, 1] 

210 coords = np.empty((natoms, 3), dtype=np.float32) 

211 coords[..., 0] = arr['x3'][0, ...] 

212 coords[..., 1] = arr['x5'][0, ...] 

213 coords[..., 2] = arr['x7'][0, ...] 

214 assert len(coords) == len(symbols) 

215 if aligned: 

216 # convention of the cell is (see cp2ks src/particle_methods.F): 

217 # A is in x 

218 # B lies in xy plane 

219 # luckily this is also the ASE convention for Atoms.set_cell() 

220 atoms = Atoms(symbols, coords, cell=cryst_const, pbc=True) 

221 else: 

222 atoms = Atoms(symbols, coords) 

223 return atoms 

224 

225 

226def read_cp2k_restart(fileobj): 

227 """Read Atoms and Cell from Restart File. 

228 

229 Reads the elements, coordinates and cell information from the 

230 '&SUBSYS' section of a CP2K restart file. 

231 

232 Tries to convert atom names to elements, if this fails element is set to X. 

233 

234 Returns an Atoms object. 

235 """ 

236 

237 def _parse_section(inp): 

238 """Helper to parse structure to nested dict""" 

239 ret = {'content': []} 

240 while inp: 

241 line = inp.readline().strip() 

242 if line.startswith('&END'): 

243 return ret 

244 elif line.startswith('&'): 

245 key = line.replace('&', '') 

246 ret[key] = _parse_section(inp) 

247 else: 

248 ret['content'].append(line) 

249 return ret 

250 

251 def _fast_forward_to(fileobj, section_header): 

252 """Helper to forward to a section""" 

253 found = False 

254 while fileobj: 

255 line = fileobj.readline() 

256 if section_header in line: 

257 found = True 

258 break 

259 if not found: 

260 raise RuntimeError(f"No {section_header} section found!") 

261 

262 def _read_cell(data): 

263 """Helper to read cell data, returns cell and pbc""" 

264 cell = None 

265 pbc = [False, False, False] 

266 if 'CELL' in data: 

267 content = data['CELL']['content'] 

268 cell = Cell([[0, 0, 0] for i in range(3)]) 

269 char2idx = {'A ': 0, 'B ': 1, 'C ': 2} 

270 for line in content: 

271 # lines starting with A/B/C<whitespace> have cell 

272 if line[:2] in char2idx: 

273 idx = char2idx[line[:2]] 

274 cell[idx] = [float(x) for x in line.split()[1:]] 

275 pbc[idx] = True 

276 if {len(v) for v in cell} != {3}: 

277 raise RuntimeError("Bad Cell Definition found.") 

278 return cell, pbc 

279 

280 def _read_geometry(content): 

281 """Helper to read geometry, returns a list of Atoms""" 

282 atom_list = [] 

283 for entry in content: 

284 entry = entry.split() 

285 # Get letters for element symbol 

286 el = [char.lower() for char in entry[0] if char.isalpha()] 

287 el = "".join(el).capitalize() 

288 # Get positions 

289 pos = [float(x) for x in entry[1:4]] 

290 if el in atomic_numbers.keys(): 

291 atom_list.append(Atom(el, pos)) 

292 else: 

293 atom_list.append(Atom('X', pos)) 

294 return atom_list 

295 

296 # fast forward to &SUBSYS section 

297 _fast_forward_to(fileobj, '&SUBSYS') 

298 # parse sections into dictionary 

299 data = _parse_section(fileobj) 

300 # look for &CELL 

301 cell, pbc = _read_cell(data) 

302 # get the atoms 

303 atom_list = _read_geometry(data['COORD']['content']) 

304 return Atoms(atom_list, cell=cell, pbc=pbc)