Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions Framework/Core/include/Framework/ASoA.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
#include <concepts>
#include <cstring>
#include <gsl/span> // IWYU pragma: export
#include <limits>

namespace o2::framework
{
Expand All @@ -53,6 +52,12 @@ void dereferenceWithWrongType(const char* getter, const char* target);
void missingFilterDeclaration(int hash, int ai);
void notBoundTable(const char* tableName);
void* extractCCDBPayload(char* payload, size_t size, TClass const* cl, const char* what);

template <typename... C>
auto createFieldsFromColumns(framework::pack<C...>)
{
return std::vector<std::shared_ptr<arrow::Field>>{C::asArrowField()...};
}
} // namespace o2::soa

namespace o2::soa
Expand Down Expand Up @@ -248,6 +253,11 @@ struct TableMetadata {
return -1;
}
}

static std::shared_ptr<arrow::Schema> getSchema()
{
return std::make_shared<arrow::Schema>([]<typename... C>(framework::pack<C...>&& p) { return o2::soa::createFieldsFromColumns(p); }(persistent_columns_t{}));
}
};

template <typename D>
Expand Down Expand Up @@ -406,12 +416,6 @@ struct Binding {
}
};

template <typename... C>
auto createFieldsFromColumns(framework::pack<C...>)
{
return std::vector<std::shared_ptr<arrow::Field>>{C::asArrowField()...};
}

using SelectionVector = std::vector<int64_t>;

template <typename T>
Expand Down Expand Up @@ -686,7 +690,7 @@ struct Column {

static auto asArrowField()
{
return std::make_shared<arrow::Field>(inherited_t::mLabel, framework::expressions::concreteArrowType(framework::expressions::selectArrowType<type>()));
return std::make_shared<arrow::Field>(inherited_t::mLabel, soa::asArrowDataType<type>());
}

/// FIXME: rather than keeping this public we should have a protected
Expand Down
38 changes: 33 additions & 5 deletions Framework/Core/include/Framework/AnalysisHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
namespace o2::framework
{
std::string serializeProjectors(std::vector<framework::expressions::Projector>& projectors);
std::string serializeSchema(std::shared_ptr<arrow::Schema>& schema);
std::string serializeSchema(std::shared_ptr<arrow::Schema> schema);
} // namespace o2::framework

namespace o2::soa
Expand All @@ -44,6 +44,16 @@ constexpr auto tableRef2ConfigParamSpec()
{"\"\""}};
}

template <TableRef R>
constexpr auto tableRef2Schema()
{
return o2::framework::ConfigParamSpec{
std::string{"input-schema:"} + o2::aod::label<R>(),
framework::VariantType::String,
framework::serializeSchema(o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata::getSchema()),
{"\"\""}};
}

namespace
{
template <soa::with_sources T>
Expand All @@ -56,6 +66,16 @@ inline constexpr auto getSources()
}.template operator()<T::sources.size(), T::sources>();
}

template <soa::with_sources T>
inline constexpr auto getSourceSchemas()
{
return []<size_t N, std::array<soa::TableRef, N> refs>() {
return []<size_t... Is>(std::index_sequence<Is...>) {
return std::vector{soa::tableRef2Schema<refs[Is]>()...};
}(std::make_index_sequence<N>());
}.template operator()<T::sources.size(), T::sources>();
}

template <soa::with_ccdb_urls T>
inline constexpr auto getCCDBUrls()
{
Expand All @@ -73,11 +93,19 @@ template <soa::with_sources T>
constexpr auto getInputMetadata() -> std::vector<framework::ConfigParamSpec>
{
std::vector<framework::ConfigParamSpec> inputMetadata;

auto inputSources = getSources<T>();
std::sort(inputSources.begin(), inputSources.end(), [](framework::ConfigParamSpec const& a, framework::ConfigParamSpec const& b) { return a.name < b.name; });
auto last = std::unique(inputSources.begin(), inputSources.end(), [](framework::ConfigParamSpec const& a, framework::ConfigParamSpec const& b) { return a.name == b.name; });
inputSources.erase(last, inputSources.end());
inputMetadata.insert(inputMetadata.end(), inputSources.begin(), inputSources.end());

auto inputSchemas = getSourceSchemas<T>();
std::sort(inputSchemas.begin(), inputSchemas.end(), [](framework::ConfigParamSpec const& a, framework::ConfigParamSpec const& b) { return a.name < b.name; });
last = std::unique(inputSchemas.begin(), inputSchemas.end(), [](framework::ConfigParamSpec const& a, framework::ConfigParamSpec const& b) { return a.name == b.name; });
inputSchemas.erase(last, inputSchemas.end());
inputMetadata.insert(inputMetadata.end(), inputSchemas.begin(), inputSchemas.end());

return inputMetadata;
}

Expand Down Expand Up @@ -115,11 +143,8 @@ constexpr auto getExpressionMetadata() -> std::vector<framework::ConfigParamSpec
return result;
}(expression_pack_t{});

