Coverage for src/km3pipe/shell.py: 41%

128 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-19 03:14 +0000

1# Filename: shell.py 

2# pylint: disable=C0103 

3""" 

4Some shell helpers 

5 

6""" 

7import os 

8import subprocess 

9from warnings import warn 

10 

11from .tools import lstrip 

12from .logger import get_logger 

13 

14from subprocess import DEVNULL # py3k 

15 

16__author__ = "Tamas Gal" 

17__copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration." 

18__credits__ = [] 

19__license__ = "MIT" 

20__maintainer__ = "Tamas Gal" 

21__email__ = "tgal@km3net.de" 

22__status__ = "Development" 

23 

24log = get_logger(__name__) # pylint: disable=C0103 

25 

26BATCH_TYPE = {"in2p3": "slurm", "woody": "torque"} 

27SUBMIT_CMD = {"slurm": "squeue", "torque": "qsub"} 

28 

29JOB_TEMPLATES = { 

30 "in2p3": lstrip( 

31 """ 

32 #SBATCH --partition=htc 

33 #SBATCH --job-name={job_name} 

34 #SBATCH --mail-user= {email} 

35 #SBATCH --mail-type {send_mail} 

36 #SBATCH --output={log_path}/{job_name}{task_name}.log 

37 #SBATCH --account={group} 

38 # 

39 ## Walltime (HH:MM:SS), for different batch systems (IN2P3, ECAP, ...) 

40 #SBATCH --time={walltime} 

41 ## Memory (Units: G, M, K, B; Min 64M) 

42 #SBATCH --mem={memory} 

43 # 

44 ## Extra Resources (sps, irods, hpss, oracle, xrootd, dcache) 

45 #SBATCH --licenses={resources} 

46 {extra_options} 

47 

48 echo "========================================================" 

49 echo "Job started on" $(date) 

50 echo "========================================================" 

51 

52 {script} 

53 

54 echo "========================================================" 

55 echo "JAWOLLJA! Job exited on" $(date) 

56 echo "========================================================" 

57 """ 

58 ), 

59 "woody": lstrip( 

60 """ 

61 #PBS -N {job_name} 

62 #PBS -M {email} -m a 

63 #PBS -o {log_path}/{job_name}{task_name}.out.log 

64 #PBS -e {log_path}/{job_name}{task_name}.err.log 

65 #PBS -l nodes={nodes}:ppn={ppn}{node_type} 

66 #PBS -l walltime={walltime} 

67 

68 echo "========================================================" 

69 echo "Job started on" $(date) 

70 echo "========================================================" 

71 

72 {script} 

73 

74 echo "========================================================" 

75 echo "JAWOLLJA! Job exited on" $(date) 

76 echo "========================================================" 

77 """ 

78 ), 

79} 

80 

81 

82def qsub(script, job_name, dryrun=False, silent=False, *args, **kwargs): 

83 """ 

84 Submit a job via qsub. 

85 

86 Returns the job script as string. 

87 """ 

88 warn( 

89 "qsub is deprecated and will be removed in the next major version!", 

90 DeprecationWarning, 

91 stacklevel=2, 

92 ) 

93 submit(script, job_name, dryrun=dryrun, silent=silent, *arg, **kwargs) 

94 

95 

96def submit(script, job_name, dryrun=False, silent=False, *args, **kwargs): 

97 """ 

98 Submit a job. 

99 

100 Returns the job script as string. 

101 """ 

102 submit_cmd = SUBMIT_CMD[BATCH_TYPE[kwargs["cluster"]]] 

103 

104 if not silent: 

105 print("Preparing job script...") 

106 job_string = gen_job(script=script, job_name=job_name, *args, **kwargs) 

107 env = os.environ.copy() 

108 if dryrun: 

109 print( 

110 "This is a dry run! Here is the generated job file, which will " 

111 "not be submitted:" 

112 ) 

113 print(job_string) 

114 else: 

115 if not silent: 

