diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml new file mode 100644 index 0000000..3a65397 --- /dev/null +++ b/.github/workflows/linux.yml @@ -0,0 +1,61 @@ +name: CMake + +on: [push] + +env: + # Customize the CMake build type here (Release, Debug, RelWithDebInfo, etc.) + BUILD_TYPE: Release + +jobs: + build: + # The CMake configure and build commands are platform agnostic and should work equally + # well on Windows or Mac. You can convert this to a matrix build if you need + # cross-platform coverage. + # See: https://docs.github.com/en/free-pro-team@latest/actions/learn-github-actions/managing-complex-workflows#using-a-build-matrix + runs-on: ubuntu-20.04 + + steps: + - name: Checkout reposistory + uses: actions/checkout@master + + - name: Checkout submodules + run: git submodule update --init --recursive + + - name: Install dependencies + run: sudo apt install -y g++ libaio-dev uuid-dev libtbb-dev + + - name: Install liburing + run: | + git clone https://git.kernel.dk/liburing + cd liburing + git checkout liburing-0.7 + ./configure + sudo make install + + - name: Create Build Environment + # Some projects don't allow in-source building, so create a separate build directory + # We'll use this as our working directory for all subsequent commands + run: cmake -E make_directory ${{runner.workspace}}/build + + - name: Configure CMake + # Use a bash shell so we can use the same syntax for environment variable + # access regardless of the host operating system + shell: bash + working-directory: ${{runner.workspace}}/build + # Note the current convention is to use the -S and -B options here to specify source + # and build directories, but this is only available with CMake 3.13 and higher. + # The CMake binaries on the Github Actions machines are (as of this writing) 3.12 + run: cmake $GITHUB_WORKSPACE -DCMAKE_BUILD_TYPE=$BUILD_TYPE + + - name: Build + working-directory: ${{runner.workspace}}/build + shell: bash + # Execute the build. You can specify a specific target with "--target " + run: cmake --build . --config $BUILD_TYPE + + - name: Test + working-directory: ${{runner.workspace}}/build + shell: bash + # Execute tests defined by the CMake configuration. + # See https://cmake.org/cmake/help/latest/manual/ctest.1.html for more detail + run: ctest -j 1 --interactive-debug-mode 0 --output-on-failure -R .*_test diff --git a/.github/workflows/windows.yml b/.github/workflows/windows.yml new file mode 100644 index 0000000..20fe700 --- /dev/null +++ b/.github/workflows/windows.yml @@ -0,0 +1,59 @@ +name: MSBuild + +on: [push] + +env: + # Path to the solution file relative to the root of the project. + SOLUTION_FILE_PATH: . + + # Configuration type to build. + # You can convert this to a build matrix if you need coverage of multiple configuration types. + # https://docs.github.com/actions/learn-github-actions/managing-complex-workflows#using-a-build-matrix + BUILD_CONFIGURATION: Release + +jobs: + build: + runs-on: windows-latest + + steps: + - name: Checkout reposistory + uses: actions/checkout@master + + - name: Checkout submodules + run: git submodule update --init --recursive + + - name: Add msbuild to PATH + uses: microsoft/setup-msbuild@v1.0.2 + #with: + # vs-version: '[16.4,16.5)' + + - name: Get latest CMake and ninja + uses: lukka/get-cmake@latest + + - name: Create Build Environment + # Some projects don't allow in-source building, so create a separate build directory + # We'll use this as our working directory for all subsequent commands + run: cmake -E make_directory ${{runner.workspace}}/build + + - name: Configure CMake + # Use a bash shell so we can use the same syntax for environment variable + # access regardless of the host operating system + shell: bash + working-directory: ${{runner.workspace}}/build + # Note the current convention is to use the -S and -B options here to specify source + # and build directories, but this is only available with CMake 3.13 and higher. + # The CMake binaries on the Github Actions machines are (as of this writing) 3.12 + run: cmake $GITHUB_WORKSPACE -G "Visual Studio 16 2019" + + - name: Build + working-directory: ${{runner.workspace}}/build + # Add additional options to the MSBuild command line here (like platform or verbosity level). + # See https://docs.microsoft.com/visualstudio/msbuild/msbuild-command-line-reference + run: msbuild /m /p:Configuration=${{env.BUILD_CONFIGURATION}} FishStore.sln + + - name: Test + working-directory: ${{runner.workspace}}/build + shell: bash + # Execute tests defined by the CMake configuration. + # See https://cmake.org/cmake/help/latest/manual/ctest.1.html for more detail + run: ctest -j 1 --interactive-debug-mode 0 --output-on-failure -C ${{env.BUILD_CONFIGURATION}} -R .*_test diff --git a/azure-pipelines.yml b/azure-pipelines.yml index d9e5bd6..e75fe11 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -33,7 +33,7 @@ jobs: - job: 'cppLinux' pool: - vmImage: ubuntu-16.04 + vmImage: ubuntu-20.04 displayName: 'C++ (Linux)' steps: diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 0aa6d93..04383a1 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -1,7 +1,7 @@ if(WIN32) set (FISHSTORE_BENCHMARK_LINK_LIBS fishstore rpcrt4 wsock32 Ws2_32) else() - set (FISHSTORE_BENCHMARK_LINK_LIBS fishstore stdc++fs uuid tbb gcc aio m stdc++ pthread dl) + set (FISHSTORE_BENCHMARK_LINK_LIBS fishstore stdc++fs uuid tbb gcc aio m stdc++ pthread dl uring) endif() #Function to automate building benchmark binaries diff --git a/examples/checkpoint_recovery-dir/checkpoint_recovery.cc b/examples/checkpoint_recovery-dir/checkpoint_recovery.cc index 4c23d87..a25d37f 100644 --- a/examples/checkpoint_recovery-dir/checkpoint_recovery.cc +++ b/examples/checkpoint_recovery-dir/checkpoint_recovery.cc @@ -3,7 +3,7 @@ #include #include -#include +#include #include #include @@ -209,7 +209,7 @@ int main(int argc, char* argv[]) { "Finish loading %u batches (%zu records) of json into the memory....\n", json_batch_cnt, record_cnt); - std::experimental::filesystem::create_directory(argv[5]); + std::filesystem::create_directory(argv[5]); size_t store_size = 1LL << atoi(argv[4]); size_t hash_table_size = 1LL << 24; { diff --git a/examples/github_query-dir/github_query.cc b/examples/github_query-dir/github_query.cc index ae8033b..8ebd86a 100644 --- a/examples/github_query-dir/github_query.cc +++ b/examples/github_query-dir/github_query.cc @@ -2,7 +2,7 @@ // Licensed under the MIT license. #include -#include +#include #include #include "adapters/simdjson_adapter.h" @@ -198,7 +198,7 @@ int main(int argc, char* argv[]) { printf("Finish loading %u batches (%zu records) of json into the memory....\n", json_batch_cnt, record_cnt); - std::experimental::filesystem::create_directory(argv[5]); + std::filesystem::create_directory(argv[5]); size_t store_size = 1LL << atoi(argv[4]); store_t store{ (1L << 24), store_size, argv[5] }; @@ -213,7 +213,7 @@ int main(int argc, char* argv[]) { auto predicate1_id = store.MakeInlinePSF({ "/type", "/payload/action" }, lib_id, "opened_issue"); auto predicate2_id = - store.MakeInlinePSF({ "/type", "/payload/pull_request/head.repo/language" }, lib_id, "cpp_pr"); + store.MakeInlinePSF({ "/type", "/payload/pull_request/head/repo/language" }, lib_id, "cpp_pr"); std::vector parser_actions; parser_actions.push_back({ REGISTER_GENERAL_PSF, id_proj }); diff --git a/examples/online_demo-dir/online_demo.cc b/examples/online_demo-dir/online_demo.cc index a3998aa..421ea9c 100644 --- a/examples/online_demo-dir/online_demo.cc +++ b/examples/online_demo-dir/online_demo.cc @@ -2,7 +2,7 @@ // Licensed under the MIT license. #include -#include +#include #include #define _NULL_DISK @@ -207,7 +207,7 @@ int main(int argc, char* argv[]) { printf("Finish loading %u batches (%zu records) of json into the memory....\n", json_batch_cnt, record_cnt); - std::experimental::filesystem::create_directory(argv[4]); + std::filesystem::create_directory(argv[4]); size_t store_size = 1LL << atoi(argv[3]); store_t store{ (1L << 24), store_size, argv[4] }; diff --git a/examples/online_demo_disk-dir/online_demo_disk.cc b/examples/online_demo_disk-dir/online_demo_disk.cc index ba4f801..5e24d37 100644 --- a/examples/online_demo_disk-dir/online_demo_disk.cc +++ b/examples/online_demo_disk-dir/online_demo_disk.cc @@ -2,7 +2,7 @@ // Licensed under the MIT license. #include -#include +#include #include #include "adapters/simdjson_adapter.h" @@ -204,7 +204,7 @@ int main(int argc, char* argv[]) { printf("Finish loading %u batches (%zu records) of json into the memory....\n", json_batch_cnt, record_cnt); - std::experimental::filesystem::create_directory(argv[4]); + std::filesystem::create_directory(argv[4]); size_t store_size = 1LL << atoi(argv[3]); store_t store{ (1L << 24), store_size, argv[4] }; diff --git a/examples/twitter_query-dir/twitter_query.cc b/examples/twitter_query-dir/twitter_query.cc index 56f4956..3d02c6c 100644 --- a/examples/twitter_query-dir/twitter_query.cc +++ b/examples/twitter_query-dir/twitter_query.cc @@ -2,7 +2,7 @@ // Licensed under the MIT license. #include -#include +#include #include #include "adapters/simdjson_adapter.h" @@ -198,7 +198,7 @@ int main(int argc, char* argv[]) { printf("Finish loading %u batches (%zu records) of json into the memory....\n", json_batch_cnt, record_cnt); - std::experimental::filesystem::create_directory(argv[5]); + std::filesystem::create_directory(argv[5]); size_t store_size = 1LL << atoi(argv[4]); store_t store{ (1L << 24), store_size, argv[5] }; diff --git a/src/adapters/common_utils.h b/src/adapters/common_utils.h index 20d6799..07b6424 100644 --- a/src/adapters/common_utils.h +++ b/src/adapters/common_utils.h @@ -20,7 +20,7 @@ class StringRef { return size; } -private: +public: const char* ptr; size_t size; }; diff --git a/src/adapters/simdjson_adapter.h b/src/adapters/simdjson_adapter.h index 23651c3..ede545a 100644 --- a/src/adapters/simdjson_adapter.h +++ b/src/adapters/simdjson_adapter.h @@ -8,77 +8,88 @@ #include #include -#ifdef _MSC_VER -#define NOMINMAX -#endif -#include -#include -#include "adapters/common_utils.h" +#include +#include -using namespace simdjson; +const size_t DEFAULT_BATCH_SIZE = 1000000; namespace fishstore { namespace adapter { + class SIMDJsonField { public: - SIMDJsonField(int64_t id_, const ParsedJson::Iterator& it_) - : field_id(id_), iter(it_) {} + SIMDJsonField(int64_t id_, const simdjson::dom::element e) + : field_id(id_), ele(e) {} inline int64_t FieldId() const { return field_id; } inline NullableBool GetAsBool() const { - switch (iter.get_type()) { - case 't': - return NullableBool(true); - case 'f': - return NullableBool(false); - default: + if (ele.is_bool()) { + return NullableBool(ele.get_bool().value()); + } + else { return NullableBool(); } } inline NullableInt GetAsInt() const { - if (iter.is_integer()) { - return NullableInt(static_cast(iter.get_integer())); - } else return NullableInt(); + if (ele.is_int64()) { + return NullableInt(ele.get_int64().value()); + } + else { + return NullableInt(); + } } inline NullableLong GetAsLong() const { - if (iter.is_integer()) { - return NullableLong(iter.get_integer()); - } else return NullableLong(); + if (ele.is_int64()) { + return NullableLong(ele.get_int64().value()); + } + else { + return NullableLong(); + } } inline NullableFloat GetAsFloat() const { - if (iter.is_double()) { - return NullableFloat(static_cast(iter.get_double())); - } else return NullableFloat(); + if (ele.is_double()) { + return NullableFloat(ele.get_double().value()); + } + else { + return NullableFloat(); + } } inline NullableDouble GetAsDouble() const { - if (iter.is_double()) { - return NullableDouble(iter.get_double()); - } else return NullableDouble(); + if (ele.is_double()) { + return NullableDouble(ele.get_double().value()); + } + else { + return NullableDouble(); + } } inline NullableString GetAsString() const { - if (iter.is_string()) { - return NullableString(std::string(iter.get_string(), iter.get_string_length())); - } else return NullableString(); + if (ele.is_string()) { + auto tmp = ele.get_string().value(); + return NullableString(std::string(tmp.data(), tmp.size())); + } + else return NullableString(); } inline NullableStringRef GetAsStringRef() const { - if (iter.is_string()) { - return NullableStringRef(StringRef(iter.get_string(), iter.get_string_length())); - } else return NullableStringRef(); + if (ele.is_string()) { + auto tmp = ele.get_string().value(); + return NullableStringRef({ tmp.data(), tmp.size() }); + } + else return NullableStringRef(); } private: int64_t field_id; - ParsedJson::Iterator iter; + const simdjson::dom::element ele; }; class SIMDJsonRecord { @@ -100,6 +111,12 @@ class SIMDJsonRecord { return original; } + inline void clear() { + original.ptr = NULL; + original.size = 0; + fields.clear(); + } + public: StringRef original; std::vector fields; @@ -107,45 +124,42 @@ class SIMDJsonRecord { class SIMDJsonParser { public: - SIMDJsonParser(const std::vector& field_names, const size_t alloc_bytes = 1LL << 25) - : fields(field_names) { - auto success = pj.allocate_capacity(alloc_bytes); - assert(success); - has_next = false; - } + SIMDJsonParser(const std::vector& field_names) + : fields(field_names), parser_(), stream(), buffer_(NULL), len_(0), record_() {} inline void Load(const char* buffer, size_t length) { - record.original = StringRef(buffer, length); - record.fields.clear(); - auto ok = json_parse(buffer, length, pj); - if (ok != 0 || !pj.is_valid()) { - printf("Parsing failed...\n"); - has_next = false; - } else { - has_next = true; - } + //XX: buffer is not padded, may have issue + simdjson::padded_string ps(buffer, length); + buffer_ = buffer; + len_ = length; + auto res = parser_.parse_many(ps, DEFAULT_BATCH_SIZE).get(stream); + it = stream.begin(); } inline bool HasNext() { - return has_next; + return it != stream.end(); } inline const SIMDJsonRecord& NextRecord() { - ParsedJson::Iterator it(pj); - for (auto field_id = 0; field_id < fields.size(); ++field_id) { - if (it.move_to(fields[field_id])) { - record.fields.emplace_back(SIMDJsonField{field_id, it}); - } + record_.clear(); + record_.original.ptr = buffer_ + it.current_index(); + auto last_index = it.current_index(); + for (auto& field : fields) { + record_.fields.emplace_back(SIMDJsonField(record_.fields.size(), (*it).at_pointer(field).value())); } - has_next = false; - return record; + ++it; + record_.original.size = it != stream.end() ? it.current_index() - last_index : len_ - last_index; + return record_; } private: + const char* buffer_; + size_t len_; std::vector fields; - ParsedJson pj; - SIMDJsonRecord record; - bool has_next; + simdjson::dom::parser parser_; + simdjson::dom::document_stream stream; + simdjson::dom::document_stream::iterator it; + SIMDJsonRecord record_; }; class SIMDJsonAdapter { diff --git a/src/core/fishstore.h b/src/core/fishstore.h index 5984c80..40cd81a 100644 --- a/src/core/fishstore.h +++ b/src/core/fishstore.h @@ -13,7 +13,7 @@ #include #include #include -#include +#include #include #include @@ -130,7 +130,7 @@ struct ParserContext { }; struct LibraryHandle { - std::experimental::filesystem::path path; + std::filesystem::path path; #ifdef _WIN32 HMODULE handle; #else @@ -3570,7 +3570,7 @@ Status FishStore::Recover(const Guid& index_token, const Guid& hybrid_log_ std::string path; naming_file >> path; LibraryHandle lib; - lib.path = std::experimental::filesystem::absolute(path); + lib.path = std::filesystem::absolute(path); #ifdef _WIN32 lib.handle = LoadLibrary(lib.path.string().c_str()); #else @@ -3763,7 +3763,7 @@ template size_t FishStore::LoadPSFLibrary(const std::string& lib_path) { std::lock_guard lk(mutex); LibraryHandle lib; - lib.path = std::experimental::filesystem::absolute(lib_path); + lib.path = std::filesystem::absolute(lib_path); #ifdef _WIN32 lib.handle = LoadLibrary(lib.path.string().c_str()); #else diff --git a/src/device/file_system_disk.h b/src/device/file_system_disk.h index 1f8f6b1..d817c1d 100644 --- a/src/device/file_system_disk.h +++ b/src/device/file_system_disk.h @@ -4,7 +4,7 @@ #pragma once #include -#include +#include #include #include @@ -491,24 +491,24 @@ class FileSystemDisk { void CreateIndexCheckpointDirectory(const Guid& token) { std::string index_dir = index_checkpoint_path(token); - std::experimental::filesystem::path path{ index_dir }; + std::filesystem::path path{ index_dir }; try { - std::experimental::filesystem::remove_all(path); - } catch(std::experimental::filesystem::filesystem_error&) { + std::filesystem::remove_all(path); + } catch(std::filesystem::filesystem_error&) { // Ignore; throws when path doesn't exist yet. } - std::experimental::filesystem::create_directories(path); + std::filesystem::create_directories(path); } void CreateCprCheckpointDirectory(const Guid& token) { std::string cpr_dir = cpr_checkpoint_path(token); - std::experimental::filesystem::path path{ cpr_dir }; + std::filesystem::path path{ cpr_dir }; try { - std::experimental::filesystem::remove_all(path); - } catch(std::experimental::filesystem::filesystem_error&) { + std::filesystem::remove_all(path); + } catch(std::filesystem::filesystem_error&) { // Ignore; throws when path doesn't exist yet. } - std::experimental::filesystem::create_directories(path); + std::filesystem::create_directories(path); } file_t NewFile(const std::string& relative_path) { diff --git a/src/environment/file_linux.cc b/src/environment/file_linux.cc index edb32b3..b59f846 100644 --- a/src/environment/file_linux.cc +++ b/src/environment/file_linux.cc @@ -9,6 +9,7 @@ #include #include #include +#include #include "file_linux.h" namespace fishstore { @@ -194,6 +195,117 @@ Status QueueFile::ScheduleOperation(FileOperationType operationType, uint8_t* bu return Status::Ok; } +bool UringIoHandler::TryComplete() { + struct io_uring_cqe* cqe = nullptr; + cq_lock_.Acquire(); + int res = io_uring_peek_cqe(ring_, &cqe); + if(res == 0 && cqe) { + int io_res = cqe->res; + auto *context = reinterpret_cast(io_uring_cqe_get_data(cqe)); + io_uring_cqe_seen(ring_, cqe); + cq_lock_.Release(); + Status return_status; + size_t byte_transferred; + if (io_res < 0) { + // Retry if it is failed..... + sq_lock_.Acquire(); + struct io_uring_sqe *sqe = io_uring_get_sqe(ring_); + assert(sqe != 0); + if (context->is_read_) { + io_uring_prep_readv(sqe, context->fd_, &context->vec_, 1, context->offset_); + } else { + io_uring_prep_writev(sqe, context->fd_, &context->vec_, 1, context->offset_); + } + io_uring_sqe_set_data(sqe, context); + int retry_res = io_uring_submit(ring_); + assert(retry_res == 1); + sq_lock_.Release(); + return false; + } else { + return_status = Status::Ok; + byte_transferred = io_res; + } + context->callback(context->caller_context, return_status, byte_transferred); + lss_allocator.Free(context); + return true; + } else { + cq_lock_.Release(); + return false; + } +} + +Status UringFile::Open(FileCreateDisposition create_disposition, const FileOptions& options, + UringIoHandler* handler, bool* exists) { + int flags = 0; + if(options.unbuffered) { + flags |= O_DIRECT; + } + RETURN_NOT_OK(File::Open(flags, create_disposition, exists)); + if(exists && !*exists) { + return Status::Ok; + } + + ring_ = handler->io_uring(); + sq_lock_ = handler->sq_lock(); + return Status::Ok; +} + +Status UringFile::Read(size_t offset, uint32_t length, uint8_t* buffer, + IAsyncContext& context, AsyncIOCallback callback) const { + DCHECK_ALIGNMENT(offset, length, buffer); +#ifdef IO_STATISTICS + ++read_count_; + bytes_read_ += length; +#endif + return const_cast(this)->ScheduleOperation(FileOperationType::Read, buffer, + offset, length, context, callback); +} + +Status UringFile::Write(size_t offset, uint32_t length, const uint8_t* buffer, + IAsyncContext& context, AsyncIOCallback callback) { + DCHECK_ALIGNMENT(offset, length, buffer); +#ifdef IO_STATISTICS + bytes_written_ += length; +#endif + return ScheduleOperation(FileOperationType::Write, const_cast(buffer), offset, length, + context, callback); +} + +Status UringFile::ScheduleOperation(FileOperationType operationType, uint8_t* buffer, + size_t offset, uint32_t length, IAsyncContext& context, + AsyncIOCallback callback) { + auto io_context = alloc_context(sizeof(UringIoHandler::IoCallbackContext)); + if (!io_context.get()) return Status::OutOfMemory; + + IAsyncContext* caller_context_copy; + RETURN_NOT_OK(context.DeepCopy(caller_context_copy)); + + bool is_read = operationType == FileOperationType::Read; + new(io_context.get()) UringIoHandler::IoCallbackContext(is_read, fd_, buffer, length, offset, caller_context_copy, callback); + + sq_lock_->Acquire(); + struct io_uring_sqe *sqe = io_uring_get_sqe(ring_); + assert(sqe != 0); + + if (is_read) { + io_uring_prep_readv(sqe, fd_, &io_context->vec_, 1, offset); + //io_uring_prep_read(sqe, fd_, buffer, length, offset); + } else { + io_uring_prep_writev(sqe, fd_, &io_context->vec_, 1, offset); + //io_uring_prep_write(sqe, fd_, buffer, length, offset); + } + io_uring_sqe_set_data(sqe, io_context.get()); + + int res = io_uring_submit(ring_); + sq_lock_->Release(); + if (res != 1) { + return Status::IOError; + } + + io_context.release(); + return Status::Ok; +} + #undef DCHECK_ALIGNMENT } diff --git a/src/environment/file_linux.h b/src/environment/file_linux.h index ffb6656..4869f8c 100644 --- a/src/environment/file_linux.h +++ b/src/environment/file_linux.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -18,6 +19,29 @@ namespace fishstore { namespace environment { +class alignas(64) SpinLock { +public: + SpinLock(): locked_(false) {} + + void Acquire() noexcept { + for (;;) { + if (!locked_.exchange(true, std::memory_order_acquire)) { + return; + } + + while (locked_.load(std::memory_order_relaxed)) { + __builtin_ia32_pause(); + } + } + } + + void Release() noexcept { + locked_.store(false, std::memory_order_release); + } +private: + std::atomic_bool locked_; +}; + constexpr const char* kPathSeparator = "/"; /// The File class encapsulates the OS file handle. @@ -250,5 +274,128 @@ class QueueFile : public File { io_context_t io_object_; }; + +class UringFile; + +/// The QueueIoHandler class encapsulates completions for async file I/O, where the completions +/// are put on the AIO completion queue. +class UringIoHandler { + public: + typedef UringFile async_file_t; + + private: + constexpr static int kMaxEvents = 128; + + public: + UringIoHandler() { + ring_ = new struct io_uring(); + int ret = io_uring_queue_init(kMaxEvents, ring_, 0); + assert(ret == 0); + } + + UringIoHandler(size_t max_threads) { + ring_ = new struct io_uring(); + int ret = io_uring_queue_init(kMaxEvents, ring_, 0); + assert(ret == 0); + } + + /// Move constructor + UringIoHandler(UringIoHandler&& other) { + ring_ = other.ring_; + other.ring_ = 0; + } + + ~UringIoHandler() { + if (ring_ != 0) { + io_uring_queue_exit(ring_); + delete ring_; + } + } + + /* + /// Invoked whenever a Linux AIO completes. + static void IoCompletionCallback(io_context_t ctx, struct iocb* iocb, long res, long res2); + */ + struct IoCallbackContext { + IoCallbackContext(bool is_read, int fd, uint8_t* buffer, size_t length, size_t offset, IAsyncContext* context_, AsyncIOCallback callback_) + : is_read_(is_read) + , fd_(fd) + , vec_{buffer, length} + , offset_(offset) + , caller_context{ context_ } + , callback{ callback_ } {} + + bool is_read_; + + int fd_; + struct iovec vec_; + size_t offset_; + + /// Caller callback context. + IAsyncContext* caller_context; + + /// The caller's asynchronous callback function + AsyncIOCallback callback; + }; + + inline struct io_uring* io_uring() const { + return ring_; + } + + inline SpinLock* sq_lock() { + return &sq_lock_; + } + + /// Try to execute the next IO completion on the queue, if any. + bool TryComplete(); + + private: + /// The io_uring for all the I/Os + struct io_uring* ring_; + SpinLock sq_lock_, cq_lock_; +}; + +/// The UringFile class encapsulates asynchronous reads and writes, using the specified +/// io_uring +class UringFile : public File { + public: + UringFile() + : File() + , ring_{ nullptr } { + } + UringFile(const std::string& filename) + : File(filename) + , ring_{ nullptr } { + } + /// Move constructor + UringFile(UringFile&& other) + : File(std::move(other)) + , ring_{ other.ring_ } + , sq_lock_{ other.sq_lock_ } { + } + /// Move assignment operator. + UringFile& operator=(UringFile&& other) { + File::operator=(std::move(other)); + ring_ = other.ring_; + sq_lock_ = other.sq_lock_; + return *this; + } + + Status Open(FileCreateDisposition create_disposition, const FileOptions& options, + UringIoHandler* handler, bool* exists = nullptr); + + Status Read(size_t offset, uint32_t length, uint8_t* buffer, + IAsyncContext& context, AsyncIOCallback callback) const; + Status Write(size_t offset, uint32_t length, const uint8_t* buffer, + IAsyncContext& context, AsyncIOCallback callback); + + private: + Status ScheduleOperation(FileOperationType operationType, uint8_t* buffer, size_t offset, + uint32_t length, IAsyncContext& context, AsyncIOCallback callback); + + struct io_uring* ring_; + SpinLock* sq_lock_; +}; + } } // namespace FASTER::environment diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 03ffcfb..4e70b60 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -1,7 +1,7 @@ if(WIN32) set (FISHSTORE_TEST_LINK_LIBS fishstore rpcrt4 gtest) else() - set (FISHSTORE_TEST_LINK_LIBS fishstore stdc++fs uuid tbb gcc aio m stdc++ pthread dl gtest) + set (FISHSTORE_TEST_LINK_LIBS fishstore stdc++fs uuid tbb gcc aio m stdc++ pthread dl gtest uring) endif() if(MSVC) @@ -22,11 +22,17 @@ ENDFUNCTION() ADD_FISHSTORE_TEST(in_memory_test "") ADD_FISHSTORE_TEST(ingest_queue_test "ingest_test.h") +if (NOT MSVC) +ADD_FISHSTORE_TEST(ingest_uring_test "ingest_test.h") +endif() if (MSVC) ADD_FISHSTORE_TEST(ingest_threadpool_test "ingest_test.h") endif() ADD_FISHSTORE_TEST(register_test "") ADD_FISHSTORE_TEST(checkpoint_queue_test "checkpoint_test.h") +if (NOT MSVC) +ADD_FISHSTORE_TEST(checkpoint_uring_test "checkpoint_test.h") +endif() if (MSVC) ADD_FISHSTORE_TEST(checkpoint_threadpool_test "checkpoint_test.h") endif() diff --git a/test/checkpoint_queue_test.cc b/test/checkpoint_queue_test.cc index b102d99..1ed88fa 100644 --- a/test/checkpoint_queue_test.cc +++ b/test/checkpoint_queue_test.cc @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include "gtest/gtest.h" #include "adapters/simdjson_adapter.h" diff --git a/test/checkpoint_test.h b/test/checkpoint_test.h index cfe9384..02bfcbb 100644 --- a/test/checkpoint_test.h +++ b/test/checkpoint_test.h @@ -8,7 +8,7 @@ using adapter_t = fishstore::adapter::SIMDJsonAdapter; using disk_t = fishstore::device::FileSystemDisk; using store_t = FishStore; -const size_t n_records = 1500000; +const size_t n_records = 3000000; const size_t n_threads = 4; const char* pattern = "{\"id\": \"%zu\", \"name\": \"name%zu\", \"gender\": \"%s\", \"school\": {\"id\": \"%zu\", \"name\": \"school%zu\"}}"; @@ -158,8 +158,8 @@ class JsonFullScanContext : public IAsyncContext { }; TEST(CLASS, Checkpoint_Concurrent) { - std::experimental::filesystem::remove_all("test"); - std::experimental::filesystem::create_directories("test"); + std::filesystem::remove_all("test"); + std::filesystem::create_directories("test"); std::vector guids(n_threads); { @@ -213,6 +213,9 @@ TEST(CLASS, Checkpoint_Concurrent) { std::this_thread::sleep_for(std::chrono::milliseconds(500)); store.CheckpointHybridLog(hybrid_log_persistence_callback, log_token); store.CompleteAction(true); + for (auto& guid: guids) { + printf("%s\n", guid.ToString().c_str()); + } for (auto& thd : thds) { thd.join(); @@ -237,6 +240,9 @@ TEST(CLASS, Checkpoint_Concurrent) { uint32_t version; std::vector recovered_session_ids; new_store.Recover(index_token, log_token, version, recovered_session_ids); + for (auto& guid: recovered_session_ids) { + printf("%s\n", guid.ToString().c_str()); + } new_store.StartSession(); std::vector> sessions(n_threads); diff --git a/test/checkpoint_threadpool_test.cc b/test/checkpoint_threadpool_test.cc index 413f55a..7d6af4c 100644 --- a/test/checkpoint_threadpool_test.cc +++ b/test/checkpoint_threadpool_test.cc @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include "gtest/gtest.h" #include "adapters/simdjson_adapter.h" diff --git a/test/checkpoint_uring_test.cc b/test/checkpoint_uring_test.cc new file mode 100644 index 0000000..034b1e9 --- /dev/null +++ b/test/checkpoint_uring_test.cc @@ -0,0 +1,22 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +#include +#include +#include +#include +#include "gtest/gtest.h" + +#include "adapters/simdjson_adapter.h" +#include + +using handler_t = fishstore::environment::UringIoHandler; + +#define CLASS CheckpointTest_Uring +#include "checkpoint_test.h" +#undef CLASS + +int main(int argc, char* argv[]) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/ingest_queue_test.cc b/test/ingest_queue_test.cc index d4b0a5e..9f8ed4b 100644 --- a/test/ingest_queue_test.cc +++ b/test/ingest_queue_test.cc @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include "gtest/gtest.h" #include "adapters/simdjson_adapter.h" diff --git a/test/ingest_test.h b/test/ingest_test.h index 55d8a3d..902fc27 100644 --- a/test/ingest_test.h +++ b/test/ingest_test.h @@ -157,8 +157,8 @@ class JsonFullScanContext : public IAsyncContext { }; TEST(CLASS, Ingest_Serial) { - std::experimental::filesystem::remove_all("test"); - std::experimental::filesystem::create_directories("test"); + std::filesystem::remove_all("test"); + std::filesystem::create_directories("test"); store_t store{ 8192, 201326592, "test" }; store.StartSession(); auto id_proj = store.MakeProjection("/id"); @@ -212,8 +212,8 @@ TEST(CLASS, Ingest_Serial) { } TEST(CLASS, Ingest_Concurrent) { - std::experimental::filesystem::remove_all("test"); - std::experimental::filesystem::create_directories("test"); + std::filesystem::remove_all("test"); + std::filesystem::create_directories("test"); { store_t store{ 8192, 201326592, "test" }; store.StartSession(); @@ -282,8 +282,8 @@ TEST(CLASS, Ingest_Concurrent) { } TEST(CLASS, FullScan) { - std::experimental::filesystem::remove_all("test"); - std::experimental::filesystem::create_directories("test"); + std::filesystem::remove_all("test"); + std::filesystem::create_directories("test"); store_t store{ 8192, 201326592, "test" }; store.StartSession(); auto id_proj = store.MakeProjection("/id"); diff --git a/test/ingest_threadpool_test.cc b/test/ingest_threadpool_test.cc index 427a8a2..9fc26dd 100644 --- a/test/ingest_threadpool_test.cc +++ b/test/ingest_threadpool_test.cc @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include "gtest/gtest.h" #include "adapters/simdjson_adapter.h" diff --git a/test/ingest_uring_test.cc b/test/ingest_uring_test.cc new file mode 100644 index 0000000..dfc9842 --- /dev/null +++ b/test/ingest_uring_test.cc @@ -0,0 +1,22 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +#include +#include +#include +#include +#include "gtest/gtest.h" + +#include "adapters/simdjson_adapter.h" +#include + +using handler_t = fishstore::environment::UringIoHandler; + +#define CLASS IngestTest_Uring +#include "ingest_test.h" +#undef CLASS + +int main(int argc, char* argv[]) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/register_test.cc b/test/register_test.cc index 3e8af94..f81bb10 100644 --- a/test/register_test.cc +++ b/test/register_test.cc @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include "gtest/gtest.h" #include "adapters/simdjson_adapter.h" @@ -171,8 +171,8 @@ class JsonFullScanContext : public IAsyncContext { }; TEST(Registration, Register_Concurrent) { - std::experimental::filesystem::remove_all("test"); - std::experimental::filesystem::create_directories("test"); + std::filesystem::remove_all("test"); + std::filesystem::create_directories("test"); store_t store{ 8192, 201326592, "test" }; store.StartSession(); auto school_id_proj = store.MakeProjection("/school/id"); @@ -239,8 +239,8 @@ TEST(Registration, Register_Concurrent) { } TEST(Registration, Deregister_Concurrent) { - std::experimental::filesystem::remove_all("test"); - std::experimental::filesystem::create_directories("test"); + std::filesystem::remove_all("test"); + std::filesystem::create_directories("test"); store_t store{ 8192, 201326592, "test" }; store.StartSession(); auto school_id_proj = store.MakeProjection("/school/id"); diff --git a/third_party/hopscotch-map b/third_party/hopscotch-map index 7ef9cc4..8483747 160000 --- a/third_party/hopscotch-map +++ b/third_party/hopscotch-map @@ -1 +1 @@ -Subproject commit 7ef9cc4aca326cbe725553ca330d74076f936ff2 +Subproject commit 848374746a50b3ebebe656611d554cb134e9aeef diff --git a/third_party/simdjson b/third_party/simdjson index ee66fb1..8a3b2f2 160000 --- a/third_party/simdjson +++ b/third_party/simdjson @@ -1 +1 @@ -Subproject commit ee66fb1c602e17563606c6f6eecc225dac5455cc +Subproject commit 8a3b2f20e47b2eb28b7085d388422de94bdae634