-
Notifications
You must be signed in to change notification settings - Fork 192
Expand file tree
/
Copy patho2dpg_qc_finalization_workflow.py
More file actions
executable file
·139 lines (107 loc) · 6.5 KB
/
o2dpg_qc_finalization_workflow.py
File metadata and controls
executable file
·139 lines (107 loc) · 6.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#!/usr/bin/env python3
#
# A script producing the QC finalization workflow.
# If run as main, it will dump the workflow to the specified output file and tasks will not have dependencies.
# For example:
# ${O2DPG_ROOT}/MC/bin/o2dpg_qc_finalization_workflow.py -o qc_workflow.json
# The script can be also imported.
# In such case, one can use include_all_QC_finalization to get the QC finalization from within other workflow script.
import sys
import argparse
from os import environ, mkdir
from os.path import join, dirname, isdir
sys.path.append(join(dirname(__file__), '.', 'o2dpg_workflow_utils'))
from o2dpg_workflow_utils import createTask, dump_workflow
def getDPL_global_options(bigshm=False, noIPC=None):
common="-b --run --driver-client-backend ws:// "
if noIPC != None:
return common + " --no-IPC "
if bigshm:
return common + " --shm-segment-size ${SHMSIZE:-50000000000} "
else:
return common
def QC_finalize_name(name):
return name + "_finalize"
qcdir = "QC"
def include_all_QC_finalization(ntimeframes, standalone, run, productionTag):
stages = []
## Adds a 'remote-batch' part of standard QC workflows
# taskName - name of the QC workflow, it should be the same as in the main workflow
# qcConfigPath - path to the QC config file
# needs - a list of tasks to be finished first. By default, the function puts the 'local-batch' part of the QC workflow
def add_QC_finalization(taskName, qcConfigPath, needs=None):
if standalone == True:
needs = []
elif needs is None:
needs = [taskName + f'_local_{tf}' for tf in range(1, ntimeframes + 1)]
task = createTask(name=QC_finalize_name(taskName), needs=needs, cwd=qcdir, lab=["QC"], cpu=1, mem='2000')
task['cmd'] = f'o2-qc --config {qcConfigPath} --remote-batch {taskName}.root' + \
f' --override-values "qc.config.Activity.number={run};qc.config.Activity.periodName={productionTag}"' + \
' ' + getDPL_global_options()
stages.append(task)
## Adds a postprocessing QC workflow
# taskName - name of the QC workflow
# qcConfigPath - path to the QC config file
# needs - a list of tasks to be finished first. Usually it should include QC finalization tasks
# which produce objects needed for given post-processing
# runSpecific - if set as true, a concrete run number is put to the QC config,
# thus the post-processing should cover objects only for that run
# prodSpecific - if set as true, a concrete production name is put to the config,
# thus the post-processing should cover objects only for that production
def add_QC_postprocessing(taskName, qcConfigPath, needs, runSpecific, prodSpecific):
task = createTask(name=taskName, needs=needs, cwd=qcdir, lab=["QC"], cpu=1, mem='2000')
overrideValues = '--override-values "'
overrideValues += f'qc.config.Activity.number={run};' if runSpecific else 'qc.config.Activity.number=0;'
overrideValues += f'qc.config.Activity.periodName={productionTag}"' if prodSpecific else 'qc.config.Activity.periodName="'
task['cmd'] = f'o2-qc --config {qcConfigPath} ' + \
overrideValues + ' ' + getDPL_global_options()
stages.append(task)
## The list of remote-batch workflows (reading the merged QC tasks results, applying Checks, uploading them to QCDB)
MFTDigitsQCneeds = []
for flp in range(5):
MFTDigitsQCneeds.extend([f'mftDigitsQC{flp}_local_{tf}' for tf in range(1, ntimeframes + 1)])
add_QC_finalization('mftDigitsQC', 'json://${O2DPG_ROOT}/MC/config/QC/json/qc-mft-digit-0.json', MFTDigitsQCneeds)
add_QC_finalization('mftClustersQC', 'json://${O2DPG_ROOT}/MC/config/QC/json/qc-mft-cluster.json')
add_QC_finalization('mftAsyncQC', 'json://${O2DPG_ROOT}/MC/config/QC/json/qc-mft-async.json')
add_QC_finalization('emcCellQC', 'json://${O2DPG_ROOT}/MC/config/QC/json/emc-cell-task.json')
#add_QC_finalization('tpcTrackingQC', 'json://${O2DPG_ROOT}/MC/config/QC/json/tpc-qc-tracking-direct.json')
add_QC_finalization('tpcStandardQC', 'json://${O2DPG_ROOT}/MC/config/QC/json/tpc-qc-standard-direct.json')
add_QC_finalization('trdDigitsQC', 'json://${O2DPG_ROOT}/MC/config/QC/json/trd-digits-task.json')
add_QC_finalization('vertexQC', 'json://${O2DPG_ROOT}/MC/config/QC/json/vertexing-qc-direct-mc.json')
add_QC_finalization('ITSTPCmatchQC', 'json://${O2DPG_ROOT}/MC/config/QC/json/ITSTPCmatchedTracks_direct_MC.json')
add_QC_finalization('TOFMatchQC', 'json://${O2DPG_ROOT}/MC/config/QC/json/tofMatchedTracks_ITSTPCTOF_TPCTOF_direct_MC.json')
add_QC_finalization('tofDigitsQC', 'json://${O2DPG_ROOT}/MC/config/QC/json/tofdigits.json')
add_QC_finalization('TOFMatchWithTRDQC', 'json://${O2DPG_ROOT}/MC/config/QC/json/tofMatchedTracks_AllTypes_direct_MC.json')
add_QC_finalization('ITSTrackSimTask', 'json://${O2DPG_ROOT}/MC/config/QC/json/its-mc-tracks-qc.json')
add_QC_finalization('tofft0PIDQC', 'json://${O2DPG_ROOT}/MC/config/QC/json/pidft0tof.json')
add_QC_finalization('tofPIDQC', 'json://${O2DPG_ROOT}/MC/config/QC/json/pidtof.json')
add_QC_finalization('RecPointsQC', 'json://${O2DPG_ROOT}/MC/config/QC/json/ft0-reconstruction-config.json')
# The list of QC Post-processing workflows
add_QC_postprocessing('tofTrendingHits', 'json://${O2DPG_ROOT}/MC/config/QC/json/tof-trending-hits.json', [QC_finalize_name('tofDigitsQC')], runSpecific=False, prodSpecific=True)
return stages
def main() -> int:
parser = argparse.ArgumentParser(description='Create the ALICE QC finalization workflow')
parser.add_argument('--noIPC',help='disable shared memory in DPL')
parser.add_argument('-o',help='output workflow file', default='workflow.json')
parser.add_argument('-run',help="Run number for this MC", default=300000)
parser.add_argument('-productionTag',help="Production tag for this MC", default='unknown')
args = parser.parse_args()
print (args)
# make sure O2DPG + O2 is loaded
O2DPG_ROOT=environ.get('O2DPG_ROOT')
O2_ROOT=environ.get('O2_ROOT')
QUALITYCONTROL_ROOT=environ.get('QUALITYCONTROL_ROOT')
if O2DPG_ROOT == None:
print('Error: This needs O2DPG loaded')
if O2_ROOT == None:
print('Error: This needs O2 loaded')
if QUALITYCONTROL_ROOT is None:
print('Error: This needs QUALITYCONTROL_ROOT loaded')
if not isdir(qcdir):
mkdir(qcdir)
workflow={}
workflow['stages'] = include_all_QC_finalization(ntimeframes=1, standalone=True, run=args.run, productionTag=args.productionTag)
dump_workflow(workflow["stages"], args.o)
return 0
if __name__ == '__main__':
sys.exit(main())