116 print("Calling {} with the generated job script.".format(submit_cmd)) 

117 out_pipe = subprocess.PIPE 

118 else: 

119 out_pipe = DEVNULL 

120 p = subprocess.Popen( 

121 "{} -V".format(submit_cmd), 

122 stdin=subprocess.PIPE, 

123 env=env, 

124 shell=True, 

125 stdout=out_pipe, 

126 ) 

127 p.communicate(input=bytes(job_string.encode("ascii"))) 

128 

129 return job_string 

130 

131 

132def _gen_job_slurm(**kwargs): 

133 resources = [] 

134 if kwargs["sps"]: 

135 resources.append("sps") 

136 if kwargs["irods"]: 

137 resources.append("irods") 

138 if kwargs["hpss"]: 

139 resources.append("hpss") 

140 if kwargs["xrootd"]: 

141 resources.append("xrootd") 

142 if kwargs["dcache"]: 

143 resources.append("dcache") 

144 if kwargs["oracle"]: 

145 resources.append("oracle") 

146 resources = ",".join(resources) 

147 

148 email = ( 

149 os.environ["USER"] + "@km3net.de" 

150 if kwargs["email"] is None 

151 else kwargs["email"] 

152 ) 

153 del kwargs["kwargs"] 

154 

155 log_path = os.path.abspath(kwargs["log_path"]) 

156 del kwargs["log_path"] 

157 

158 if job_array_stop is not None: 

159 job_array_option = "#SBATCH --array {}-{}:{}".format( 

160 kwargs["job_array_start"], 

161 kwargs["job_array_stop"], 

162 kwargs["job_array_step"], 

163 ) 

164 else: 

165 job_array_option = "#" 

166 

167 extra_options = "\n".join([job_array_option]) 

168 

169 job_string = JOB_TEMPLATES[kwargs["cluster"]].format( 

170 resources=resources, 

171 email=email, 

172 extra_options=extra_options, 

173 log_path=log_path, 

174 **kwargs 

175 ) 

176 

177 return job_string 

178 

179 

180def _gen_job_torque(**kwargs): 

181 """Generate a job script.""" 

182 shell = os.environ["SHELL"] if kwargs["shell"] is None else kwargs["shell"] 

183 del kwargs["shell"] 

184 

185 email = ( 

186 os.environ["USER"] + "@km3net.de" 

187 if kwargs["email"] is None 

188 else kwargs["email"] 

189 ) 

190 del kwargs["kwargs"] 

191 

192 cpu = kwargs["walltime"] if kwargs["walltime"] is None else kwargs["cpu"] 

193 del kwargs["walltime"] 

194 

195 script = ( 

196 str(kwargs["script"]) 

197 if isinstance(kwargs["script"], Script) 

198 else kwargs["script"] 

199 ) 

200 del kwargs["script"] 

201 

202 log_path = os.path.abspath(kwargs["log_path"]) 

203 del kwargs["log_path"] 

204 

205 if job_array_stop is not None: 

206 job_array_option = "#$ -t {}-{}:{}".format( 

207 kwargs["job_array_start"], 

208 kwargs["job_array_stop"], 

209 kwargs["job_array_step"], 

210 ) 

211 else: 

212 job_array_option = "#" 

213 

214 if kwargs["split_array_logs"]: 

215 task_name = "_$TASK_ID" 

216 else: 

217 task_name = "" 

218 

219 if kwargs["node_type"] is not None: 

220 node_type = ":" + str(kwargs["node_type"]) 

221 else: 

222 node_type = "" 

223 del kwargs["node_type"] 

224 

225 job_string = JOB_TEMPLATES[kwargs["cluster"]].format( 

226 email=email, 

227 log_path=log_path, 

228 cpu=cpu, 

229 job_array_option=job_array_option, 

230 task_name=task_name, 

231 node_type=node_type, 

232 **kwargs 

233 ) 

234 return job_string 

235 

236 

