Skip to content

Commit 33c43ad

Browse files
committed
Refactor Late Tasks by adopting Actor
1 parent 0fdf3d0 commit 33c43ad

9 files changed

Lines changed: 117 additions & 157 deletions

Framework/include/QualityControl/LateTaskRunnerConfig.h renamed to Framework/include/QualityControl/LateTaskConfig.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,18 @@
1313
#define QC_CORE_LATETASKRUNNERCONFIG_H
1414

1515
#include "QualityControl/UserCodeConfig.h"
16+
#include "QualityControl/DataSourceSpec.h"
1617

1718
#include <Framework/DataProcessorSpec.h>
1819

20+
1921
namespace o2::quality_control::core {
2022

2123
// fixme: should it be LateTaskConfig instead?
22-
struct LateTaskRunnerConfig : public UserCodeConfig {
23-
std::string taskName;
24-
std::string deviceName;
25-
framework::Inputs inputSpecs;
26-
framework::OutputSpec moSpec;
27-
framework::Options options;
24+
struct LateTaskConfig : public UserCodeConfig {
25+
26+
// fixme: we should think what should happen when a user wants a critical late task relying on a non-critical QC task.
27+
// should it be marked as resilient?
2828
bool critical = true;
2929
};
3030

Framework/include/QualityControl/LateTaskFactory.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class LateTaskFactory
3030

3131
/// \brief Create a new instance of a LateTaskInterface.
3232
/// The LateTaskInterface actual class is decided based on the parameters passed.
33-
static LateTaskInterface* create(const LateTaskRunnerConfig& taskConfig, std::shared_ptr<ObjectsManager> objectsManager)
33+
static LateTaskInterface* create(const LateTaskConfig& taskConfig, std::shared_ptr<ObjectsManager> objectsManager)
3434
{
3535
auto* result = root_class_factory::create<LateTaskInterface>(taskConfig.moduleName, taskConfig.className);
3636
result->setName(taskConfig.name);

Framework/include/QualityControl/LateTaskRunner.h

Lines changed: 17 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -17,83 +17,47 @@
1717
/// \author Piotr Konopka
1818
///
1919

20-
#include <Framework/Task.h>
21-
#include <Framework/DataProcessorSpec.h>
22-
#include <Framework/CompletionPolicy.h>
20+
#include "QualityControl/LateTaskConfig.h"
21+
#include "QualityControl/LateTaskRunnerTraits.h"
2322

24-
#include "QualityControl/LateTaskRunnerConfig.h"
23+
#include "QualityControl/ActorTraits.h"
24+
#include "QualityControl/Actor.h"
25+
#include <string_view>
26+
#include <memory>
2527

2628
namespace o2::quality_control::core {
2729

2830
class LateTaskInterface;
2931
class ObjectsManager;
3032

31-
class LateTaskRunner : public framework::Task
33+
class LateTaskRunner : public Actor<LateTaskRunner>
3234
{
33-
/// \brief Number of bytes in data description used for hashing of Task names. See HashDataDescription.h for details
34-
static constexpr size_t taskDescriptionHashLength = 4;
35-
static_assert(taskDescriptionHashLength <= o2::header::DataDescription::size);
36-
3735
public:
38-
explicit LateTaskRunner(const LateTaskRunnerConfig& config);
39-
~LateTaskRunner() override = default;
40-
41-
/// \brief LateTaskRunner's init callback
42-
void init(framework::InitContext& iCtx) override;
43-
/// \brief LateTaskRunner's process callback
44-
void run(framework::ProcessingContext& pCtx) override;
45-
/// \brief LateTaskRunner's finaliseCCDB callback
46-
// void finaliseCCDB(framework::ConcreteDataMatcher& matcher, void* obj) override;
47-
48-
std::string getDeviceName() const { return mTaskConfig.deviceName; };
49-
const framework::Inputs& getInputsSpecs() const { return mTaskConfig.inputSpecs; };
50-
const framework::OutputSpec& getOutputSpec() const { return mTaskConfig.moSpec; };
51-
const framework::Options& getOptions() const { return mTaskConfig.options; };
36+
LateTaskRunner(const ServicesConfig& servicesConfig, const LateTaskConfig& config);
37+
~LateTaskRunner() = default;
5238

53-
/// \brief Data Processor Label to identify all Late Task Runners
54-
static framework::DataProcessorLabel getLabel() { return { "qc-late-task" }; }
55-
/// \brief ID string for all LateTaskRunner devices
56-
static std::string createIdString();
57-
/// \brief Unified DataOrigin for Quality Control tasks
58-
static header::DataOrigin createDataOrigin(const std::string& detectorCode);
59-
/// \brief Unified DataDescription naming scheme for all tasks
60-
static header::DataDescription createDataDescription(const std::string& taskName);
39+
void onStart(framework::ServiceRegistryRef services);
40+
void onInit(framework::InitContext& iCtx);
41+
void onProcess(framework::ProcessingContext& pCtx);
6142

62-
/// \brief Callback for CallbackService::Id::EndOfStream
63-
// void endOfStream(framework::EndOfStreamContext& eosContext) override;
43+
std::string_view getDetectorName() const { return mTaskConfig.detectorName; }
44+
std::string_view getUserCodeName() const { return mTaskConfig.name; }
45+
bool isCritical() const { return mTaskConfig.critical; }
6446

6547
private:
66-
/// \brief Callback for CallbackService::Id::Start (DPL) a.k.a. RUN transition (FairMQ)
67-
// void start(framework::ServiceRegistryRef services);
68-
/// \brief Callback for CallbackService::Id::Stop (DPL) a.k.a. STOP transition (FairMQ)
69-
// void stop(framework::ServiceRegistryRef services);
70-
/// \brief Callback for CallbackService::Id::Reset (DPL) a.k.a. RESET DEVICE transition (FairMQ)
71-
// void reset();
7248

7349
void startOfActivity();
7450
void endOfActivity();
7551
int publish(framework::DataAllocator& outputs);
76-
// void publishCycleStats();
77-
// void updateMonitoringStats(framework::ProcessingContext& pCtx);
78-
// void registerToBookkeeping();
7952

8053
private:
81-
LateTaskRunnerConfig mTaskConfig;
82-
// std::shared_ptr<monitoring::Monitoring> mCollector;
54+
LateTaskConfig mTaskConfig;
8355
std::shared_ptr<LateTaskInterface> mTask;
8456
std::shared_ptr<ObjectsManager> mObjectsManager;
85-
// std::shared_ptr<Timekeeper> mTimekeeper;
86-
Activity mActivity;
8757
ValidityInterval mValidity;
88-
89-
// stats
90-
// int mNumberMessagesReceivedInCycle = 0;
91-
// int mNumberObjectsPublishedInCycle = 0;
92-
// int mTotalNumberObjectsPublished = 0; // over a run
93-
// double mLastPublicationDuration = 0;
94-
// uint64_t mDataReceivedInCycle = 0;
9558
};
9659

9760

61+
9862
}
9963
#endif //QC_CORE_LATETASKRUNNER_H

Framework/include/QualityControl/LateTaskRunnerFactory.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
#include <vector>
2121
#include <optional>
2222

23-
#include "QualityControl/LateTaskRunnerConfig.h"
23+
#include "QualityControl/LateTaskConfig.h"
2424
#include <Framework/DataProcessorSpec.h>
2525

2626
namespace o2::framework
@@ -32,8 +32,9 @@ namespace o2::quality_control::core
3232
{
3333

3434
struct LateTaskSpec;
35-
struct LateTaskRunnerConfig;
35+
struct LateTaskConfig;
3636
struct CommonSpec;
37+
struct ServicesConfig;
3738

3839
/// \brief Factory in charge of creating DataProcessorSpec of LateTaskRunner
3940
class LateTaskRunnerFactory
@@ -45,10 +46,10 @@ class LateTaskRunnerFactory
4546
/// \brief Creates LateTaskRunner
4647
///
4748
/// \param taskConfig
48-
static o2::framework::DataProcessorSpec create(const LateTaskRunnerConfig&);
49+
static o2::framework::DataProcessorSpec create(const ServicesConfig& ServicesConfig, const LateTaskConfig& taskConfig);
4950

5051
/// \brief Knows how to create LateTaskConfig from Specs
51-
static LateTaskRunnerConfig extractConfig(const CommonSpec&, const LateTaskSpec&);
52+
static LateTaskConfig extractConfig(const CommonSpec&, const LateTaskSpec&);
5253

5354
/// \brief Provides necessary customization of the LateTaskRunners.
5455
///
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
#ifndef QUALITYCONTROL_LATETASKRUNNERTRAITS_H
13+
#define QUALITYCONTROL_LATETASKRUNNERTRAITS_H
14+
15+
#include "QualityControl/ActorTraits.h"
16+
17+
#include <string_view>
18+
#include <array>
19+
#include "QualityControl/DataSourceSpec.h"
20+
21+
namespace o2::quality_control::core {
22+
23+
class LateTaskRunner;
24+
25+
template <>
26+
struct ActorTraits<LateTaskRunner>
27+
{
28+
constexpr static std::string_view sActorTypeShort{"late"};
29+
constexpr static std::string_view sActorTypeKebabCase{"qc-late-task"};
30+
constexpr static std::string_view sActorTypeUpperCamelCase{"LateTaskRunner"};
31+
32+
constexpr static size_t sDataDescriptionHashLength{4};
33+
34+
constexpr static std::array<DataSourceType, 5> sConsumedDataSources{
35+
DataSourceType::Task, DataSourceType::TaskMovingWindow, DataSourceType::Check, DataSourceType::Aggregator, DataSourceType::LateTask
36+
};
37+
constexpr static std::array<DataSourceType, 1> sPublishedDataSources{DataSourceType::LateTask};
38+
39+
constexpr static std::array<Service, 2> sRequiredServices{Service::InfoLogger, Service::Monitoring};
40+
constexpr static bkp::DplProcessType sDplProcessType{bkp::DplProcessType::QC_POSTPROCESSING}; // todo at some point we should add a new type in BKP
41+
42+
constexpr static UserCodeInstanceCardinality sUserCodeInstanceCardinality{UserCodeInstanceCardinality::One};
43+
constexpr static bool sDetectorSpecific{true};
44+
constexpr static Criticality sCriticality{Criticality::UserDefined};
45+
};
46+
47+
static_assert(ValidActorTraits<ActorTraits<LateTaskRunner>>);
48+
49+
}
50+
51+
#endif //QUALITYCONTROL_LATETASKRUNNERTRAITS_H

Framework/src/InfrastructureGenerator.cxx

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@
5151
#include <vector>
5252
#include <ranges>
5353

54+
#include "QualityControl/ActorHelpers.h"
55+
#include "QualityControl/InputUtils.h"
56+
5457
using namespace o2::framework;
5558
using namespace o2::configuration;
5659
using namespace o2::mergers;
@@ -654,10 +657,7 @@ void InfrastructureGenerator::generateCheckRunners(framework::WorkflowSpec& work
654657
}
655658

656659
for (const auto& lateTaskSpec : infrastructureSpec.lateTasks | std::views::filter(&LateTaskSpec::active)) {
657-
InputSpec lateTaskOutput{ lateTaskSpec.taskName,
658-
LateTaskRunner::createDataOrigin(lateTaskSpec.detectorName),
659-
LateTaskRunner::createDataDescription(lateTaskSpec.taskName),
660-
Lifetime::Sporadic };
660+
InputSpec lateTaskOutput = createUserInputSpec<LateTaskRunner, DataSourceType::LateTask>(lateTaskSpec.detectorName, lateTaskSpec.taskName);
661661
tasksOutputMap.insert({ DataSpecUtils::label(lateTaskOutput), lateTaskOutput });
662662
}
663663

@@ -838,7 +838,10 @@ void InfrastructureGenerator::generateLateTasks(framework::WorkflowSpec& workflo
838838
}
839839

840840
for (const auto& lateTaskSpec : infrastructureSpec.lateTasks | std::views::filter(&LateTaskSpec::active)) {
841-
DataProcessorSpec spec = LateTaskRunnerFactory::create(LateTaskRunnerFactory::extractConfig(infrastructureSpec.common, lateTaskSpec));
841+
auto servicesConfig = actor_helpers::extractConfig(infrastructureSpec.common);
842+
auto lateTaskConfig = LateTaskRunnerFactory::extractConfig(infrastructureSpec.common, lateTaskSpec);
843+
844+
DataProcessorSpec spec = LateTaskRunnerFactory::create(servicesConfig, lateTaskConfig);
842845
workflow.emplace_back(std::move(spec));
843846
}
844847
}

Framework/src/InfrastructureSpecReader.cxx

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515

1616
#include "QualityControl/InfrastructureSpecReader.h"
1717
#include "QualityControl/QcInfoLogger.h"
18+
#include "QualityControl/LateTaskRunnerTraits.h"
1819
#include "QualityControl/TaskRunner.h"
1920
#include "QualityControl/PostProcessingDevice.h"
2021
#include "QualityControl/Check.h"
2122
#include "QualityControl/AggregatorRunner.h"
2223
#include "QualityControl/LateTaskRunner.h"
24+
#include "QualityControl/InputUtils.h"
2325

2426
#include <DataSampling/DataSampling.h>
2527
#include <Framework/DataDescriptorQueryBuilder.h>
@@ -301,7 +303,7 @@ DataSourceSpec InfrastructureSpecReader::readSpecEntry<DataSourceSpec>(const std
301303
dss.name = wholeTree.get<std::string>("qc.lateTasks." + dss.id + ".taskName", dss.id);
302304
auto detectorName = wholeTree.get<std::string>("qc.lateTasks." + dss.id + ".detectorName");
303305

304-
dss.inputs = { { dss.name, LateTaskRunner::createDataOrigin(detectorName), LateTaskRunner::createDataDescription(dss.name), 0, Lifetime::Sporadic } };
306+
dss.inputs = { createUserInputSpec<LateTaskRunner, DataSourceType::LateTask>(detectorName, dss.name) };
305307
if (dataSourceTree.count("MOs") > 0) {
306308
for (const auto& moName : dataSourceTree.get_child("MOs")) {
307309
dss.subInputs.push_back(moName.second.get_value<std::string>());

Framework/src/LateTaskRunner.cxx

Lines changed: 11 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -56,17 +56,19 @@ using namespace o2::monitoring;
5656
namespace o2::quality_control::core
5757
{
5858

59-
LateTaskRunner::LateTaskRunner(const LateTaskRunnerConfig& config)
60-
: mTaskConfig(config)
59+
LateTaskRunner::LateTaskRunner(const ServicesConfig& servicesConfig, const LateTaskConfig& config)
60+
: Actor<LateTaskRunner>(servicesConfig), mTaskConfig(config)
6161
{
62+
// fixme: this should be moved to Actor
6263
o2::ccdb::BasicCCDBManager::instance().setFatalWhenNull(false);
6364
}
6465

65-
void LateTaskRunner::init(InitContext& iCtx)
66+
void LateTaskRunner::onInit(InitContext& iCtx)
6667
{
68+
// fixme: move exception handling to Actor
6769
try {
6870
// setup publisher
69-
mObjectsManager = std::make_shared<ObjectsManager>(mTaskConfig.taskName, mTaskConfig.className, mTaskConfig.detectorName, 0);
71+
mObjectsManager = std::make_shared<ObjectsManager>(mTaskConfig.name, mTaskConfig.className, mTaskConfig.detectorName, 0);
7072

7173
// setup user's task
7274
mTask.reset(LateTaskFactory::create(mTaskConfig, mObjectsManager));
@@ -78,14 +80,15 @@ void LateTaskRunner::init(InitContext& iCtx)
7880
mValidity = gInvalidValidityInterval;
7981

8082
// todo move to start
81-
mTask->startOfActivity({});
83+
mTask->startOfActivity(getActivity());
84+
mObjectsManager->setActivity(getActivity());
8285
} catch (boost::exception& e) {
8386
ILOG(Info, Devel) << "exception during init " << diagnostic_information(e) << ENDM;
8487
throw;
8588
}
8689
}
8790

88-
void LateTaskRunner::run(ProcessingContext& pCtx)
91+
void LateTaskRunner::onProcess(ProcessingContext& pCtx)
8992
{
9093
// todo: derive from received objects
9194
mValidity.update(getCurrentTimestamp());
@@ -101,7 +104,7 @@ void LateTaskRunner::run(ProcessingContext& pCtx)
101104

102105
// fixme: come up with an elegant way for this. LateTask should be probably aware of the requested user inputs aside from just InputSpecs
103106
// also, we should filter only expect objects from the received collections
104-
if (dataOrigin.str[0] == 'Q' || dataOrigin.str[0] == 'W') { // main MOs and moving windows from QC tasks
107+
if (dataOrigin.str[0] == 'Q' || dataOrigin.str[0] == 'W' || dataOrigin.str[0] == 'L') { // main MOs and moving windows from QC tasks, and outputs of late tasks
105108
auto moc = DataRefUtils::as<MonitorObjectCollection>(ref);
106109
moc->postDeserialization();
107110

@@ -124,51 +127,15 @@ void LateTaskRunner::run(ProcessingContext& pCtx)
124127
}
125128
}
126129

127-
128-
129130
// run the task
130131
mTask->process(taskInputs);
131132

132133
// publish objects
133134
mObjectsManager->setValidity(mValidity);
134135
std::unique_ptr<MonitorObjectCollection> array(mObjectsManager->getNonOwningArray());
135-
auto concreteOutput = framework::DataSpecUtils::asConcreteDataMatcher(mTaskConfig.moSpec);
136-
pCtx.outputs().snapshot(Output{ concreteOutput.origin, concreteOutput.description, concreteOutput.subSpec }, *array);
136+
pCtx.outputs().snapshot(mTaskConfig.name, *array);
137137
mObjectsManager->stopPublishing(PublicationPolicy::Once);
138138
}
139139

140-
/// \brief ID string for all LateTaskRunner devices
141-
std::string LateTaskRunner::createIdString()
142-
{
143-
return { "qc-late-task" };
144-
}
145-
/// \brief Unified DataOrigin for Quality Control tasks
146-
header::DataOrigin LateTaskRunner::createDataOrigin(const std::string& detectorCode)
147-
{
148-
std::string originStr = "L";
149-
if (detectorCode.empty()) {
150-
ILOG(Warning, Support) << "empty detector code for a task data origin, trying to survive with: DET" << ENDM;
151-
originStr += "DET";
152-
} else if (detectorCode.size() > 3) {
153-
ILOG(Warning, Support) << "too long detector code for a task data origin: " + detectorCode + ", trying to survive with: " + detectorCode.substr(0, 3) << ENDM;
154-
originStr += detectorCode.substr(0, 3);
155-
} else {
156-
originStr += detectorCode;
157-
}
158-
o2::header::DataOrigin origin;
159-
origin.runtimeInit(originStr.c_str());
160-
return origin;
161-
}
162-
163-
/// \brief Unified DataDescription naming scheme for all tasks
164-
header::DataDescription LateTaskRunner::createDataDescription(const std::string& lateTaskName)
165-
{
166-
if (lateTaskName.empty()) {
167-
BOOST_THROW_EXCEPTION(FatalException() << errinfo_details("Empty lateTaskName for task's data description"));
168-
}
169-
170-
return quality_control::core::createDataDescription(lateTaskName, LateTaskRunner::taskDescriptionHashLength);
171-
}
172-
173140

174141
} // namespace o2::quality_control::core

0 commit comments

Comments
 (0)