diff --git a/.circleci/config.yml b/.circleci/config.yml index 43e6d2f36c..ef38d04b54 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -100,7 +100,6 @@ commands: type: string steps: - run: | - brew install python@3.9 curl -OL "https://github.com/bazelbuild/bazelisk/releases/download/v1.17.0/bazelisk-darwin-<>" sudo mv "bazelisk-darwin-<>" /usr/local/bin/bazel chmod a+x /usr/local/bin/bazel @@ -149,15 +148,14 @@ commands: steps: - install-brew-rosetta - run: | - /usr/local/bin/brew install python@3.9 tool/test/start-community-server.sh - /usr/local/bin/python3.9 -m pip install wheel - /usr/local/bin/python3.9 -m pip install pip==21.3.1 - /usr/local/bin/python3.9 -m pip install -r python/requirements_dev.txt - /usr/local/bin/python3.9 -m pip install --extra-index-url https://repo.typedb.com/public/public-snapshot/python/simple typedb-driver==0.0.0+$(git rev-parse HEAD) + python3 -m pip install wheel + python3 -m pip install pip==21.3.1 + python3 -m pip install -r python/requirements_dev.txt + python3 -m pip install --extra-index-url https://repo.typedb.com/public/public-snapshot/python/simple typedb-driver==0.0.0+$(git rev-parse HEAD) sleep 5 pushd python/tests/deployment/ - /usr/local/bin/python3.9 -m unittest test && export TEST_SUCCESS=0 || export TEST_SUCCESS=1 + python3 -m unittest test && export TEST_SUCCESS=0 || export TEST_SUCCESS=1 popd tool/test/stop-community-server.sh exit $TEST_SUCCESS @@ -850,32 +848,32 @@ workflows: - deploy-snapshot-linux-arm64: filters: branches: - only: [development, master, "3.0"] + only: [development, master] - deploy-snapshot-linux-x86_64: filters: branches: - only: [development, master, "3.0"] + only: [development, master] - deploy-snapshot-mac-arm64: filters: branches: - only: [development, master, "3.0"] + only: [development, master] - deploy-snapshot-mac-x86_64: filters: branches: - only: [development, master, "3.0"] + only: [development, master] - deploy-snapshot-windows-x86_64: filters: branches: - only: [development, master, "3.0"] + only: [development, master] - deploy-snapshot-any: filters: branches: - only: [development, master, "3.0"] + only: [development, master] requires: - deploy-snapshot-linux-arm64 - deploy-snapshot-linux-x86_64 diff --git a/Cargo.lock b/Cargo.lock index 4f93e243f5..7e20ee10ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -276,9 +276,9 @@ dependencies = [ [[package]] name = "async-std" -version = "1.13.1" +version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "730294c1c08c2e0f85759590518f6333f0d5a0a766a27d519c1b244c3dfd8a24" +checksum = "2c8e079a4ab67ae52b7403632e4618815d6db36d2a010cfe41b02c1b1578f93b" dependencies = [ "async-attributes", "async-channel 1.9.0", @@ -1354,6 +1354,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "io-uring" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "046fa2d4d00aea763528b4950358d0ead425372445dc8ff86312b3c69ff7727b" +dependencies = [ + "bitflags 2.6.0", + "cfg-if", + "libc", +] + [[package]] name = "is-terminal" version = "0.4.13" @@ -1397,10 +1408,11 @@ checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" [[package]] name = "js-sys" -version = "0.3.70" +version = "0.3.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1868808506b929d7b0cfa8f75951347aa71bb21144b7791bae35d9bccfcfe37a" +checksum = "ec48937a97411dcb524a265206ccd4c90bb711fca92b2792c407f268825b9305" dependencies = [ + "once_cell", "wasm-bindgen", ] @@ -1421,9 +1433,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.169" +version = "0.2.176" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" +checksum = "58f929b4d672ea937a23a1ab494143d968337a5f47e56d0815df1e0890ddf174" [[package]] name = "linked-hash-map" @@ -1464,9 +1476,9 @@ dependencies = [ [[package]] name = "macro_rules_attribute" -version = "0.2.0" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a82271f7bc033d84bbca59a3ce3e4159938cb08a9c3aebbe54d215131518a13" +checksum = "65049d7923698040cd0b1ddcced9b0eb14dd22c5f86ae59c3740eab64a676520" dependencies = [ "macro_rules_attribute-proc_macro", "paste", @@ -1474,9 +1486,18 @@ dependencies = [ [[package]] name = "macro_rules_attribute-proc_macro" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "670fdfda89751bc4a84ac13eaa63e205cf0fd22b4c9a5fbfa085b63c1f1d3a30" + +[[package]] +name = "matchers" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8dd856d451cc0da70e2ef2ce95a18e39a93b7558bedf10201ad28503f918568" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] [[package]] name = "matchit" @@ -1561,6 +1582,15 @@ dependencies = [ "nom", ] +[[package]] +name = "nu-ansi-term" +version = "0.50.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4a28e057d01f97e61255210fcff094d74ed0466038633e95017f5beb68e4399" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -2207,18 +2237,28 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.219" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_core" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", @@ -2227,15 +2267,16 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.140" +version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373" +checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c" dependencies = [ "indexmap 2.5.0", "itoa", "memchr", "ryu", "serde", + "serde_core", ] [[package]] @@ -2264,6 +2305,15 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -2354,6 +2404,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "socket2" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "233504af464074f9d066d7b5416c5f9b894a5862a6506e306f7b816cdd6f1807" +dependencies = [ + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "spin" version = "0.9.8" @@ -2520,22 +2580,33 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + [[package]] name = "tokio" -version = "1.45.1" +version = "1.47.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75ef51a33ef1da925cea3e4eb122833cb377c61439ca401b770f54902b806779" +checksum = "89e49afdadebb872d3145a5638b59eb0691ea23e46ca484037cfab3b76b95038" dependencies = [ "backtrace", "bytes", + "io-uring", "libc", "mio", "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.7", + "slab", + "socket2 0.6.0", "tokio-macros", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -2676,10 +2747,11 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" -version = "0.1.40" +version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -2687,9 +2759,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.27" +version = "0.1.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903" dependencies = [ "proc-macro2", "quote", @@ -2698,11 +2770,41 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.32" +version = "0.1.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678" +dependencies = [ + "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +checksum = "2054a14f5307d601f88daf0553e1cbf472acc4f2c51afab632431cdcd72124d5" dependencies = [ + "matchers", + "nu-ansi-term", "once_cell", + "regex-automata", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", ] [[package]] @@ -2748,6 +2850,8 @@ dependencies = [ "tokio-stream", "tonic", "tonic-types", + "tracing", + "tracing-subscriber", "typedb-protocol", "uuid", ] @@ -2770,6 +2874,7 @@ dependencies = [ "env_logger", "itertools 0.10.5", "log", + "tracing", "typedb-driver", ] @@ -2820,15 +2925,23 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.16.0" +version = "1.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "458f7a779bf54acc9f347480ac654f68407d3aab21269a6e3c9f922acd9e2da9" +checksum = "2f87b8aa10b915a06587d0dec516c282ff295b475d94abf425d62b57710070a2" dependencies = [ "getrandom 0.3.1", + "js-sys", "rand 0.9.0", "serde", + "wasm-bindgen", ] +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "value-bag" version = "1.9.0" @@ -2883,24 +2996,25 @@ dependencies = [ [[package]] name = "wasm-bindgen" -version = "0.2.93" +version = "0.2.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a82edfc16a6c469f5f44dc7b571814045d60404b55a0ee849f9bcfa2e63dd9b5" +checksum = "c1da10c01ae9f1ae40cbfac0bac3b1e724b320abfcf52229f80b547c0d250e2d" dependencies = [ "cfg-if", "once_cell", + "rustversion", "wasm-bindgen-macro", + "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-backend" -version = "0.2.93" +version = "0.2.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9de396da306523044d3302746f1208fa71d7532227f15e347e2d93e4145dd77b" +checksum = "671c9a5a66f49d8a47345ab942e2cb93c7d1d0339065d4f8139c486121b43b19" dependencies = [ "bumpalo", "log", - "once_cell", "proc-macro2", "quote", "syn 2.0.87", @@ -2921,9 +3035,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.93" +version = "0.2.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "585c4c91a46b072c92e908d99cb1dcdf95c5218eeb6f3bf1efa991ee7a68cccf" +checksum = "7ca60477e4c59f5f2986c50191cd972e3a50d8a95603bc9434501cf156a9a119" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -2931,9 +3045,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.93" +version = "0.2.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836" +checksum = "9f07d2f20d4da7b26400c9f4a0511e6e0345b040694e8a75bd41d578fa4421d7" dependencies = [ "proc-macro2", "quote", @@ -2944,9 +3058,12 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.93" +version = "0.2.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c62a0a307cb4a311d3a07867860911ca130c3494e8c2719593806c08bc5d0484" +checksum = "bad67dc8b2a1a6e5448428adec4c3e84c43e561d8c9ee8a9e5aabeb193ec41d1" +dependencies = [ + "unicode-ident", +] [[package]] name = "web-sys" diff --git a/c/BUILD b/c/BUILD index b0eb6f31fa..1f16511bdd 100644 --- a/c/BUILD +++ b/c/BUILD @@ -46,7 +46,7 @@ rust_static_library( "@crates//:chrono", "@crates//:itertools", "@crates//:env_logger", - "@crates//:log", + "@crates//:tracing", ], tags = ["crate-name=typedb_driver_clib"], ) diff --git a/c/Cargo.toml b/c/Cargo.toml index 79614fedb5..d97baa9158 100644 --- a/c/Cargo.toml +++ b/c/Cargo.toml @@ -19,6 +19,11 @@ features = {} version = "0.10.2" default-features = false + [dependencies.tracing] + features = ["attributes", "default", "log", "std", "tracing-attributes"] + version = "0.1.41" + default-features = false + [dependencies.log] features = ["kv", "kv_unstable", "std", "value-bag"] version = "0.4.27" diff --git a/c/src/answer.rs b/c/src/answer.rs index d8d32c2fb9..8eabc9ef02 100644 --- a/c/src/answer.rs +++ b/c/src/answer.rs @@ -29,7 +29,7 @@ use typedb_driver::{ use super::{ concept::ConceptIterator, iterator::CIterator, - memory::{borrow, free, release, release_optional, release_string, string_view}, + memory::{borrow, free, release, release_string, string_view}, }; use crate::{ common::StringIterator, diff --git a/c/src/concept/concept.rs b/c/src/concept/concept.rs index 115e76f2b0..ba674edd6d 100644 --- a/c/src/concept/concept.rs +++ b/c/src/concept/concept.rs @@ -24,7 +24,7 @@ use typedb_driver::{ box_stream, concept::{ value::{Decimal, Duration, TimeZone}, - Attribute, AttributeType, Concept, Entity, EntityType, Relation, RelationType, RoleType, Value, + Attribute, Concept, Entity, Relation, Value, }, }; @@ -171,7 +171,7 @@ pub extern "C" fn concept_try_get_iid(thing: *mut Concept) -> *mut c_char { /// If this is a Type, returns the label of the type. #[no_mangle] pub extern "C" fn concept_get_label(concept: *const Concept) -> *mut c_char { - release_string(borrow(concept).get_label().clone().to_owned()) + release_string(borrow(concept).get_label().to_owned()) } /// Retrieves the optional label of this Concept. @@ -180,7 +180,7 @@ pub extern "C" fn concept_get_label(concept: *const Concept) -> *mut c_char { /// If this is a Type, returns the label of the type. #[no_mangle] pub extern "C" fn concept_try_get_label(concept: *const Concept) -> *mut c_char { - release_optional_string(borrow(concept).try_get_label().map(|str| str.clone().to_owned())) + release_optional_string(borrow(concept).try_get_label().map(|str| str.to_owned())) } /// Retrieves the value type of this Concept, if it exists. @@ -190,7 +190,7 @@ pub extern "C" fn concept_try_get_label(concept: *const Concept) -> *mut c_char /// Otherwise, returns null. #[no_mangle] pub extern "C" fn concept_try_get_value_type(concept: *const Concept) -> *mut c_char { - release_optional_string(borrow(concept).try_get_value_label().map(|str| str.clone().to_owned())) + release_optional_string(borrow(concept).try_get_value_label().map(|str| str.to_owned())) } /// Retrieves the value of this Concept, if it exists. @@ -317,7 +317,7 @@ pub extern "C" fn concept_get_decimal(concept: *const Concept) -> Decimal { #[no_mangle] pub extern "C" fn concept_get_string(concept: *const Concept) -> *mut c_char { match borrow(concept).try_get_string() { - Some(value) => release_string(value.clone().to_owned()), + Some(value) => release_string(value.to_owned()), None => unreachable!("Attempting to unwrap a non-string {:?} as string", borrow(concept)), } } diff --git a/c/src/database.rs b/c/src/database.rs index dacf866e1e..8e669e778f 100644 --- a/c/src/database.rs +++ b/c/src/database.rs @@ -17,14 +17,13 @@ * under the License. */ -use std::{ffi::c_char, path::Path, ptr::addr_of_mut, sync::Arc}; +use std::{ffi::c_char, path::Path}; -use typedb_driver::{box_stream, info::ReplicaInfo, Database}; +use typedb_driver::Database; use super::{ error::{try_release_string, unwrap_void}, - iterator::{iterator_next, CIterator}, - memory::{borrow, borrow_mut, free, release, release_optional, release_string, take_ownership}, + memory::{borrow, release_string}, }; use crate::memory::{decrement_arc, string_view, take_arc}; diff --git a/c/src/error.rs b/c/src/error.rs index 57ed509aa4..a147cc4855 100644 --- a/c/src/error.rs +++ b/c/src/error.rs @@ -25,7 +25,7 @@ use std::{ }; use env_logger::Env; -use log::{trace, warn}; +use tracing::{debug, warn}; use typedb_driver::{Error, Result}; use super::memory::{free, release_arc, release_optional, release_string}; @@ -90,7 +90,7 @@ pub(super) fn unwrap_void(result: Result) { } fn record_error(err: Error) { - trace!("Encountered error {err} in typedb-driver-rust"); + debug!("Encountered error {err} in typedb-driver-rust"); LAST_ERROR.with(|prev| *prev.borrow_mut() = Some(err)); } diff --git a/c/src/memory.rs b/c/src/memory.rs index 4aa959c8c6..4fe15fefc3 100644 --- a/c/src/memory.rs +++ b/c/src/memory.rs @@ -20,11 +20,11 @@ use std::{ cell::RefCell, ffi::{c_char, CStr, CString}, - ptr::{null, null_mut}, + ptr::null_mut, sync::Arc, }; -use log::trace; +use tracing::trace; use typedb_driver::Error; thread_local! { diff --git a/c/src/transaction.rs b/c/src/transaction.rs index 8e4f31d7da..aa0c47cd6e 100644 --- a/c/src/transaction.rs +++ b/c/src/transaction.rs @@ -19,9 +19,7 @@ use std::{ffi::c_char, ptr::null_mut}; -use typedb_driver::{ - DatabaseManager, Error, QueryOptions, Transaction, TransactionOptions, TransactionType, TypeDBDriver, -}; +use typedb_driver::{Error, QueryOptions, Transaction, TransactionOptions, TransactionType, TypeDBDriver}; use super::memory::{borrow, borrow_mut, free, release, take_ownership}; use crate::{answer::QueryAnswerPromise, error::try_release, memory::string_view, promise::VoidPromise}; @@ -60,14 +58,14 @@ pub extern "C" fn transaction_query( /// Closes the transaction and frees the native rust object. #[no_mangle] -pub extern "C" fn transaction_close(txn: *mut Transaction) { +pub extern "C" fn transaction_submit_close(txn: *mut Transaction) { free(txn); } /// Forcibly closes this transaction. To be used in exceptional cases. #[no_mangle] -pub extern "C" fn transaction_force_close(txn: *mut Transaction) { - borrow_mut(txn).force_close(); +pub extern "C" fn transaction_close(txn: *mut Transaction) -> *mut VoidPromise { + release(VoidPromise(Box::new(borrow_mut(txn).close()))) } /// Commits the changes made via this transaction to the TypeDB database. @@ -100,7 +98,9 @@ pub extern "C" fn transaction_on_close( txn: *const Transaction, callback_id: usize, callback: extern "C" fn(usize, *mut Error), -) { - borrow(txn) - .on_close(move |error| callback(callback_id, error.map(|err| release(err.into())).unwrap_or(null_mut()))); +) -> *mut VoidPromise { + release(VoidPromise(Box::new( + borrow(txn) + .on_close(move |error| callback(callback_id, error.map(|err| release(err.into())).unwrap_or(null_mut()))), + ))) } diff --git a/c/src/user.rs b/c/src/user.rs index 0615fc4464..15a3e5c2d4 100644 --- a/c/src/user.rs +++ b/c/src/user.rs @@ -19,13 +19,13 @@ use std::ffi::c_char; -use typedb_driver::{Database, TypeDBDriver, User, UserManager}; +use typedb_driver::User; use super::{ error::unwrap_void, memory::{borrow, free, release_string, string_view}, }; -use crate::memory::{take_arc, take_ownership}; +use crate::memory::take_ownership; /// Frees the native rust User object. #[no_mangle] diff --git a/c/src/user_manager.rs b/c/src/user_manager.rs index fefa0f8e87..5c347506d3 100644 --- a/c/src/user_manager.rs +++ b/c/src/user_manager.rs @@ -19,14 +19,13 @@ use std::{ffi::c_char, ptr::addr_of_mut}; -use typedb_driver::{box_stream, TypeDBDriver, User, UserManager}; +use typedb_driver::{box_stream, TypeDBDriver, User}; use super::{ error::{try_release, try_release_optional, unwrap_or_default, unwrap_void}, iterator::{iterator_next, CIterator}, - memory::{borrow, free, release, string_view}, + memory::{borrow, free, string_view}, }; -use crate::{error::try_release_string, memory::release_string}; /// Iterator over a set of Users pub struct UserIterator(CIterator); diff --git a/c/typedb_driver.i b/c/typedb_driver.i index f95d874103..2cbd6ae1ee 100644 --- a/c/typedb_driver.i +++ b/c/typedb_driver.i @@ -17,6 +17,7 @@ * under the License. */ +%module(threads=1) typedb_driver %module(directors="1") typedb_driver %{ extern "C" { @@ -62,7 +63,7 @@ struct Type {}; %dropproxy(QueryOptions, query_options) #define typedb_driver_drop driver_close -#define transaction_drop transaction_close +#define transaction_drop transaction_submit_close #define database_drop database_close %dropproxy(TypeDBDriver, typedb_driver) @@ -116,12 +117,75 @@ struct TransactionCallbackDirector { %{ #include #include +#include #include -static std::unordered_map transactionOnCloseCallbacks {}; + +class ThreadSafeTransactionCallbacks { +private: + // 1. The static map to protect + static std::unordered_map s_transactionOnCloseCallbacks; + + // 2. The static mutex to manage access + static std::mutex s_mutex; + +public: + // Delete copy/move constructors and assignment operators + // to prevent accidental copying of the singleton-like structure + ThreadSafeTransactionCallbacks(const ThreadSafeTransactionCallbacks&) = delete; + ThreadSafeTransactionCallbacks& operator=(const ThreadSafeTransactionCallbacks&) = delete; + + // --- Core Operations --- + + /** + * @brief Inserts a key-value pair into the map in a thread-safe manner. + */ + static void insert(size_t key, TransactionCallbackDirector* value) { + // Lock the mutex for the duration of this scope + std::lock_guard lock(s_mutex); + + // Thread-safe insertion + s_transactionOnCloseCallbacks[key] = value; + } + + /** + * @brief Retrieves a value associated with a key in a thread-safe manner. + * @returns The value pointer, or nullptr if the key is not found. + */ + static TransactionCallbackDirector* find(size_t key) { + // Lock the mutex for the duration of this scope + std::lock_guard lock(s_mutex); + + // Thread-safe lookup + auto it = s_transactionOnCloseCallbacks.find(key); + if (it != s_transactionOnCloseCallbacks.end()) { + return it->second; + } + return nullptr; // Return nullptr if not found + } + + /** + * @brief Removes a key-value pair from the map in a thread-safe manner. + */ + static void remove(size_t key) { + // Lock the mutex for the duration of this scope + std::lock_guard lock(s_mutex); + + // Thread-safe removal + s_transactionOnCloseCallbacks.erase(key); + } + + // Add other necessary map operations (e.g., size(), contains(), clear()) here... +}; + +// Initialize the static members +std::unordered_map ThreadSafeTransactionCallbacks::s_transactionOnCloseCallbacks; +std::mutex ThreadSafeTransactionCallbacks::s_mutex; + static void transaction_callback_execute(size_t ID, Error* error) { try { - transactionOnCloseCallbacks.at(ID)->callback(error); - transactionOnCloseCallbacks.erase(ID); + auto cb = ThreadSafeTransactionCallbacks::find(ID); + cb->callback(error); + ThreadSafeTransactionCallbacks::remove(ID); } catch (std::exception const& e) { std::cerr << "[ERROR] " << e.what() << std::endl; } @@ -132,11 +196,11 @@ static void transaction_callback_execute(size_t ID, Error* error) { %ignore transaction_on_close; %inline %{ #include -void transaction_on_close_register(const Transaction* transaction, TransactionCallbackDirector* handler) { +VoidPromise* transaction_on_close_register(const Transaction* transaction, TransactionCallbackDirector* handler) { static std::atomic_size_t nextID; std::size_t ID = nextID.fetch_add(1); - transactionOnCloseCallbacks.insert({ID, handler}); - transaction_on_close(transaction, ID, &transaction_callback_execute); + ThreadSafeTransactionCallbacks::insert(ID, handler); + return transaction_on_close(transaction, ID, &transaction_callback_execute); } %} diff --git a/cpp/lib/connection/transaction.cpp b/cpp/lib/connection/transaction.cpp index e8aa3009c9..5711f8ed1d 100644 --- a/cpp/lib/connection/transaction.cpp +++ b/cpp/lib/connection/transaction.cpp @@ -77,13 +77,16 @@ bool Transaction::isOpen() const { return transactionNative != nullptr && _native::transaction_is_open(transactionNative.get()); } -void Transaction::close() { - if (transactionNative != nullptr) _native::transaction_close(transactionNative.release()); +void Transaction::submitClose() { + if (transactionNative != nullptr) _native::transaction_submit_close(transactionNative.release()); } -void Transaction::forceClose() { +void Transaction::close() { CHECK_NATIVE(transactionNative); - _native::transaction_force_close(transactionNative.release()); + VoidFuture p = _native::transaction_close(transactionNative.release()); + DriverException::check_and_throw(); + p.wait(); + DriverException::check_and_throw(); } void Transaction::commit() { diff --git a/dependencies/typedb/artifacts.bzl b/dependencies/typedb/artifacts.bzl index 1a3db21f7e..00b1c26beb 100644 --- a/dependencies/typedb/artifacts.bzl +++ b/dependencies/typedb/artifacts.bzl @@ -25,7 +25,7 @@ def typedb_artifact(): artifact_name = "typedb-all-{platform}-{version}.{ext}", tag_source = deployment["artifact"]["release"]["download"], commit_source = deployment["artifact"]["snapshot"]["download"], - tag = "3.5.0-rc0" + tag = "3.5.0" ) #def typedb_cloud_artifact(): diff --git a/docs/modules/ROOT/partials/rust/connection/TypeDBDriver.adoc b/docs/modules/ROOT/partials/rust/connection/TypeDBDriver.adoc index a97287f9bd..a4f4427d25 100644 --- a/docs/modules/ROOT/partials/rust/connection/TypeDBDriver.adoc +++ b/docs/modules/ROOT/partials/rust/connection/TypeDBDriver.adoc @@ -32,6 +32,27 @@ Result driver.force_close() ---- +[#_struct_TypeDBDriver_init_logging_] +==== init_logging + +[source,rust] +---- +pub fn init_logging() +---- + +Initialize logging configuration for the TypeDB driver. + +This function sets up tracing with the following priority: + +The logging is initialized only once using a static flag to prevent multiple initializations in applications that create multiple drivers. + +[caption=""] +.Returns +[source,rust] +---- +null +---- + [#_struct_TypeDBDriver_is_open_] ==== is_open diff --git a/docs/modules/ROOT/partials/rust/transaction/Transaction.adoc b/docs/modules/ROOT/partials/rust/transaction/Transaction.adoc index 7482e4b7fa..54262ef618 100644 --- a/docs/modules/ROOT/partials/rust/transaction/Transaction.adoc +++ b/docs/modules/ROOT/partials/rust/transaction/Transaction.adoc @@ -8,21 +8,21 @@ A transaction with a TypeDB database. // tag::methods[] -[#_struct_Transaction_commit_] -==== commit +[#_struct_Transaction_close_] +==== close [source,rust] ---- -pub fn commit(self) -> impl Promise<'static, Result> +pub fn close(&self) -> impl Promise<'_, Result<()>> ---- -Commits the changes made via this transaction to the TypeDB database. Whether or not the transaction is commited successfully, it gets closed after the commit call. +Closes the transaction and returns a resolvable promise [caption=""] .Returns [source,rust] ---- -impl Promise<'static, Result> +impl Promise<'_, Result<()>> ---- [caption=""] @@ -34,7 +34,7 @@ async:: -- [source,rust] ---- -transaction.commit().await +transaction.close().await ---- -- @@ -44,36 +44,54 @@ sync:: -- [source,rust] ---- -transaction.commit() +transaction.close().resolve() ---- -- ==== -[#_struct_Transaction_force_close_] -==== force_close +[#_struct_Transaction_commit_] +==== commit [source,rust] ---- -pub fn force_close(&self) +pub fn commit(self) -> impl Promise<'static, Result> ---- -Closes the transaction. +Commits the changes made via this transaction to the TypeDB database. Whether or not the transaction is commited successfully, it gets closed after the commit call. [caption=""] .Returns [source,rust] ---- -null +impl Promise<'static, Result> ---- [caption=""] .Code examples +[tabs] +==== +async:: ++ +-- +[source,rust] +---- +transaction.commit().await +---- + +-- + +sync:: ++ +-- [source,rust] ---- -transaction.force_close() +transaction.commit() ---- +-- +==== + [#_struct_Transaction_is_open_] ==== is_open @@ -106,10 +124,10 @@ transaction.close() pub fn on_close( &self, callback: impl FnOnce(Option) + Send + Sync + 'static, -) +) -> impl Promise<'_, Result<()>> ---- -Registers a callback function which will be executed when this transaction is closed. +Registers a callback function which will be executed when this transaction is closed returns a resolvable promise that must be awaited otherwise the callback may not be registered [caption=""] .Input parameters @@ -124,7 +142,7 @@ a| `function` a| — The callback function. a| .Returns [source,rust] ---- -null +impl Promise<'_, Result<()>> ---- [caption=""] diff --git a/java/connection/TransactionImpl.java b/java/connection/TransactionImpl.java index a18bf40b43..b673ae7e81 100644 --- a/java/connection/TransactionImpl.java +++ b/java/connection/TransactionImpl.java @@ -36,7 +36,7 @@ import static com.typedb.driver.common.exception.ErrorMessage.Driver.TRANSACTION_CLOSED; import static com.typedb.driver.jni.typedb_driver.transaction_commit; -import static com.typedb.driver.jni.typedb_driver.transaction_force_close; +import static com.typedb.driver.jni.typedb_driver.transaction_close; import static com.typedb.driver.jni.typedb_driver.transaction_is_open; import static com.typedb.driver.jni.typedb_driver.transaction_new; import static com.typedb.driver.jni.typedb_driver.transaction_on_close; @@ -102,7 +102,7 @@ public void onClose(Consumer function) throws TypeDBDriverException { try { TransactionOnClose callback = new TransactionOnClose(function); callbacks.add(callback); - transaction_on_close(nativeObject, callback.released()); + transaction_on_close(nativeObject, callback.released()).get(); } catch (com.typedb.driver.jni.Error error) { throw new TypeDBDriverException(error); } @@ -133,7 +133,7 @@ public void rollback() throws TypeDBDriverException { public void close() throws TypeDBDriverException { if (nativeObject.isOwned()) { try { - transaction_force_close(nativeObject); + transaction_close(nativeObject).get(); } catch (com.typedb.driver.jni.Error error) { throw new TypeDBDriverException(error); } finally { diff --git a/java/test/integration/BUILD b/java/test/integration/BUILD index 972f84b621..2ac781f4e7 100644 --- a/java/test/integration/BUILD +++ b/java/test/integration/BUILD @@ -31,7 +31,7 @@ typedb_java_test( "@typedb_bazel_distribution//platform:is_linux_x86_64": "@typedb_artifact_linux-x86_64//file", "@typedb_bazel_distribution//platform:is_mac_arm64": "@typedb_artifact_mac-arm64//file", "@typedb_bazel_distribution//platform:is_mac_x86_64": "@typedb_artifact_mac-x86_64//file", -# "@typedb_bazel_distribution//platform:is_windows_x86_64": "@typedb_artifact_windows-x86_64//file", + "@typedb_bazel_distribution//platform:is_windows_x86_64": "@typedb_artifact_windows-x86_64//file", }, test_class = "com.typedb.driver.test.integration.ExampleTest", deps = [ @@ -46,6 +46,29 @@ typedb_java_test( ], ) +typedb_java_test( + name = "test-driver", + srcs = ["DriverTest.java"], + server_artifacts = { + "@typedb_bazel_distribution//platform:is_linux_arm64": "@typedb_artifact_linux-arm64//file", + "@typedb_bazel_distribution//platform:is_linux_x86_64": "@typedb_artifact_linux-x86_64//file", + "@typedb_bazel_distribution//platform:is_mac_arm64": "@typedb_artifact_mac-arm64//file", + "@typedb_bazel_distribution//platform:is_mac_x86_64": "@typedb_artifact_mac-x86_64//file", +# "@typedb_bazel_distribution//platform:is_windows_x86_64": "@typedb_artifact_windows-x86_64//file", + }, + test_class = "com.typedb.driver.test.integration.DriverTest", + deps = [ + # Internal dependencies + "//java:driver-java", + "//java/api", + "//java/common", + + # External dependencies from @typedb + "@maven//:org_slf4j_slf4j_api", +# "@maven//:com_typedb_typedb_runner", + ], +) + typedb_java_test( name = "test-value", srcs = ["ValueTest.java"], diff --git a/java/test/integration/DriverTest.java b/java/test/integration/DriverTest.java new file mode 100644 index 0000000000..18014d195d --- /dev/null +++ b/java/test/integration/DriverTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.typedb.driver.test.integration; + +import com.typedb.driver.TypeDB; +import com.typedb.driver.api.Credentials; +import com.typedb.driver.api.Driver; +import com.typedb.driver.api.DriverOptions; +import com.typedb.driver.api.Transaction; +import com.typedb.driver.api.answer.ConceptRow; +import com.typedb.driver.api.answer.QueryAnswer; +import com.typedb.driver.api.concept.Concept; +import com.typedb.driver.api.concept.instance.Attribute; +import com.typedb.driver.api.concept.type.AttributeType; +import com.typedb.driver.api.concept.value.Value; +import com.typedb.driver.api.database.Database; +import com.typedb.driver.common.Duration; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +@SuppressWarnings("Duplicates") +public class DriverTest { + private static final String DB_NAME = "typedb"; + private static final String ADDRESS = "0.0.0.0:1729"; + private static Driver typedbDriver; + + @BeforeClass + public static void setUpClass() { + typedbDriver = TypeDB.driver(ADDRESS, new Credentials("admin", "password"), new DriverOptions(false, null)); + if (typedbDriver.databases().contains(DB_NAME)) typedbDriver.databases().get(DB_NAME).delete(); + typedbDriver.databases().create(DB_NAME); + } + + @AfterClass + public static void close() { + typedbDriver.close(); + } + + @Test + public void transaction_on_close() { + Database db = typedbDriver.databases().get(DB_NAME); + db.delete(); + typedbDriver.databases().create(DB_NAME); + + AtomicBoolean calledOnClose = new AtomicBoolean(false); + + localhostTypeDBTX(transaction -> { + + transaction.onClose(error -> { + calledOnClose.set(true); + }); + + transaction.close(); + assertTrue(calledOnClose.get()); + }, Transaction.Type.READ); + } + + private void localhostTypeDBTX(Consumer fn, Transaction.Type type/*, Options options*/) { + try (Transaction transaction = typedbDriver.transaction(DB_NAME, type/*, options*/)) { + fn.accept(transaction); + } + } +} diff --git a/python/tests/integration/BUILD b/python/tests/integration/BUILD index 4f70e6437a..42a6c41d98 100644 --- a/python/tests/integration/BUILD +++ b/python/tests/integration/BUILD @@ -35,6 +35,17 @@ py_test( python_version = "PY3" ) +py_test( + name = "test_driver", + srcs = ["test_driver.py"], + deps = [ + "//python:driver_python", + requirement("PyHamcrest"), + ], + data = ["//python:native-driver-binary-link", "//python:native-driver-wrapper-link"], + python_version = "PY3" +) + py_test( name = "test_values", srcs = ["test_values.py"], diff --git a/python/tests/integration/test_driver.py b/python/tests/integration/test_driver.py new file mode 100644 index 0000000000..981d30e561 --- /dev/null +++ b/python/tests/integration/test_driver.py @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest +from unittest import TestCase +import time + +from hamcrest import * +from typedb.driver import * + + +class TestDriver(TestCase): + + def setUp(self): + with TypeDB.driver(TypeDB.DEFAULT_ADDRESS, Credentials("admin", "password"), DriverOptions(is_tls_enabled=False)) as driver: + if driver.databases.contains("typedb"): + driver.databases.get("typedb").delete() + + + def test_on_close_callback(self): + with TypeDB.driver(TypeDB.DEFAULT_ADDRESS, Credentials("admin", "password"), DriverOptions(is_tls_enabled=False)) as driver: + driver.databases.create("typedb") + database = driver.databases.get("typedb") + assert_that(database.name, is_("typedb")) + + tx = driver.transaction(database.name, TransactionType.READ) + + transaction_closed = {"closed": False} + def callback(_error): + transaction_closed.update({"closed": True}) + tx.on_close(callback) + + tx.close() + + assert_that(transaction_closed["closed"], is_(True)) + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/python/typedb/connection/transaction.py b/python/typedb/connection/transaction.py index 69c3723013..32d214068d 100644 --- a/python/typedb/connection/transaction.py +++ b/python/typedb/connection/transaction.py @@ -30,7 +30,7 @@ from typedb.concept.answer.query_answer_factory import wrap_query_answer from typedb.native_driver_wrapper import error_code, error_message, transaction_new, transaction_query, \ transaction_commit, \ - transaction_rollback, transaction_is_open, transaction_on_close, transaction_force_close, \ + transaction_rollback, transaction_is_open, transaction_on_close, transaction_close, \ query_answer_promise_resolve, \ Transaction as NativeTransaction, TransactionCallbackDirector, TypeDBDriverExceptionNative, void_promise_resolve @@ -78,7 +78,8 @@ def is_open(self) -> bool: return transaction_is_open(self.native_object) def on_close(self, function: callable): - transaction_on_close(self.native_object, _Transaction.TransactionOnClose(function).__disown__()) + callback = _Transaction.TransactionOnClose(function) + void_promise_resolve(transaction_on_close(self.native_object, callback.__disown__())) class TransactionOnClose(TransactionCallbackDirector): @@ -87,7 +88,16 @@ def __init__(self, function: callable): self._function = function def callback(self, error: NativeError) -> None: - self._function(TypeDBException(error_code(error), error_message(error))) + try: + if error: + self._function(TypeDBException(error_code(error), error_message(error))) + else: + self._function(None) + except Exception as e: + # WARNING: SWIG will not propagate any errors (including syntax!) to the user without more work so we can at least log + import sys + print("Error invoking onclose callback: ", e, file=sys.stderr) + raise e def commit(self): try: @@ -104,7 +114,7 @@ def rollback(self): def close(self): if self._native_object.thisown: - transaction_force_close(self._native_object) + void_promise_resolve(transaction_close(self._native_object)) def __enter__(self): return self diff --git a/rust/BUILD b/rust/BUILD index aa1d10c459..3d3786f342 100644 --- a/rust/BUILD +++ b/rust/BUILD @@ -42,6 +42,8 @@ typedb_driver_deps = [ "@crates//:tokio-stream", "@crates//:tonic", "@crates//:tonic-types", + "@crates//:tracing", + "@crates//:tracing-subscriber", "@crates//:uuid", "@typedb_protocol//grpc/rust:typedb_protocol", ] diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 5503f44ee4..e36a121269 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -63,6 +63,11 @@ version = "1.47.1" default-features = false + [dependencies.tracing] + features = ["attributes", "default", "log", "std", "tracing-attributes"] + version = "0.1.41" + default-features = false + [dependencies.typedb-protocol] features = [] git = "https://github.com/typedb/typedb-protocol" @@ -79,6 +84,11 @@ version = "1.0.219" default-features = false + [dependencies.tracing-subscriber] + features = ["alloc", "ansi", "default", "env-filter", "fmt", "matchers", "nu-ansi-term", "once_cell", "regex", "registry", "sharded-slab", "smallvec", "std", "thread_local", "tracing", "tracing-log"] + version = "0.3.19" + default-features = false + [dependencies.tokio-stream] features = ["default", "net", "time"] version = "0.1.17" @@ -139,6 +149,10 @@ version = "0.8.4" default-features = false +[[test]] + path = "tests/integration/driver.rs" + name = "test_driver" + [[test]] path = "tests/integration/example.rs" name = "test_example" diff --git a/rust/README.md b/rust/README.md index da7c603273..92861209a6 100644 --- a/rust/README.md +++ b/rust/README.md @@ -322,3 +322,53 @@ fn typedb_example() { ``` + +## Logging + +The TypeDB Rust driver includes comprehensive logging functionality using the `tracing` crate. This allows you to monitor driver operations, debug issues, and understand the driver's behavior. + +### Logging Configuration + +The logging level can be controlled through environment variables with the following priority: + +1. **TYPEDB_DRIVER_LOG** - Driver-specific log level (highest priority) +2. **RUST_LOG** - General Rust logging level (fallback) +3. **Default level: INFO** - If no environment variables are set + +The logging is scoped to the `typedb_driver` package only, so it won't affect other loggers like Tonic or other dependencies. This means you can control TypeDB driver logging independently from your application's other logging. + +If you want to track the memory exchanges between Rust and the C layer, you can set `TYPEDB_DRIVER_CLIB_LOG` to TRACE. + +### Supported Log Levels + +- `error` - Only error messages +- `warn` - Warning and error messages +- `info` - Informational, warning, and error messages (default) +- `debug` - Detailed debugging information +- `trace` - Very detailed tracing information + +### Examples + +```bash +# Set driver-specific log level +TYPEDB_DRIVER_LOG=debug cargo run + +# Use RUST_LOG as fallback +RUST_LOG=info cargo run + +# Default level (INFO) if no environment variable is set +cargo run +``` + +### Manual Initialization + +If you need to initialize logging before creating a driver (for example, to see connection logs), you can call the initialization function directly: + +```rust +use typedb_driver::TypeDBDriver; + +TypeDBDriver::init_logging(); +// Logging is now initialized +``` + +For more detailed logging configuration and integration with application logging, see the [Logging Documentation](LOGGING.md). diff --git a/rust/src/common/error.rs b/rust/src/common/error.rs index 207376352b..96dad3fd50 100644 --- a/rust/src/common/error.rs +++ b/rust/src/common/error.rs @@ -21,7 +21,7 @@ use std::{collections::HashSet, error::Error as StdError, fmt}; use itertools::Itertools; use tonic::{Code, Status}; -use tonic_types::{ErrorDetails, ErrorInfo, StatusExt}; +use tonic_types::StatusExt; use super::{address::Address, RequestID}; diff --git a/rust/src/connection/network/channel.rs b/rust/src/connection/network/channel.rs index a96ec9bddd..d4d64fecd9 100644 --- a/rust/src/connection/network/channel.rs +++ b/rust/src/connection/network/channel.rs @@ -17,12 +17,14 @@ * under the License. */ -use std::sync::{Arc, RwLock}; +use std::{ + sync::{Arc, RwLock}, + time::Duration, +}; use tonic::{ body::BoxBody, client::GrpcService, - metadata::MetadataValue, service::{ interceptor::{InterceptedService, ResponseFuture as InterceptorResponseFuture}, Interceptor, @@ -58,6 +60,7 @@ pub(super) fn open_callcred_channel( driver_options.tls_config().clone().expect("TLS config object must be set when TLS is enabled"); builder = builder.tls_config(tls_config)?; } + builder = builder.keep_alive_while_idle(true).http2_keep_alive_interval(Duration::from_secs(3)); let channel = builder.connect_lazy(); let call_credentials = Arc::new(CallCredentials::new(credentials)); Ok((CallCredChannel::new(channel, CredentialInjector::new(call_credentials.clone())), call_credentials)) diff --git a/rust/src/connection/network/proto/common.rs b/rust/src/connection/network/proto/common.rs index b4b9f43483..602581a8c7 100644 --- a/rust/src/connection/network/proto/common.rs +++ b/rust/src/connection/network/proto/common.rs @@ -19,7 +19,7 @@ use typedb_protocol::{ options::{Query as QueryOptionsProto, Transaction as TransactionOptionsProto}, - transaction, Options, + transaction, }; use super::{IntoProto, TryFromProto}; diff --git a/rust/src/connection/network/stub.rs b/rust/src/connection/network/stub.rs index 9da178c79b..b13487850b 100644 --- a/rust/src/connection/network/stub.rs +++ b/rust/src/connection/network/stub.rs @@ -20,12 +20,12 @@ use std::sync::Arc; use futures::{future::BoxFuture, FutureExt, TryFutureExt}; -use log::{debug, trace, warn}; use tokio::sync::mpsc::{unbounded_channel as unbounded_async, UnboundedSender}; use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::{Response, Status, Streaming}; +use tracing::{debug, trace}; use typedb_protocol::{ - authentication, connection, database, database_manager, migration, server_manager, transaction, + connection, database, database_manager, migration, server_manager, transaction, type_db_client::TypeDbClient as GRPC, user, user_manager, }; @@ -64,12 +64,12 @@ impl RPCStub { async fn renew_token(&mut self) -> Result { if let Some(call_credentials) = &self.call_credentials { - trace!("Renewing token..."); + debug!("Renewing token..."); call_credentials.reset_token(); let request = call_credentials.credentials().clone().try_into_proto()?; let token = self.grpc.authentication_token_create(request).await?.into_inner().token; call_credentials.set_token(token); - trace!("Token renewed"); + debug!("Token renewed"); } Ok(()) } diff --git a/rust/src/connection/network/transmitter/import.rs b/rust/src/connection/network/transmitter/import.rs index 892d2595c4..ffe062ed20 100644 --- a/rust/src/connection/network/transmitter/import.rs +++ b/rust/src/connection/network/transmitter/import.rs @@ -17,7 +17,7 @@ * under the License. */ -use std::{ops::Deref, sync::Arc, thread::sleep, time::Duration}; +use std::{sync::Arc, thread::sleep, time::Duration}; use futures::StreamExt; #[cfg(not(feature = "sync"))] diff --git a/rust/src/connection/network/transmitter/response_sink.rs b/rust/src/connection/network/transmitter/response_sink.rs index 561876e339..338672e96a 100644 --- a/rust/src/connection/network/transmitter/response_sink.rs +++ b/rust/src/connection/network/transmitter/response_sink.rs @@ -17,16 +17,19 @@ * under the License. */ -use std::{fmt, fmt::Formatter, sync::Arc}; +use std::{ + fmt, + fmt::{Debug, Formatter}, + sync::Arc, +}; use crossbeam::channel::Sender as SyncOneshotSender; -use itertools::Either; -use log::{debug, error}; use tokio::sync::{mpsc::UnboundedSender, oneshot::Sender as AsyncOneshotSender}; +use tracing::{debug, error}; use crate::{ common::{RequestID, Result}, - error::{ConnectionError, InternalError}, + error::InternalError, Error, }; @@ -59,7 +62,7 @@ pub(super) enum StreamResponse { Continue(RequestID), } -impl ResponseSink { +impl ResponseSink { pub(super) fn finish(self, response: Result) { let result = match self { Self::ImmediateOneShot(handler) => { @@ -71,7 +74,7 @@ impl ResponseSink { Self::Streamed(sink) => sink.send(StreamResponse::Result(response)).map_err(Error::from), }; match result { - Err(Error::Internal(err @ InternalError::SendError)) => debug!("{err}"), + Err(Error::Internal(err @ InternalError::SendError)) => debug!("ResponseSink::finish - {err}"), Err(err) => error!("{err}"), Ok(()) => (), } @@ -83,7 +86,7 @@ impl ResponseSink { _ => unreachable!("attempted to stream over a one-shot callback"), }; match result { - Err(Error::Internal(err @ InternalError::SendError)) => debug!("{err}"), + Err(Error::Internal(err @ InternalError::SendError)) => debug!("ResponseSink::send_result - {err}"), Err(err) => error!("{err}"), Ok(()) => (), } @@ -95,7 +98,9 @@ impl ResponseSink { _ => unreachable!("attempted to stream over a one-shot callback"), }; match result { - Err(Error::Internal(err @ InternalError::SendError)) => debug!("{err}"), + Err(Error::Internal(err @ InternalError::SendError)) => { + debug!("ResponseSink::send_continuable - {err}") + } Err(err) => error!("{err}"), Ok(()) => (), } diff --git a/rust/src/connection/network/transmitter/rpc.rs b/rust/src/connection/network/transmitter/rpc.rs index 7fb87d0cb6..92934e6565 100644 --- a/rust/src/connection/network/transmitter/rpc.rs +++ b/rust/src/connection/network/transmitter/rpc.rs @@ -25,6 +25,7 @@ use tokio::{ oneshot::channel as oneshot_async, }, }; +use tracing::trace; use typedb_protocol::{transaction, transaction::server::Server}; use super::{oneshot_blocking, response_sink::ResponseSink}; @@ -100,9 +101,15 @@ impl RPCTransmitter { request = request_source.recv() => request, _ = shutdown_signal.recv() => None, } { + trace!("RPC dispatcher loop received request {:?}", request); let rpc = rpc.clone(); tokio::spawn(async move { let response = Self::send_request(rpc, request).await; + trace!( + "RPC dispatcher loop received response, will send into response {:?} into sink {:?}", + response, + response_sink + ); response_sink.finish(response); }); } diff --git a/rust/src/connection/network/transmitter/transaction.rs b/rust/src/connection/network/transmitter/transaction.rs index 81465efadb..d063db3a12 100644 --- a/rust/src/connection/network/transmitter/transaction.rs +++ b/rust/src/connection/network/transmitter/transaction.rs @@ -32,7 +32,6 @@ use futures::StreamExt; use futures::TryStreamExt; #[cfg(feature = "sync")] use itertools::Itertools; -use log::{debug, error}; use prost::Message; #[cfg(not(feature = "sync"))] use tokio::sync::oneshot::channel as oneshot; @@ -42,15 +41,16 @@ use tokio::{ mpsc::{error::SendError, unbounded_channel as unbounded_async, UnboundedReceiver, UnboundedSender}, oneshot::{channel as oneshot_async, Sender as AsyncOneshotSender}, }, + task, time::{sleep_until, Instant}, }; use tonic::Streaming; +use tracing::{debug, error, trace}; use typedb_protocol::transaction::{self, res_part::ResPart, server::Server, stream_signal::res_part::State}; -use uuid::Uuid; #[cfg(feature = "sync")] use super::oneshot_blocking as oneshot; -use super::response_sink::{ImmediateHandler, ResponseSink, StreamResponse}; +use super::response_sink::{ResponseSink, StreamResponse}; use crate::{ common::{ box_promise, @@ -59,19 +59,18 @@ use crate::{ Callback, Promise, RequestID, Result, }, connection::{ - message::{QueryResponse, Request, Response, TransactionRequest, TransactionResponse}, + message::{QueryResponse, TransactionRequest, TransactionResponse}, network::proto::{FromProto, IntoProto, TryFromProto}, runtime::BackgroundRuntime, - server_connection::LatencyTracker, }, - Error, + resolve, Error, }; pub(in crate::connection) struct TransactionTransmitter { request_sink: UnboundedSender<(TransactionRequest, Option>)>, is_open: Arc>, error: Arc>>, - on_close_register_sink: UnboundedSender) + Send + Sync>>, + on_close_register_sink: UnboundedSender<(Box) + Send + Sync>, UnboundedSender<()>)>, shutdown_sink: UnboundedSender<()>, // runtime is alive as long as the transaction transmitter is alive: background_runtime: Arc, @@ -79,7 +78,7 @@ pub(in crate::connection) struct TransactionTransmitter { impl Drop for TransactionTransmitter { fn drop(&mut self) { - self.force_close(); + self.submit_close(); } } @@ -118,15 +117,77 @@ impl TransactionTransmitter { &self.shutdown_sink } - pub(in crate::connection) fn force_close(&self) { + #[cfg(not(feature = "sync"))] + pub(in crate::connection) fn close(&self) -> impl Promise<'_, Result<()>> { + box_promise(async move { + if self.background_runtime.is_open() && self.is_open.compare_exchange(true, false).is_ok() { + let (closed_sink, mut closed_source) = unbounded_async(); + let close_notifier_callback = Box::new(move |error| { + closed_sink.send(()).unwrap(); + }); + let on_close_submit_promise = self.on_close(close_notifier_callback); + let _ = resolve!(on_close_submit_promise); + *self.error.write().unwrap() = Some(ConnectionError::TransactionIsClosed.into()); + self.shutdown_sink.send(()).ok(); + closed_source.recv().await; + Ok(()) + } else { + Ok(()) + } + }) + } + + #[cfg(feature = "sync")] + pub(in crate::connection) fn close(&self) -> impl Promise<'_, Result<()>> { + box_promise(move || { + if self.background_runtime.is_open() && self.is_open.compare_exchange(true, false).is_ok() { + let (closed_sink, closed_source) = oneshot(); + let close_notifier_callback = Box::new(move |error| { + closed_sink.send(()).unwrap(); + }); + let _ = resolve!(self.on_close(close_notifier_callback)); + *self.error.write().unwrap() = Some(ConnectionError::TransactionIsClosed.into()); + self.shutdown_sink.send(()).ok(); + closed_source.recv().ok(); + Ok(()) + } else { + Ok(()) + } + }) + } + + // fire and forget close - here for symmetry + // callers should use 'drop()' instead + fn submit_close(&mut self) { if self.is_open.compare_exchange(true, false).is_ok() { - *self.error.write().unwrap() = Some(ConnectionError::TransactionIsClosed.into()); self.shutdown_sink.send(()).ok(); } } - pub(in crate::connection) fn on_close(&self, callback: impl FnOnce(Option) + Send + Sync + 'static) { - self.on_close_register_sink.send(Box::new(callback)).ok(); + #[cfg(not(feature = "sync"))] + pub(in crate::connection) fn on_close( + &self, + callback: impl FnOnce(Option) + Send + Sync + 'static, + ) -> impl Promise<'_, Result<()>> { + box_promise(async move { + let (sender, mut sink) = unbounded_async(); + self.on_close_register_sink.send((Box::new(callback), sender)).ok(); + sink.recv().await.expect("Did not receive on_close registration success signal"); + Ok(()) + }) + } + + #[cfg(feature = "sync")] + pub(in crate::connection) fn on_close( + &self, + callback: impl FnOnce(Option) + Send + Sync + 'static, + ) -> impl Promise<'_, Result<()>> { + box_promise(move || { + let (sender, mut sink) = unbounded_async(); + self.on_close_register_sink.send((Box::new(callback), sender)).ok(); + sink.blocking_recv().expect("Did not receive on_close registration success signal"); + Ok(()) + }) } #[cfg(not(feature = "sync"))] @@ -230,7 +291,10 @@ impl TransactionTransmitter { response_source: Streaming, is_open: Arc>, error: Arc>>, - on_close_callback_source: UnboundedReceiver) + Send + Sync>>, + on_close_callback_source: UnboundedReceiver<( + Box) + Send + Sync>, + UnboundedSender<()>, + )>, callback_handler_sink: Sender<(Callback, AsyncOneshotSender<()>)>, shutdown_sink: UnboundedSender<()>, shutdown_signal: UnboundedReceiver<()>, @@ -242,38 +306,33 @@ impl TransactionTransmitter { on_close: Default::default(), callback_handler_sink, }; - tokio::task::spawn_blocking({ + task::spawn_blocking({ let collector = collector.clone(); - move || { - Self::dispatch_loop(queue_source, request_sink, collector, on_close_callback_source, shutdown_signal) - } + move || Self::sync_dispatch_loop(queue_source, request_sink, collector, shutdown_signal) }); - tokio::spawn(Self::listen_loop(response_source, collector, shutdown_sink)); + tokio::spawn(Self::async_listen_loop(response_source, collector, on_close_callback_source, shutdown_sink)); } - fn dispatch_loop( + const DISPATCH_INTERVAL: Duration = Duration::from_micros(50); + + fn sync_dispatch_loop( mut request_source: UnboundedReceiver<(TransactionRequest, Option>)>, request_sink: UnboundedSender, mut collector: ResponseCollector, - mut on_close_callback_source: UnboundedReceiver) + Send + Sync>>, mut shutdown_signal: UnboundedReceiver<()>, ) { const MAX_GRPC_MESSAGE_LEN: usize = 1_000_000; - const DISPATCH_INTERVAL: Duration = Duration::from_micros(50); let mut request_buffer = TransactionRequestBuffer::default(); loop { if let Ok(_) = shutdown_signal.try_recv() { if !request_buffer.is_empty() { - request_sink.send(request_buffer.take()).unwrap(); + request_sink.send(request_buffer.take()).ok(); } break; } - if let Ok(callback) = on_close_callback_source.try_recv() { - collector.on_close.write().unwrap().push(callback) - } // sleep, then take all messages off the request queue and dispatch them - sleep(DISPATCH_INTERVAL); + sleep(Self::DISPATCH_INTERVAL); while let Ok(recv) = request_source.try_recv() { let (request, callback) = recv; let request = request.into_proto(); @@ -281,27 +340,41 @@ impl TransactionTransmitter { collector.register(request.req_id.clone().into(), callback); } if request_buffer.len() + request.encoded_len() > MAX_GRPC_MESSAGE_LEN { - request_sink.send(request_buffer.take()).unwrap(); + request_sink.send(request_buffer.take()).ok(); } request_buffer.push(request); } if !request_buffer.is_empty() { - request_sink.send(request_buffer.take()).unwrap(); + request_sink.send(request_buffer.take()).ok(); } } } - async fn listen_loop( + async fn async_listen_loop( mut grpc_source: Streaming, collector: ResponseCollector, + mut on_close_callback_source: UnboundedReceiver<( + Box) + Send + Sync>, + UnboundedSender<()>, + )>, shutdown_sink: UnboundedSender<()>, ) { loop { - match grpc_source.next().await { - Some(Ok(message)) => collector.collect(message).await, - Some(Err(status)) => break collector.close_with_error(status.into()).await, - None => break collector.close().await, - } + let result = tokio::select! { biased; + message = grpc_source.next() => { + match message { + Some(Ok(message)) => collector.collect(message).await, + Some(Err(status)) => break collector.close_with_error(status.into()).await, + None => break collector.close().await + } + } + callback_option = on_close_callback_source.recv() => { + if let Some((callback, recorded_signal)) = callback_option { + collector.on_close.write().unwrap().push(callback); + recorded_signal.send(()).expect("Failed to signal back that on_close callback was recorded.") + } + } + }; } shutdown_sink.send(()).ok(); } @@ -438,8 +511,8 @@ impl ResponseCollector { for (_, listener) in listeners.drain() { listener.finish(Ok(TransactionResponse::Close)); } - let callbacks = std::mem::take(&mut *self.on_close.write().unwrap()); - for callback in callbacks { + let on_close_callbacks = std::mem::take(&mut *self.on_close.write().unwrap()); + for callback in on_close_callbacks { let (response_sink, response) = oneshot_async(); self.callback_handler_sink.send((Box::new(move || callback(None)), response_sink)).unwrap(); response.await.ok(); diff --git a/rust/src/connection/runtime.rs b/rust/src/connection/runtime.rs index 23fa08b0eb..edcfba6847 100644 --- a/rust/src/connection/runtime.rs +++ b/rust/src/connection/runtime.rs @@ -23,7 +23,6 @@ use crossbeam::{ atomic::AtomicCell, channel::{bounded as bounded_blocking, unbounded, Sender}, }; -use log::error; use tokio::{ runtime, sync::{ @@ -31,6 +30,7 @@ use tokio::{ oneshot::Sender as AsyncOneshotSender, }, }; +use tracing::error; use crate::common::{Callback, Result}; diff --git a/rust/src/connection/server_connection.rs b/rust/src/connection/server_connection.rs index 8e805dd34f..f257436c6b 100644 --- a/rust/src/connection/server_connection.rs +++ b/rust/src/connection/server_connection.rs @@ -18,7 +18,6 @@ */ use std::{ - collections::{HashMap, HashSet}, fmt, sync::{ atomic::{AtomicU64, Ordering}, @@ -31,10 +30,10 @@ use tokio::{sync::mpsc::UnboundedSender, time::Instant}; use uuid::Uuid; use crate::{ - common::{address::Address, RequestID}, + common::address::Address, connection::{ database::{export_stream::DatabaseExportStream, import_stream::DatabaseImportStream}, - message::{DatabaseImportRequest, Request, Response, TransactionRequest, TransactionResponse}, + message::{DatabaseImportRequest, Request, Response, TransactionRequest}, network::transmitter::{ DatabaseExportTransmitter, DatabaseImportTransmitter, RPCTransmitter, TransactionTransmitter, }, @@ -43,7 +42,7 @@ use crate::{ }, error::{ConnectionError, InternalError}, info::{DatabaseInfo, UserInfo}, - Credentials, DriverOptions, TransactionOptions, TransactionType, User, + Credentials, DriverOptions, TransactionOptions, TransactionType, }; #[derive(Clone)] @@ -261,7 +260,6 @@ impl ServerConnection { let open_latency = Instant::now().duration_since(open_request_start).as_millis() as u64 - server_duration_millis; self.latency_tracker.update_latency(open_latency); - let transmitter = TransactionTransmitter::new(self.background_runtime.clone(), request_sink, response_source); let transmitter_shutdown_sink = transmitter.shutdown_sink().clone(); diff --git a/rust/src/connection/transaction_stream.rs b/rust/src/connection/transaction_stream.rs index 44aca8275f..9a3d05751b 100644 --- a/rust/src/connection/transaction_stream.rs +++ b/rust/src/connection/transaction_stream.rs @@ -21,6 +21,7 @@ use std::{fmt, iter, pin::Pin, sync::Arc}; #[cfg(not(feature = "sync"))] use futures::{stream, StreamExt}; +use tracing::trace; use super::network::transmitter::TransactionTransmitter; use crate::{ @@ -78,10 +79,6 @@ impl TransactionStream { self.transaction_transmitter.is_open() } - pub(crate) fn force_close(&self) { - self.transaction_transmitter.force_close(); - } - pub(crate) fn type_(&self) -> TransactionType { self.type_ } @@ -90,10 +87,17 @@ impl TransactionStream { self.options } - pub(crate) fn on_close(&self, callback: impl FnOnce(Option) + Send + Sync + 'static) { + pub(crate) fn on_close( + &self, + callback: impl FnOnce(Option) + Send + Sync + 'static, + ) -> impl Promise<'_, Result<()>> { self.transaction_transmitter.on_close(callback) } + pub(crate) fn close(&self) -> impl Promise<'_, Result<()>> { + self.transaction_transmitter.close() + } + pub(crate) fn commit(self: Pin>) -> impl Promise<'static, Result> { let promise = self.single(TransactionRequest::Commit); promisify! { @@ -104,7 +108,9 @@ impl TransactionStream { pub(crate) fn rollback(&self) -> impl Promise<'_, Result> { let promise = self.single(TransactionRequest::Rollback); - promisify! { require_transaction_response!(resolve!(promise), Rollback) } + promisify! { + require_transaction_response!(resolve!(promise), Rollback) + } } pub(crate) fn query(&self, query: &str, options: QueryOptions) -> impl Promise<'static, Result> { diff --git a/rust/src/database/database.rs b/rust/src/database/database.rs index 59a315ac24..43383613fd 100644 --- a/rust/src/database/database.rs +++ b/rust/src/database/database.rs @@ -25,17 +25,14 @@ use std::{ fs::File, io::{BufWriter, Write}, path::Path, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, RwLock, - }, + sync::{Arc, RwLock}, thread::sleep, time::Duration, }; use itertools::Itertools; -use log::{debug, error}; use prost::Message; +use tracing::{debug, error}; use crate::{ common::{ diff --git a/rust/src/database/database_manager.rs b/rust/src/database/database_manager.rs index 40ca837947..d2dff1dafa 100644 --- a/rust/src/database/database_manager.rs +++ b/rust/src/database/database_manager.rs @@ -21,12 +21,12 @@ use std::future::Future; use std::{ collections::HashMap, - io::{BufReader, BufWriter, Cursor, Read}, + io::BufReader, path::Path, sync::{Arc, RwLock}, }; -use prost::{decode_length_delimiter, Message}; +use tracing::{debug, error, info}; use typedb_protocol::migration::Item; use super::Database; diff --git a/rust/src/database/migration.rs b/rust/src/database/migration.rs index c86a1c66e6..7f8ab401fc 100644 --- a/rust/src/database/migration.rs +++ b/rust/src/database/migration.rs @@ -20,7 +20,7 @@ use std::{ cmp::max, fs::{File, OpenOptions}, - io::{BufRead, Read, Write}, + io::BufRead, marker::PhantomData, path::Path, }; diff --git a/rust/src/driver.rs b/rust/src/driver.rs index d9385a2c1e..37aaa15783 100644 --- a/rust/src/driver.rs +++ b/rust/src/driver.rs @@ -24,6 +24,8 @@ use std::{ }; use itertools::Itertools; +use tracing::{debug, error, info, trace}; +use tracing_subscriber::{fmt as tracing_fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; use crate::{ common::{ @@ -52,6 +54,45 @@ impl TypeDBDriver { pub const DEFAULT_ADDRESS: &'static str = "localhost:1729"; + /// Initialize logging configuration for the TypeDB driver. + /// + /// This function sets up tracing with the following priority: + /// 1. TYPEDB_DRIVER_LOG environment variable (if set). Use TYPEDB_DRIVER_CLIB_LOG to see memory exchanges + /// 1. environment variable (if set) + /// 2. RUST_LOG environment variable (if set) + /// 3. Default level (INFO) + /// + /// The logging is initialized only once using a static flag to prevent + /// multiple initializations in applications that create multiple drivers. + pub fn init_logging() { + use std::sync::Once; + static INIT: Once = Once::new(); + + INIT.call_once(|| { + let clib_level = if let Ok(typedb_driver_clib_log) = std::env::var("TYPEDB_DRIVER_CLIB_LOG") { + typedb_driver_clib_log + } else { + "info".to_owned() + }; + // Try to get log level from TYPEDB_DRIVER_LOG first + let env_filter = if let Ok(typedb_log_level) = std::env::var("TYPEDB_DRIVER_LOG") { + EnvFilter::new(&format!("typedb_driver={},typedb_driver_clib={}", typedb_log_level, clib_level)) + } else if let Ok(rust_log) = std::env::var("RUST_LOG") { + // If RUST_LOG is set, use it but scope it to typedb_driver only + EnvFilter::new(&format!("typedb_driver={},typedb_driver_clib={}", rust_log, clib_level)) + } else { + EnvFilter::new(&format!("typedb_driver=info,typedb_driver_clib={}", clib_level)) + }; + + // Initialize the tracing subscriber + if let Err(e) = + tracing_subscriber::registry().with(env_filter).with(tracing_fmt::layer().with_target(false)).try_init() + { + eprintln!("Failed to initialize logging: {}", e); + } + }); + } + /// Creates a new TypeDB Server connection. /// /// # Arguments @@ -78,6 +119,7 @@ impl TypeDBDriver { credentials: Credentials, driver_options: DriverOptions, ) -> Result { + debug!("Creating new TypeDB driver connection to {}", address.as_ref()); Self::new_with_description(address, credentials, driver_options, Self::DRIVER_LANG).await } @@ -111,10 +153,15 @@ impl TypeDBDriver { driver_options: DriverOptions, driver_lang: impl AsRef, ) -> Result { + Self::init_logging(); + + debug!("Initializing TypeDB driver with description: {}", driver_lang.as_ref()); let id = address.as_ref().to_string(); let address: Address = id.parse()?; + let background_runtime = Arc::new(BackgroundRuntime::new()?); + debug!("Establishing server connection to {}", address); let (server_connection, database_info) = ServerConnection::new( background_runtime.clone(), address.clone(), @@ -124,6 +171,7 @@ impl TypeDBDriver { Self::VERSION, ) .await?; + debug!("Successfully connected to server at {}", address); // // validate // let advertised_address = server_connection @@ -136,7 +184,9 @@ impl TypeDBDriver { let server_connections: HashMap = [(address, server_connection)].into(); let database_manager = DatabaseManager::new(server_connections.clone(), database_info)?; let user_manager = UserManager::new(server_connections.clone()); + debug!("Created database manager and user manager"); + debug!("TypeDB driver initialization completed successfully"); Ok(Self { server_connections, database_manager, user_manager, background_runtime }) } @@ -226,12 +276,17 @@ impl TypeDBDriver { options: TransactionOptions, ) -> Result { let database_name = database_name.as_ref(); + debug!("Opening transaction for database: {} with type: {:?}", database_name, transaction_type); + let database = self.database_manager.get_cached_or_fetch(database_name).await?; let transaction_stream = database .run_failsafe(|database| async move { - database.connection().open_transaction(database.name(), transaction_type, options).await + let res = database.connection().open_transaction(database.name(), transaction_type, options).await; + res }) .await?; + + debug!("Successfully opened transaction for database: {}", database_name); Ok(Transaction::new(transaction_stream)) } @@ -247,9 +302,17 @@ impl TypeDBDriver { return Ok(()); } + debug!("Closing TypeDB driver connection"); let result = self.server_connections.values().map(ServerConnection::force_close).try_collect().map_err(Into::into); - self.background_runtime.force_close().and(result) + let close_result = self.background_runtime.force_close().and(result); + + match &close_result { + Ok(_) => debug!("Successfully closed TypeDB driver connection"), + Err(e) => error!("Failed to close TypeDB driver connection: {}", e), + } + + close_result } pub(crate) fn server_count(&self) -> usize { diff --git a/rust/src/transaction.rs b/rust/src/transaction.rs index 93c9daccb5..8b9597b13f 100644 --- a/rust/src/transaction.rs +++ b/rust/src/transaction.rs @@ -19,6 +19,8 @@ use std::{fmt, pin::Pin}; +use tracing::debug; + use crate::{ answer::QueryAnswer, common::{Promise, Result, TransactionType}, @@ -76,6 +78,7 @@ impl Transaction { options: QueryOptions, ) -> impl Promise<'static, Result> { let query = query.as_ref(); + debug!("Transaction submitting query: {}", query); self.transaction_stream.query(query, options) } @@ -84,7 +87,8 @@ impl Transaction { self.type_ } - /// Registers a callback function which will be executed when this transaction is closed. + /// Registers a callback function which will be executed when this transaction is closed + /// returns a resolvable promise that must be awaited otherwise the callback may not be registered /// /// # Arguments /// @@ -95,19 +99,23 @@ impl Transaction { /// ```rust /// transaction.on_close(function) /// ``` - pub fn on_close(&self, callback: impl FnOnce(Option) + Send + Sync + 'static) { + pub fn on_close( + &self, + callback: impl FnOnce(Option) + Send + Sync + 'static, + ) -> impl Promise<'_, Result<()>> { self.transaction_stream.on_close(callback) } - /// Closes the transaction. + /// Closes the transaction and returns a resolvable promise /// /// # Examples /// /// ```rust - /// transaction.force_close() + #[cfg_attr(feature = "sync", doc = "transaction.close().resolve()")] + #[cfg_attr(not(feature = "sync"), doc = "transaction.close().await")] /// ``` - pub fn force_close(&self) { - self.transaction_stream.force_close(); + pub fn close(&self) -> impl Promise<'_, Result<()>> { + self.transaction_stream.close() } /// Commits the changes made via this transaction to the TypeDB database. Whether or not the transaction is commited successfully, it gets closed after the commit call. diff --git a/rust/tests/behaviour/steps/connection/database.rs b/rust/tests/behaviour/steps/connection/database.rs index 6a42aa20c2..9ed45176be 100644 --- a/rust/tests/behaviour/steps/connection/database.rs +++ b/rust/tests/behaviour/steps/connection/database.rs @@ -17,7 +17,7 @@ * under the License. */ -use std::{collections::HashSet, fs::File, io::Read}; +use std::io::Read; use cucumber::{gherkin::Step, given, then, when}; use futures::{ diff --git a/rust/tests/behaviour/steps/connection/mod.rs b/rust/tests/behaviour/steps/connection/mod.rs index 293a881083..67d85db5cb 100644 --- a/rust/tests/behaviour/steps/connection/mod.rs +++ b/rust/tests/behaviour/steps/connection/mod.rs @@ -19,8 +19,6 @@ use cucumber::{given, then, when}; use macro_rules_attribute::apply; -use tokio::time::sleep; -use typedb_driver::{Credentials, TypeDBDriver}; use crate::{assert_with_timeout, generic_step, params, params::check_boolean, Context}; diff --git a/rust/tests/behaviour/steps/connection/transaction.rs b/rust/tests/behaviour/steps/connection/transaction.rs index ede25f09c4..8b03a1f7d2 100644 --- a/rust/tests/behaviour/steps/connection/transaction.rs +++ b/rust/tests/behaviour/steps/connection/transaction.rs @@ -156,7 +156,7 @@ pub async fn transaction_commits(context: &mut Context, may_error: params::MayEr #[apply(generic_step)] #[step(expr = "transaction closes")] pub async fn transaction_closes(context: &mut Context) { - context.take_transaction().force_close(); + context.take_transaction().close().await.ok(); } #[apply(generic_step)] diff --git a/rust/tests/behaviour/steps/lib.rs b/rust/tests/behaviour/steps/lib.rs index a33b5412f4..ce4f444b9d 100644 --- a/rust/tests/behaviour/steps/lib.rs +++ b/rust/tests/behaviour/steps/lib.rs @@ -234,13 +234,13 @@ impl Context { pub async fn cleanup_transactions(&mut self) { while let Some(transaction) = self.try_take_transaction() { - transaction.force_close(); + transaction.close().await.ok(); } } pub async fn cleanup_background_transactions(&mut self) { while let Some(background_transaction) = self.try_take_background_transaction() { - background_transaction.force_close(); + background_transaction.close().await.ok(); } } diff --git a/rust/tests/integration/BUILD b/rust/tests/integration/BUILD index 11ba6b4760..b3af28ff89 100644 --- a/rust/tests/integration/BUILD +++ b/rust/tests/integration/BUILD @@ -43,6 +43,24 @@ rust_test( ], ) +rust_test( + name = "test_driver", + srcs = ["driver.rs"], + deps = [ + "//rust:typedb_driver", + "@crates//:async-std", + "@crates//:chrono", + "@crates//:futures", + "@crates//:itertools", + "@crates//:regex", + "@crates//:serde_json", + "@crates//:serial_test", + "@crates//:smol", + "@crates//:tokio", + "@crates//:uuid", + ], +) + checkstyle_test( name = "checkstyle", include = glob(["*"]), diff --git a/rust/tests/integration/driver.rs b/rust/tests/integration/driver.rs new file mode 100644 index 0000000000..d566140bf1 --- /dev/null +++ b/rust/tests/integration/driver.rs @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; + +use serial_test::serial; +use typedb_driver::{Credentials, DriverOptions, TransactionType, TypeDBDriver}; + +async fn cleanup() { + let driver = TypeDBDriver::new( + TypeDBDriver::DEFAULT_ADDRESS, + Credentials::new("admin", "password"), + DriverOptions::new(false, None).unwrap(), + ) + .await + .unwrap(); + if driver.databases().contains("typedb").await.unwrap() { + driver.databases().get("typedb").await.unwrap().delete().await.unwrap(); + } +} + +#[test] +#[serial] +fn transaction_callback() { + async_std::task::block_on(async { + cleanup().await; + let driver = TypeDBDriver::new( + TypeDBDriver::DEFAULT_ADDRESS, + Credentials::new("admin", "password"), + DriverOptions::new(false, None).unwrap(), + ) + .await + .unwrap(); + + driver.databases().create("typedb").await.unwrap(); + let database = driver.databases().get("typedb").await.unwrap(); + assert_eq!(database.name(), "typedb"); + + let close_called = Arc::new(AtomicBool::new(false)); + let transaction = driver.transaction(database.name(), TransactionType::Read).await.unwrap(); + transaction + .on_close(Box::new({ + let clone = close_called.clone(); + move |error| { + clone.store(true, Ordering::SeqCst); + } + })) + .await; + + transaction.close().await; + drop(transaction); + + while !close_called.load(Ordering::Acquire) { + // Yield the current time slice to the OS scheduler. + // This prevents the loop from consuming 100% of a CPU core. + std::thread::yield_now(); + } + assert!(close_called.load(Ordering::SeqCst)) + }) +} diff --git a/rust/tests/integration/example.rs b/rust/tests/integration/example.rs index fee42911a5..b6932e8952 100644 --- a/rust/tests/integration/example.rs +++ b/rust/tests/integration/example.rs @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + // EXAMPLE START MARKER use std::time::Duration; @@ -43,7 +44,11 @@ async fn cleanup() { .await .unwrap(); if driver.databases().contains("typedb").await.unwrap() { - driver.databases().get("typedb").await.unwrap().delete().await.unwrap(); + println!("Confirmed DB contains, going to get..."); + let db = driver.databases().get("typedb").await.unwrap(); + println!("Got DB"); + db.delete().await.unwrap(); + println!("Deleted db"); } } diff --git a/rust/tests/integration/mod.rs b/rust/tests/integration/mod.rs index 22e8bfb348..f4890ab29f 100644 --- a/rust/tests/integration/mod.rs +++ b/rust/tests/integration/mod.rs @@ -18,4 +18,3 @@ */ mod cluster; -mod cluster;