@@ -337,7 +337,7 @@ framework::WorkflowSpec InfrastructureGenerator::generateLocalBatchInfrastructur
337337 auto taskConfig = TaskRunnerFactory::extractConfig (infrastructureSpec.common , taskSpec, 0 , 1 );
338338 workflow.emplace_back (TaskRunnerFactory::create (taskConfig));
339339
340- fileSinkInputs.emplace_back (taskSpec. taskName , TaskRunner::createTaskDataOrigin ( taskSpec.detectorName ), TaskRunner::createTaskDataDescription ( taskSpec.taskName ), Lifetime::Sporadic );
340+ fileSinkInputs.emplace_back (createUserInputSpec (DataSourceType::Task, taskSpec.detectorName , taskSpec.taskName ));
341341 }
342342
343343 if (!fileSinkInputs.empty ()) {
@@ -377,9 +377,8 @@ framework::WorkflowSpec InfrastructureGenerator::generateRemoteBatchInfrastructu
377377 // We create an OutputSpec for moving windows for this task only if they are expected.
378378 if (!taskConfig.movingWindows .empty ()) {
379379 fileSourceOutputs.push_back (
380- { RootFileSource::outputBinding (taskSpec.detectorName , taskSpec.taskName , true ),
381- TaskRunner::createTaskDataOrigin (taskSpec.detectorName , true ),
382- TaskRunner::createTaskDataDescription (taskSpec.taskName ), 0 , Lifetime::Sporadic });
380+ createUserOutputSpec (DataSourceType::TaskMovingWindow, taskSpec.detectorName , taskSpec.taskName , 0 ,
381+ RootFileSource::outputBinding (taskSpec.detectorName , taskSpec.taskName , true )));
383382 }
384383 }
385384 if (!fileSourceOutputs.empty ()) {
@@ -530,7 +529,7 @@ void InfrastructureGenerator::generateLocalTaskLocalProxy(framework::WorkflowSpe
530529 std::string remotePort = std::to_string (taskSpec.remotePort );
531530 std::string proxyName = taskSpec.detectorName + " -" + taskName + " -proxy" ;
532531 std::string channelName = taskSpec.detectorName + " -" + taskName + " -proxy" ;
533- InputSpec proxyInput{ channelName, TaskRunner::createTaskDataOrigin ( taskSpec.detectorName , false ), TaskRunner::createTaskDataDescription ( taskName) , static_cast <SubSpec>(id), Lifetime::Sporadic } ;
532+ InputSpec proxyInput = createUserInputSpec (DataSourceType::Task, taskSpec.detectorName , taskName, static_cast <SubSpec>(id), channelName) ;
534533 std::string channelConfig = " name=" + channelName + " ,type=pub,method=connect,address=tcp://" +
535534 taskSpec.remoteMachine + " :" + remotePort + " ,rateLogging=60,transport=zeromq,sndBufSize=4" ;
536535
@@ -554,7 +553,7 @@ void InfrastructureGenerator::generateLocalTaskRemoteProxy(framework::WorkflowSp
554553 Outputs proxyOutputs;
555554 for (size_t id = 1 ; id <= numberOfLocalMachines; id++) {
556555 proxyOutputs.emplace_back (
557- OutputSpec{ { channelName }, TaskRunner::createTaskDataOrigin ( taskSpec.detectorName , false ), TaskRunner::createTaskDataDescription ( taskName) , static_cast <SubSpec>(id), Lifetime::Sporadic } );
556+ createUserOutputSpec (DataSourceType::Task, taskSpec.detectorName , taskName, static_cast <SubSpec>(id), { channelName }) );
558557 }
559558
560559 std::string channelConfig = " name=" + channelName + " ,type=sub,method=bind,address=tcp://*:" + remotePort +
@@ -586,20 +585,14 @@ void InfrastructureGenerator::generateMergers(framework::WorkflowSpec& workflow,
586585 Inputs mergerInputs;
587586 for (size_t id = 1 ; id <= numberOfLocalMachines; id++) {
588587 mergerInputs.emplace_back (
589- InputSpec{ { taskName + std::to_string (id) },
590- TaskRunner::createTaskDataOrigin (detectorName, false ),
591- TaskRunner::createTaskDataDescription (taskName),
592- static_cast <SubSpec>(id),
593- Lifetime::Sporadic });
588+ createUserInputSpec (DataSourceType::Task, detectorName, taskName, static_cast <SubSpec>(id), taskName + std::to_string (id)));
594589 }
595590
596591 MergerInfrastructureBuilder mergersBuilder;
597592 mergersBuilder.setInfrastructureName (taskName);
598593 mergersBuilder.setInputSpecs (mergerInputs);
599- mergersBuilder.setOutputSpec (
600- { { " main" }, TaskRunner::createTaskDataOrigin (detectorName, false ), TaskRunner::createTaskDataDescription (taskName), 0 , Lifetime::Sporadic });
601- mergersBuilder.setOutputSpecMovingWindow (
602- { { " main_mw" }, TaskRunner::createTaskDataOrigin (detectorName, true ), TaskRunner::createTaskDataDescription (taskName), 0 , Lifetime::Sporadic });
594+ mergersBuilder.setOutputSpec (createUserOutputSpec (DataSourceType::Task, detectorName, taskName, 0 , { " main" }));
595+ mergersBuilder.setOutputSpecMovingWindow (createUserOutputSpec (DataSourceType::TaskMovingWindow, detectorName, taskName, 0 , { " main_mw" }));
603596 MergerConfig mergerConfig;
604597 // if we are to change the mode to Full, disable reseting tasks after each cycle.
605598 mergerConfig.inputObjectTimespan = { (mergingMode.empty () || mergingMode == " delta" ) ? InputObjectsTimespan::LastDifference : InputObjectsTimespan::FullHistory };
@@ -630,23 +623,20 @@ void InfrastructureGenerator::generateCheckRunners(framework::WorkflowSpec& work
630623
631624 // todo: avoid code repetition
632625 for (const auto & taskSpec : infrastructureSpec.tasks | std::views::filter (&TaskSpec::active)) {
633- InputSpec taskOutput{ taskSpec. taskName , TaskRunner::createTaskDataOrigin ( taskSpec.detectorName ), TaskRunner::createTaskDataDescription ( taskSpec.taskName ), Lifetime::Sporadic };
626+ InputSpec taskOutput{ createUserInputSpec (DataSourceType::Task, taskSpec.detectorName , taskSpec.taskName ) };
634627 tasksOutputMap.insert ({ DataSpecUtils::label (taskOutput), taskOutput });
635628
636629 bool movingWindowsEnabled = !taskSpec.movingWindows .empty ();
637630 bool synchronousRemote = taskSpec.location == TaskLocationSpec::Local && (infrastructureSpec.workflowType == WorkflowType::Remote || infrastructureSpec.workflowType == WorkflowType::FullChain);
638631 bool asynchronousRemote = infrastructureSpec.workflowType == WorkflowType::RemoteBatch;
639632 if (movingWindowsEnabled && (synchronousRemote || asynchronousRemote)) {
640- InputSpec taskMovingWindowOutput{ taskSpec. taskName , TaskRunner::createTaskDataOrigin ( taskSpec.detectorName , true ), TaskRunner::createTaskDataDescription ( taskSpec.taskName ), Lifetime::Sporadic };
633+ InputSpec taskMovingWindowOutput{ createUserInputSpec (DataSourceType::TaskMovingWindow, taskSpec.detectorName , taskSpec.taskName ) };
641634 tasksOutputMap.insert ({ DataSpecUtils::label (taskMovingWindowOutput), taskMovingWindowOutput });
642635 }
643636 }
644637
645638 for (const auto & ppTaskSpec : infrastructureSpec.postProcessingTasks | std::views::filter (&PostProcessingTaskSpec::active)) {
646- InputSpec ppTaskOutput{ ppTaskSpec.taskName ,
647- PostProcessingDevice::createPostProcessingDataOrigin (ppTaskSpec.detectorName ),
648- PostProcessingDevice::createPostProcessingDataDescription (ppTaskSpec.taskName ),
649- Lifetime::Sporadic };
639+ InputSpec ppTaskOutput{ createUserInputSpec (DataSourceType::PostProcessingTask, ppTaskSpec.detectorName , ppTaskSpec.taskName ) };
650640 tasksOutputMap.insert ({ DataSpecUtils::label (ppTaskOutput), ppTaskOutput });
651641 }
652642
@@ -779,27 +769,18 @@ void InfrastructureGenerator::generatePostProcessing(WorkflowSpec& workflow, con
779769 }
780770}
781771
782- template <typename Type>
783- auto createSinkInput (const std::string& detectorName, const std::string& name) -> framework::InputSpec
784- {
785- const auto outputSpec = Type::createOutputSpec (detectorName, name);
786- auto input = DataSpecUtils::matchingInput (outputSpec);
787- input.binding = name;
788- return input;
789- }
790-
791772void InfrastructureGenerator::generateBookkeepingQualitySink (WorkflowSpec& workflow, const InfrastructureSpec& infrastructureSpec)
792773{
793774 framework::Inputs sinkInputs{};
794775
795776 for (const auto & checkSpec : infrastructureSpec.checks | std::views::filter (&CheckSpec::active) | std::views::filter (&CheckSpec::exportToBookkeeping)) {
796777 ILOG (Debug, Support) << " Adding input to BookkeepingSink from check " << checkSpec.checkName << " and detector: " << checkSpec.detectorName << ENDM;
797- sinkInputs.emplace_back (createSinkInput<Check>( checkSpec.detectorName , checkSpec.checkName ));
778+ sinkInputs.emplace_back (createUserInputSpec (DataSourceType::Check, checkSpec.detectorName , checkSpec.checkName ));
798779 }
799780
800781 for (const auto & aggregatorSpec : infrastructureSpec.aggregators | std::views::filter (&AggregatorSpec::active) | std::views::filter (&AggregatorSpec::exportToBookkeeping)) {
801782 ILOG (Debug, Support) << " Adding input to BookkeepingSink from aggregator " << aggregatorSpec.aggregatorName << " and detector: " << aggregatorSpec.detectorName << ENDM;
802- sinkInputs.emplace_back (createSinkInput<Aggregator>( aggregatorSpec.detectorName , aggregatorSpec.aggregatorName ));
783+ sinkInputs.emplace_back (createUserInputSpec (DataSourceType::Aggregator, aggregatorSpec.detectorName , aggregatorSpec.aggregatorName ));
803784 }
804785
805786 if (sinkInputs.empty ()) {
0 commit comments