Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Framework/Core/include/Framework/ArrowTableSlicingCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ namespace o2::framework
using ListVector = std::vector<std::vector<int64_t>>;

struct SliceInfoPtr {
gsl::span<int const> values;
gsl::span<int64_t const> counts;
gsl::span<int64_t const> offsets;
gsl::span<int64_t const> sizes;

std::pair<int64_t, int64_t> getSliceFor(int value) const;
};
Expand Down Expand Up @@ -66,6 +66,8 @@ struct ArrowTableSlicingCache {
Cache bindingsKeys;
std::vector<std::shared_ptr<arrow::NumericArray<arrow::Int32Type>>> values;
std::vector<std::shared_ptr<arrow::NumericArray<arrow::Int64Type>>> counts;
std::vector<std::vector<int64_t>> offsets;
std::vector<std::vector<int64_t>> sizes;

Cache bindingsKeysUnsorted;
std::vector<std::vector<int>> valuesUnsorted;
Expand Down
4 changes: 1 addition & 3 deletions Framework/Core/include/Framework/GroupSlicer.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,7 @@ struct GroupSlicer {
pos = position;
}
// optimized split
auto oc = sliceInfos[index].getSliceFor(pos);
uint64_t offset = oc.first;
auto count = oc.second;
auto [offset, count] = sliceInfos[index].getSliceFor(pos);
auto groupedElementsTable = originalTable.rawSlice(offset, offset + count - 1);
groupedElementsTable.bindInternalIndicesTo(&originalTable);
return groupedElementsTable;
Expand Down
65 changes: 42 additions & 23 deletions Framework/Core/src/ArrowTableSlicingCache.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,14 @@ void updatePairList(Cache& list, std::string const& binding, std::string const&
std::pair<int64_t, int64_t> SliceInfoPtr::getSliceFor(int value) const
{
int64_t offset = 0;
if (values.empty()) {
if (offsets.empty()) {
return {offset, 0};
}
int64_t p = static_cast<int64_t>(values.size()) - 1;
while (values[p] < 0) {
--p;
if (p < 0) {
return {offset, 0};
}
}

if (value > values[p]) {
if ((size_t)value >= offsets.size()) {
return {offset, 0};
}

for (auto i = 0U; i < values.size(); ++i) {
if (values[i] == value) {
return {offset, counts[i]};
}
offset += counts[i];
}
return {offset, 0};
return {offsets[value], sizes[value]};
}

gsl::span<const int64_t> SliceInfoUnsortedPtr::getSliceFor(int value) const
Expand Down Expand Up @@ -84,6 +70,8 @@ ArrowTableSlicingCache::ArrowTableSlicingCache(Cache&& bsks, Cache&& bsksUnsorte
{
values.resize(bindingsKeys.size());
counts.resize(bindingsKeys.size());
offsets.resize(bindingsKeys.size());
sizes.resize(bindingsKeys.size());

valuesUnsorted.resize(bindingsKeysUnsorted.size());
groups.resize(bindingsKeysUnsorted.size());
Expand All @@ -97,6 +85,10 @@ void ArrowTableSlicingCache::setCaches(Cache&& bsks, Cache&& bsksUnsorted)
values.resize(bindingsKeys.size());
counts.clear();
counts.resize(bindingsKeys.size());
offsets.clear();
offsets.resize(bindingsKeys.size());
sizes.clear();
sizes.resize(bindingsKeys.size());
valuesUnsorted.clear();
valuesUnsorted.resize(bindingsKeysUnsorted.size());
groups.clear();
Expand All @@ -105,9 +97,11 @@ void ArrowTableSlicingCache::setCaches(Cache&& bsks, Cache&& bsksUnsorted)

arrow::Status ArrowTableSlicingCache::updateCacheEntry(int pos, std::shared_ptr<arrow::Table> const& table)
{
values[pos].reset();
counts[pos].reset();
offsets[pos].clear();
sizes[pos].clear();
if (table->num_rows() == 0) {
values[pos].reset();
counts[pos].reset();
return arrow::Status::OK();
}
auto& [b, k, e] = bindingsKeys[pos];
Expand All @@ -125,6 +119,31 @@ arrow::Status ArrowTableSlicingCache::updateCacheEntry(int pos, std::shared_ptr<
counts[pos].reset();
values[pos] = std::make_shared<arrow::NumericArray<arrow::Int32Type>>(pair.field(0)->data());
counts[pos] = std::make_shared<arrow::NumericArray<arrow::Int64Type>>(pair.field(1)->data());

int maxValue = -1;
for (auto i = values[pos]->length() - 1; i >= 0; --i) {
if (values[pos]->Value(i) < 0) {
continue;
} else {
maxValue = values[pos]->Value(i);
break;
}
}

offsets[pos].resize(maxValue + 1);
sizes[pos].resize(maxValue + 1);
std::fill(offsets[pos].begin(), offsets[pos].end(), 0);
std::fill(sizes[pos].begin(), sizes[pos].end(), 0);
int64_t offset = 0;
for (auto i = 0U; i < values[pos]->length(); ++i) {
auto value = values[pos]->Value(i);
auto count = counts[pos]->Value(i);
if (value >= 0) {
offsets[pos][value] = offset;
sizes[pos][value] = count;
}
offset += count;
}
return arrow::Status::OK();
}

Expand Down Expand Up @@ -221,14 +240,14 @@ SliceInfoPtr ArrowTableSlicingCache::getCacheForPos(int pos) const
{
if (values[pos] == nullptr && counts[pos] == nullptr) {
return {
{},
{} //
{}, //
{} //
};
}

return {
{reinterpret_cast<int const*>(values[pos]->values()->data()), static_cast<size_t>(values[pos]->length())},
{reinterpret_cast<int64_t const*>(counts[pos]->values()->data()), static_cast<size_t>(counts[pos]->length())} //
gsl::span{offsets[pos].data(), offsets[pos].size()}, //
gsl::span(sizes[pos].data(), sizes[pos].size()) //
};
}

Expand Down
20 changes: 8 additions & 12 deletions Framework/Core/test/test_GroupSlicer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,8 @@ TEST_CASE("GroupSlicerMismatchedGroups")
if (i == 3 || i == 10 || i == 12 || i == 16 || i == 19) {
continue;
}
for (auto j = 0.f; j < 5; j += 0.5f) {
trksWriter(0, i, 0.5f * j);
for (auto j = 0; j < 10; ++j) {
trksWriter(0, i, 0.5f * (j / 2.));
}
}
auto trkTable = builderT.finalize();
Expand All @@ -260,21 +260,19 @@ TEST_CASE("GroupSlicerMismatchedGroups")
auto s = slices.updateCacheEntry(0, trkTable);
o2::framework::GroupSlicer g(e, tt, slices);

auto count = 0;
for (auto& slice : g) {
auto as = slice.associatedTables();
auto gg = slice.groupingElement();
REQUIRE(gg.globalIndex() == count);
REQUIRE(gg.globalIndex() == (int64_t)slice.position);
auto trks = std::get<aod::TrksX>(as);
if (count == 3 || count == 10 || count == 12 || count == 16 || count == 19) {
if (slice.position == 3 || slice.position == 10 || slice.position == 12 || slice.position == 16 || slice.position == 19) {
REQUIRE(trks.size() == 0);
} else {
REQUIRE(trks.size() == 10);
}
for (auto& trk : trks) {
REQUIRE(trk.eventId() == count);
REQUIRE(trk.eventId() == (int64_t)slice.position);
}
++count;
}
}

Expand All @@ -299,8 +297,8 @@ TEST_CASE("GroupSlicerMismatchedUnassignedGroups")
++skip;
continue;
}
for (auto j = 0.f; j < 5; j += 0.5f) {
trksWriter(0, i, 0.5f * j);
for (auto j = 0; j < 10; ++j) {
trksWriter(0, i, 0.5f * (j / 2.));
}
}
for (auto i = 0; i < 5; ++i) {
Expand Down Expand Up @@ -510,7 +508,7 @@ TEST_CASE("GroupSlicerMismatchedUnsortedFilteredGroupsWithSelfIndex")
{
TableBuilder builderE;
auto evtsWriter = builderE.cursor<aod::Events>();
for (auto i = 0; i < 20; ++i) {
for (auto i = 0; i < 10; ++i) {
evtsWriter(0, i, 0.5f * i, 2.f * i, 3.f * i);
}
auto evtTable = builderE.finalize();
Expand All @@ -523,7 +521,6 @@ TEST_CASE("GroupSlicerMismatchedUnsortedFilteredGroupsWithSelfIndex")
std::uniform_int_distribution<> distrib(0, 99);

for (auto i = 0; i < 100; ++i) {

filler[0] = distrib(gen);
filler[1] = distrib(gen);
if (filler[0] > filler[1]) {
Expand All @@ -541,7 +538,6 @@ TEST_CASE("GroupSlicerMismatchedUnsortedFilteredGroupsWithSelfIndex")
auto thingsTable = builderT.finalize();

aod::Events e{evtTable};
// aod::Parts p{partsTable};
aod::Things t{thingsTable};
using FilteredParts = soa::Filtered<aod::Parts>;
auto size = distrib(gen);
Expand Down