Coverage for ase / io / bundlemanipulate.py: 5.71%

105 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-30 08:22 +0000

1# fmt: off 

2 

3"""Functions for in-place manipulation of bundletrajectories. 

4 

5This module defines a number of functions that can be used to 

6extract and delete data from BundleTrajectories directly on 

7disk. The functions are intended for large-scale MD output, 

8so they avoid copying the potentially large amounts of data. 

9Instead, data is either directly deleted in-place; or copies 

10are made by creating a new directory structure, but hardlinking 

11the data files. Hard links makes it possible to delete the 

12original data without invalidating the copy. 

13 

14Usage from command line: 

15 

16python -m ase.io.bundlemanipulate inbundle outbundle [start [end [step]]] 

17""" 

18 

19import json 

20import os 

21 

22import numpy as np 

23 

24from ase.io.bundletrajectory import UlmBundleBackend 

25 

26 

27def copy_frames(inbundle, outbundle, start=0, end=None, step=1, 

28 verbose=False): 

29 """Copies selected frame from one bundle to the next.""" 

30 if not (isinstance(start, int) and 

31 (isinstance(end, int) or end is None) and 

32 isinstance(step, int)): 

33 raise TypeError("copy_frames: start, end and step must be integers.") 

34 metadata, nframes = read_bundle_info(inbundle) 

35 

36 if metadata['backend'] == 'ulm': 

37 backend = UlmBundleBackend(True, metadata['ulm.singleprecision']) 

38 elif metadata['backend'] == 'pickle': 

39 raise OSError("Input BundleTrajectory uses the 'pickle' backend. " + 

40 "This is not longer supported for security reasons") 

41 else: 

42 raise OSError("Unknown backend type '{}'".format(metadata['backend'])) 

43 

44 if start < 0: 

45 start += nframes 

46 if end is None: 

47 end = nframes 

48 if end < 0: 

49 end += nframes 

50 if start < 0 or (start > nframes - 1 and end > 0): 

51 raise ValueError("copy_frames: Invalid start value.") 

52 if end < 0 or (end > nframes - 1 and end < 0): 

53 raise ValueError("copy_frames: Invalid end value.") 

54 if step == 0: 

55 raise ValueError("copy_frames: Invalid step value (zero)") 

56 frames = list(range(start, end, step)) 

57 if verbose: 

58 print("Copying the frames", frames) 

59 

60 # Make the new bundle directory 

61 os.mkdir(outbundle) 

62 with open(os.path.join(outbundle, 'metadata.json'), 'w') as fd: 

63 json.dump(metadata, fd, indent=2) 

64 

65 for nout, nin in enumerate(frames): 

66 if verbose: 

67 print("F%i -> F%i" % (nin, nout)) 

68 indir = os.path.join(inbundle, "F" + str(nin)) 

69 outdir = os.path.join(outbundle, "F" + str(nout)) 

70 os.mkdir(outdir) 

71 names = os.listdir(indir) 

72 for name in names: 

73 fromfile = os.path.join(indir, name) 

74 tofile = os.path.join(outdir, name) 

75 os.link(fromfile, tofile) 

76 if nout == 0 and nin != 0: 

77 if verbose: 

78 print("F0 -> F0 (supplemental)") 

79 # The smalldata.ulm stuff must be updated. 

80 # At the same time, check if the number of fragments 

81 # has not changed. 

82 data0 = backend.read_small(os.path.join(inbundle, "F0")) 

83 data1 = backend.read_small(indir) 

84 split_data = (metadata['subtype'] == 'split') 

85 if split_data: 

86 fragments0 = data0['fragments'] 

87 fragments1 = data1['fragments'] 

88 

89 data0.update(data1) # Data in frame overrides data from frame 0. 

90 backend.write_small(outdir, data0) 

91 

92 # If data is written in split mode, it must be reordered 

93 firstnames = os.listdir(os.path.join(inbundle, "F0")) 

94 if not split_data: 

