diff --git a/Framework/TestWorkflows/CMakeLists.txt b/Framework/TestWorkflows/CMakeLists.txt index 8548d6570e4a4..b147a4871bf26 100644 --- a/Framework/TestWorkflows/CMakeLists.txt +++ b/Framework/TestWorkflows/CMakeLists.txt @@ -119,6 +119,10 @@ o2_add_dpl_workflow(simple-sink SOURCES src/o2SimpleSink.cxx COMPONENT_NAME TestWorkflows) +o2_add_dpl_workflow(simple-processor + SOURCES src/o2SimpleProcessor.cxx + COMPONENT_NAME TestWorkflows) + o2_add_dpl_workflow(analysis-workflow SOURCES src/o2AnalysisWorkflow.cxx COMPONENT_NAME TestWorkflows) diff --git a/Framework/TestWorkflows/scripts/mock-calibration.sh b/Framework/TestWorkflows/scripts/mock-calibration.sh new file mode 100755 index 0000000000000..a56fcdf45561c --- /dev/null +++ b/Framework/TestWorkflows/scripts/mock-calibration.sh @@ -0,0 +1,7 @@ +#/bin/sh -ex +export DPL_SIGNPOSTS="calibration" +stage/bin/o2-dpl-raw-proxy --exit-transition-timeout 20 --data-processing-timeout 10 --dataspec "tst:TST/A/0" --channel-config "readout-proxy:address=tcp://0.0.0.0:4200,method=connect,type=pair" | \ + stage/bin/o2-testworkflows-simple-processor --exit-transition-timeout 20 --data-processing-timeout 10 --name reconstruction --processing-delay 5000 --eos-dataspec tst3:TST/C/0 --in-dataspec "tst2:TST/A/0" --out-dataspec "tst:TST/B/0" | \ + stage/bin/o2-testworkflows-simple-processor --exit-transition-timeout 20 --data-processing-timeout 10 --name calibration --processing-delay 1000 --in-dataspec "tst2:TST/C/0?lifetime=sporadic" --out-dataspec "tst:TCL/C/0?lifetime=sporadic" | \ + stage/bin/o2-testworkflows-simple-sink --exit-transition-timeout 20 --data-processing-timeout 10 --name calibration-publisher --dataspec "tst2:TCL/C/0?lifetime=sporadic" | \ + stage/bin/o2-testworkflows-simple-sink --exit-transition-timeout 20 --data-processing-timeout 10 --dataspec "tst:TST/B/0" diff --git a/Framework/TestWorkflows/scripts/mock-flp.sh b/Framework/TestWorkflows/scripts/mock-flp.sh new file mode 100755 index 0000000000000..c1ad7c2f0dbaf --- /dev/null +++ b/Framework/TestWorkflows/scripts/mock-flp.sh @@ -0,0 +1,3 @@ +#/bin/sh -ex +stage/bin/o2-testworkflows-simple-source --dataspec tst:TST/A/0 --delay 1000 | \ + stage/bin/o2-dpl-output-proxy --dataspec "tst:TST/A/0" --channel-config "downstream:address=tcp://0.0.0.0:4200,method=bind,type=pair" diff --git a/Framework/TestWorkflows/src/o2SimpleProcessor.cxx b/Framework/TestWorkflows/src/o2SimpleProcessor.cxx new file mode 100644 index 0000000000000..078500a886ada --- /dev/null +++ b/Framework/TestWorkflows/src/o2SimpleProcessor.cxx @@ -0,0 +1,99 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#include "Framework/ConfigParamSpec.h" +#include "Framework/RawDeviceService.h" + +#include +#include +#include +#include + +using namespace o2::framework; + +void customize(std::vector& workflowOptions) +{ + workflowOptions.emplace_back( + ConfigParamSpec{"in-dataspec", VariantType::String, "", {"DataSpec for the outputs"}}); + workflowOptions.emplace_back( + ConfigParamSpec{"out-dataspec", VariantType::String, "", {"DataSpec for the outputs"}}); + workflowOptions.emplace_back( + ConfigParamSpec{"eos-dataspec", VariantType::String, "", {"DataSpec for the outputs during EoS"}}); + workflowOptions.emplace_back( + ConfigParamSpec{"processing-delay", VariantType::Int, 0, {"How long the processing takes"}}); + workflowOptions.emplace_back( + ConfigParamSpec{"eos-delay", VariantType::Int, 0, {"How long the takes to do eos"}}); + workflowOptions.emplace_back( + ConfigParamSpec{"name", VariantType::String, "test-processor", {"Name of the processor"}}); +} +#include "Framework/runDataProcessing.h" + +// This is how you can define your processing in a declarative way +WorkflowSpec defineDataProcessing(ConfigContext const& ctx) +{ + // Get the dataspec option and creates OutputSpecs from it + auto inDataspec = ctx.options().get("in-dataspec"); + auto outDataspec = ctx.options().get("out-dataspec"); + // For data created at the End-Of-Stream + auto eosDataspec = ctx.options().get("eos-dataspec"); + + auto processingDelay = ctx.options().get("processing-delay"); + auto eosDelay = ctx.options().get("eos-delay"); + + std::vector inputs = select(inDataspec.c_str()); + + for (auto& input : inputs) { + LOGP(info, "{} : lifetime {}", DataSpecUtils::describe(input), (int)input.lifetime); + } + + std::vector matchers = select(outDataspec.c_str()); + std::vector outputRefs; + std::vector outputs; + + for (auto const& matcher : matchers) { + outputRefs.emplace_back(matcher.binding); + outputs.emplace_back(DataSpecUtils::asOutputSpec(matcher)); + } + + std::vector eosMatchers = select(eosDataspec.c_str()); + std::vector eosRefs; + std::vector eosOutputs; + + for (auto const& matcher : eosMatchers) { + eosRefs.emplace_back(matcher.binding); + auto eosOut = DataSpecUtils::asOutputSpec(matcher); + eosOut.lifetime = Lifetime::Sporadic; + outputs.emplace_back(eosOut); + } + + AlgorithmSpec algo = adaptStateful([outputRefs, eosRefs, processingDelay, eosDelay](CallbackService& service) { + service.set([eosRefs, eosDelay](EndOfStreamContext&) { + LOG(info) << "Creating objects on end of stream reception."; + std::this_thread::sleep_for(std::chrono::seconds(eosDelay)); + }); + + return adaptStateless( + [outputRefs, processingDelay](InputRecord& inputs, DataAllocator& outputs) { + LOG(info) << "Received " << inputs.size() << " messages. Converting."; + auto i = 0; + std::this_thread::sleep_for(std::chrono::milliseconds(processingDelay)); + for (auto& ref : outputRefs) { + LOGP(info, "Creating {}.", ref); + outputs.make(ref, ++i); + } + }); + }); + + return WorkflowSpec{ + {.name = ctx.options().get("name"), + .inputs = inputs, + .outputs = outputs, + .algorithm = algo}}; +} diff --git a/Framework/TestWorkflows/src/o2SimpleSource.cxx b/Framework/TestWorkflows/src/o2SimpleSource.cxx index d095b16065ebe..5f9193465834b 100644 --- a/Framework/TestWorkflows/src/o2SimpleSource.cxx +++ b/Framework/TestWorkflows/src/o2SimpleSource.cxx @@ -29,6 +29,8 @@ void customize(std::vector& workflowOptions) ConfigParamSpec{"name", VariantType::String, "test-source", {"Name of the source"}}); workflowOptions.emplace_back( ConfigParamSpec{"timer", VariantType::String, "", {"What to use as timer intervals. Format is :[, ...]"}}); + workflowOptions.emplace_back( + ConfigParamSpec{"delay", VariantType::Int, 0, {"How long it takes to do the processing (in ms)"}}); } #include "Framework/runDataProcessing.h" @@ -39,6 +41,8 @@ WorkflowSpec defineDataProcessing(ConfigContext const& ctx) // Get the dataspec option and creates OutputSpecs from it auto dataspec = ctx.options().get("dataspec"); auto timer = ctx.options().get("timer"); + auto delay = ctx.options().get("delay"); + std::vector inputs; std::vector timers; if (timer.empty() == false) { @@ -74,13 +78,14 @@ WorkflowSpec defineDataProcessing(ConfigContext const& ctx) .inputs = inputs, .outputs = outputSpecs, .algorithm = AlgorithmSpec{adaptStateful( - [outputSpecs](ConfigParamRegistry const& options) { + [outputSpecs, delay](ConfigParamRegistry const& options) { // the size of the messages is also a workflow option auto dataSize = options.get("data-size"); return adaptStateless( - [outputSpecs, dataSize](DataAllocator& outputs, ProcessingContext& ctx) { + [outputSpecs, dataSize, delay](DataAllocator& outputs, ProcessingContext& ctx) { for (auto const& output : outputSpecs) { auto concrete = DataSpecUtils::asConcreteDataMatcher(output); + std::this_thread::sleep_for(std::chrono::milliseconds(delay)); outputs.make(Output{concrete.origin, concrete.description, concrete.subSpec}, dataSize); } }); diff --git a/Framework/Utils/src/raw-proxy.cxx b/Framework/Utils/src/raw-proxy.cxx index fe33b4b4c8ab8..76fb10aec963d 100644 --- a/Framework/Utils/src/raw-proxy.cxx +++ b/Framework/Utils/src/raw-proxy.cxx @@ -29,7 +29,7 @@ void customize(std::vector& workflowOptions) workflowOptions.push_back( ConfigParamSpec{ - "dataspec", VariantType::String, "A:FLP/RAWDATA;B:FLP/DISTSUBTIMEFRAME/0", {"selection string for the data to be proxied"}}); + "dataspec", VariantType::String, "tst:TST/A", {"selection string for the data to be proxied"}}); workflowOptions.push_back( ConfigParamSpec{