Note
Go to the end to download the full example code.
K40 Calibration Batch Processing¶
Standalone job submitter for K40 offline calibrations with KM3Pipe.
Usage:
qk40calib.py OUTPUT_PATH [options]
qk40calib.py (-h | --help)
Options:
OUTPUT_PATH Folder to store the calibration data.
-d DET_ID Detector ID [default: 29].
-t TMAX Coincidence time window [default: 10].
-n N_RUNS Number of runs to process per job [default: 10].
-e ET Estimated walltime per run in minutes [default: 8].
-m VMEM Estimated vmem for a job [default: 8G].
-s RUNSETUP Match [default: PHYS.1710v5-TUNED.HRV19.3D_T_S_MX.NBMODULE].
-j JOBNAME The name of the submitted jobs [default: k40calib].
-l LOG_PATH Path of the job log files [default: qlogs].
-q Dryrun: don't submit jobs, just print the first job script.
-h --help Show this screen.
import os
import re
from glob import glob
import time
import km3db
from km3pipe.shell import qsub, Script
import km3pipe as kp
from docopt import docopt
def main():
args = docopt(__doc__)
DET_ID = int(args["-d"])
TMAX = int(args["-t"])
ET_PER_RUN = int(args["-e"]) * 60 # [s]
RUNS_PER_JOB = int(args["-n"])
VMEM = args["-m"]
CWD = os.getcwd()
LOG_PATH = args["-l"]
JOB_NAME = args["-j"]
CALIB_PATH = os.path.join(CWD, args["OUTPUT_PATH"])
RUN_SUBSTR = args["-s"]
DRYRUN = args["-q"]
if not os.path.exists(CALIB_PATH):
os.makedirs(CALIB_PATH)
run_table = km3db.StreamDS(container="pd").get("run_table", det_id=DET_ID)
phys_run_table = run_table[run_table.RUNSETUPNAME.str.contains(RUN_SUBSTR)]
phys_runs = set(phys_run_table.RUN)
processed_runs = set(
int(re.search("_\\d{8}_(\\d{8})", s).group(1))
for s in glob(os.path.join(CALIB_PATH, "*.k40_cal.p"))
)
remaining_runs = list(phys_runs - processed_runs)
print("Remaining runs: {}".format(remaining_runs))
s = Script()
for job_id, runs_chunk in enumerate(kp.tools.chunks(remaining_runs, RUNS_PER_JOB)):
n_runs = len(runs_chunk)
print("Preparing batch script for a chunk of {} runs.".format(len(runs_chunk)))
s.add("cd $TMPDIR; mkdir -p $USER; cd $USER")
for run in runs_chunk:
s.add("echo Processing {}:".format(run))
irods_path = kp.tools.irods_path(DET_ID, run)
root_filename = os.path.basename(irods_path)
calib_filename = root_filename + ".k40_cal.p"
s.add("iget -v {}".format(irods_path))
s.add(
"CTMIN=$(JPrint -f {}|grep '^ctMin'|awk '{{print $2}}')".format(
root_filename
)
)
s.add(
"k40calib {} {} -t {} -c $CTMIN -o {}".format(
root_filename, DET_ID, TMAX, calib_filename
)
)
s.add("cp {} {}".format(calib_filename, CALIB_PATH))
s.add("rm -f {}".format(root_filename))
s.add("rm -f {}".format(calib_filename))
s.add("echo Run {} processed.".format(run))
s.add("echo " + 42 * "=")
walltime = time.strftime("%H:%M:%S", time.gmtime(ET_PER_RUN * n_runs))
qsub(
s,
"{}_{}".format(JOB_NAME, job_id),
walltime=walltime,
vmem=VMEM,
log_path=LOG_PATH,
irods=True,
dryrun=DRYRUN,
)
if DRYRUN:
break
s.clear()
if __name__ == "__main__":
main()