Coverage for /builds/ase/ase/ase/ga/parallellocalrun.py: 19.15%

47 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2025-08-02 00:12 +0000

1# fmt: off 

2 

3""" Class for handling several simultaneous jobs. 

4 The class has been tested on linux and Mac OS X. 

5""" 

6import os 

7import time 

8from subprocess import PIPE, Popen 

9 

10from ase.io import read, write 

11 

12 

13class ParallelLocalRun: 

14 

15 """ Class that allows for the simultaneous relaxation of 

16 several candidates on the same computer. 

17 The method is based on starting each relaxation with an 

18 external python script and then monitoring when the 

19 relaxations are done adding in the resulting structures 

20 to the database. 

21 

22 Parameters: 

23 data_connection: DataConnection object. 

24 tmp_folder: Folder for temporary files 

25 n_simul: The number of simultaneous relaxations. 

26 calc_script: Reference to the relaxation script. 

27 """ 

28 

29 def __init__(self, data_connection, tmp_folder, 

30 n_simul, calc_script): 

31 self.dc = data_connection 

32 self.n_simul = n_simul 

33 self.calc_script = calc_script 

34 self.tmp_folder = tmp_folder 

35 self.running_pids = [] 

36 

37 def get_number_of_jobs_running(self): 

38 """ Returns the number of jobs running. 

39 It is a good idea to check that this is 0 before 

40 terminating the main program. """ 

41 self.__cleanup__() 

42 return len(self.running_pids) 

43 

44 def relax(self, a): 

45 """ Relax the input atoms object a. If n_simul relaxations 

46 are already running the function sleeps until a processor 

47 becomes available. 

48 """ 

49 self.__cleanup__() 

50 

51 # Wait until a thread is available. 

52 while len(self.running_pids) >= self.n_simul: 

53 time.sleep(2.) 

54 self.__cleanup__() 

55 

56 # Mark the structure as queued and run the external py script. 

57 self.dc.mark_as_queued(a) 

58 if not os.path.isdir(self.tmp_folder): 

59 os.mkdir(self.tmp_folder) 

60 fname = '{}/cand{}.traj'.format(self.tmp_folder, 

61 a.info['confid']) 

62 write(fname, a) 

63 p = Popen(['python', self.calc_script, fname]) 

64 self.running_pids.append([a.info['confid'], p.pid]) 

65 

66 def __cleanup__(self): 

67 """ Checks if any relaxations are done and load in the structure 

68 from the traj file. """ 

69 p = Popen(['ps -x -U `whoami`'], shell=True, 

70 stdin=PIPE, stdout=PIPE, stderr=PIPE, close_fds=True, 

71 universal_newlines=True) 

72 (_, fout) = (p.stdin, p.stdout) 

73 lines = fout.readlines() 

74 lines = [line for line in lines if line.find('defunct') == -1] 

75 

76 stopped_runs = [] 

77 for i in range(len(self.running_pids) - 1, -1, -1): 

78 found = False 

79 for line in lines: 

80 if line.find(str(self.running_pids[i][1])) != -1: 

81 found = True 

82 break 

83 if not found: 

84 stopped_runs.append(self.running_pids.pop(i)) 

85 

86 # All processes not running any more must be complete and should 

87 # be loaded in. 

88 for (confid, _) in stopped_runs: 

89 try: 

90 tf = self.tmp_folder 

91 a = read('{}/cand{}_done.traj'.format(tf, 

92 confid)) 

93 self.dc.add_relaxed_step(a) 

94 except OSError as e: 

95 print(e)