Coverage for /builds/ase/ase/ase/io/bundlemanipulate.py: 6.56%
122 statements
« prev ^ index » next coverage.py v7.5.3, created at 2025-08-02 00:12 +0000
« prev ^ index » next coverage.py v7.5.3, created at 2025-08-02 00:12 +0000
1# fmt: off
3"""Functions for in-place manipulation of bundletrajectories.
5This module defines a number of functions that can be used to
6extract and delete data from BundleTrajectories directly on
7disk. The functions are intended for large-scale MD output,
8so they avoid copying the potentially large amounts of data.
9Instead, data is either directly deleted in-place; or copies
10are made by creating a new directory structure, but hardlinking
11the data files. Hard links makes it possible to delete the
12original data without invalidating the copy.
14Usage from command line:
16python -m ase.io.bundlemanipulate inbundle outbundle [start [end [step]]]
17"""
19import json
20import os
21from typing import Optional
23import numpy as np
25from ase.io.bundletrajectory import UlmBundleBackend
28def copy_frames(inbundle, outbundle, start=0, end=None, step=1,
29 verbose=False):
30 """Copies selected frame from one bundle to the next."""
31 if not (isinstance(start, int) and
32 (isinstance(end, int) or end is None) and
33 isinstance(step, int)):
34 raise TypeError("copy_frames: start, end and step must be integers.")
35 metadata, nframes = read_bundle_info(inbundle)
37 if metadata['backend'] == 'ulm':
38 backend = UlmBundleBackend(True, metadata['ulm.singleprecision'])
39 elif metadata['backend'] == 'pickle':
40 raise OSError("Input BundleTrajectory uses the 'pickle' backend. " +
41 "This is not longer supported for security reasons")
42 else:
43 raise OSError("Unknown backend type '{}'".format(metadata['backend']))
45 if start < 0:
46 start += nframes
47 if end is None:
48 end = nframes
49 if end < 0:
50 end += nframes
51 if start < 0 or (start > nframes - 1 and end > 0):
52 raise ValueError("copy_frames: Invalid start value.")
53 if end < 0 or (end > nframes - 1 and end < 0):
54 raise ValueError("copy_frames: Invalid end value.")
55 if step == 0:
56 raise ValueError("copy_frames: Invalid step value (zero)")
57 frames = list(range(start, end, step))
58 if verbose:
59 print("Copying the frames", frames)
61 # Make the new bundle directory
62 os.mkdir(outbundle)
63 with open(os.path.join(outbundle, 'metadata.json'), 'w') as fd:
64 json.dump(metadata, fd, indent=2)
66 for nout, nin in enumerate(frames):
67 if verbose:
68 print("F%i -> F%i" % (nin, nout))
69 indir = os.path.join(inbundle, "F" + str(nin))
70 outdir = os.path.join(outbundle, "F" + str(nout))
71 os.mkdir(outdir)
72 names = os.listdir(indir)
73 for name in names:
74 fromfile = os.path.join(indir, name)
75 tofile = os.path.join(outdir, name)
76 os.link(fromfile, tofile)
77 if nout == 0 and nin != 0:
78 if verbose:
79 print("F0 -> F0 (supplemental)")
80 # The smalldata.ulm stuff must be updated.
81 # At the same time, check if the number of fragments
82 # has not changed.
83 data0 = backend.read_small(os.path.join(inbundle, "F0"))
84 data1 = backend.read_small(indir)
85 split_data = (metadata['subtype'] == 'split')
86 if split_data:
87 fragments0 = data0['fragments']
88 fragments1 = data1['fragments']
90 data0.update(data1) # Data in frame overrides data from frame 0.
91 backend.write_small(outdir, data0)
93 # If data is written in split mode, it must be reordered
94 firstnames = os.listdir(os.path.join(inbundle, "F0"))
95 if not split_data:
96 # Simple linking
97 for name in firstnames:
98 if name not in names:
99 if verbose:
100 print(" ", name, " (linking)")
101 fromfile = os.path.join(inbundle, "F0", name)
102 tofile = os.path.join(outdir, name)
103 os.link(fromfile, tofile)
104 else:
105 # Must read and rewrite data
106 # First we read the ID's from frame 0 and N
107 assert 'ID_0.ulm' in firstnames and 'ID_0.ulm' in names
108 backend.nfrag = fragments0
109 f0_id, _dummy = backend.read_split(
110 os.path.join(inbundle, "F0"), "ID"
111 )
112 backend.nfrag = fragments1
113 fn_id, fn_sizes = backend.read_split(indir, "ID")
114 for name in firstnames:
115 # Only look at each array, not each file
116 if '_0.' not in name:
117 continue
118 if name not in names:
119 # We need to load this array
120 arrayname = name.split('_')[0]
121 print(" Reading", arrayname)
122 backend.nfrag = fragments0
123 f0_data, _dummy = backend.read_split(
124 os.path.join(inbundle, "F0"), arrayname
125 )
126 # Sort data
127 f0_data[f0_id] = np.array(f0_data)
128 # Unsort with new ordering
129 f0_data = f0_data[fn_id]
130 # Write it
131 print(" Writing reshuffled", arrayname)
132 pointer = 0
133 backend.nfrag = fragments1
134 for i, s in enumerate(fn_sizes):
135 segment = f0_data[pointer:pointer + s]
136 pointer += s
137 backend.write(outdir, f'{arrayname}_{i}',
138 segment)
139 # Finally, write the number of frames
140 with open(os.path.join(outbundle, 'frames'), 'w') as fd:
141 fd.write(str(len(frames)) + '\n')
144# Helper functions
145def read_bundle_info(name):
146 """Read global info about a bundle.
148 Returns (metadata, nframes)
149 """
150 if not os.path.isdir(name):
151 raise OSError(f"No directory (bundle) named '{name}' found.")
153 metaname = os.path.join(name, 'metadata.json')
155 if not os.path.isfile(metaname):
156 if os.path.isfile(os.path.join(name, 'metadata')):
157 raise OSError(
158 "Found obsolete metadata in unsecure Pickle format. "
159 "Refusing to load.")
160 else:
161 raise OSError("'{}' does not appear to be a BundleTrajectory "
162 "(no {})".format(name, metaname))
164 with open(metaname) as fd:
165 mdata = json.load(fd)
167 if 'format' not in mdata or mdata['format'] != 'BundleTrajectory':
168 raise OSError(f"'{name}' does not appear to be a BundleTrajectory")
169 if mdata['version'] != 1:
170 raise OSError("Cannot manipulate BundleTrajectories with version "
171 "number %s" % (mdata['version'],))
172 with open(os.path.join(name, "frames")) as fd:
173 nframes = int(fd.read())
174 if nframes == 0:
175 raise OSError(f"'{name}' is an empty BundleTrajectory")
176 return mdata, nframes
179if __name__ == '__main__':
180 import sys
181 if len(sys.argv) < 3:
182 print(__doc__)
183 sys.exit()
184 inname, outname = sys.argv[1:3]
185 if len(sys.argv) > 3:
186 start = int(sys.argv[3])
187 else:
188 start = 0
189 if len(sys.argv) > 4:
190 end: Optional[int] = int(sys.argv[4])
191 else:
192 end = None
193 if len(sys.argv) > 5:
194 step = int(sys.argv[5])
195 else:
196 step = 1
197 copy_frames(inname, outname, start, end, step, verbose=1)