Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions include/svs/index/flat/flat.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ template <
typename Ownership = OwnsMembers>
class FlatIndex {
public:
static constexpr bool supports_insertions = false;
static constexpr bool supports_deletions = false;
static constexpr bool supports_saving = true;
static constexpr bool needs_id_translation = false;

using const_value_type = data::const_value_type_t<Data>;

/// The type of the distance functor.
Expand Down
230 changes: 230 additions & 0 deletions include/svs/lib/file.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

// svs
#include "svs/lib/exception.h"
#include "svs/lib/uuid.h"

// stl
#include <filesystem>
#include <fstream>
#include <span>

namespace svs::lib {

Expand Down Expand Up @@ -110,4 +112,232 @@ inline std::ifstream open_read(
return std::ifstream(path, mode);
}

inline std::filesystem::path unique_temp_directory_path(const std::string& prefix) {
namespace fs = std::filesystem;
auto temp_dir = fs::temp_directory_path();
// Try up to 10 times to create a unique directory.
for (int i = 0; i < 10; ++i) {
auto dir = temp_dir / (prefix + "-" + svs::lib::UUID().str());
if (!fs::exists(dir)) {
return dir;
}
return dir;
}
throw ANNEXCEPTION("Could not create a unique temporary directory!");
}

// RAII helper to create and delete a temporary directory.
struct UniqueTempDirectory {
std::filesystem::path path;

UniqueTempDirectory(const std::string& prefix)
: path{unique_temp_directory_path(prefix)} {
std::filesystem::create_directories(path);
}

~UniqueTempDirectory() {
try {
std::filesystem::remove_all(path);
} catch (...) {
// Ignore errors.
}
}

std::filesystem::path get() const { return path; }
operator const std::filesystem::path&() const { return path; }
};

// Simple directory archiver to pack/unpack a directory to/from a stream.
// Uses a simple custom binary format.
// Not meant to be super efficient, just a simple way to serialize a directory
// structure to a stream.
struct DirectoryArchiver {
using size_type = uint64_t;

// TODO: Define CACHELINE_BYTES in a common place
// rather than duplicating it here and in prefetch.h
static constexpr auto CACHELINE_BYTES = 64;
static constexpr size_type magic_number = 0x5e2d58d9f3b4a6c1;

static size_type write_size(std::ostream& os, size_type size) {
os.write(reinterpret_cast<const char*>(&size), sizeof(size));
if (!os) {
throw ANNEXCEPTION("Error writing to stream!");
}
return sizeof(size);
}

static size_type read_size(std::istream& is, size_type& size) {
is.read(reinterpret_cast<char*>(&size), sizeof(size));
if (!is) {
throw ANNEXCEPTION("Error reading from stream!");
}
return sizeof(size);
}

static size_type write_name(std::ostream& os, const std::string& name) {
auto bytes = write_size(os, name.size());
os.write(name.data(), name.size());
if (!os) {
throw ANNEXCEPTION("Error writing to stream!");
}
return bytes + name.size();
}

static size_type read_name(std::istream& is, std::string& name) {
size_type size = 0;
auto bytes = read_size(is, size);
name.resize(size);
is.read(name.data(), size);
if (!is) {
throw ANNEXCEPTION("Error reading from stream!");
}
return bytes + size;
}

static size_type write_file(
std::ostream& stream,
const std::filesystem::path& path,
const std::filesystem::path& root
) {
namespace fs = std::filesystem;
check_file(path, std::ios_base::in | std::ios_base::binary);

// Write the filename as a string.
std::string filename = fs::relative(path, root).string();
auto header_bytes = write_name(stream, filename);
if (!stream) {
throw ANNEXCEPTION("Error writing to stream!");
}

// Write the size of the file.
size_type filesize = fs::file_size(path);
header_bytes += write_size(stream, filesize);
if (!stream) {
throw ANNEXCEPTION("Error writing to stream!");
}

// Now write the actual file contents.
std::ifstream in(path, std::ios_base::in | std::ios_base::binary);
if (!in) {
throw ANNEXCEPTION("Error opening file {} for reading!", path);
}
stream << in.rdbuf();
if (!stream) {
throw ANNEXCEPTION("Error writing to stream!");
}

return header_bytes + filesize;
}

static size_type read_file(std::istream& stream, const std::filesystem::path& root) {
namespace fs = std::filesystem;

// Read the filename as a string.
std::string filename;
auto header_bytes = read_name(stream, filename);
if (!stream) {
throw ANNEXCEPTION("Error reading from stream!");
}

auto path = root / filename;
auto parent_dir = path.parent_path();
if (!fs::exists(parent_dir)) {
fs::create_directories(parent_dir);
} else if (!fs::is_directory(parent_dir)) {
throw ANNEXCEPTION("Path {} exists and is not a directory!", root);
}
check_file(path, std::ios_base::out | std::ios_base::binary);

// Read the size of the file.
std::uint64_t filesize = 0;
header_bytes += read_size(stream, filesize);
if (!stream) {
throw ANNEXCEPTION("Error reading from stream!");
}

// Now write the actual file contents.
std::ofstream out(path, std::ios_base::out | std::ios_base::binary);
if (!out) {
throw ANNEXCEPTION("Error opening file {} for writing!", path);
}

// Copy the data in chunks.
constexpr size_t buffer_size = 1 << 13; // 8KB buffer
alignas(CACHELINE_BYTES) char buffer[buffer_size];

size_t bytes_remaining = filesize;
while (bytes_remaining > 0) {
size_t to_read = std::min(buffer_size, bytes_remaining);
stream.read(buffer, to_read);
if (!stream) {
throw ANNEXCEPTION("Error reading from stream!");
}
out.write(buffer, to_read);
if (!out) {
throw ANNEXCEPTION("Error writing to file {}!", path);
}
bytes_remaining -= to_read;
}

return header_bytes + filesize;
}

static size_t pack(const std::filesystem::path& dir, std::ostream& stream) {
namespace fs = std::filesystem;
if (!fs::is_directory(dir)) {
throw ANNEXCEPTION("Path {} is not a directory!", dir);
}

auto total_bytes = write_size(stream, magic_number);

// Calculate the number of files in the directory.
uint64_t filesnum = std::count_if(
fs::recursive_directory_iterator{dir},
fs::recursive_directory_iterator{},
[&](const auto& entry) { return entry.is_regular_file(); }
);
total_bytes += write_size(stream, filesnum);

// Now serialize each file in the directory recursively.
for (const auto& entry : fs::recursive_directory_iterator{dir}) {
if (entry.is_regular_file()) {
total_bytes += write_file(stream, entry.path(), dir);
}
// Ignore other types of entries.
}

return total_bytes;
}

static size_t unpack(std::istream& stream, const std::filesystem::path& root) {
namespace fs = std::filesystem;

// Read and verify the magic number.
size_type magic = 0;
auto total_bytes = read_size(stream, magic);
if (magic != magic_number) {
throw ANNEXCEPTION("Invalid magic number in directory unpacking!");
}

size_type num_files = 0;
total_bytes += read_size(stream, num_files);
if (!stream) {
throw ANNEXCEPTION("Error reading from stream!");
}

if (!fs::exists(root)) {
fs::create_directories(root);
} else if (!fs::is_directory(root)) {
throw ANNEXCEPTION("Path {} exists and is not a directory!", root);
}

// Now deserialize each file in the directory.
for (size_type i = 0; i < num_files; ++i) {
total_bytes += read_file(stream, root);
}

return total_bytes;
}
};
} // namespace svs::lib
54 changes: 54 additions & 0 deletions include/svs/orchestrators/dynamic_flat.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class DynamicFlatInterface {
const std::filesystem::path& config_directory,
const std::filesystem::path& data_directory
) = 0;
virtual void save(std::ostream& stream) = 0;
};

