Coverage for /builds/ase/ase/ase/ga/multiprocessingrun.py: 29.63%

27 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2025-08-02 00:12 +0000

1# fmt: off 

2 

3""" Class for handling several simultaneous jobs. 

4The class has been tested on Niflheim-opteron4. 

5""" 

6import time 

7from multiprocessing import Pool 

8 

9from ase.io import read, write 

10 

11 

12class MultiprocessingRun: 

13 """Class that allows for the simultaneous relaxation of 

14 several candidates on a cluster. Best used if each individual 

15 calculation is too small for using a queueing system. 

16 

17 Parameters: 

18 

19 data_connection: DataConnection object. 

20 

21 tmp_folder: Folder for temporary files. 

22 

23 n_simul: The number of simultaneous relaxations. 

24 

25 relax_function: The relaxation function. This needs to return 

26 the filename of the relaxed structure. 

27 """ 

28 

29 def __init__(self, data_connection, relax_function, 

30 tmp_folder, n_simul=None): 

31 self.dc = data_connection 

32 self.pool = Pool(n_simul) 

33 self.relax_function = relax_function 

34 self.tmp_folder = tmp_folder 

35 self.results = [] 

36 

37 def relax(self, a): 

38 """Relax the atoms object a by submitting the relaxation 

39 to the pool of cpus.""" 

40 self.dc.mark_as_queued(a) 

41 fname = '{}/cand{}.traj'.format(self.tmp_folder, 

42 a.info['confid']) 

43 write(fname, a) 

44 self.results.append(self.pool.apply_async(self.relax_function, 

45 [fname])) 

46 self._cleanup() 

47 

48 def _cleanup(self): 

49 for r in self.results: 

50 if r.ready() and r.successful(): 

51 fname = r.get() 

52 a = read(fname) 

53 self.dc.add_relaxed_step(a) 

54 self.results.remove(r) 

55 

56 def finish_all(self): 

57 """Checks that all calculations are finished, if not 

58 wait and check again. Return when all are finished.""" 

59 while len(self.results) > 0: 

60 self._cleanup() 

61 time.sleep(2.)