Coverage for src/km3pipe/shell.py: 41%
128 statements
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 03:14 +0000
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 03:14 +0000
1# Filename: shell.py
2# pylint: disable=C0103
3"""
4Some shell helpers
6"""
7import os
8import subprocess
9from warnings import warn
11from .tools import lstrip
12from .logger import get_logger
14from subprocess import DEVNULL # py3k
16__author__ = "Tamas Gal"
17__copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration."
18__credits__ = []
19__license__ = "MIT"
20__maintainer__ = "Tamas Gal"
21__email__ = "tgal@km3net.de"
22__status__ = "Development"
24log = get_logger(__name__) # pylint: disable=C0103
26BATCH_TYPE = {"in2p3": "slurm", "woody": "torque"}
27SUBMIT_CMD = {"slurm": "squeue", "torque": "qsub"}
29JOB_TEMPLATES = {
30 "in2p3": lstrip(
31 """
32 #SBATCH --partition=htc
33 #SBATCH --job-name={job_name}
34 #SBATCH --mail-user= {email}
35 #SBATCH --mail-type {send_mail}
36 #SBATCH --output={log_path}/{job_name}{task_name}.log
37 #SBATCH --account={group}
38 #
39 ## Walltime (HH:MM:SS), for different batch systems (IN2P3, ECAP, ...)
40 #SBATCH --time={walltime}
41 ## Memory (Units: G, M, K, B; Min 64M)
42 #SBATCH --mem={memory}
43 #
44 ## Extra Resources (sps, irods, hpss, oracle, xrootd, dcache)
45 #SBATCH --licenses={resources}
46 {extra_options}
48 echo "========================================================"
49 echo "Job started on" $(date)
50 echo "========================================================"
52 {script}
54 echo "========================================================"
55 echo "JAWOLLJA! Job exited on" $(date)
56 echo "========================================================"
57 """
58 ),
59 "woody": lstrip(
60 """
61 #PBS -N {job_name}
62 #PBS -M {email} -m a
63 #PBS -o {log_path}/{job_name}{task_name}.out.log
64 #PBS -e {log_path}/{job_name}{task_name}.err.log
65 #PBS -l nodes={nodes}:ppn={ppn}{node_type}
66 #PBS -l walltime={walltime}
68 echo "========================================================"
69 echo "Job started on" $(date)
70 echo "========================================================"
72 {script}
74 echo "========================================================"
75 echo "JAWOLLJA! Job exited on" $(date)
76 echo "========================================================"
77 """
78 ),
79}
82def qsub(script, job_name, dryrun=False, silent=False, *args, **kwargs):
83 """
84 Submit a job via qsub.
86 Returns the job script as string.
87 """
88 warn(
89 "qsub is deprecated and will be removed in the next major version!",
90 DeprecationWarning,
91 stacklevel=2,
92 )
93 submit(script, job_name, dryrun=dryrun, silent=silent, *arg, **kwargs)
96def submit(script, job_name, dryrun=False, silent=False, *args, **kwargs):
97 """
98 Submit a job.
100 Returns the job script as string.
101 """
102 submit_cmd = SUBMIT_CMD[BATCH_TYPE[kwargs["cluster"]]]
104 if not silent:
105 print("Preparing job script...")
106 job_string = gen_job(script=script, job_name=job_name, *args, **kwargs)
107 env = os.environ.copy()
108 if dryrun:
109 print(
110 "This is a dry run! Here is the generated job file, which will "
111 "not be submitted:"
112 )
113 print(job_string)
114 else:
115 if not silent:
116 print("Calling {} with the generated job script.".format(submit_cmd))
117 out_pipe = subprocess.PIPE
118 else:
119 out_pipe = DEVNULL
120 p = subprocess.Popen(
121 "{} -V".format(submit_cmd),
122 stdin=subprocess.PIPE,
123 env=env,
124 shell=True,
125 stdout=out_pipe,
126 )
127 p.communicate(input=bytes(job_string.encode("ascii")))
129 return job_string
132def _gen_job_slurm(**kwargs):
133 resources = []
134 if kwargs["sps"]:
135 resources.append("sps")
136 if kwargs["irods"]:
137 resources.append("irods")
138 if kwargs["hpss"]:
139 resources.append("hpss")
140 if kwargs["xrootd"]:
141 resources.append("xrootd")
142 if kwargs["dcache"]:
143 resources.append("dcache")
144 if kwargs["oracle"]:
145 resources.append("oracle")
146 resources = ",".join(resources)
148 email = (
149 os.environ["USER"] + "@km3net.de"
150 if kwargs["email"] is None
151 else kwargs["email"]
152 )
153 del kwargs["kwargs"]
155 log_path = os.path.abspath(kwargs["log_path"])
156 del kwargs["log_path"]
158 if job_array_stop is not None:
159 job_array_option = "#SBATCH --array {}-{}:{}".format(
160 kwargs["job_array_start"],
161 kwargs["job_array_stop"],
162 kwargs["job_array_step"],
163 )
164 else:
165 job_array_option = "#"
167 extra_options = "\n".join([job_array_option])
169 job_string = JOB_TEMPLATES[kwargs["cluster"]].format(
170 resources=resources,
171 email=email,
172 extra_options=extra_options,
173 log_path=log_path,
174 **kwargs
175 )
177 return job_string
180def _gen_job_torque(**kwargs):
181 """Generate a job script."""
182 shell = os.environ["SHELL"] if kwargs["shell"] is None else kwargs["shell"]
183 del kwargs["shell"]
185 email = (
186 os.environ["USER"] + "@km3net.de"
187 if kwargs["email"] is None
188 else kwargs["email"]
189 )
190 del kwargs["kwargs"]
192 cpu = kwargs["walltime"] if kwargs["walltime"] is None else kwargs["cpu"]
193 del kwargs["walltime"]
195 script = (
196 str(kwargs["script"])
197 if isinstance(kwargs["script"], Script)
198 else kwargs["script"]
199 )
200 del kwargs["script"]
202 log_path = os.path.abspath(kwargs["log_path"])
203 del kwargs["log_path"]
205 if job_array_stop is not None:
206 job_array_option = "#$ -t {}-{}:{}".format(
207 kwargs["job_array_start"],
208 kwargs["job_array_stop"],
209 kwargs["job_array_step"],
210 )
211 else:
212 job_array_option = "#"
214 if kwargs["split_array_logs"]:
215 task_name = "_$TASK_ID"
216 else:
217 task_name = ""
219 if kwargs["node_type"] is not None:
220 node_type = ":" + str(kwargs["node_type"])
221 else:
222 node_type = ""
223 del kwargs["node_type"]
225 job_string = JOB_TEMPLATES[kwargs["cluster"]].format(
226 email=email,
227 log_path=log_path,
228 cpu=cpu,
229 job_array_option=job_array_option,
230 task_name=task_name,
231 node_type=node_type,
232 **kwargs
233 )
234 return job_string
237def gen_job(
238 script,
239 job_name,
240 log_path="qlogs",
241 group="km3net",
242 walltime="00:10:00",
243 nodes=1,
244 ppn=4,
245 node_type=None,
246 cluster="in2p3",
247 memory="3G",
248 email=None,
249 send_mail="n",
250 job_array_start=1,
251 job_array_stop=None,
252 job_array_step=1,
253 irods=False,
254 sps=True,
255 hpss=False,
256 xrootd=False,
257 dcache=False,
258 oracle=False,
259 split_array_logs=False,
260):
261 kwargs = locals().items()
262 if BATCH_TYPE[cluster] == "slurm":
263 return _gen_job_slurm(**kwargs)
264 elif BATCH_TYPE[cluster] == "torque":
265 return _gen_job_torque(**kwargs)
268def get_jpp_env(jpp_dir):
269 """Return the environment dict of a loaded Jpp env.
271 The returned env can be passed to `subprocess.Popen("J...", env=env)`
272 to execute Jpp commands.
274 """
275 env = {
276 v[0]: "".join(v[1:])
277 for v in [
278 l.split("=")
279 for l in os.popen("source {0}/setenv.sh {0} && env".format(jpp_dir))
280 .read()
281 .split("\n")
282 if "=" in l
283 ]
284 }
285 return env
288class Script(object):
289 """A shell script which can be built line by line for `qsub`."""
291 def __init__(self):
292 self.lines = []
294 def add(self, line):
295 """Add a new line"""
296 self.lines.append(line)
298 def echo(self, text):
299 """Add an echo command. The given text will be double qouted."""
300 self.lines.append('echo "{}"'.format(text))
302 def separator(self, character="=", length=42):
303 """Add a visual separator."""
304 self.echo(character * length)
306 def cp(self, source, target):
307 """Add a new copy instruction"""
308 self._add_two_argument_command("cp", source, target)
310 def mv(self, source, target):
311 """Add a new move instruction"""
312 self._add_two_argument_command("mv", source, target)
314 def mkdir(self, folder_path):
315 """Add a new 'mkdir -p' instruction"""
316 self.add('mkdir -p "{}"'.format(folder_path))
318 def iget(self, irods_path, attempts=1, pause=15):
319 """Add an iget command to retrieve a file from iRODS.
321 Parameters
322 ----------
323 irods_path: str
324 Filepath which should be fetched using iget
325 attempts: int (default: 1)
326 Number of retries, if iRODS access fails
327 pause: int (default: 15)
328 Pause between two access attempts in seconds
329 """
330 if attempts > 1:
331 cmd = """ for i in {{1..{0}}}; do
332 ret=$(iget -v {1} 2>&1)
333 echo $ret
334 if [[ $ret == *"ERROR"* ]]; then
335 echo "Attempt $i failed"
336 else
337 break
338 fi
339 sleep {2}s
340 done """
341 cmd = lstrip(cmd)
342 cmd = cmd.format(attempts, irods_path, pause)
343 self.add(cmd)
344 else:
345 self.add('iget -v "{}"'.format(irods_path))
347 def _add_two_argument_command(self, command, arg1, arg2):
348 """Helper function for two-argument commands"""
349 self.lines.append("{} {} {}".format(command, arg1, arg2))
351 def clear(self):
352 self.lines = []
354 def __add__(self, other):
355 new_script = Script()
356 new_script.lines = self.lines + other.lines
357 return new_script
359 def __str__(self):
360 return "\n".join(self.lines)
362 def __repr__(self):
363 return "# Shell script\n" + str(self)