Skip to content

Commit 9966305

Browse files
sawenzeldavidrohr
authored andcommitted
Full system test as a DAG pipeline
This demonstrates how the full system test can be built and run as a DAG pipeline. We separate description of the workflow and runtime. One of the benefits coming with this are automatic parallel scheduling of QED + normal simulation and parallel RAW data creation (as long as this is possible in memory). Makes use of a recent development prototyped in O2DPG https://github.com/AliceO2Group/O2DPG/blob/master/MC/doc/WorkflowRunner.md The commit should be seen as a first demonstrator rather than a full conversion.
1 parent bea0eac commit 9966305

2 files changed

Lines changed: 218 additions & 0 deletions

File tree

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
#!/usr/bin/env python3
2+
#
3+
# A script producing a consistent full-system-test workflow.
4+
# Python used for convenient json handling but this is completely up to the workflow writer.
5+
#
6+
from os import environ
7+
import json
8+
9+
workflow={}
10+
workflow['stages'] = []
11+
12+
taskcounter=0
13+
def Task(name='', needs=[], tf=-1, cmd='a', cwd='./', lab=[]):
14+
global taskcounter
15+
taskcounter = taskcounter + 1
16+
t = { 'name': name, 'cmd': cmd, 'needs': needs, 'resources': { 'cpu': -1 , 'mem': -1 }, 'timeframe' : tf, 'labels' : lab, 'cwd' : cwd }
17+
workflow['stages'].append(t)
18+
return t
19+
20+
# ---- qed transport task -------
21+
QED=Task(name='qedsim', lab=["QED", "SIM"], cwd='qed')
22+
QED['cmd']='o2-sim --seed $O2SIMSEED -j $NJOBS -n$NEventsQED -m PIPE ITS MFT FT0 FV0 FDD -g extgen --configKeyValues \"GeneratorExternal.fileName=$O2_ROOT/share/Generators/external/QEDLoader.C;QEDGenParam.yMin=-7;QEDGenParam.yMax=7;QEDGenParam.ptMin=0.001;QEDGenParam.ptMax=1.;Diamond.width[2]=6.\"'
23+
24+
QED2HAD=Task(name='qed2had', lab=["QED", "SIM"], cwd='qed', needs=["qedsim"])
25+
QED2HAD['cmd']='PbPbXSec="8." ; awk "BEGIN {printf \\"%.2f\\",`grep xSectionQED qedgenparam.ini | cut -d\'=\' -f 2`/$PbPbXSec}" > qed2had.log'
26+
#echo "Obtained ratio of QED to hadronic x-sections = $QED2HAD" >> qedsim.log
27+
28+
# --- signal sim and digitization ----
29+
SIM=Task(name="sim", lab=["SIM"])
30+
SIM['cmd']='o2-sim --seed $O2SIMSEED -n $NEvents --skipModules ZDC --configKeyValues "Diamond.width[2]=6." -g pythia8hi -j $NJOBS'
31+
32+
DIGI1=Task(name="digi", lab=["DIGI"], needs=["sim", "qed2had"])
33+
DIGI1['cmd']='QED2HAD=`cat qed/qed2had.log`; echo ${QED2HAD}; o2-sim-digitizer-workflow -n $NEvents --simPrefixQED qed/o2sim --qed-x-section-ratio ${QED2HAD} ${NOMCLABELS} --firstOrbit 0 --firstBC 0 --skipDet TRD --tpc-lanes $((NJOBS < 36 ? NJOBS : 36)) --shm-segment-size $SHMSIZE ${GLOBALDPLOPT}'
34+
35+
# the dependency on digi is because of collisioncontext
36+
DIGI2=Task(name='digiTRD', lab=["DIGI"], needs=["sim", "digi"])
37+
DIGI2['cmd']='o2-sim-digitizer-workflow -n $NEvents ${NOMCLABELS} --firstOrbit 0 --firstBC 0 --onlyDet TRD --shm-segment-size $SHMSIZE ${GLOBALDPLOPT} --incontext collisioncontext.root --configKeyValues "TRDSimParams.digithreads=${NJOBS}"'
38+
39+
allrawtasknames=[]
40+
def RAWTask(name, command):
41+
allrawtasknames.append(name)
42+
return Task(name=name, cmd=command, lab=["RAW"], needs=["digi"])
43+
44+
ITSRAW=RAWTask('itsraw', 'o2-its-digi2raw --file-for link --configKeyValues \"HBFUtils.nHBFPerTF=128;HBFUtils.orbitFirst=0\" -o raw/ITS')
45+
MFTRAW=RAWTask('mftraw', 'o2-mft-digi2raw --file-for link --configKeyValues \"HBFUtils.nHBFPerTF=128;HBFUtils.orbitFirst=0\" -o raw/MFT')
46+
FT0RAW=RAWTask('ft0raw', 'o2-ft0-digi2raw --file-per-link --configKeyValues \"HBFUtils.nHBFPerTF=128;HBFUtils.orbitFirst=0\" -o raw/FT0')
47+
FV0RAW=RAWTask('fv0raw', 'o2-fv0-digi2raw --file-per-link --configKeyValues \"HBFUtils.nHBFPerTF=128;HBFUtils.orbitFirst=0\" -o raw/FV0')
48+
FDDRAW=RAWTask('fddraw', 'o2-fdd-digit2raw --file-per-link --configKeyValues \"HBFUtils.nHBFPerTF=128;HBFUtils.orbitFirst=0\" -o raw/FDD')
49+
TPCRAW=RAWTask('tpcraw', 'o2-tpc-digits-to-rawzs --file-for link --configKeyValues \"HBFUtils.nHBFPerTF=128;HBFUtils.orbitFirst=0\" -i tpcdigits.root -o raw/TPC')
50+
TOFRAW=RAWTask('tofraw', 'o2-tof-reco-workflow ${GLOBALDPLOPT} --tof-raw-file-for link --configKeyValues \"HBFUtils.nHBFPerTF=128;HBFUtils.orbitFirst=0\" --output-type raw --tof-raw-outdir raw/TOF')
51+
TOFRAW=RAWTask('midraw', 'o2-mid-digits-to-raw-workflow ${GLOBALDPLOPT} --mid-raw-outdir raw/MID --mid-raw-perlink --configKeyValues \"HBFUtils.nHBFPerTF=128;HBFUtils.orbitFirst=0\"')
52+
EMCRAW=RAWTask('emcraw', 'o2-emcal-rawcreator --file-for link --configKeyValues \"HBFUtils.nHBFPerTF=128;HBFUtils.orbitFirst=0\" -o raw/EMC')
53+
PHSRAW=RAWTask('phsraw', 'o2-phos-digi2raw --file-for link --configKeyValues \"HBFUtils.nHBFPerTF=128;HBFUtils.orbitFirst=0\" -o raw/PHS')
54+
CPVRAW=RAWTask('cpvraw', 'o2-cpv-digi2raw --file-for link --configKeyValues \"HBFUtils.nHBFPerTF=128;HBFUtils.orbitFirst=0\" -o raw/CPV')
55+
56+
# make configuration -> this depends on all previous raws
57+
Task('rawAllConfig', cmd='cat raw/*/*.cfg > rawAll.cfg', needs=allrawtasknames, lab=["RAW"])
58+
59+
# now emit all possible versions of the dpl workflow (WITHGPU, NOGPU, AYNC)
60+
RECO_ENV={ "WITHGPU": { "CREATECTFDICT":"0",
61+
"GPUTYPE":"CUDA",
62+
"GPUMEMSIZE":"6000000000",
63+
"HOSTMEMSIZE":"1000000000",
64+
"SYNCMODE":"1",
65+
"CTFINPUT":"0",
66+
"SAVECTF":"0" },
67+
"ASYNC": {
68+
"CREATECTFDICT":"0",
69+
"GPUTYPE":"CPU",
70+
"SYNCMODE":"0",
71+
"HOSTMEMSIZE":"$TPCTRACKERSCRATCHMEMORY",
72+
"CTFINPUT":"1",
73+
"SAVECTF":"0"
74+
},
75+
"NOGPU": {
76+
"CREATECTFDICT":"1",
77+
"GPUTYPE":"CPU",
78+
"SYNCMODE":"0",
79+
"HOSTMEMSIZE":"$TPCTRACKERSCRATCHMEMORY",
80+
"CTFINPUT":"0",
81+
"SAVECTF":"1"
82+
}
83+
}
84+
85+
for stage in [ "NOGPU", "WITHGPU", "ASYNC" ]:
86+
t=Task(name='reco_' + stage, needs=['rawAllConfig'], lab=["RECO"])
87+
t['env']=RECO_ENV[stage]
88+
precommand=""
89+
if stage=="NOGPU":
90+
precommand="rm -f ctf_dictionary.root;"
91+
t['cmd']=precommand + "$O2_ROOT/prodtests/full-system-test/dpl-workflow.sh"
92+
93+
def trimString(cmd):
94+
return ' '.join(cmd.split())
95+
96+
# insert taskwrapper stuff
97+
for s in workflow['stages']:
98+
s['cmd']='. ${O2_ROOT}/share/scripts/jobutils.sh; taskwrapper ' + s['name']+'.log \'' + s['cmd'] + '\''
99+
100+
# remove whitespaces etc
101+
for s in workflow['stages']:
102+
s['cmd']=trimString(s['cmd'])
103+
104+
# write workflow to json
105+
workflowfile='workflow.json'
106+
with open(workflowfile, 'w') as outfile:
107+
json.dump(workflow, outfile, indent=2)
108+
109+
exit (0)
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
#!/bin/bash
2+
#
3+
# A workflow performing a full system test:
4+
# - simulation of digits
5+
# - creation of raw data
6+
# - reconstruction of raw data
7+
#
8+
# Note that this might require a production server to run.
9+
#
10+
# This script can use additional binary objects which can be optionally provided:
11+
# - matbud.root + ITSdictionary.bin
12+
#
13+
# authors: D. Rohr / S. Wenzel
14+
15+
# include jobutils, which notably brings
16+
# --> the taskwrapper as a simple control and monitoring tool
17+
# (look inside the jobutils.sh file for documentation)
18+
# --> utilities to query CPU count
19+
. ${O2_ROOT}/share/scripts/jobutils.sh
20+
21+
export NEvents=${NEvents:-10} #550 for full TF (the number of PbPb events)
22+
export NEventsQED=${NEventsQED:-1000} #35000 for full TF
23+
export NCPUS=$(getNumberOfPhysicalCPUCores)
24+
echo "Found ${NCPUS} physical CPU cores"
25+
export NJOBS=${NJOBS:-"${NCPUS}"}
26+
export SHMSIZE=${SHMSIZE:-8000000000} # Size of shared memory for messages (use 128 GB for 550 event full TF)
27+
export TPCTRACKERSCRATCHMEMORY=${SHMSIZE:-4000000000} # Size of memory allocated by TPC tracker. (Use 24 GB for 550 event full TF)
28+
export ENABLE_GPU_TEST=${ENABLE_GPU_TEST:-0} # Run the full system test also on the GPU
29+
export NTIMEFRAMES=${NTIMEFRAMES:-1} # Number of time frames to process
30+
export TFDELAY=${TFDELAY:-100} # Delay in seconds between publishing time frames
31+
export NOMCLABELS="--disable-mc"
32+
export O2SIMSEED=${O2SIMSEED:--1}
33+
34+
# allow skipping
35+
JOBUTILS_SKIPDONE=ON
36+
# enable memory monitoring (independent on whether DPL or not)
37+
JOBUTILS_MONITORMEM=ON
38+
# CPU monitoring JOBUTILS_MONITORCPU=ON
39+
40+
# prepare some metrics file for the monitoring system
41+
METRICFILE=metrics.dat
42+
CONFIG="full_system_test_N${NEvents}"
43+
HOST=`hostname`
44+
45+
# include header information such as tested alidist tag and O2 tag
46+
TAG="conf=${CONFIG},host=${HOST}${ALIDISTCOMMIT:+,alidist=$ALIDISTCOMMIT}${O2COMMIT:+,o2=$O2COMMIT}"
47+
echo "versions,${TAG} alidist=\"${ALIDISTCOMMIT}\",O2=\"${O2COMMIT}\" " > ${METRICFILE}
48+
49+
ulimit -n 4096 # Make sure we can open sufficiently many files
50+
51+
export GLOBALDPLOPT="-b" # --monitoring-backend no-op:// is currently removed due to https://alice.its.cern.ch/jira/browse/O2-1887
52+
53+
mkdir -p raw
54+
55+
# create the full workflow as json
56+
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
57+
${DIR}/full-system-test/create_full_system_pipeline.py
58+
59+
# create a visualization of the pipeline-workflow (in workflow.gv.pdf) and print the list of tasks
60+
${O2DPG_ROOT}/MC/bin/o2_dpg_workflow_runner.py -f workflow.json --visualize-workflow --list-tasks
61+
62+
# run the workflow (not constraining in parallelism) until RAW creation is finished
63+
${O2DPG_ROOT}/MC/bin/o2_dpg_workflow_runner.py -f workflow.json --target-labels RAW
64+
65+
# We run the workflow in both CPU-only and With-GPU mode
66+
STAGES="NOGPU"
67+
if [ $ENABLE_GPU_TEST != "0" ]; then
68+
STAGES+=" WITHGPU"
69+
fi
70+
STAGES+=" ASYNC"
71+
for STAGE in $STAGES; do
72+
logfile=reco_${STAGE}.log
73+
export SHMSIZE
74+
export NTIMEFRAMES
75+
export TFDELAY
76+
export GLOBALDPLOPT
77+
78+
# run each of the reco but in strictly serial mode
79+
${O2DPG_ROOT}/MC/bin/o2_dpg_workflow_runner.py -f workflow.json -tt reco_${STAGE} -jmax 1
80+
81+
# --- record interesting metrics to monitor ----
82+
# boolean flag indicating if workflow completed successfully at all
83+
RC=$?
84+
SUCCESS=0
85+
[ -f "${logfile}_done" ] && [ "$RC" = 0 ] && SUCCESS=1
86+
echo "success_${STAGE},${TAG} value=${SUCCESS}" >> ${METRICFILE}
87+
88+
if [ "${SUCCESS}" = "1" ]; then
89+
# runtime
90+
walltime=`grep "#walltime" ${logfile}_time | awk '//{print $2}'`
91+
echo "walltime_${STAGE},${TAG} value=${walltime}" >> ${METRICFILE}
92+
93+
# GPU reconstruction (also in CPU version) processing time
94+
gpurecotime=`grep "tpc-tracker" reco_NOGPU.log | grep -e "Total Wall Time:" | awk '//{printf "%f", $6/1000000}'`
95+
echo "gpurecotime_${STAGE},${TAG} value=${gpurecotime}" >> ${METRICFILE}
96+
97+
# memory
98+
maxmem=`awk '/PROCESS MAX MEM/{print $5}' ${logfile}` # in MB
99+
avgmem=`awk '/PROCESS AVG MEM/{print $5}' ${logfile}` # in MB
100+
echo "maxmem_${STAGE},${TAG} value=${maxmem}" >> ${METRICFILE}
101+
echo "avgmem_${STAGE},${TAG} value=${avgmem}" >> ${METRICFILE}
102+
103+
# some physics quantities
104+
tpctracks=`grep "tpc-tracker" ${logfile} | grep -e "found.*track" | awk '//{print $4}'`
105+
echo "tpctracks_${STAGE},${TAG} value=${tpctracks}" >> ${METRICFILE}
106+
tpcclusters=`grep -e "Event has.*TPC Clusters" ${logfile} | awk '//{print $5}'`
107+
echo "tpcclusters_${STAGE},${TAG} value=${tpcclusters}" >> ${METRICFILE}
108+
fi
109+
done

0 commit comments

Comments
 (0)