Coverage for /builds/ase/ase/ase/ga/pbs_queue_run.py: 17.74%
62 statements
« prev ^ index » next coverage.py v7.5.3, created at 2025-08-02 00:12 +0000
« prev ^ index » next coverage.py v7.5.3, created at 2025-08-02 00:12 +0000
1# fmt: off
3""" Class for handling interaction with the PBS queuing system."""
4import os
5import time
6from subprocess import PIPE, Popen
8from ase.io import write
9from ase.io.trajectory import Trajectory
12class PBSQueueRun:
14 """ Class for communicating with the commonly used PBS queing system
15 at a computer cluster.
17 The user needs to supply a job file generator which takes
18 as input a job name and the relative path to the traj
19 file which is to be locally optimized. The function returns
20 the job script as text.
21 If the traj file is called f the job must write a file
22 f[:-5] + '_done.traj' which is then read by this object.
24 Parameters:
26 data_connection: The DataConnection object.
27 tmp_folder: Temporary folder for all calculations
28 job_prefix: Prefix of the job submitted. This identifier is used
29 to determine how many jobs are currently running.
30 n_simul: The number of simultaneous jobs to keep in the queuing system.
31 job_template_generator: The function generating the job file.
32 This function should return the content of the job file as a
33 string.
34 qsub_command: The name of the qsub command (default qsub).
35 qstat_command: The name of the qstat command (default qstat).
36 """
38 def __init__(self, data_connection, tmp_folder, job_prefix,
39 n_simul, job_template_generator,
40 qsub_command='qsub', qstat_command='qstat',
41 find_neighbors=None, perform_parametrization=None):
42 self.dc = data_connection
43 self.job_prefix = job_prefix
44 self.n_simul = n_simul
45 self.job_template_generator = job_template_generator
46 self.qsub_command = qsub_command
47 self.qstat_command = qstat_command
48 self.tmp_folder = tmp_folder
49 self.find_neighbors = find_neighbors
50 self.perform_parametrization = perform_parametrization
51 self.__cleanup__()
53 def relax(self, a):
54 """ Add a structure to the queue. This method does not fail
55 if sufficient jobs are already running, but simply
56 submits the job. """
57 self.__cleanup__()
58 self.dc.mark_as_queued(a)
59 if not os.path.isdir(self.tmp_folder):
60 os.mkdir(self.tmp_folder)
61 fname = '{}/cand{}.traj'.format(self.tmp_folder,
62 a.info['confid'])
63 write(fname, a)
64 job_name = '{}_{}'.format(self.job_prefix, a.info['confid'])
65 with open('tmp_job_file.job', 'w') as fd:
66 fd.write(self.job_template_generator(job_name, fname))
67 os.system(f'{self.qsub_command} tmp_job_file.job')
69 def enough_jobs_running(self):
70 """ Determines if sufficient jobs are running. """
71 return self.number_of_jobs_running() >= self.n_simul
73 def number_of_jobs_running(self):
74 """ Determines how many jobs are running. The user
75 should use this or the enough_jobs_running method
76 to verify that a job needs to be started before
77 calling the relax method."""
78 self.__cleanup__()
79 p = Popen([f'`which {self.qstat_command}` -u `whoami`'],
80 shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE,
81 close_fds=True, universal_newlines=True)
82 fout = p.stdout
83 lines = fout.readlines()
84 n_running = 0
85 for line in lines:
86 if line.find(self.job_prefix) != -1:
87 n_running += 1
88 return n_running
90 def __cleanup__(self):
91 """ Tries to load in structures previously
92 submitted to the queing system. """
93 confs = self.dc.get_all_candidates_in_queue()
94 for c in confs:
95 fdone = '{}/cand{}_done.traj'.format(self.tmp_folder,
96 c)
97 if os.path.isfile(fdone) and os.path.getsize(fdone) > 0:
98 try:
99 a = []
100 niter = 0
101 while len(a) == 0 and niter < 5:
102 t = Trajectory(fdone, 'r')
103 a = [ats for ats in t]
104 if len(a) == 0:
105 time.sleep(1.)
106 niter += 1
107 if len(a) == 0:
108 txt = 'Could not read candidate ' + \
109 f'{c} from the filesystem'
110 raise OSError(txt)
111 a = a[-1]
112 a.info['confid'] = c
113 self.dc.add_relaxed_step(
114 a,
115 find_neighbors=self.find_neighbors,
116 perform_parametrization=self.perform_parametrization)
117 except OSError as e:
118 print(e)