auto schema = std::make_shared<arrow::Schema>(o2::soa::createFieldsFromColumns(expression_pack_t{}));

auto json = framework::serializeProjectors(projectors);
return {framework::ConfigParamSpec{"projectors", framework::VariantType::String, json, {"\"\""}},
framework::ConfigParamSpec{"schema", framework::VariantType::String, framework::serializeSchema(schema), {"\"\""}}};
return {framework::ConfigParamSpec{"projectors", framework::VariantType::String, json, {"\"\""}}};
}

template <typename T>
Expand All @@ -141,6 +166,9 @@ constexpr auto tableRef2InputSpec()
metadata.insert(metadata.end(), ccdbMetadata.begin(), ccdbMetadata.end());
auto p = getExpressionMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>();
metadata.insert(metadata.end(), p.begin(), p.end());
if constexpr (!soa::with_ccdb_urls<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>) {
metadata.emplace_back(framework::ConfigParamSpec{"schema", framework::VariantType::String, framework::serializeSchema(o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata::getSchema()), {"\"\""}});
}

return framework::InputSpec{
o2::aod::label<R>(),
Expand Down
50 changes: 50 additions & 0 deletions Framework/Core/include/Framework/ArrowTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#ifndef O2_FRAMEWORK_ARROWTYPES_H
#define O2_FRAMEWORK_ARROWTYPES_H
#include "Framework/Traits.h"
#include "arrow/type_fwd.h"
#include <span>

Expand Down Expand Up @@ -117,5 +118,54 @@ template <typename T>
using arrow_array_for_t = typename arrow_array_for<T>::type;
template <typename T>
using value_for_t = typename arrow_array_for<T>::value_type;

template <class Array>
using array_element_t = std::decay_t<decltype(std::declval<Array>()[0])>;

template <typename T>
std::shared_ptr<arrow::DataType> asArrowDataType(int list_size = 1)
{
auto typeGenerator = [](std::shared_ptr<arrow::DataType> const& type, int list_size) -> std::shared_ptr<arrow::DataType> {
switch (list_size) {
case -1:
return arrow::list(type);
case 1:
return std::move(type);
default:
return arrow::fixed_size_list(type, list_size);
}
};

if constexpr (std::is_arithmetic_v<T>) {
if constexpr (std::same_as<T, bool>) {
return typeGenerator(arrow::boolean(), list_size);
} else if constexpr (std::same_as<T, uint8_t>) {
return typeGenerator(arrow::uint8(), list_size);
} else if constexpr (std::same_as<T, uint16_t>) {
return typeGenerator(arrow::uint16(), list_size);
} else if constexpr (std::same_as<T, uint32_t>) {
return typeGenerator(arrow::uint32(), list_size);
} else if constexpr (std::same_as<T, uint64_t>) {
return typeGenerator(arrow::uint64(), list_size);
} else if constexpr (std::same_as<T, int8_t>) {
return typeGenerator(arrow::int8(), list_size);
} else if constexpr (std::same_as<T, int16_t>) {
return typeGenerator(arrow::int16(), list_size);
} else if constexpr (std::same_as<T, int32_t>) {
return typeGenerator(arrow::int32(), list_size);
} else if constexpr (std::same_as<T, int64_t>) {
return typeGenerator(arrow::int64(), list_size);
} else if constexpr (std::same_as<T, float>) {
return typeGenerator(arrow::float32(), list_size);
} else if constexpr (std::same_as<T, double>) {
return typeGenerator(arrow::float64(), list_size);
}
} else if constexpr (std::is_bounded_array_v<T>) {
return asArrowDataType<array_element_t<T>>(std::extent_v<T>);
} else if constexpr (o2::framework::is_specialization_v<T, std::vector>) {
return asArrowDataType<typename T::value_type>(-1);
}
return nullptr;
}
} // namespace o2::soa
#endif // O2_FRAMEWORK_ARROWTYPES_H
55 changes: 26 additions & 29 deletions Framework/Core/src/AODReaderHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,14 @@ struct Maker {
std::vector<std::string> labels;
std::vector<std::shared_ptr<gandiva::Expression>> expressions;
std::shared_ptr<gandiva::Projector> projector = nullptr;
std::shared_ptr<arrow::Schema> schema;
std::shared_ptr<arrow::Schema> schema = nullptr;
std::shared_ptr<arrow::Schema> inputSchema = nullptr;

header::DataOrigin origin;
header::DataDescription description;
header::DataHeader::SubSpecificationType version;

std::shared_ptr<arrow::Table> make(ProcessingContext& pc)
std::shared_ptr<arrow::Table> make(ProcessingContext& pc) const
{
std::vector<std::shared_ptr<arrow::Table>> originals;
for (auto const& label : labels) {
Expand All @@ -159,15 +160,6 @@ struct Maker {
if (fullTable->num_rows() == 0) {
return arrow::Table::MakeEmpty(schema).ValueOrDie();
}
if (projector == nullptr) {
auto s = gandiva::Projector::Make(
fullTable->schema(),
expressions,
&projector);
if (!s.ok()) {
throw o2::framework::runtime_error_f("Failed to create projector: %s", s.ToString().c_str());
}
}

return spawnerHelper(fullTable, schema, binding.c_str(), schema->num_fields(), projector);
}
Expand Down Expand Up @@ -200,24 +192,21 @@ struct Spawnable {
iws.clear();
iws.str(loc->defaultValue.get<std::string>());
outputSchema = ArrowJSONHelpers::read(iws);
o2::framework::addLabelToSchema(outputSchema, binding.c_str());

std::vector<std::shared_ptr<arrow::Schema>> schemas;
for (auto& i : spec.metadata) {
if (i.name.starts_with("input:")) {
labels.emplace_back(i.name.substr(6));
if (i.name.starts_with("input-schema:")) {
labels.emplace_back(i.name.substr(13));
iws.clear();
auto json = i.defaultValue.get<std::string>();
iws.str(json);
schemas.emplace_back(ArrowJSONHelpers::read(iws));
}
}

std::vector<std::shared_ptr<arrow::Field>> fields;
for (auto& p : projectors) {
expressions::walk(p.node.get(),
[&fields](expressions::Node* n) mutable {
if (n->self.index() == 1) {
auto& b = std::get<expressions::BindingNode>(n->self);
if (std::find_if(fields.begin(), fields.end(), [&b](std::shared_ptr<arrow::Field> const& field) { return field->name() == b.name; }) == fields.end()) {
fields.emplace_back(std::make_shared<arrow::Field>(b.name, expressions::concreteArrowType(b.type)));
}
}
});
for (auto& s : schemas) {
std::copy(s->fields().begin(), s->fields().end(), std::back_inserter(fields));
}
inputSchema = std::make_shared<arrow::Schema>(fields);

Expand All @@ -233,20 +222,28 @@ struct Spawnable {
}
}

std::shared_ptr<gandiva::Projector> makeProjector()
std::shared_ptr<gandiva::Projector> makeProjector() const
{
return expressions::createProjectorHelper(projectors.size(), projectors.data(), inputSchema, outputSchema->fields());
std::shared_ptr<gandiva::Projector> p = nullptr;
auto s = gandiva::Projector::Make(
inputSchema,
expressions,
&p);
if (!s.ok()) {
throw o2::framework::runtime_error_f("Failed to create projector: %s", s.ToString().c_str());
}
return p;
}

Maker createMaker()
Maker createMaker() const
{
o2::framework::addLabelToSchema(outputSchema, binding.c_str());
return {
binding,
labels,
expressions,
nullptr,
makeProjector(),
outputSchema,
inputSchema,
origin,
description,
version};
Expand Down
2 changes: 1 addition & 1 deletion Framework/Core/src/AnalysisHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ std::string serializeProjectors(std::vector<framework::expressions::Projector>&
return osm.str();
}

std::string serializeSchema(std::shared_ptr<arrow::Schema>& schema)
std::string serializeSchema(std::shared_ptr<arrow::Schema> schema)
{
std::stringstream osm;
ArrowJSONHelpers::write(osm, schema);
Expand Down
1 change: 0 additions & 1 deletion Framework/Core/src/AnalysisSupportHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ void AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher(
// FIXME: good enough for now...
for (auto& i : input.metadata) {
if ((i.type == VariantType::String) && (i.name.find("input:") != std::string::npos)) {
auto value = i.defaultValue.get<std::string>();
auto spec = DataSpecUtils::fromMetadataString(i.defaultValue.get<std::string>());
auto j = std::find_if(publisher.inputs.begin(), publisher.inputs.end(), [&](auto x) { return x.binding == spec.binding; });
if (j == publisher.inputs.end()) {
Expand Down
Loading