Skip to content

Commit 627b198

Browse files
authored
Support parallel graph scans in HNSW - MOD-4318 (#311) (#321)
* Add support for the visited nodes handler pool as default. * forcing batch iterator to be destructed *after* the index itself in python bindings by using a shared pointer of the index. * use vector instead of deque for the visited nodes handler pool * making allocator bookkeeping atomic
1 parent 8e6b203 commit 627b198

File tree

12 files changed

+359
-138
lines changed

12 files changed

+359
-138
lines changed

src/VecSim/algorithms/hnsw/hnsw.h

Lines changed: 49 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,11 @@ class HNSWIndex : public VecSimIndexAbstract<DistType>,
9191
char *data_level0_memory_;
9292
char **linkLists_;
9393
vecsim_stl::vector<size_t> element_levels_;
94-
std::shared_ptr<VisitedNodesHandler> visited_nodes_handler;
9594

96-
// used for synchronization only when parallel indexing / searching is enabled.
95+
// Used for marking the visited nodes in graph scans (the pool supports parallel graph scans).
96+
// This is mutable since the object changes upon search operations as well (which are const).
97+
mutable VisitedNodesHandlerPool visited_nodes_handler_pool;
9798
#ifdef ENABLE_PARALLELIZATION
98-
std::unique_ptr<VisitedNodesHandlerPool> visited_nodes_handler_pool;
99-
size_t pool_initial_size;
10099
std::mutex global;
101100
std::mutex cur_element_count_guard_;
102101
std::vector<std::mutex> link_list_locks_;
@@ -126,12 +125,13 @@ class HNSWIndex : public VecSimIndexAbstract<DistType>,
126125
template <bool has_marked_deleted, typename Identifier> // Either idType or labelType
127126
inline DistType
128127
processCandidate(idType curNodeId, const void *data_point, size_t layer, size_t ef,
129-
tag_t visited_tag,
128+
tag_t visited_tag, tag_t *elements_tags,
130129
vecsim_stl::abstract_priority_queue<DistType, Identifier> &top_candidates,
131130
candidatesMaxHeap<DistType> &candidates_set, DistType lowerBound) const;
132131
template <bool has_marked_deleted>
133132
inline void processCandidate_RangeSearch(
134133
idType curNodeId, const void *data_point, size_t layer, double epsilon, tag_t visited_tag,
134+
tag_t *elements_tags,
135135
std::unique_ptr<vecsim_stl::abstract_results_container> &top_candidates,
136136
candidatesMaxHeap<DistType> &candidate_set, DistType lowerBound, double radius) const;
137137
template <bool has_marked_deleted>
@@ -189,6 +189,7 @@ class HNSWIndex : public VecSimIndexAbstract<DistType>,
189189
inline labelType getEntryPointLabel() const;
190190
inline labelType getExternalLabel(idType internal_id) const;
191191
inline VisitedNodesHandler *getVisitedList() const;
192+
inline void returnVisitedList(VisitedNodesHandler *visited_nodes_handler) const;
192193
VecSimIndexInfo info() const override;
193194
VecSimInfoIterator *infoIterator() const override;
194195
bool preferAdHocSearch(size_t subsetSize, size_t k, bool initial_check) override;
@@ -379,7 +380,13 @@ idType HNSWIndex<DataType, DistType>::getEntryPointId() const {
379380

380381
template <typename DataType, typename DistType>
381382
VisitedNodesHandler *HNSWIndex<DataType, DistType>::getVisitedList() const {
382-
return visited_nodes_handler.get();
383+
return visited_nodes_handler_pool.getAvailableVisitedNodesHandler();
384+
}
385+
386+
template <typename DataType, typename DistType>
387+
void HNSWIndex<DataType, DistType>::returnVisitedList(
388+
VisitedNodesHandler *visited_nodes_handler) const {
389+
visited_nodes_handler_pool.returnVisitedNodesHandlerToPool(visited_nodes_handler);
383390
}
384391

385392
template <typename DataType, typename DistType>
@@ -451,7 +458,7 @@ template <typename DataType, typename DistType>
451458
template <bool has_marked_deleted, typename Identifier>
452459
DistType HNSWIndex<DataType, DistType>::processCandidate(
453460
idType curNodeId, const void *data_point, size_t layer, size_t ef, tag_t visited_tag,
454-
vecsim_stl::abstract_priority_queue<DistType, Identifier> &top_candidates,
461+
tag_t *elements_tags, vecsim_stl::abstract_priority_queue<DistType, Identifier> &top_candidates,
455462
candidatesMaxHeap<DistType> &candidate_set, DistType lowerBound) const {
456463

457464
#ifdef ENABLE_PARALLELIZATION
@@ -460,22 +467,21 @@ DistType HNSWIndex<DataType, DistType>::processCandidate(
460467
idType *node_links = get_linklist_at_level(curNodeId, layer);
461468
linkListSize links_num = getListCount(node_links);
462469

463-
__builtin_prefetch(visited_nodes_handler->getElementsTags() + *node_links);
470+
__builtin_prefetch(elements_tags + *node_links);
464471
__builtin_prefetch(getDataByInternalId(*node_links));
465472

466473
for (size_t j = 0; j < links_num; j++) {
467474
idType *candidate_pos = node_links + j;
468475
idType candidate_id = *candidate_pos;
469-
470-
// Pre-fetch the next candidate data into memory cache, to improve performance.
471476
idType *next_candidate_pos = node_links + j + 1;
472-
__builtin_prefetch(visited_nodes_handler->getElementsTags() + *next_candidate_pos);
477+
478+
__builtin_prefetch(elements_tags + *next_candidate_pos);
473479
__builtin_prefetch(getDataByInternalId(*next_candidate_pos));
474480

475-
if (this->visited_nodes_handler->getNodeTag(candidate_id) == visited_tag)
481+
if (elements_tags[candidate_id] == visited_tag)
476482
continue;
477483

478-
this->visited_nodes_handler->tagNode(candidate_id, visited_tag);
484+
elements_tags[candidate_id] = visited_tag;
479485
char *currObj1 = (getDataByInternalId(candidate_id));
480486

481487
DistType dist1 = this->dist_func(data_point, currObj1, this->dim);
@@ -506,7 +512,7 @@ template <typename DataType, typename DistType>
506512
template <bool has_marked_deleted>
507513
void HNSWIndex<DataType, DistType>::processCandidate_RangeSearch(
508514
idType curNodeId, const void *query_data, size_t layer, double epsilon, tag_t visited_tag,
509-
std::unique_ptr<vecsim_stl::abstract_results_container> &results,
515+
tag_t *elements_tags, std::unique_ptr<vecsim_stl::abstract_results_container> &results,
510516
candidatesMaxHeap<DistType> &candidate_set, DistType dyn_range, double radius) const {
511517

512518
#ifdef ENABLE_PARALLELIZATION
@@ -515,7 +521,7 @@ void HNSWIndex<DataType, DistType>::processCandidate_RangeSearch(
515521
idType *node_links = get_linklist_at_level(curNodeId, layer);
516522
linkListSize links_num = getListCount(node_links);
517523

518-
__builtin_prefetch(visited_nodes_handler->getElementsTags() + *node_links);
524+
__builtin_prefetch(elements_tags + *node_links);
519525
__builtin_prefetch(getDataByInternalId(*node_links));
520526

521527
// Cast radius once instead of each time we check that candidate_dist <= radius_
@@ -526,12 +532,12 @@ void HNSWIndex<DataType, DistType>::processCandidate_RangeSearch(
526532

527533
// Pre-fetch the next candidate data into memory cache, to improve performance.
528534
idType *next_candidate_pos = node_links + j + 1;
529-
__builtin_prefetch(visited_nodes_handler->getElementsTags() + *next_candidate_pos);
535+
__builtin_prefetch(elements_tags + *next_candidate_pos);
530536
__builtin_prefetch(getDataByInternalId(*next_candidate_pos));
531537

532-
if (this->visited_nodes_handler->getNodeTag(candidate_id) == visited_tag)
538+
if (elements_tags[candidate_id] == visited_tag)
533539
continue;
534-
this->visited_nodes_handler->tagNode(candidate_id, visited_tag);
540+
elements_tags[candidate_id] = visited_tag;
535541
char *candidate_data = getDataByInternalId(candidate_id);
536542

537543
DistType candidate_dist = this->dist_func(query_data, candidate_data, this->dim);
@@ -556,12 +562,8 @@ candidatesMaxHeap<DistType>
556562
HNSWIndex<DataType, DistType>::searchLayer(idType ep_id, const void *data_point, size_t layer,
557563
size_t ef) const {
558564

559-
#ifdef ENABLE_PARALLELIZATION
560-
this->visited_nodes_handler =
561-
this->visited_nodes_handler_pool->getAvailableVisitedNodesHandler();
562-
#endif
563-
564-
tag_t visited_tag = this->visited_nodes_handler->getFreshTag();
565+
auto *visited_nodes_handler = getVisitedList();
566+
tag_t visited_tag = visited_nodes_handler->getFreshTag();
565567

566568
candidatesMaxHeap<DistType> top_candidates(this->allocator);
567569
candidatesMaxHeap<DistType> candidate_set(this->allocator);
@@ -577,7 +579,7 @@ HNSWIndex<DataType, DistType>::searchLayer(idType ep_id, const void *data_point,
577579
candidate_set.emplace(-lowerBound, ep_id);
578580
}
579581

580-
this->visited_nodes_handler->tagNode(ep_id, visited_tag);
582+
visited_nodes_handler->tagNode(ep_id, visited_tag);
581583

582584
while (!candidate_set.empty()) {
583585
pair<DistType, idType> curr_el_pair = candidate_set.top();
@@ -586,14 +588,12 @@ HNSWIndex<DataType, DistType>::searchLayer(idType ep_id, const void *data_point,
586588
}
587589
candidate_set.pop();
588590

589-
lowerBound = processCandidate<has_marked_deleted>(curr_el_pair.second, data_point, layer,
590-
ef, visited_tag, top_candidates,
591-
candidate_set, lowerBound);
591+
lowerBound = processCandidate<has_marked_deleted>(
592+
curr_el_pair.second, data_point, layer, ef, visited_tag,
593+
visited_nodes_handler->getElementsTags(), top_candidates, candidate_set, lowerBound);
592594
}
595+
returnVisitedList(visited_nodes_handler);
593596

594-
#ifdef ENABLE_PARALLELIZATION
595-
visited_nodes_handler_pool->returnVisitedNodesHandlerToPool(this->visited_nodes_handler);
596-
#endif
597597
return top_candidates;
598598
}
599599

@@ -1001,14 +1001,9 @@ void HNSWIndex<DataType, DistType>::resizeIndexInternal(size_t new_max_elements)
10011001
element_levels_.resize(new_max_elements);
10021002
element_levels_.shrink_to_fit();
10031003
resizeLabelLookup(new_max_elements);
1004+
visited_nodes_handler_pool.resize(new_max_elements);
10041005
#ifdef ENABLE_PARALLELIZATION
1005-
visited_nodes_handler_pool = std::unique_ptr<VisitedNodesHandlerPool>(
1006-
new (this->allocator)
1007-
VisitedNodesHandlerPool(this->pool_initial_size, new_max_elements, this->allocator));
10081006
std::vector<std::mutex>(new_max_elements).swap(link_list_locks_);
1009-
#else
1010-
visited_nodes_handler = std::shared_ptr<VisitedNodesHandler>(
1011-
new (this->allocator) VisitedNodesHandler(new_max_elements, this->allocator));
10121007
#endif
10131008
// Reallocate base layer
10141009
char *data_level0_memory_new = (char *)this->allocator->reallocate(
@@ -1049,7 +1044,8 @@ HNSWIndex<DataType, DistType>::HNSWIndex(const HNSWParams *params,
10491044
params->blockSize, params->multi),
10501045
VecSimIndexTombstone(), max_elements_(params->initialCapacity),
10511046
data_size_(VecSimType_sizeof(params->type) * this->dim),
1052-
element_levels_(max_elements_, allocator)
1047+
element_levels_(max_elements_, allocator),
1048+
visited_nodes_handler_pool(pool_initial_size, max_elements_, this->allocator)
10531049

10541050
#ifdef ENABLE_PARALLELIZATION
10551051
,
@@ -1070,15 +1066,6 @@ HNSWIndex<DataType, DistType>::HNSWIndex(const HNSWParams *params,
10701066

10711067
cur_element_count = 0;
10721068
num_marked_deleted = 0;
1073-
#ifdef ENABLE_PARALLELIZATION
1074-
pool_initial_size = pool_initial_size;
1075-
visited_nodes_handler_pool = std::unique_ptr<VisitedNodesHandlerPool>(
1076-
new (this->allocator)
1077-
VisitedNodesHandlerPool(pool_initial_size, max_elements_, this->allocator));
1078-
#else
1079-
visited_nodes_handler = std::shared_ptr<VisitedNodesHandler>(
1080-
new (this->allocator) VisitedNodesHandler(max_elements_, this->allocator));
1081-
#endif
10821069

10831070
// initializations for special treatment of the first node
10841071
entrypoint_node_ = HNSW_INVALID_ID;
@@ -1384,12 +1371,8 @@ HNSWIndex<DataType, DistType>::searchBottomLayer_WithTimeout(idType ep_id, const
13841371
size_t ef, size_t k, void *timeoutCtx,
13851372
VecSimQueryResult_Code *rc) const {
13861373

1387-
#ifdef ENABLE_PARALLELIZATION
1388-
this->visited_nodes_handler =
1389-
this->visited_nodes_handler_pool->getAvailableVisitedNodesHandler();
1390-
#endif
1391-
1392-
tag_t visited_tag = this->visited_nodes_handler->getFreshTag();
1374+
auto *visited_nodes_handler = getVisitedList();
1375+
tag_t visited_tag = visited_nodes_handler->getFreshTag();
13931376

13941377
candidatesLabelsMaxHeap<DistType> *top_candidates = getNewMaxPriorityQueue();
13951378
candidatesMaxHeap<DistType> candidate_set(this->allocator);
@@ -1409,26 +1392,25 @@ HNSWIndex<DataType, DistType>::searchBottomLayer_WithTimeout(idType ep_id, const
14091392
candidate_set.emplace(-lowerBound, ep_id);
14101393
}
14111394

1412-
this->visited_nodes_handler->tagNode(ep_id, visited_tag);
1395+
visited_nodes_handler->tagNode(ep_id, visited_tag);
14131396

14141397
while (!candidate_set.empty()) {
14151398
pair<DistType, idType> curr_el_pair = candidate_set.top();
14161399
if ((-curr_el_pair.first) > lowerBound && top_candidates->size() >= ef) {
14171400
break;
14181401
}
14191402
if (VECSIM_TIMEOUT(timeoutCtx)) {
1403+
returnVisitedList(visited_nodes_handler);
14201404
*rc = VecSim_QueryResult_TimedOut;
14211405
return top_candidates;
14221406
}
14231407
candidate_set.pop();
14241408

1425-
lowerBound = processCandidate<has_marked_deleted>(curr_el_pair.second, data_point, 0, ef,
1426-
visited_tag, *top_candidates,
1427-
candidate_set, lowerBound);
1409+
lowerBound = processCandidate<has_marked_deleted>(
1410+
curr_el_pair.second, data_point, 0, ef, visited_tag,
1411+
visited_nodes_handler->getElementsTags(), *top_candidates, candidate_set, lowerBound);
14281412
}
1429-
#ifdef ENABLE_PARALLELIZATION
1430-
visited_nodes_handler_pool->returnVisitedNodesHandlerToPool(this->visited_nodes_handler);
1431-
#endif
1413+
returnVisitedList(visited_nodes_handler);
14321414
while (top_candidates->size() > k) {
14331415
top_candidates->pop();
14341416
}
@@ -1503,12 +1485,9 @@ VecSimQueryResult *HNSWIndex<DataType, DistType>::searchRangeBottomLayer_WithTim
15031485
*rc = VecSim_QueryResult_OK;
15041486
auto res_container = getNewResultsContainer(10); // arbitrary initial cap.
15051487

1506-
#ifdef ENABLE_PARALLELIZATION
1507-
this->visited_nodes_handler =
1508-
this->visited_nodes_handler_pool->getAvailableVisitedNodesHandler();
1509-
#endif
1488+
auto *visited_nodes_handler = getVisitedList();
1489+
tag_t visited_tag = visited_nodes_handler->getFreshTag();
15101490

1511-
tag_t visited_tag = this->visited_nodes_handler->getFreshTag();
15121491
candidatesMaxHeap<DistType> candidate_set(this->allocator);
15131492

15141493
// Set the initial effective-range to be at least the distance from the entry-point.
@@ -1530,7 +1509,7 @@ VecSimQueryResult *HNSWIndex<DataType, DistType>::searchRangeBottomLayer_WithTim
15301509
}
15311510

15321511
candidate_set.emplace(-ep_dist, ep_id);
1533-
this->visited_nodes_handler->tagNode(ep_id, visited_tag);
1512+
visited_nodes_handler->tagNode(ep_id, visited_tag);
15341513

15351514
// Cast radius once instead of each time we check that -curr_el_pair.first >= radius_.
15361515
DistType radius_ = DistType(radius);
@@ -1558,13 +1537,11 @@ VecSimQueryResult *HNSWIndex<DataType, DistType>::searchRangeBottomLayer_WithTim
15581537
// requested radius.
15591538
// Here we send the radius as double to match the function arguments type.
15601539
processCandidate_RangeSearch<has_marked_deleted>(
1561-
curr_el_pair.second, data_point, 0, epsilon, visited_tag, res_container, candidate_set,
1540+
curr_el_pair.second, data_point, 0, epsilon, visited_tag,
1541+
visited_nodes_handler->getElementsTags(), res_container, candidate_set,
15621542
dynamic_range_search_boundaries, radius);
15631543
}
1564-
1565-
#ifdef ENABLE_PARALLELIZATION
1566-
visited_nodes_handler_pool->returnVisitedNodesHandlerToPool(this->visited_nodes_handler);
1567-
#endif
1544+
returnVisitedList(visited_nodes_handler);
15681545
return res_container->get_results();
15691546
}
15701547

@@ -1635,6 +1612,7 @@ VecSimIndexInfo HNSWIndex<DataType, DistType>::info() const {
16351612
info.hnswInfo.entrypoint = this->getEntryPointLabel();
16361613
info.hnswInfo.memory = this->getAllocationSize();
16371614
info.hnswInfo.last_mode = this->last_mode;
1615+
info.hnswInfo.visitedNodesPoolSize = this->visited_nodes_handler_pool.getPoolSize();
16381616
return info;
16391617
}
16401618

src/VecSim/algorithms/hnsw/hnsw_batch_iterator.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class HNSW_BatchIterator : public VecSimBatchIterator {
6262

6363
void reset() override;
6464

65-
~HNSW_BatchIterator() override {}
65+
virtual ~HNSW_BatchIterator() { index->returnVisitedList(this->visited_list); }
6666
};
6767

6868
/******************** Ctor / Dtor **************/
@@ -242,6 +242,8 @@ template <typename DataType, typename DistType>
242242
void HNSW_BatchIterator<DataType, DistType>::reset() {
243243
this->resetResultsCount();
244244
this->depleted = false;
245+
246+
// Reset the visited nodes handler.
245247
this->visited_tag = this->visited_list->getFreshTag();
246248
this->lower_bound = std::numeric_limits<DistType>::infinity();
247249
// Clear the queues.

src/VecSim/algorithms/hnsw/hnsw_factory.cpp

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,11 @@ size_t EstimateInitialSize(const HNSWParams *params) {
5252
est += EstimateInitialSize_ChooseMultiOrSingle<double>(params->multi);
5353
}
5454

55-
#ifdef ENABLE_PARALLELIZATION
56-
// Used for synchronization only when parallel indexing / searching is enabled.
57-
est += sizeof(VisitedNodesHandlerPool) + sizeof(size_t);
58-
#else
55+
// Account for the visited nodes pool (assume that it holds one pointer to a handler).
5956
est += sizeof(VisitedNodesHandler) + sizeof(size_t);
60-
est += sizeof(tag_t) * params->initialCapacity + sizeof(size_t); // visited nodes
61-
#endif
57+
// The visited nodes pool inner vector buffer (contains one pointer).
58+
est += sizeof(void *) + sizeof(size_t);
59+
est += sizeof(tag_t) * params->initialCapacity + sizeof(size_t); // visited nodes array
6260

6361
// Implicit allocation calls - allocates memory + a header only with positive capacity.
6462
if (params->initialCapacity) {

src/VecSim/algorithms/hnsw/hnsw_serializer.h

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ HNSWIndex<DataType, DistType>::HNSWIndex(std::ifstream &input, const HNSWParams
77
: VecSimIndexAbstract<DistType>(allocator, params->dim, params->type, params->metric,
88
params->blockSize, params->multi),
99
Serializer(version), max_elements_(params->initialCapacity), epsilon_(params->epsilon),
10-
element_levels_(max_elements_, allocator) {
10+
element_levels_(max_elements_, allocator),
11+
visited_nodes_handler_pool(1, max_elements_, allocator) {
1112

1213
this->restoreIndexFields(input);
1314
this->fieldsValidation();
@@ -17,16 +18,6 @@ HNSWIndex<DataType, DistType>::HNSWIndex(std::ifstream &input, const HNSWParams
1718
// levels value than the loaded index.
1819
level_generator_.seed(200);
1920

20-
#ifdef ENABLE_PARALLELIZATION
21-
this->pool_initial_size = 1;
22-
this->visited_nodes_handler_pool = std::unique_ptr<VisitedNodesHandlerPool>(
23-
new (this->allocator)
24-
VisitedNodesHandlerPool(this->pool_initial_size, max_elements_, this->allocator));
25-
#else
26-
this->visited_nodes_handler = std::unique_ptr<VisitedNodesHandler>(
27-
new (this->allocator) VisitedNodesHandler(max_elements_, this->allocator));
28-
#endif
29-
3021
data_level0_memory_ =
3122
(char *)this->allocator->callocate(max_elements_ * size_data_per_element_);
3223
if (data_level0_memory_ == nullptr)

0 commit comments

Comments
 (0)