template <lib::TypeList QueryTypes, typename Impl>
Expand Down Expand Up @@ -118,6 +119,21 @@ class DynamicFlatImpl
) override {
impl().save(config_directory, data_directory);
}

// Stream-based save implementation
void save(std::ostream& stream) override {
if constexpr (Impl::supports_saving) {
lib::UniqueTempDirectory tempdir{"svs_dynflat_save"};
const auto config_dir = tempdir.get() / "config";
const auto data_dir = tempdir.get() / "data";
std::filesystem::create_directories(config_dir);
std::filesystem::create_directories(data_dir);
save(config_dir, data_dir);
lib::DirectoryArchiver::pack(tempdir, stream);
} else {
throw ANNEXCEPTION("The current DynamicFlat backend doesn't support saving!");
}
}
};

// Forward Declarations.
Expand Down Expand Up @@ -253,6 +269,44 @@ class DynamicFlat : public manager::IndexManager<DynamicFlatInterface> {
);
}

// Assembly from stream
template <
manager::QueryTypeDefinition QueryTypes,
typename Data,
typename Distance,
typename ThreadPoolProto,
typename... DataLoaderArgs>
static DynamicFlat assemble(
std::istream& stream,
const Distance& distance,
ThreadPoolProto threadpool_proto,
DataLoaderArgs&&... data_args
) {
namespace fs = std::filesystem;
lib::UniqueTempDirectory tempdir{"svs_dynflat_load"};
lib::DirectoryArchiver::unpack(stream, tempdir);

const auto config_path = tempdir.get() / "config";
if (!fs::is_directory(config_path)) {
throw ANNEXCEPTION(
"Invalid Dynamic Flat index archive: missing config directory!"
);
}

const auto data_path = tempdir.get() / "data";
if (!fs::is_directory(data_path)) {
throw ANNEXCEPTION("Invalid Dynamic Flat index archive: missing data directory!"
);
}

return assemble<QueryTypes>(
config_path,
lib::load_from_disk<Data>(data_path, SVS_FWD(data_args)...),
distance,
threads::as_threadpool(std::move(threadpool_proto))
);
}

