Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CCDB/include/CCDB/CCDBDownloader.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ namespace o2::ccdb
#if !defined(__CINT__) && !defined(__MAKECINT__) && !defined(__ROOTCLING__) && !defined(__CLING__)
struct HeaderObjectPair_t {
std::multimap<std::string, std::string> header;
std::pmr::vector<char>* object = nullptr;
o2::pmr::vector<char>* object = nullptr;
int counter = 0;
};

Expand Down
18 changes: 9 additions & 9 deletions CCDB/include/CCDB/CcdbApi.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ class CcdbApi //: public DatabaseInterface

#if !defined(__CINT__) && !defined(__MAKECINT__) && !defined(__ROOTCLING__) && !defined(__CLING__)
typedef struct RequestContext {
std::pmr::vector<char>& dest;
o2::pmr::vector<char>& dest;
std::string path;
std::map<std::string, std::string> const& metadata;
long timestamp;
Expand All @@ -365,7 +365,7 @@ class CcdbApi //: public DatabaseInterface
std::string createdNotBefore;
bool considerSnapshot;

RequestContext(std::pmr::vector<char>& d,
RequestContext(o2::pmr::vector<char>& d,
std::map<std::string, std::string> const& m,
std::map<std::string, std::string>& h)
: dest(d), metadata(m), headers(h) {}
Expand All @@ -379,7 +379,7 @@ class CcdbApi //: public DatabaseInterface

void getFromSnapshot(bool createSnapshot, std::string const& path,
long timestamp, std::map<std::string, std::string>& headers,
std::string& snapshotpath, std::pmr::vector<char>& dest, int& fromSnapshot, std::string const& etag) const;
std::string& snapshotpath, o2::pmr::vector<char>& dest, int& fromSnapshot, std::string const& etag) const;
void releaseNamedSemaphore(boost::interprocess::named_semaphore* sem, std::string const& path) const;
boost::interprocess::named_semaphore* createNamedSemaphore(std::string const& path) const;
static std::string determineSemaphoreName(std::string const& basedir, std::string const& objectpath);
Expand All @@ -388,22 +388,22 @@ class CcdbApi //: public DatabaseInterface
static bool removeSemaphore(std::string const& name, bool remove = false);
static void removeLeakingSemaphores(std::string const& basedir, bool remove = false);

void loadFileToMemory(std::pmr::vector<char>& dest, const std::string& path, std::map<std::string, std::string>* localHeaders = nullptr, bool fetchLocalMetaData = true) const;
void loadFileToMemory(std::pmr::vector<char>& dest, std::string const& path,
void loadFileToMemory(o2::pmr::vector<char>& dest, const std::string& path, std::map<std::string, std::string>* localHeaders = nullptr, bool fetchLocalMetaData = true) const;
void loadFileToMemory(o2::pmr::vector<char>& dest, std::string const& path,
std::map<std::string, std::string> const& metadata, long timestamp,
std::map<std::string, std::string>* headers, std::string const& etag,
const std::string& createdNotAfter, const std::string& createdNotBefore, bool considerSnapshot = true) const;

// Loads files from alien and cvmfs into given destination.
bool loadLocalContentToMemory(std::pmr::vector<char>& dest, std::string& url) const;
bool loadLocalContentToMemory(o2::pmr::vector<char>& dest, std::string& url) const;

// add annotated flattened headers in the end of the blob
static void appendFlatHeader(std::pmr::vector<char>& dest, const std::map<std::string, std::string>& headers);
static void appendFlatHeader(o2::pmr::vector<char>& dest, const std::map<std::string, std::string>& headers);

// the failure to load the file to memory is signaled by 0 size and non-0 capacity
static bool isMemoryFileInvalid(const std::pmr::vector<char>& v) { return v.size() == 0 && v.capacity() > 0; }
static bool isMemoryFileInvalid(const o2::pmr::vector<char>& v) { return v.size() == 0 && v.capacity() > 0; }
template <typename T>
static T* extractFromMemoryBlob(std::pmr::vector<char>& blob)
static T* extractFromMemoryBlob(o2::pmr::vector<char>& blob)
{
auto obj = static_cast<T*>(interpretAsTMemFileAndExtract(blob.data(), blob.size(), typeid(T)));
if constexpr (std::is_base_of<o2::conf::ConfigurableParam, T>::value) {
Expand Down
14 changes: 7 additions & 7 deletions CCDB/src/CcdbApi.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,7 @@ bool CcdbApi::retrieveBlob(std::string const& path, std::string const& targetdir
return false;
}

std::pmr::vector<char> buff;
o2::pmr::vector<char> buff;
std::map<std::string, std::string> headers;
// avoid creating snapshot via loadFileToMemory itself
loadFileToMemory(buff, path, metadata, timestamp, &headers, "", createdNotAfter, createdNotBefore, false);
Expand Down Expand Up @@ -1830,7 +1830,7 @@ void CcdbApi::removeLeakingSemaphores(std::string const& snapshotdir, bool remov

void CcdbApi::getFromSnapshot(bool createSnapshot, std::string const& path,
long timestamp, std::map<std::string, std::string>& headers,
std::string& snapshotpath, std::pmr::vector<char>& dest, int& fromSnapshot, std::string const& etag) const
std::string& snapshotpath, o2::pmr::vector<char>& dest, int& fromSnapshot, std::string const& etag) const
{
if (createSnapshot) { // create named semaphore
std::string logfile = mSnapshotCachePath + "/log";
Expand Down Expand Up @@ -1884,7 +1884,7 @@ void CcdbApi::loadFileToMemory(std::vector<char>& dest, std::string const& path,
std::map<std::string, std::string>* headers, std::string const& etag,
const std::string& createdNotAfter, const std::string& createdNotBefore, bool considerSnapshot) const
{
std::pmr::vector<char> destP;
o2::pmr::vector<char> destP;
destP.reserve(dest.size());
loadFileToMemory(destP, path, metadata, timestamp, headers, etag, createdNotAfter, createdNotBefore, considerSnapshot);
dest.clear();
Expand All @@ -1894,7 +1894,7 @@ void CcdbApi::loadFileToMemory(std::vector<char>& dest, std::string const& path,
}
}

void CcdbApi::loadFileToMemory(std::pmr::vector<char>& dest, std::string const& path,
void CcdbApi::loadFileToMemory(o2::pmr::vector<char>& dest, std::string const& path,
std::map<std::string, std::string> const& metadata, long timestamp,
std::map<std::string, std::string>* headers, std::string const& etag,
const std::string& createdNotAfter, const std::string& createdNotBefore, bool considerSnapshot) const
Expand All @@ -1912,7 +1912,7 @@ void CcdbApi::loadFileToMemory(std::pmr::vector<char>& dest, std::string const&
vectoredLoadFileToMemory(contexts);
}

void CcdbApi::appendFlatHeader(std::pmr::vector<char>& dest, const std::map<std::string, std::string>& headers)
void CcdbApi::appendFlatHeader(o2::pmr::vector<char>& dest, const std::map<std::string, std::string>& headers)
{
size_t hsize = getFlatHeaderSize(headers), cnt = dest.size();
dest.resize(cnt + hsize);
Expand Down Expand Up @@ -1977,7 +1977,7 @@ void CcdbApi::vectoredLoadFileToMemory(std::vector<RequestContext>& requestConte
}
}

bool CcdbApi::loadLocalContentToMemory(std::pmr::vector<char>& dest, std::string& url) const
bool CcdbApi::loadLocalContentToMemory(o2::pmr::vector<char>& dest, std::string& url) const
{
if (url.find("alien:/", 0) != std::string::npos) {
std::map<std::string, std::string> localHeaders;
Expand Down Expand Up @@ -2005,7 +2005,7 @@ bool CcdbApi::loadLocalContentToMemory(std::pmr::vector<char>& dest, std::string
return false;
}

void CcdbApi::loadFileToMemory(std::pmr::vector<char>& dest, const std::string& path, std::map<std::string, std::string>* localHeaders, bool fetchLocalMetaData) const
void CcdbApi::loadFileToMemory(o2::pmr::vector<char>& dest, const std::string& path, std::map<std::string, std::string>* localHeaders, bool fetchLocalMetaData) const
{
// Read file to memory as vector. For special case of the locally cached file retriev metadata stored directly in the file
constexpr size_t MaxCopySize = 0x1L << 25;
Expand Down
4 changes: 2 additions & 2 deletions CCDB/test/testCcdbApi.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ BOOST_AUTO_TEST_CASE(multi_host_test)
api.init("http://bogus-host.cern.ch,http://ccdb-test.cern.ch:8080");
std::map<std::string, std::string> metadata;
std::map<std::string, std::string> headers;
std::pmr::vector<char> dst;
o2::pmr::vector<char> dst;
std::string url = "Analysis/ALICE3/Centrality";
api.loadFileToMemory(dst, url, metadata, 1645780010602, &headers, "", "", "", true);
BOOST_CHECK(dst.size() != 0);
Expand All @@ -572,7 +572,7 @@ BOOST_AUTO_TEST_CASE(vectored)
api.init("http://ccdb-test.cern.ch:8080");

int TEST_SAMPLE_SIZE = 5;
std::vector<std::pmr::vector<char>> dests(TEST_SAMPLE_SIZE);
std::vector<o2::pmr::vector<char>> dests(TEST_SAMPLE_SIZE);
std::vector<std::map<std::string, std::string>> metadatas(TEST_SAMPLE_SIZE);
std::vector<std::map<std::string, std::string>> headers(TEST_SAMPLE_SIZE);

Expand Down
6 changes: 3 additions & 3 deletions CCDB/test/testCcdbApiDownloader.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,12 @@ size_t writeCallbackNoLambda(void* contents, size_t size, size_t nmemb, void* ch
return realsize;
}

std::vector<CURL*> prepareAsyncHandles(size_t num, std::vector<std::pmr::vector<char>*>& dests)
std::vector<CURL*> prepareAsyncHandles(size_t num, std::vector<o2::pmr::vector<char>*>& dests)
{
std::vector<CURL*> handles;

for (int i = 0; i < num; i++) {
auto dest = new std::pmr::vector<char>();
auto dest = new o2::pmr::vector<char>();
dests.push_back(dest);
CURL* curl_handle = curl_easy_init();
handles.push_back(curl_handle);
Expand Down Expand Up @@ -154,7 +154,7 @@ BOOST_AUTO_TEST_CASE(asynch_schedule_test)
}

CCDBDownloader downloader;
std::vector<std::pmr::vector<char>*> dests;
std::vector<o2::pmr::vector<char>*> dests;
auto handles = prepareAsyncHandles(TRANSFERS, dests);
size_t transfersLeft = 0;

Expand Down
22 changes: 10 additions & 12 deletions DataFormats/Headers/include/Headers/Stack.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@
#ifndef O2_HEADERS_STACK_H
#define O2_HEADERS_STACK_H

#include "MemoryResources/MemoryResources.h"
#include "Headers/DataHeader.h"

#include <memory_resource>

namespace o2::header
{
//__________________________________________________________________________________________________
Expand All @@ -33,18 +32,17 @@ namespace o2::header
/// - returns a Stack ready to be shipped.
struct Stack {

using memory_resource = std::pmr::memory_resource;
using memory_resource = o2::pmr::memory_resource;

private:
struct freeobj {
freeobj(memory_resource* mr, size_t s) : resource(mr), size(s) {}
freeobj(memory_resource* mr) : resource(mr) {}
memory_resource* resource{nullptr};
size_t size{0};
void operator()(std::byte* ptr) { resource->deallocate(ptr, size, alignof(std::max_align_t)); }
void operator()(std::byte* ptr) { resource->deallocate(ptr, 0, 0); }
};

public:
using allocator_type = std::pmr::polymorphic_allocator<std::byte>;
using allocator_type = fair::mq::pmr::polymorphic_allocator<std::byte>;
using value_type = std::byte;
using BufferType = std::unique_ptr<value_type[], freeobj>; // this gives us proper default move semantics for free

Expand Down Expand Up @@ -89,9 +87,9 @@ struct Stack {
/// all headers must derive from BaseHeader, in addition also other stacks can be passed to ctor.
template <typename FirstArgType, typename... Headers,
typename std::enable_if_t<
!std::is_convertible<FirstArgType, std::pmr::polymorphic_allocator<std::byte>>::value, int> = 0>
!std::is_convertible<FirstArgType, fair::mq::pmr::polymorphic_allocator<std::byte>>::value, int> = 0>
Stack(FirstArgType&& firstHeader, Headers&&... headers)
: Stack(std::pmr::new_delete_resource(), std::forward<FirstArgType>(firstHeader),
: Stack(fair::mq::pmr::new_delete_resource(), std::forward<FirstArgType>(firstHeader),
std::forward<Headers>(headers)...)
{
}
Expand All @@ -101,7 +99,7 @@ struct Stack {
Stack(const allocator_type allocatorArg, Headers&&... headers)
: allocator{allocatorArg},
bufferSize{calculateSize(std::forward<Headers>(headers)...)},
buffer{static_cast<std::byte*>(allocator.resource()->allocate(bufferSize, alignof(std::max_align_t))), freeobj{allocator.resource(), bufferSize}}
buffer{static_cast<std::byte*>(allocator.resource()->allocate(bufferSize, alignof(std::max_align_t))), freeobj{allocator.resource()}}
{
if constexpr (sizeof...(headers) > 1) {
injectAll(buffer.get(), std::forward<Headers>(headers)...);
Expand Down Expand Up @@ -142,9 +140,9 @@ struct Stack {
constexpr static size_t calculateSize() { return 0; }

private:
allocator_type allocator{std::pmr::new_delete_resource()};
allocator_type allocator{fair::mq::pmr::new_delete_resource()};
size_t bufferSize{0};
BufferType buffer{nullptr, freeobj{allocator.resource(), 0}};
BufferType buffer{nullptr, freeobj{allocator.resource()}};

//______________________________________________________________________________________________
template <typename T>
Expand Down
5 changes: 3 additions & 2 deletions DataFormats/Headers/test/testDataHeader.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,8 @@ BOOST_AUTO_TEST_CASE(headerStack_test)
BOOST_CHECK(h3->secret == 42);

// test constructing from a buffer and an additional header
Stack s5(std::pmr::new_delete_resource(), s1.data(), Stack{}, meta);
using namespace fair::mq::pmr;
Stack s5(new_delete_resource(), s1.data(), Stack{}, meta);
BOOST_CHECK(s5.size() == s1.size() + sizeof(meta));
// check if we can find the header even though there was an empty stack in the middle
h3 = get<test::MetaHeader*>(s5.data());
Expand All @@ -328,7 +329,7 @@ BOOST_AUTO_TEST_CASE(headerStack_test)
BOOST_CHECK(h4 == h3);

// let's assume we have some stack that is missing the required DataHeader at the beginning:
Stack s6{std::pmr::new_delete_resource(), DataHeader{}, s1.data()};
Stack s6{new_delete_resource(), DataHeader{}, s1.data()};
BOOST_CHECK(s6.size() == sizeof(DataHeader) + s1.size());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
#ifndef ALICEO2_MEMORY_RESOURCES_
#define ALICEO2_MEMORY_RESOURCES_

#include <boost/container/pmr/memory_resource.hpp>
#include <boost/container/pmr/monotonic_buffer_resource.hpp>
#include <boost/container/pmr/polymorphic_allocator.hpp>
#include <cstring>
#include <string>
#include <type_traits>
Expand All @@ -42,8 +45,12 @@
namespace o2::pmr
{

using FairMQMemoryResource = fair::mq::MemoryResource;
using ChannelResource = fair::mq::ChannelResource;
using namespace fair::mq::pmr;

template <typename ContainerT>
fair::mq::MessagePtr getMessage(ContainerT&& container, fair::mq::MemoryResource* targetResource = nullptr)
fair::mq::MessagePtr getMessage(ContainerT&& container, FairMQMemoryResource* targetResource = nullptr)
{
return fair::mq::getMessage(std::forward<ContainerT>(container), targetResource);
}
Expand All @@ -53,7 +60,7 @@ fair::mq::MessagePtr getMessage(ContainerT&& container, fair::mq::MemoryResource
/// Ownership of hte message is taken. Meant to be used for transparent data adoption in containers.
/// In combination with the SpectatorAllocator this is an alternative to using span, as raw memory
/// (e.g. an existing buffer message) will be accessible with appropriate container.
class MessageResource : public fair::mq::MemoryResource
class MessageResource : public FairMQMemoryResource
{

public:
Expand All @@ -75,7 +82,7 @@ class MessageResource : public fair::mq::MemoryResource
size_t getNumberOfMessages() const noexcept override { return mMessageData ? 1 : 0; }

protected:
fair::mq::MemoryResource* mUpstream{nullptr};
FairMQMemoryResource* mUpstream{nullptr};
size_t mMessageSize{0};
void* mMessageData{nullptr};
bool initialImport{true};
Expand Down Expand Up @@ -106,14 +113,14 @@ class MessageResource : public fair::mq::MemoryResource

// The NoConstructAllocator behaves like the normal pmr vector but does not call constructors / destructors
template <typename T>
class NoConstructAllocator : public std::pmr::polymorphic_allocator<T>
class NoConstructAllocator : public fair::mq::pmr::polymorphic_allocator<T>
{
public:
using std::pmr::polymorphic_allocator<T>::polymorphic_allocator;
using fair::mq::pmr::polymorphic_allocator<T>::polymorphic_allocator;
using propagate_on_container_move_assignment = std::true_type;

template <typename... Args>
NoConstructAllocator(Args&&... args) : std::pmr::polymorphic_allocator<T>(std::forward<Args>(args)...)
NoConstructAllocator(Args&&... args) : fair::mq::pmr::polymorphic_allocator<T>(std::forward<Args>(args)...)
{
}

Expand All @@ -138,13 +145,13 @@ class NoConstructAllocator : public std::pmr::polymorphic_allocator<T>
//__________________________________________________________________________________________________
//__________________________________________________________________________________________________

using BytePmrAllocator = std::pmr::polymorphic_allocator<std::byte>;
using BytePmrAllocator = fair::mq::pmr::polymorphic_allocator<std::byte>;
template <class T>
using vector = std::vector<T, std::pmr::polymorphic_allocator<T>>;
using vector = std::vector<T, fair::mq::pmr::polymorphic_allocator<T>>;

//__________________________________________________________________________________________________
/// Get the allocator associated to a transport factory
inline static fair::mq::MemoryResource* getTransportAllocator(fair::mq::TransportFactory* factory)
inline static FairMQMemoryResource* getTransportAllocator(fair::mq::TransportFactory* factory)
{
return *factory;
}
Expand Down
10 changes: 6 additions & 4 deletions DataFormats/MemoryResources/test/testMemoryResources.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include <fairmq/ProgOptions.h>
#include <vector>
#include <cstring>
#include <memory_resource>

namespace o2::pmr
{
Expand Down Expand Up @@ -61,6 +60,8 @@ BOOST_AUTO_TEST_CASE(transportallocatormap_test)
BOOST_CHECK(_tmp == allocZMQ);
}

using namespace fair::mq::pmr;

BOOST_AUTO_TEST_CASE(allocator_test)
{
size_t session{(size_t)getpid() * 1000 + 1};
Expand All @@ -75,7 +76,7 @@ BOOST_AUTO_TEST_CASE(allocator_test)
testData::nconstructions = 0;

{
std::vector<testData, std::pmr::polymorphic_allocator<testData>> v(std::pmr::polymorphic_allocator<testData>{allocZMQ});
std::vector<testData, polymorphic_allocator<testData>> v(polymorphic_allocator<testData>{allocZMQ});
v.reserve(3);
BOOST_CHECK(v.capacity() == 3);
BOOST_CHECK(allocZMQ->getNumberOfMessages() == 1);
Expand Down Expand Up @@ -109,7 +110,7 @@ BOOST_AUTO_TEST_CASE(getMessage_test)

// test message creation on the same channel it was allocated with
{
std::vector<testData, std::pmr::polymorphic_allocator<testData>> v(std::pmr::polymorphic_allocator<testData>{allocZMQ});
std::vector<testData, polymorphic_allocator<testData>> v(polymorphic_allocator<testData>{allocZMQ});
v.emplace_back(1);
v.emplace_back(2);
v.emplace_back(3);
Expand All @@ -124,7 +125,7 @@ BOOST_AUTO_TEST_CASE(getMessage_test)

// test message creation on a different channel than it was allocated with
{
std::vector<testData, std::pmr::polymorphic_allocator<testData>> v(std::pmr::polymorphic_allocator<testData>{allocZMQ});
std::vector<testData, polymorphic_allocator<testData>> v(polymorphic_allocator<testData>{allocZMQ});
v.emplace_back(4);
v.emplace_back(5);
v.emplace_back(6);
Expand All @@ -136,6 +137,7 @@ BOOST_AUTO_TEST_CASE(getMessage_test)
BOOST_CHECK(message->GetSize() == 3 * sizeof(testData));
messageArray = static_cast<int*>(message->GetData());
BOOST_CHECK(messageArray[0] == 4 && messageArray[1] == 5 && messageArray[2] == 6);

}

}; // namespace o2::pmr
Loading
Loading