95 # Simple linking 

96 for name in firstnames: 

97 if name not in names: 

98 if verbose: 

99 print(" ", name, " (linking)") 

100 fromfile = os.path.join(inbundle, "F0", name) 

101 tofile = os.path.join(outdir, name) 

102 os.link(fromfile, tofile) 

103 else: 

104 # Must read and rewrite data 

105 # First we read the ID's from frame 0 and N 

106 assert 'ID_0.ulm' in firstnames and 'ID_0.ulm' in names 

107 backend.nfrag = fragments0 

108 f0_id, _dummy = backend.read_split( 

109 os.path.join(inbundle, "F0"), "ID" 

110 ) 

111 backend.nfrag = fragments1 

112 fn_id, fn_sizes = backend.read_split(indir, "ID") 

113 for name in firstnames: 

114 # Only look at each array, not each file 

115 if '_0.' not in name: 

116 continue 

117 if name not in names: 

118 # We need to load this array 

119 arrayname = name.split('_')[0] 

120 print(" Reading", arrayname) 

121 backend.nfrag = fragments0 

122 f0_data, _dummy = backend.read_split( 

123 os.path.join(inbundle, "F0"), arrayname 

124 ) 

125 # Sort data 

126 f0_data[f0_id] = np.array(f0_data) 

127 # Unsort with new ordering 

128 f0_data = f0_data[fn_id] 

129 # Write it 

130 print(" Writing reshuffled", arrayname) 

131 pointer = 0 

132 backend.nfrag = fragments1 

133 for i, s in enumerate(fn_sizes): 

134 segment = f0_data[pointer:pointer + s] 

135 pointer += s 

136 backend.write(outdir, f'{arrayname}_{i}', 

137 segment) 

138 # Finally, write the number of frames 

139 with open(os.path.join(outbundle, 'frames'), 'w') as fd: 

140 fd.write(str(len(frames)) + '\n') 

141 

142 

143# Helper functions 

144def read_bundle_info(name): 

145 """Read global info about a bundle. 

146 

147 Returns (metadata, nframes) 

148 """ 

149 if not os.path.isdir(name): 

150 raise OSError(f"No directory (bundle) named '{name}' found.") 

151 

152 metaname = os.path.join(name, 'metadata.json') 

153 

154 if not os.path.isfile(metaname): 

155 if os.path.isfile(os.path.join(name, 'metadata')): 

156 raise OSError( 

157 "Found obsolete metadata in unsecure Pickle format. " 

158 "Refusing to load.") 

159 else: 

160 raise OSError("'{}' does not appear to be a BundleTrajectory " 

161 "(no {})".format(name, metaname)) 

162 

163 with open(metaname) as fd: 

164 mdata = json.load(fd) 

165 

166 if 'format' not in mdata or mdata['format'] != 'BundleTrajectory': 

167 raise OSError(f"'{name}' does not appear to be a BundleTrajectory") 

168 if mdata['version'] != 1: 

169 raise OSError("Cannot manipulate BundleTrajectories with version " 

170 "number %s" % (mdata['version'],)) 

171 with open(os.path.join(name, "frames")) as fd: 

172 nframes = int(fd.read()) 

173 if nframes == 0: 

174 raise OSError(f"'{name}' is an empty BundleTrajectory") 

175 return mdata, nframes 

176 

177 

178if __name__ == '__main__': 

179 import sys 

180 if len(sys.argv) < 3: 

181 print(__doc__) 

182 sys.exit() 

183 inname, outname = sys.argv[1:3] 

184 if len(sys.argv) > 3: 

185 start = int(sys.argv[3]) 

186 else: 

187 start = 0 

188 if len(sys.argv) > 4: 

189 end: int | None = int(sys.argv[4]) 

190 else: 

191 end = None 

192 if len(sys.argv) > 5: 

193 step = int(sys.argv[5]) 

194 else: 

195 step = 1 

196 copy_frames(inname, outname, start, end, step, verbose=1)