237def gen_job( 

238 script, 

239 job_name, 

240 log_path="qlogs", 

241 group="km3net", 

242 walltime="00:10:00", 

243 nodes=1, 

244 ppn=4, 

245 node_type=None, 

246 cluster="in2p3", 

247 memory="3G", 

248 email=None, 

249 send_mail="n", 

250 job_array_start=1, 

251 job_array_stop=None, 

252 job_array_step=1, 

253 irods=False, 

254 sps=True, 

255 hpss=False, 

256 xrootd=False, 

257 dcache=False, 

258 oracle=False, 

259 split_array_logs=False, 

260): 

261 kwargs = locals().items() 

262 if BATCH_TYPE[cluster] == "slurm": 

263 return _gen_job_slurm(**kwargs) 

264 elif BATCH_TYPE[cluster] == "torque": 

265 return _gen_job_torque(**kwargs) 

266 

267 

268def get_jpp_env(jpp_dir): 

269 """Return the environment dict of a loaded Jpp env. 

270 

271 The returned env can be passed to `subprocess.Popen("J...", env=env)` 

272 to execute Jpp commands. 

273 

274 """ 

275 env = { 

276 v[0]: "".join(v[1:]) 

277 for v in [ 

278 l.split("=") 

279 for l in os.popen("source {0}/setenv.sh {0} && env".format(jpp_dir)) 

280 .read() 

281 .split("\n") 

282 if "=" in l 

283 ] 

284 } 

285 return env 

286 

287 

288class Script(object): 

289 """A shell script which can be built line by line for `qsub`.""" 

290 

291 def __init__(self): 

292 self.lines = [] 

293 

294 def add(self, line): 

295 """Add a new line""" 

296 self.lines.append(line) 

297 

298 def echo(self, text): 

299 """Add an echo command. The given text will be double qouted.""" 

300 self.lines.append('echo "{}"'.format(text)) 

301 

302 def separator(self, character="=", length=42): 

303 """Add a visual separator.""" 

304 self.echo(character * length) 

305 

306 def cp(self, source, target): 

307 """Add a new copy instruction""" 

308 self._add_two_argument_command("cp", source, target) 

309 

310 def mv(self, source, target): 

311 """Add a new move instruction""" 

312 self._add_two_argument_command("mv", source, target) 

313 

314 def mkdir(self, folder_path): 

315 """Add a new 'mkdir -p' instruction""" 

316 self.add('mkdir -p "{}"'.format(folder_path)) 

317 

318 def iget(self, irods_path, attempts=1, pause=15): 

319 """Add an iget command to retrieve a file from iRODS. 

320 

321 Parameters 

322 ---------- 

323 irods_path: str 

324 Filepath which should be fetched using iget 

325 attempts: int (default: 1) 

326 Number of retries, if iRODS access fails 

327 pause: int (default: 15) 

328 Pause between two access attempts in seconds 

329 """ 

330 if attempts > 1: 

331 cmd = """ for i in {{1..{0}}}; do 

332 ret=$(iget -v {1} 2>&1) 

333 echo $ret 

334 if [[ $ret == *"ERROR"* ]]; then 

335 echo "Attempt $i failed" 

336 else 

337 break 

338 fi 

339 sleep {2}s 

340 done """ 

341 cmd = lstrip(cmd) 

342 cmd = cmd.format(attempts, irods_path, pause) 

343 self.add(cmd) 

344 else: 

345 self.add('iget -v "{}"'.format(irods_path)) 

346 

347 def _add_two_argument_command(self, command, arg1, arg2): 

348 """Helper function for two-argument commands""" 

349 self.lines.append("{} {} {}".format(command, arg1, arg2)) 

350 

351 def clear(self): 

352 self.lines = [] 

353 

354 def __add__(self, other): 

355 new_script = Script() 

356 new_script.lines = self.lines + other.lines 

357 return new_script 

358 

359 def __str__(self): 

360 return "\n".join(self.lines) 

361 

362 def __repr__(self): 

363 return "# Shell script\n" + str(self)