///// Distance
/// @brief Get the distance between a vector in the index and a query vector
/// @tparam Query The query vector type
Expand Down
44 changes: 44 additions & 0 deletions include/svs/orchestrators/dynamic_vamana.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ class DynamicVamana : public manager::IndexManager<DynamicVamanaInterface> {
impl_->save(config_dir, graph_dir, data_dir);
}

void save(std::ostream& stream) { impl_->save(stream); }

/// Reconstruction
void reconstruct_at(data::SimpleDataView<float> data, std::span<const uint64_t> ids) {
impl_->reconstruct_at(data, ids);
Expand Down Expand Up @@ -340,6 +342,48 @@ class DynamicVamana : public manager::IndexManager<DynamicVamanaInterface> {
);
}

// Assembly from stream
template <
manager::QueryTypeDefinition QueryTypes,
typename Data,
typename Distance,
typename ThreadPoolProto,
typename... DataLoaderArgs>
static DynamicVamana assemble(
std::istream& stream,
const Distance& distance,
ThreadPoolProto threadpool_proto,
DataLoaderArgs&&... data_args
) {
namespace fs = std::filesystem;
lib::UniqueTempDirectory tempdir{"svs_vamana_load"};
lib::DirectoryArchiver::unpack(stream, tempdir);

const auto config_path = tempdir.get() / "config";
if (!fs::is_directory(config_path)) {
throw ANNEXCEPTION("Invalid Vamana index archive: missing config directory!");
}

const auto graph_path = tempdir.get() / "graph";
if (!fs::is_directory(graph_path)) {
throw ANNEXCEPTION("Invalid Vamana index archive: missing graph directory!");
}

const auto data_path = tempdir.get() / "data";
if (!fs::is_directory(data_path)) {
throw ANNEXCEPTION("Invalid Vamana index archive: missing data directory!");
}

return assemble<QueryTypes>(
config_path,
svs::GraphLoader{graph_path},
lib::load_from_disk<Data>(data_path, SVS_FWD(data_args)...),
distance,
threads::as_threadpool(std::move(threadpool_proto)),
false
);
}

/// @copydoc svs::Vamana::batch_iterator
template <typename QueryType, size_t N>
svs::VamanaIterator batch_iterator(
Expand Down
Loading
Loading