Skip to content

Commit 1caa792

Browse files
[MOD-11765] streaming merge bench (#838)
1 parent dd4a4bc commit 1caa792

File tree

18 files changed

+780
-500
lines changed

18 files changed

+780
-500
lines changed

src/VecSim/algorithms/hnsw/hnsw_disk.h

Lines changed: 328 additions & 410 deletions
Large diffs are not rendered by default.

src/VecSim/algorithms/hnsw/hnsw_disk_serializer.h

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ HNSWDiskIndex<DataType, DistType>::HNSWDiskIndex(
5656
: VecSimIndexAbstract<DataType, DistType>(abstractInitParams, components),
5757
idToMetaData(this->allocator), labelToIdMap(this->allocator), db(db), cf(cf), dbPath(""),
5858
indexDataGuard(), visitedNodesHandlerPool(INITIAL_CAPACITY, this->allocator),
59-
delta_list(), new_elements_meta_data(this->allocator), batchThreshold(0), // Will be restored from file
59+
new_elements_meta_data(this->allocator), batchThreshold(0), // Will be restored from file
6060
pendingVectorIds(this->allocator), pendingMetadata(this->allocator),
6161
pendingVectorCount(0), pendingDeleteIds(this->allocator),
6262
stagedInsertUpdates(this->allocator),
@@ -214,14 +214,14 @@ template <typename DataType, typename DistType>
214214
void HNSWDiskIndex<DataType, DistType>::restoreVectors(std::ifstream &input, EncodingVersion version) {
215215
// #ifdef HNSW_DISK_SERIALIZE_VECTORS_TO_FILE
216216
// NEW METHOD: Load vectors from metadata file
217-
this->log(VecSimCommonStrings::LOG_VERBOSE_STRING,
218-
"Loading vectors from metadata file (HNSW_DISK_SERIALIZE_VECTORS_TO_FILE enabled)");
219-
restoreVectorsFromFile(input, version);
217+
// this->log(VecSimCommonStrings::LOG_VERBOSE_STRING,
218+
// "Loading vectors from metadata file (HNSW_DISK_SERIALIZE_VECTORS_TO_FILE enabled)");
219+
// restoreVectorsFromFile(input, version);
220220
// #else
221-
// // CURRENT METHOD: Load vectors from RocksDB (default for backward compatibility)
222-
// this->log(VecSimCommonStrings::LOG_VERBOSE_STRING,
223-
// "Loading vectors from RocksDB checkpoint (default method)");
224-
// restoreVectorsFromRocksDB(version);
221+
// CURRENT METHOD: Load vectors from RocksDB (default for backward compatibility)
222+
this->log(VecSimCommonStrings::LOG_VERBOSE_STRING,
223+
"Loading vectors from RocksDB checkpoint (default method)");
224+
restoreVectorsFromRocksDB(version);
225225
// #endif
226226
}
227227

@@ -287,11 +287,7 @@ void HNSWDiskIndex<DataType, DistType>::saveIndexIMP(std::ofstream &output) {
287287
if (pendingDeleteIds.size() != 0) {
288288
throw std::runtime_error("Serialization error: pendingDeleteIds not empty after flush");
289289
}
290-
// Note: delta_list and new_elements_meta_data are currently unused legacy variables
291-
// but we verify them for future-proofing
292-
if (!delta_list.empty()) {
293-
throw std::runtime_error("Serialization error: delta_list not empty after flush");
294-
}
290+
295291
if (!new_elements_meta_data.empty()) {
296292
throw std::runtime_error("Serialization error: new_elements_meta_data not empty after flush");
297293
}

src/VecSim/index_factories/components/components_factory.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,10 @@ CreateQuantizedIndexComponents(std::shared_ptr<VecSimAllocator> allocator, VecSi
5959
// Use INT8 distance function for quantized vectors
6060
spaces::dist_func_t<DistType> distFunc =
6161
spaces::GetDistFunc<int8_t, DistType>(distance_metric, dim, &alignment);
62+
spaces::dist_func_t<DistType> rawDistFunc =
63+
spaces::GetDistFunc<DataType, DistType>(distance_metric, dim, &alignment);
6264

63-
auto indexCalculator = new (allocator) DistanceCalculatorCommon<DistType>(allocator, distFunc);
65+
auto indexCalculator = new (allocator) DistanceCalculatorQuantized<DistType>(allocator, distFunc, rawDistFunc);
6466

6567
// Create preprocessor container with space for 2 preprocessors (normalization + quantization)
6668
auto preprocessors =

src/VecSim/index_factories/hnsw_disk_factory.cpp

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -191,12 +191,12 @@ class ManagedRocksDB {
191191
std::string(e.what()));
192192
}
193193

194-
// Open RocksDB from the temp checkpoint copy
195-
// All writes (WAL, SST, MANIFEST, etc.) will go to the temp location
196-
rocksdb::Options options;
197-
options.create_if_missing = false; // Checkpoint copy should exist
198-
options.error_if_exists = false;
199-
options.statistics = rocksdb::CreateDBStatistics();
194+
// Open RocksDB from the temp checkpoint copy
195+
// All writes (WAL, SST, MANIFEST, etc.) will go to the temp location
196+
rocksdb::Options options;
197+
options.create_if_missing = false; // Checkpoint copy should exist
198+
options.error_if_exists = false;
199+
options.statistics = rocksdb::CreateDBStatistics();
200200

201201
rocksdb::DB *db_ptr = nullptr;
202202
rocksdb::Status status = rocksdb::DB::Open(options, temp_checkpoint, &db_ptr);
@@ -223,10 +223,10 @@ class ManagedRocksDB {
223223
return instance;
224224
}
225225

226-
// Destructor: closes DB and optionally cleans up temp directory
226+
// Destructor: closes DB and optionally cleans up temp directory
227227
~ManagedRocksDB() {
228-
// Close DB first (unique_ptr handles this automatically)
229-
db.reset();
228+
// Close DB first (unique_ptr handles this automatically)
229+
db.reset();
230230

231231
// Delete temp directory only if it's actually temporary
232232
if (cleanup_temp_dir && !temp_dir.empty() && std::filesystem::exists(temp_dir)) {
@@ -257,8 +257,7 @@ static std::unique_ptr<ManagedRocksDB> managed_rocksdb;
257257
static AbstractIndexInitParams NewAbstractInitParams(const VecSimParams *params) {
258258
const HNSWParams *hnswParams = &params->algoParams.hnswParams;
259259

260-
size_t dataSize =
261-
VecSimParams_GetDataSize(hnswParams->type, hnswParams->dim, hnswParams->metric);
260+
size_t dataSize = hnswParams->dim * sizeof(int8_t); // Quantized storage
262261
AbstractIndexInitParams abstractInitParams = {.allocator =
263262
VecSimAllocator::newVecsimAllocator(),
264263
.dim = hnswParams->dim,

src/VecSim/spaces/computer/calculator.h

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class IndexCalculatorInterface : public VecsimBaseObject {
2323
virtual ~IndexCalculatorInterface() = default;
2424

2525
virtual DistType calcDistance(const void *v1, const void *v2, size_t dim) const = 0;
26+
virtual DistType calcDistanceRaw(const void *v1, const void *v2, size_t dim) const = 0;
2627
};
2728

2829
/**
@@ -39,7 +40,7 @@ class DistanceCalculatorInterface : public IndexCalculatorInterface<DistType> {
3940
DistanceCalculatorInterface(std::shared_ptr<VecSimAllocator> allocator, DistFuncType dist_func)
4041
: IndexCalculatorInterface<DistType>(allocator), dist_func(dist_func) {}
4142
virtual DistType calcDistance(const void *v1, const void *v2, size_t dim) const = 0;
42-
43+
virtual DistType calcDistanceRaw(const void *v1, const void *v2, size_t dim) const = 0;
4344
protected:
4445
DistFuncType dist_func;
4546
};
@@ -56,4 +57,28 @@ class DistanceCalculatorCommon
5657
DistType calcDistance(const void *v1, const void *v2, size_t dim) const override {
5758
return this->dist_func(v1, v2, dim);
5859
}
60+
DistType calcDistanceRaw(const void *v1, const void *v2, size_t dim) const override {
61+
return this->dist_func(v1, v2, dim);
62+
}
63+
};
64+
65+
template <typename DistType>
66+
class DistanceCalculatorQuantized
67+
: public DistanceCalculatorInterface<DistType, spaces::dist_func_t<DistType>> {
68+
protected:
69+
spaces::dist_func_t<DistType> raw_dist_func;
70+
71+
public:
72+
DistanceCalculatorQuantized(std::shared_ptr<VecSimAllocator> allocator,
73+
spaces::dist_func_t<DistType> quant_dist_func, spaces::dist_func_t<DistType> raw_dist_func)
74+
: DistanceCalculatorInterface<DistType, spaces::dist_func_t<DistType>>(allocator,
75+
quant_dist_func),
76+
raw_dist_func(raw_dist_func) {}
77+
78+
DistType calcDistance(const void *v1, const void *v2, size_t dim) const override {
79+
return this->dist_func(v1, v2, dim);
80+
}
81+
DistType calcDistanceRaw(const void *v1, const void *v2, size_t dim) const {
82+
return this->raw_dist_func(v1, v2, dim) * 16129; // multiply by 127^2
83+
}
5984
};

src/VecSim/spaces/computer/preprocessors.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <memory>
1414
#include <cassert>
1515
#include <cmath>
16+
#include <iostream>
1617

1718
#include "VecSim/memory/vecsim_base.h"
1819
#include "VecSim/spaces/spaces.h"
@@ -216,5 +217,10 @@ class ScalarQuantizationPreprocessor : public PreprocessorInterface {
216217
output_vec[i] = static_cast<int8_t>(std::round(scaled));
217218
}
218219
}
220+
221+
// std::cout << "quantized_0: " << static_cast<int>(output_vec[0]) << std::endl;
222+
// std::cout << "original_0: " << input_vec[0] << std::endl;
223+
// std::cout << "quantized_n: " << static_cast<int>(output_vec[dim - 1]) << std::endl;
224+
// std::cout << "original_n: " << input_vec[dim - 1] << std::endl;
219225
}
220226
};

src/VecSim/utils/updatable_heap.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,12 @@ class updatable_max_heap : public abstract_priority_queue<Priority, Value> {
4141
inline void emplace(Priority p, Value v) override;
4242
inline bool empty() const override;
4343
inline void pop() override;
44+
inline bool exists(Value v) const;
4445
inline const std::pair<Priority, Value> top() const override;
4546
inline size_t size() const override;
46-
47+
// Random order iteration
48+
const auto begin() const { return this->priorityToValue.begin(); }
49+
const auto end() const { return this->priorityToValue.end(); }
4750
private:
4851
inline auto top_ptr() const;
4952
};
@@ -110,4 +113,8 @@ void updatable_max_heap<Priority, Value>::emplace(Priority p, Value v) {
110113
}
111114
}
112115

116+
template <typename Priority, typename Value>
117+
bool updatable_max_heap<Priority, Value>::exists(Value v) const {
118+
return valueToNode.find(v) != valueToNode.end();
119+
}
113120
} // namespace vecsim_stl

src/VecSim/vec_sim_common.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ typedef struct {
328328
*/
329329
typedef struct {
330330
size_t memory;
331+
size_t vectors_memory;
331332
size_t db_memory;
332333
size_t db_disk;
333334
size_t numberOfMarkedDeleted; // The number of vectors that are marked as deleted (HNSW/tiered

src/VecSim/vec_sim_index.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,10 @@ struct VecSimIndexAbstract : public VecSimIndexInterface {
136136
DistType calcDistance(const void *vector_data1, const void *vector_data2) const {
137137
return indexCalculator->calcDistance(vector_data1, vector_data2, this->dim);
138138
}
139+
140+
DistType calcDistanceRaw(const void *vector_data1, const void *vector_data2) const {
141+
return indexCalculator->calcDistanceRaw(vector_data1, vector_data2, this->dim);
142+
}
139143

140144
/**
141145
* @brief Preprocess a blob for both storage and query.
@@ -181,6 +185,7 @@ struct VecSimIndexAbstract : public VecSimIndexInterface {
181185
virtual inline VecSimIndexStatsInfo statisticInfo() const override {
182186
return VecSimIndexStatsInfo{
183187
.memory = this->getAllocationSize(),
188+
.vectors_memory = 0,
184189
.db_memory = 0,
185190
.db_disk = 0,
186191
.numberOfMarkedDeleted = 0,

src/VecSim/vec_sim_tiered_index.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ template <typename DataType, typename DistType>
308308
VecSimIndexStatsInfo VecSimTieredIndex<DataType, DistType>::statisticInfo() const {
309309
auto stats = VecSimIndexStatsInfo{
310310
.memory = this->getAllocationSize(),
311+
.vectors_memory = 0,
311312
.db_memory = 0,
312313
.db_disk = 0,
313314
.numberOfMarkedDeleted = 0, // Default value if cast fails

0 commit comments

Comments
 (0)