Coverage for /builds/ase/ase/ase/ga/multiprocessingrun.py: 29.63%
27 statements
« prev ^ index » next coverage.py v7.5.3, created at 2025-08-02 00:12 +0000
« prev ^ index » next coverage.py v7.5.3, created at 2025-08-02 00:12 +0000
1# fmt: off
3""" Class for handling several simultaneous jobs.
4The class has been tested on Niflheim-opteron4.
5"""
6import time
7from multiprocessing import Pool
9from ase.io import read, write
12class MultiprocessingRun:
13 """Class that allows for the simultaneous relaxation of
14 several candidates on a cluster. Best used if each individual
15 calculation is too small for using a queueing system.
17 Parameters:
19 data_connection: DataConnection object.
21 tmp_folder: Folder for temporary files.
23 n_simul: The number of simultaneous relaxations.
25 relax_function: The relaxation function. This needs to return
26 the filename of the relaxed structure.
27 """
29 def __init__(self, data_connection, relax_function,
30 tmp_folder, n_simul=None):
31 self.dc = data_connection
32 self.pool = Pool(n_simul)
33 self.relax_function = relax_function
34 self.tmp_folder = tmp_folder
35 self.results = []
37 def relax(self, a):
38 """Relax the atoms object a by submitting the relaxation
39 to the pool of cpus."""
40 self.dc.mark_as_queued(a)
41 fname = '{}/cand{}.traj'.format(self.tmp_folder,
42 a.info['confid'])
43 write(fname, a)
44 self.results.append(self.pool.apply_async(self.relax_function,
45 [fname]))
46 self._cleanup()
48 def _cleanup(self):
49 for r in self.results:
50 if r.ready() and r.successful():
51 fname = r.get()
52 a = read(fname)
53 self.dc.add_relaxed_step(a)
54 self.results.remove(r)
56 def finish_all(self):
57 """Checks that all calculations are finished, if not
58 wait and check again. Return when all are finished."""
59 while len(self.results) > 0:
60 self._cleanup()
61 time.sleep(2.)