Skip to content

Commit cf23602

Browse files
authored
Support parallel graph scans in HNSW - MOD-4318 (#311)
* Add support for the visited nodes handler pool as default. * forcing batch iterator to be destructed *after* the index itself in python bindings by using a shared pointer of the index. * use vector instead of deque for the visited nodes handler pool * making allocator bookkeeping atomic
1 parent e299b58 commit cf23602

File tree

12 files changed

+359
-138
lines changed

12 files changed

+359
-138
lines changed

src/VecSim/algorithms/hnsw/hnsw.h

Lines changed: 49 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,11 @@ class HNSWIndex : public VecSimIndexAbstract<DistType>,
9191
char *data_level0_memory_;
9292
char **linkLists_;
9393
vecsim_stl::vector<size_t> element_levels_;
94-
std::shared_ptr<VisitedNodesHandler> visited_nodes_handler;
9594

96-
// used for synchronization only when parallel indexing / searching is enabled.
95+
// Used for marking the visited nodes in graph scans (the pool supports parallel graph scans).
96+
// This is mutable since the object changes upon search operations as well (which are const).
97+
mutable VisitedNodesHandlerPool visited_nodes_handler_pool;
9798
#ifdef ENABLE_PARALLELIZATION
98-
std::unique_ptr<VisitedNodesHandlerPool> visited_nodes_handler_pool;
99-
size_t pool_initial_size;
10099
std::mutex global;
101100
std::mutex cur_element_count_guard_;
102101
std::vector<std::mutex> link_list_locks_;
@@ -126,12 +125,13 @@ class HNSWIndex : public VecSimIndexAbstract<DistType>,
126125
template <bool has_marked_deleted, typename Identifier> // Either idType or labelType
127126
inline DistType
128127
processCandidate(idType curNodeId, const void *data_point, size_t layer, size_t ef,
129-
tag_t visited_tag,
128+
tag_t visited_tag, tag_t *elements_tags,
130129
vecsim_stl::abstract_priority_queue<DistType, Identifier> &top_candidates,
131130
candidatesMaxHeap<DistType> &candidates_set, DistType lowerBound) const;
132131
template <bool has_marked_deleted>
133132
inline void processCandidate_RangeSearch(
134133
idType curNodeId, const void *data_point, size_t layer, double epsilon, tag_t visited_tag,
134+
tag_t *elements_tags,
135135
std::unique_ptr<vecsim_stl::abstract_results_container> &top_candidates,
136136
candidatesMaxHeap<DistType> &candidate_set, DistType lowerBound, double radius) const;
137137
template <bool has_marked_deleted>
@@ -189,6 +189,7 @@ class HNSWIndex : public VecSimIndexAbstract<DistType>,
189189
inline labelType getEntryPointLabel() const;
190190
inline labelType getExternalLabel(idType internal_id) const;
191191
inline VisitedNodesHandler *getVisitedList() const;
192+
inline void returnVisitedList(VisitedNodesHandler *visited_nodes_handler) const;
192193
VecSimIndexInfo info() const override;
193194
VecSimInfoIterator *infoIterator() const override;
194195
bool preferAdHocSearch(size_t subsetSize, size_t k, bool initial_check) override;
@@ -379,7 +380,13 @@ idType HNSWIndex<DataType, DistType>::getEntryPointId() const {
379380

380381
template <typename DataType, typename DistType>
381382
VisitedNodesHandler *HNSWIndex<DataType, DistType>::getVisitedList() const {
382-
return visited_nodes_handler.get();
383+
return visited_nodes_handler_pool.getAvailableVisitedNodesHandler();
384+
}
385+
386+
template <typename DataType, typename DistType>
387+
void HNSWIndex<DataType, DistType>::returnVisitedList(
388+
VisitedNodesHandler *visited_nodes_handler) const {
389+
visited_nodes_handler_pool.returnVisitedNodesHandlerToPool(visited_nodes_handler);
383390
}
384391

385392
template <typename DataType, typename DistType>
@@ -461,7 +468,7 @@ template <typename DataType, typename DistType>
461468
template <bool has_marked_deleted, typename Identifier>
462469
DistType HNSWIndex<DataType, DistType>::processCandidate(
463470
idType curNodeId, const void *data_point, size_t layer, size_t ef, tag_t visited_tag,
464-
vecsim_stl::abstract_priority_queue<DistType, Identifier> &top_candidates,
471+
tag_t *elements_tags, vecsim_stl::abstract_priority_queue<DistType, Identifier> &top_candidates,
465472
candidatesMaxHeap<DistType> &candidate_set, DistType lowerBound) const {
466473

467474
#ifdef ENABLE_PARALLELIZATION
@@ -470,22 +477,21 @@ DistType HNSWIndex<DataType, DistType>::processCandidate(
470477
idType *node_links = get_linklist_at_level(curNodeId, layer);
471478
linkListSize links_num = getListCount(node_links);
472479

473-
__builtin_prefetch(visited_nodes_handler->getElementsTags() + *node_links);
480+
__builtin_prefetch(elements_tags + *node_links);
474481
__builtin_prefetch(getDataByInternalId(*node_links));
475482

476483
for (size_t j = 0; j < links_num; j++) {
477484
idType *candidate_pos = node_links + j;
478485
idType candidate_id = *candidate_pos;
479-
480-
// Pre-fetch the next candidate data into memory cache, to improve performance.
481486
idType *next_candidate_pos = node_links + j + 1;
482-
__builtin_prefetch(visited_nodes_handler->getElementsTags() + *next_candidate_pos);
487+
488+
__builtin_prefetch(elements_tags + *next_candidate_pos);
483489
__builtin_prefetch(getDataByInternalId(*next_candidate_pos));
484490

485-
if (this->visited_nodes_handler->getNodeTag(candidate_id) == visited_tag)
491+
if (elements_tags[candidate_id] == visited_tag)
486492
continue;
487493

488-
this->visited_nodes_handler->tagNode(candidate_id, visited_tag);
494+
elements_tags[candidate_id] = visited_tag;
489495
char *currObj1 = (getDataByInternalId(candidate_id));
490496

491497
DistType dist1 = this->dist_func(data_point, currObj1, this->dim);
@@ -516,7 +522,7 @@ template <typename DataType, typename DistType>
516522
template <bool has_marked_deleted>
517523
void HNSWIndex<DataType, DistType>::processCandidate_RangeSearch(
518524
idType curNodeId, const void *query_data, size_t layer, double epsilon, tag_t visited_tag,
519-
std::unique_ptr<vecsim_stl::abstract_results_container> &results,
525+
tag_t *elements_tags, std::unique_ptr<vecsim_stl::abstract_results_container> &results,
520526
candidatesMaxHeap<DistType> &candidate_set, DistType dyn_range, double radius) const {
521527

522528
#ifdef ENABLE_PARALLELIZATION
@@ -525,7 +531,7 @@ void HNSWIndex<DataType, DistType>::processCandidate_RangeSearch(
525531
idType *node_links = get_linklist_at_level(curNodeId, layer);
526532
linkListSize links_num = getListCount(node_links);
527533

528-
__builtin_prefetch(visited_nodes_handler->getElementsTags() + *node_links);
534+
__builtin_prefetch(elements_tags + *node_links);
529535
__builtin_prefetch(getDataByInternalId(*node_links));
530536

531537
// Cast radius once instead of each time we check that candidate_dist <= radius_
@@ -536,12 +542,12 @@ void HNSWIndex<DataType, DistType>::processCandidate_RangeSearch(
536542

537543
// Pre-fetch the next candidate data into memory cache, to improve performance.
538544
idType *next_candidate_pos = node_links + j + 1;
539-
__builtin_prefetch(visited_nodes_handler->getElementsTags() + *next_candidate_pos);
545+
__builtin_prefetch(elements_tags + *next_candidate_pos);
540546
__builtin_prefetch(getDataByInternalId(*next_candidate_pos));
541547

542-
if (this->visited_nodes_handler->getNodeTag(candidate_id) == visited_tag)
548+
if (elements_tags[candidate_id] == visited_tag)
543549
continue;
544-
this->visited_nodes_handler->tagNode(candidate_id, visited_tag);
550+
elements_tags[candidate_id] = visited_tag;
545551
char *candidate_data = getDataByInternalId(candidate_id);
546552

547553
DistType candidate_dist = this->dist_func(query_data, candidate_data, this->dim);
@@ -566,12 +572,8 @@ candidatesMaxHeap<DistType>
566572
HNSWIndex<DataType, DistType>::searchLayer(idType ep_id, const void *data_point, size_t layer,
567573
size_t ef) const {
568574

569-
#ifdef ENABLE_PARALLELIZATION
570-
this->visited_nodes_handler =
571-
this->visited_nodes_handler_pool->getAvailableVisitedNodesHandler();
572-
#endif
573-
574-
tag_t visited_tag = this->visited_nodes_handler->getFreshTag();
575+
auto *visited_nodes_handler = getVisitedList();
576+
tag_t visited_tag = visited_nodes_handler->getFreshTag();
575577

576578
candidatesMaxHeap<DistType> top_candidates(this->allocator);
577579
candidatesMaxHeap<DistType> candidate_set(this->allocator);
@@ -587,7 +589,7 @@ HNSWIndex<DataType, DistType>::searchLayer(idType ep_id, const void *data_point,
587589
candidate_set.emplace(-lowerBound, ep_id);
588590
}
589591

590-
this->visited_nodes_handler->tagNode(ep_id, visited_tag);
592+
visited_nodes_handler->tagNode(ep_id, visited_tag);
591593

592594
while (!candidate_set.empty()) {
593595
pair<DistType, idType> curr_el_pair = candidate_set.top();
@@ -596,14 +598,12 @@ HNSWIndex<DataType, DistType>::searchLayer(idType ep_id, const void *data_point,
596598
}
597599
candidate_set.pop();
598600

599-
lowerBound = processCandidate<has_marked_deleted>(curr_el_pair.second, data_point, layer,
600-
ef, visited_tag, top_candidates,
601-
candidate_set, lowerBound);
601+
lowerBound = processCandidate<has_marked_deleted>(
602+
curr_el_pair.second, data_point, layer, ef, visited_tag,
603+
visited_nodes_handler->getElementsTags(), top_candidates, candidate_set, lowerBound);
602604
}
605+
returnVisitedList(visited_nodes_handler);
603606

604-
#ifdef ENABLE_PARALLELIZATION
605-
visited_nodes_handler_pool->returnVisitedNodesHandlerToPool(this->visited_nodes_handler);
606-
#endif
607607
return top_candidates;
608608
}
609609

@@ -1025,7 +1025,8 @@ HNSWIndex<DataType, DistType>::HNSWIndex(const HNSWParams *params,
10251025
params->blockSize, params->multi),
10261026
VecSimIndexTombstone(), max_elements_(params->initialCapacity),
10271027
data_size_(VecSimType_sizeof(params->type) * this->dim),
1028-
element_levels_(max_elements_, allocator)
1028+
element_levels_(max_elements_, allocator),
1029+
visited_nodes_handler_pool(pool_initial_size, max_elements_, this->allocator)
10291030

10301031
#ifdef ENABLE_PARALLELIZATION
10311032
,
@@ -1046,15 +1047,6 @@ HNSWIndex<DataType, DistType>::HNSWIndex(const HNSWParams *params,
10461047

10471048
cur_element_count = 0;
10481049
num_marked_deleted = 0;
1049-
#ifdef ENABLE_PARALLELIZATION
1050-
pool_initial_size = pool_initial_size;
1051-
visited_nodes_handler_pool = std::unique_ptr<VisitedNodesHandlerPool>(
1052-
new (this->allocator)
1053-
VisitedNodesHandlerPool(pool_initial_size, max_elements_, this->allocator));
1054-
#else
1055-
visited_nodes_handler = std::shared_ptr<VisitedNodesHandler>(
1056-
new (this->allocator) VisitedNodesHandler(max_elements_, this->allocator));
1057-
#endif
10581050

10591051
// initializations for special treatment of the first node
10601052
entrypoint_node_ = HNSW_INVALID_ID;
@@ -1121,14 +1113,9 @@ void HNSWIndex<DataType, DistType>::resizeIndex(size_t new_max_elements) {
11211113
element_levels_.resize(new_max_elements);
11221114
element_levels_.shrink_to_fit();
11231115
resizeLabelLookup(new_max_elements);
1116+
visited_nodes_handler_pool.resize(new_max_elements);
11241117
#ifdef ENABLE_PARALLELIZATION
1125-
visited_nodes_handler_pool = std::unique_ptr<VisitedNodesHandlerPool>(
1126-
new (this->allocator)
1127-
VisitedNodesHandlerPool(this->pool_initial_size, new_max_elements, this->allocator));
11281118
std::vector<std::mutex>(new_max_elements).swap(link_list_locks_);
1129-
#else
1130-
visited_nodes_handler = std::shared_ptr<VisitedNodesHandler>(
1131-
new (this->allocator) VisitedNodesHandler(new_max_elements, this->allocator));
11321119
#endif
11331120
// Reallocate base layer
11341121
char *data_level0_memory_new = (char *)this->allocator->reallocate(
@@ -1391,12 +1378,8 @@ HNSWIndex<DataType, DistType>::searchBottomLayer_WithTimeout(idType ep_id, const
13911378
size_t ef, size_t k, void *timeoutCtx,
13921379
VecSimQueryResult_Code *rc) const {
13931380

1394-
#ifdef ENABLE_PARALLELIZATION
1395-
this->visited_nodes_handler =
1396-
this->visited_nodes_handler_pool->getAvailableVisitedNodesHandler();
1397-
#endif
1398-
1399-
tag_t visited_tag = this->visited_nodes_handler->getFreshTag();
1381+
auto *visited_nodes_handler = getVisitedList();
1382+
tag_t visited_tag = visited_nodes_handler->getFreshTag();
14001383

14011384
candidatesLabelsMaxHeap<DistType> *top_candidates = getNewMaxPriorityQueue();
14021385
candidatesMaxHeap<DistType> candidate_set(this->allocator);
@@ -1416,26 +1399,25 @@ HNSWIndex<DataType, DistType>::searchBottomLayer_WithTimeout(idType ep_id, const
14161399
candidate_set.emplace(-lowerBound, ep_id);
14171400
}
14181401

1419-
this->visited_nodes_handler->tagNode(ep_id, visited_tag);
1402+
visited_nodes_handler->tagNode(ep_id, visited_tag);
14201403

14211404
while (!candidate_set.empty()) {
14221405
pair<DistType, idType> curr_el_pair = candidate_set.top();
14231406
if ((-curr_el_pair.first) > lowerBound && top_candidates->size() >= ef) {
14241407
break;
14251408
}
14261409
if (VECSIM_TIMEOUT(timeoutCtx)) {
1410+
returnVisitedList(visited_nodes_handler);
14271411
*rc = VecSim_QueryResult_TimedOut;
14281412
return top_candidates;
14291413
}
14301414
candidate_set.pop();
14311415

1432-
lowerBound = processCandidate<has_marked_deleted>(curr_el_pair.second, data_point, 0, ef,
1433-
visited_tag, *top_candidates,
1434-
candidate_set, lowerBound);
1416+
lowerBound = processCandidate<has_marked_deleted>(
1417+
curr_el_pair.second, data_point, 0, ef, visited_tag,
1418+
visited_nodes_handler->getElementsTags(), *top_candidates, candidate_set, lowerBound);
14351419
}
1436-
#ifdef ENABLE_PARALLELIZATION
1437-
visited_nodes_handler_pool->returnVisitedNodesHandlerToPool(this->visited_nodes_handler);
1438-
#endif
1420+
returnVisitedList(visited_nodes_handler);
14391421
while (top_candidates->size() > k) {
14401422
top_candidates->pop();
14411423
}
@@ -1510,12 +1492,9 @@ VecSimQueryResult *HNSWIndex<DataType, DistType>::searchRangeBottomLayer_WithTim
15101492
*rc = VecSim_QueryResult_OK;
15111493
auto res_container = getNewResultsContainer(10); // arbitrary initial cap.
15121494

1513-
#ifdef ENABLE_PARALLELIZATION
1514-
this->visited_nodes_handler =
1515-
this->visited_nodes_handler_pool->getAvailableVisitedNodesHandler();
1516-
#endif
1495+
auto *visited_nodes_handler = getVisitedList();
1496+
tag_t visited_tag = visited_nodes_handler->getFreshTag();
15171497

1518-
tag_t visited_tag = this->visited_nodes_handler->getFreshTag();
15191498
candidatesMaxHeap<DistType> candidate_set(this->allocator);
15201499

15211500
// Set the initial effective-range to be at least the distance from the entry-point.
@@ -1537,7 +1516,7 @@ VecSimQueryResult *HNSWIndex<DataType, DistType>::searchRangeBottomLayer_WithTim
15371516
}
15381517

15391518
candidate_set.emplace(-ep_dist, ep_id);
1540-
this->visited_nodes_handler->tagNode(ep_id, visited_tag);
1519+
visited_nodes_handler->tagNode(ep_id, visited_tag);
15411520

15421521
// Cast radius once instead of each time we check that -curr_el_pair.first >= radius_.
15431522
DistType radius_ = DistType(radius);
@@ -1565,13 +1544,11 @@ VecSimQueryResult *HNSWIndex<DataType, DistType>::searchRangeBottomLayer_WithTim
15651544
// requested radius.
15661545
// Here we send the radius as double to match the function arguments type.
15671546
processCandidate_RangeSearch<has_marked_deleted>(
1568-
curr_el_pair.second, data_point, 0, epsilon, visited_tag, res_container, candidate_set,
1547+
curr_el_pair.second, data_point, 0, epsilon, visited_tag,
1548+
visited_nodes_handler->getElementsTags(), res_container, candidate_set,
15691549
dynamic_range_search_boundaries, radius);
15701550
}
1571-
1572-
#ifdef ENABLE_PARALLELIZATION
1573-
visited_nodes_handler_pool->returnVisitedNodesHandlerToPool(this->visited_nodes_handler);
1574-
#endif
1551+
returnVisitedList(visited_nodes_handler);
15751552
return res_container->get_results();
15761553
}
15771554

@@ -1642,6 +1619,7 @@ VecSimIndexInfo HNSWIndex<DataType, DistType>::info() const {
16421619
info.hnswInfo.entrypoint = this->getEntryPointLabel();
16431620
info.hnswInfo.memory = this->getAllocationSize();
16441621
info.hnswInfo.last_mode = this->last_mode;
1622+
info.hnswInfo.visitedNodesPoolSize = this->visited_nodes_handler_pool.getPoolSize();
16451623
return info;
16461624
}
16471625

src/VecSim/algorithms/hnsw/hnsw_batch_iterator.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class HNSW_BatchIterator : public VecSimBatchIterator {
6262

6363
void reset() override;
6464

65-
~HNSW_BatchIterator() override {}
65+
virtual ~HNSW_BatchIterator() { index->returnVisitedList(this->visited_list); }
6666
};
6767

6868
/******************** Ctor / Dtor **************/
@@ -242,6 +242,8 @@ template <typename DataType, typename DistType>
242242
void HNSW_BatchIterator<DataType, DistType>::reset() {
243243
this->resetResultsCount();
244244
this->depleted = false;
245+
246+
// Reset the visited nodes handler.
245247
this->visited_tag = this->visited_list->getFreshTag();
246248
this->lower_bound = std::numeric_limits<DistType>::infinity();
247249
// Clear the queues.

src/VecSim/algorithms/hnsw/hnsw_factory.cpp

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,11 @@ size_t EstimateInitialSize(const HNSWParams *params) {
5252
est += EstimateInitialSize_ChooseMultiOrSingle<double>(params->multi);
5353
}
5454

55-
#ifdef ENABLE_PARALLELIZATION
56-
// Used for synchronization only when parallel indexing / searching is enabled.
57-
est += sizeof(VisitedNodesHandlerPool) + sizeof(size_t);
58-
#else
55+
// Account for the visited nodes pool (assume that it holds one pointer to a handler).
5956
est += sizeof(VisitedNodesHandler) + sizeof(size_t);
60-
est += sizeof(tag_t) * params->initialCapacity + sizeof(size_t); // visited nodes
61-
#endif
57+
// The visited nodes pool inner vector buffer (contains one pointer).
58+
est += sizeof(void *) + sizeof(size_t);
59+
est += sizeof(tag_t) * params->initialCapacity + sizeof(size_t); // visited nodes array
6260

6361
// Implicit allocation calls - allocates memory + a header only with positive capacity.
6462
if (params->initialCapacity) {

src/VecSim/algorithms/hnsw/hnsw_serializer.h

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ HNSWIndex<DataType, DistType>::HNSWIndex(std::ifstream &input, const HNSWParams
77
: VecSimIndexAbstract<DistType>(allocator, params->dim, params->type, params->metric,
88
params->blockSize, params->multi),
99
Serializer(version), max_elements_(params->initialCapacity), epsilon_(params->epsilon),
10-
element_levels_(max_elements_, allocator) {
10+
element_levels_(max_elements_, allocator),
11+
visited_nodes_handler_pool(1, max_elements_, allocator) {
1112

1213
this->restoreIndexFields(input);
1314
this->fieldsValidation();
@@ -17,16 +18,6 @@ HNSWIndex<DataType, DistType>::HNSWIndex(std::ifstream &input, const HNSWParams
1718
// levels value than the loaded index.
1819
level_generator_.seed(200);
1920

20-
#ifdef ENABLE_PARALLELIZATION
21-
this->pool_initial_size = 1;
22-
this->visited_nodes_handler_pool = std::unique_ptr<VisitedNodesHandlerPool>(
23-
new (this->allocator)
24-
VisitedNodesHandlerPool(this->pool_initial_size, max_elements_, this->allocator));
25-
#else
26-
this->visited_nodes_handler = std::unique_ptr<VisitedNodesHandler>(
27-
new (this->allocator) VisitedNodesHandler(max_elements_, this->allocator));
28-
#endif
29-
3021
data_level0_memory_ =
3122
(char *)this->allocator->callocate(max_elements_ * size_data_per_element_);
3223
if (data_level0_memory_ == nullptr)

0 commit comments

Comments
 (0)