diff --git a/Makefile b/Makefile index 6076ed9c..fe8127ac 100644 --- a/Makefile +++ b/Makefile @@ -25,6 +25,8 @@ SCYLLA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\ :LoggingTests.*\ :PreparedMetadataTests.*\ :UseKeyspaceCaseSensitiveTests.*\ +:MetricsTests.Integration_Cassandra_ErrorsRequestTimeouts\ +:MetricsTests.Integration_Cassandra_Requests\ :-PreparedTests.Integration_Cassandra_PreparedIDUnchangedDuringReprepare\ :HeartbeatTests.Integration_Cassandra_HeartbeatFailed\ :ControlConnectionTests.Integration_Cassandra_TopologyChange\ @@ -36,6 +38,11 @@ SCYLLA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\ :UseKeyspaceCaseSensitiveTests.Integration_Cassandra_ConnectWithKeyspace) endif +ifndef SCYLLA_NO_VALGRIND_TEST_FILTER +SCYLLA_NO_VALGRIND_TEST_FILTER := $(subst ${SPACE},${EMPTY},AsyncTests.Integration_Cassandra_Simple\ +:HeartbeatTests.Integration_Cassandra_HeartbeatFailed) +endif + ifndef CASSANDRA_TEST_FILTER CASSANDRA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\ :BasicsTests.*\ @@ -59,6 +66,8 @@ CASSANDRA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\ :LoggingTests.*\ :PreparedMetadataTests.*\ :UseKeyspaceCaseSensitiveTests.*\ +:MetricsTests.Integration_Cassandra_ErrorsRequestTimeouts\ +:MetricsTests.Integration_Cassandra_Requests\ :-PreparedTests.Integration_Cassandra_PreparedIDUnchangedDuringReprepare\ :PreparedTests.Integration_Cassandra_FailFastWhenPreparedIDChangesDuringReprepare\ :HeartbeatTests.Integration_Cassandra_HeartbeatFailed\ @@ -72,6 +81,11 @@ CASSANDRA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\ :UseKeyspaceCaseSensitiveTests.Integration_Cassandra_ConnectWithKeyspace) endif +ifndef CASSANDRA_NO_VALGRIND_TEST_FILTER +CASSANDRA_NO_VALGRIND_TEST_FILTER := $(subst ${SPACE},${EMPTY},AsyncTests.Integration_Cassandra_Simple\ +:HeartbeatTests.Integration_Cassandra_HeartbeatFailed) +endif + ifndef CCM_COMMIT_ID # TODO: change it back to master/next when https://github.com/scylladb/scylla-ccm/issues/646 is fixed. export CCM_COMMIT_ID := 5392dd68 @@ -226,7 +240,7 @@ endif @echo "Running integration tests on scylla ${SCYLLA_VERSION}" valgrind --error-exitcode=123 --leak-check=full --errors-for-leak-kinds=definite build/cassandra-integration-tests --scylla --version=${SCYLLA_VERSION} --category=CASSANDRA --verbose=ccm --gtest_filter="${SCYLLA_TEST_FILTER}" @echo "Running timeout sensitive tests on scylla ${SCYLLA_VERSION}" - build/cassandra-integration-tests --scylla --version=${SCYLLA_VERSION} --category=CASSANDRA --verbose=ccm --gtest_filter="AsyncTests.Integration_Cassandra_Simple" + build/cassandra-integration-tests --scylla --version=${SCYLLA_VERSION} --category=CASSANDRA --verbose=ccm --gtest_filter="${SCYLLA_NO_VALGRIND_TEST_FILTER}" run-test-integration-cassandra: prepare-integration-test download-ccm-cassandra-image install-java8-if-missing ifdef DONT_REBUILD_INTEGRATION_BIN @@ -237,7 +251,7 @@ endif @echo "Running integration tests on cassandra ${CASSANDRA_VERSION}" valgrind --error-exitcode=123 --leak-check=full --errors-for-leak-kinds=definite build/cassandra-integration-tests --version=${CASSANDRA_VERSION} --category=CASSANDRA --verbose=ccm --gtest_filter="${CASSANDRA_TEST_FILTER}" @echo "Running timeout sensitive tests on cassandra ${CASSANDRA_VERSION}" - build/cassandra-integration-tests --version=${CASSANDRA_VERSION} --category=CASSANDRA --verbose=ccm --gtest_filter="AsyncTests.Integration_Cassandra_Simple" + build/cassandra-integration-tests --version=${CASSANDRA_VERSION} --category=CASSANDRA --verbose=ccm --gtest_filter="${CASSANDRA_NO_VALGRIND_TEST_FILTER}" run-test-unit: install-cargo-if-missing _update-rust-tooling @cd ${CURRENT_DIR}/scylla-rust-wrapper; cargo test diff --git a/scylla-rust-wrapper/Cargo.lock b/scylla-rust-wrapper/Cargo.lock index a7040eaa..278b6ef9 100644 --- a/scylla-rust-wrapper/Cargo.lock +++ b/scylla-rust-wrapper/Cargo.lock @@ -461,6 +461,15 @@ version = "0.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" +[[package]] +name = "histogram" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95aebe0dec9a429e3207e5e34d97f2a7d1064d5ee6d8ed13ce0a26456de000ae" +dependencies = [ + "thiserror 1.0.69", +] + [[package]] name = "home" version = "0.5.11" @@ -1114,6 +1123,7 @@ dependencies = [ "dashmap", "futures", "hashbrown 0.14.5", + "histogram", "itertools", "lazy_static", "lz4_flex", diff --git a/scylla-rust-wrapper/Cargo.toml b/scylla-rust-wrapper/Cargo.toml index 0815b4c2..14082ad8 100644 --- a/scylla-rust-wrapper/Cargo.toml +++ b/scylla-rust-wrapper/Cargo.toml @@ -12,6 +12,7 @@ license = "MIT OR Apache-2.0" [dependencies] scylla = { git = "https://github.com/scylladb/scylla-rust-driver.git", rev = "v1.1.0", features = [ "openssl-010", + "metrics", ] } tokio = { version = "1.27.0", features = ["full"] } uuid = "1.1.2" diff --git a/scylla-rust-wrapper/build.rs b/scylla-rust-wrapper/build.rs index 4046079d..6ea13656 100644 --- a/scylla-rust-wrapper/build.rs +++ b/scylla-rust-wrapper/build.rs @@ -153,4 +153,9 @@ fn main() { &["CassIteratorType_", "CassIteratorType"], &out_path, ); + prepare_cppdriver_data( + "cppdriver_metrics_types.rs", + &["CassMetrics_", "CassMetrics"], + &out_path, + ); } diff --git a/scylla-rust-wrapper/src/integration_testing.rs b/scylla-rust-wrapper/src/integration_testing.rs index b8302225..96157df2 100644 --- a/scylla-rust-wrapper/src/integration_testing.rs +++ b/scylla-rust-wrapper/src/integration_testing.rs @@ -1,8 +1,16 @@ use std::ffi::{CString, c_char}; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; + +use scylla::errors::{RequestAttemptError, RequestError}; +use scylla::observability::history::{AttemptId, HistoryListener, RequestId, SpeculativeId}; +use scylla::policies::retry::RetryDecision; use crate::argconv::{BoxFFI, CMut, CassBorrowedExclusivePtr}; use crate::cluster::CassCluster; -use crate::types::{cass_int32_t, cass_uint16_t, size_t}; +use crate::statement::{BoundStatement, CassStatement}; +use crate::types::{cass_int32_t, cass_uint16_t, cass_uint64_t, size_t}; #[unsafe(no_mangle)] pub unsafe extern "C" fn testing_cluster_get_connect_timeout( @@ -54,3 +62,57 @@ pub unsafe extern "C" fn testing_cluster_get_contact_points( pub unsafe extern "C" fn testing_free_contact_points(contact_points: *mut c_char) { let _ = unsafe { CString::from_raw(contact_points) }; } + +#[derive(Debug)] +struct SleepingHistoryListener(Duration); + +impl HistoryListener for SleepingHistoryListener { + fn log_request_start(&self) -> RequestId { + RequestId(0) + } + + fn log_request_success(&self, _request_id: RequestId) {} + + fn log_request_error(&self, _request_id: RequestId, _error: &RequestError) {} + + fn log_new_speculative_fiber(&self, _request_id: RequestId) -> SpeculativeId { + SpeculativeId(0) + } + + fn log_attempt_start( + &self, + _request_id: RequestId, + _speculative_id: Option, + _node_addr: SocketAddr, + ) -> AttemptId { + // Sleep to simulate a delay in the request + std::thread::sleep(self.0); + AttemptId(0) + } + + fn log_attempt_success(&self, _attempt_id: AttemptId) {} + + fn log_attempt_error( + &self, + _attempt_id: AttemptId, + _error: &RequestAttemptError, + _retry_decision: &RetryDecision, + ) { + } +} + +#[unsafe(no_mangle)] +pub unsafe extern "C" fn testing_statement_set_sleeping_history_listener( + statement_raw: CassBorrowedExclusivePtr, + sleep_time_ms: cass_uint64_t, +) { + let sleep_time = Duration::from_millis(sleep_time_ms); + let history_listener = Arc::new(SleepingHistoryListener(sleep_time)); + + match &mut BoxFFI::as_mut_ref(statement_raw).unwrap().statement { + BoundStatement::Simple(inner) => inner.query.set_history_listener(history_listener), + BoundStatement::Prepared(inner) => Arc::make_mut(&mut inner.statement) + .statement + .set_history_listener(history_listener), + } +} diff --git a/scylla-rust-wrapper/src/lib.rs b/scylla-rust-wrapper/src/lib.rs index f40838be..15afa911 100644 --- a/scylla-rust-wrapper/src/lib.rs +++ b/scylla-rust-wrapper/src/lib.rs @@ -119,6 +119,13 @@ pub mod cass_iterator_types { include_bindgen_generated!("cppdriver_iterator_types.rs"); } +/// CassMetrics +pub mod cass_metrics_types { + #![allow(non_camel_case_types, non_snake_case)] + + include_bindgen_generated!("cppdriver_metrics_types.rs"); +} + pub static RUNTIME: LazyLock = LazyLock::new(|| Runtime::new().unwrap()); pub static LOGGER: LazyLock> = LazyLock::new(|| { RwLock::new(Logger { diff --git a/scylla-rust-wrapper/src/session.rs b/scylla-rust-wrapper/src/session.rs index 4d0393be..91a75cdc 100644 --- a/scylla-rust-wrapper/src/session.rs +++ b/scylla-rust-wrapper/src/session.rs @@ -1,6 +1,7 @@ use crate::argconv::*; use crate::batch::CassBatch; use crate::cass_error::*; +use crate::cass_metrics_types::CassMetrics; use crate::cass_types::get_column_type; use crate::cluster::CassCluster; use crate::cluster::build_session_builder; @@ -19,6 +20,7 @@ use scylla::client::session_builder::SessionBuilder; use scylla::cluster::metadata::ColumnType; use scylla::errors::ExecutionError; use scylla::frame::types::Consistency; +use scylla::observability::metrics::MetricsError; use scylla::response::PagingStateResponse; use scylla::response::query_result::QueryResult; use scylla::statement::unprepared::Statement; @@ -571,6 +573,91 @@ pub unsafe extern "C" fn cass_session_get_schema_meta( BoxFFI::into_ptr(Box::new(CassSchemaMeta { keyspaces })) } +#[unsafe(no_mangle)] +pub unsafe extern "C" fn cass_session_get_metrics( + session_raw: CassBorrowedSharedPtr, + metrics: *mut CassMetrics, +) { + let Some(maybe_session_lock) = ArcFFI::as_ref(session_raw) else { + tracing::error!("Provided null session pointer to cass_session_get_metrics!"); + return; + }; + if metrics.is_null() { + tracing::error!("Provided null metrics pointer to cass_session_get_metrics!"); + return; + } + + let maybe_session_guard = maybe_session_lock.blocking_read(); + let maybe_session = maybe_session_guard.as_ref(); + let Some(session) = maybe_session else { + tracing::warn!("Attempted to get metrics before connecting session object"); + return; + }; + + let rust_metrics = session.session.get_metrics(); + // TODO (rust-driver): Add Snapshot::default() or Snapshot::empty() with 0-initialized snapshot. + let ( + min, + max, + mean, + stddev, + median, + percentile_75, + percentile_95, + percentile_98, + percentile_99, + percentile_99_9, + ) = match rust_metrics.get_snapshot() { + Ok(snapshot) => ( + snapshot.min, + snapshot.max, + snapshot.mean, + snapshot.stddev, + snapshot.median, + snapshot.percentile_75, + snapshot.percentile_95, + snapshot.percentile_98, + snapshot.percentile_99, + snapshot.percentile_99_9, + ), + // Histogram is empty, but we don't want to return because there + // are other metrics that don't depend on histogram. + Err(MetricsError::Empty) => (0, 0, 0, 0, 0, 0, 0, 0, 0, 0), + Err(e) => { + tracing::error!("Failed to get metrics snapshot: {}", e); + return; + } + }; + + const MILLIS_TO_MICROS: u64 = 1000; + // SAFETY: We assume that user provided valid CassMetrics pointer. + unsafe { + (*metrics).requests.min = min * MILLIS_TO_MICROS; + (*metrics).requests.max = max * MILLIS_TO_MICROS; + (*metrics).requests.mean = mean * MILLIS_TO_MICROS; + (*metrics).requests.stddev = stddev * MILLIS_TO_MICROS; + (*metrics).requests.median = median * MILLIS_TO_MICROS; + (*metrics).requests.percentile_75th = percentile_75 * MILLIS_TO_MICROS; + (*metrics).requests.percentile_95th = percentile_95 * MILLIS_TO_MICROS; + (*metrics).requests.percentile_98th = percentile_98 * MILLIS_TO_MICROS; + (*metrics).requests.percentile_99th = percentile_99 * MILLIS_TO_MICROS; + (*metrics).requests.percentile_999th = percentile_99_9 * MILLIS_TO_MICROS; + (*metrics).requests.mean_rate = rust_metrics.get_mean_rate(); + (*metrics).requests.one_minute_rate = rust_metrics.get_one_minute_rate(); + (*metrics).requests.five_minute_rate = rust_metrics.get_five_minute_rate(); + (*metrics).requests.fifteen_minute_rate = rust_metrics.get_fifteen_minute_rate(); + + (*metrics).stats.total_connections = rust_metrics.get_total_connections(); + (*metrics).stats.available_connections = 0; // Deprecated + (*metrics).stats.exceeded_pending_requests_water_mark = 0; // Deprecated + (*metrics).stats.exceeded_write_bytes_water_mark = 0; // Deprecated + + (*metrics).errors.connection_timeouts = rust_metrics.get_connection_timeouts(); + (*metrics).errors.pending_request_timeouts = 0; // Deprecated + (*metrics).errors.request_timeouts = rust_metrics.get_request_timeouts(); + } +} + #[cfg(test)] mod tests { use rusty_fork::rusty_fork_test; diff --git a/src/testing.cpp b/src/testing.cpp index 313dae02..b120e49a 100644 --- a/src/testing.cpp +++ b/src/testing.cpp @@ -98,4 +98,8 @@ void set_record_attempted_hosts(CassStatement* statement, bool enable) { throw std::runtime_error("Unimplemented 'set_record_attempted_hosts'!"); } +void set_sleeping_history_listener_on_statement(CassStatement* statement, uint64_t sleep_time_ms) { + testing_statement_set_sleeping_history_listener(statement, sleep_time_ms); +} + }}} // namespace datastax::internal::testing diff --git a/src/testing.hpp b/src/testing.hpp index 9f1f9123..197c7228 100644 --- a/src/testing.hpp +++ b/src/testing.hpp @@ -50,6 +50,8 @@ CASS_EXPORT String get_server_name(CassFuture* future); CASS_EXPORT void set_record_attempted_hosts(CassStatement* statement, bool enable); +CASS_EXPORT void set_sleeping_history_listener_on_statement(CassStatement* statement, uint64_t sleep_time_ms); + }}} // namespace datastax::internal::testing #endif diff --git a/src/testing_rust_impls.h b/src/testing_rust_impls.h index 5c010f67..cda25cff 100644 --- a/src/testing_rust_impls.h +++ b/src/testing_rust_impls.h @@ -21,6 +21,11 @@ CASS_EXPORT void testing_cluster_get_contact_points(CassCluster* cluster, char** size_t* contact_points_length); CASS_EXPORT void testing_free_contact_points(char* contact_points); + +// Sets a sleeping history listener on the statement. +// This can be used to enforce a sleep time during statement execution, which increases the latency. +CASS_EXPORT void testing_statement_set_sleeping_history_listener(CassStatement *statement, + cass_uint64_t sleep_time_ms); } #endif diff --git a/src/testing_unimplemented.cpp b/src/testing_unimplemented.cpp index 31f188d0..7be97ead 100644 --- a/src/testing_unimplemented.cpp +++ b/src/testing_unimplemented.cpp @@ -205,9 +205,6 @@ CASS_EXPORT CassRetryPolicy* cass_retry_policy_logging_new(CassRetryPolicy* chil CASS_EXPORT CassVersion cass_schema_meta_version(const CassSchemaMeta* schema_meta) { throw std::runtime_error("UNIMPLEMENTED cass_schema_meta_version\n"); } -CASS_EXPORT void cass_session_get_metrics(const CassSession* session, CassMetrics* output) { - throw std::runtime_error("UNIMPLEMENTED cass_session_get_metrics\n"); -} CASS_EXPORT void cass_session_get_speculative_execution_metrics(const CassSession* session, CassSpeculativeExecutionMetrics* output) { diff --git a/tests/src/integration/objects/session.hpp b/tests/src/integration/objects/session.hpp index d8efa261..df118dae 100644 --- a/tests/src/integration/objects/session.hpp +++ b/tests/src/integration/objects/session.hpp @@ -111,7 +111,7 @@ class Session : public Object { * @return Driver metrics */ CassMetrics metrics() const { - CassMetrics metrics; + CassMetrics metrics = {}; cass_session_get_metrics(get(), &metrics); return metrics; } diff --git a/tests/src/integration/objects/statement.hpp b/tests/src/integration/objects/statement.hpp index 4a7512d6..7080effe 100644 --- a/tests/src/integration/objects/statement.hpp +++ b/tests/src/integration/objects/statement.hpp @@ -165,6 +165,16 @@ class Statement : public Object { ASSERT_EQ(CASS_OK, cass_statement_set_execution_profile(get(), name.c_str())); } + /** + * Set a sleeping history listener on the statement. + * This can be used to enforce a sleep time during statement execution, which increases the latency. + * + * @param sleep_time_ms Sleep time in milliseconds + */ + void set_sleep_time(uint64_t sleep_time_ms) { + datastax::internal::testing::set_sleeping_history_listener_on_statement(get(), sleep_time_ms); + } + /** * Enable/Disable whether the statement is idempotent. Idempotent statements * are able to be automatically retried after timeouts/errors and can be diff --git a/tests/src/integration/tests/test_heartbeat.cpp b/tests/src/integration/tests/test_heartbeat.cpp index 4479bd7b..6cdc59ee 100644 --- a/tests/src/integration/tests/test_heartbeat.cpp +++ b/tests/src/integration/tests/test_heartbeat.cpp @@ -83,7 +83,7 @@ CASSANDRA_INTEGRATION_TEST_F(HeartbeatTests, HeartbeatDisabled) { CASSANDRA_INTEGRATION_TEST_F(HeartbeatTests, HeartbeatFailed) { CHECK_FAILURE; - logger_.add_critera("Failed to send a heartbeat within connection idle interval."); + logger_.add_critera("Timed out while waiting for response to keepalive request"); Cluster cluster = default_cluster().with_connection_heartbeat_interval(1).with_connection_idle_timeout(5); connect(cluster); @@ -91,11 +91,14 @@ CASSANDRA_INTEGRATION_TEST_F(HeartbeatTests, HeartbeatFailed) { cass_uint64_t initial_connections = session_.metrics().stats.total_connections; pause_node(2); start_timer(); - while (session_.metrics().stats.total_connections >= initial_connections && + + CassMetrics metrics = session_.metrics(); + while (metrics.stats.total_connections >= initial_connections && elapsed_time() < 60000) { session_.execute_async(SELECT_ALL_SYSTEM_LOCAL_CQL); // Simply execute statements ignore any // error that can occur from paused node + metrics = session_.metrics(); } - EXPECT_LT(session_.metrics().stats.total_connections, initial_connections); + EXPECT_LT(metrics.stats.total_connections, initial_connections); EXPECT_GE(logger_.count(), 1u); } diff --git a/tests/src/integration/tests/test_metrics.cpp b/tests/src/integration/tests/test_metrics.cpp index 53208263..be517261 100644 --- a/tests/src/integration/tests/test_metrics.cpp +++ b/tests/src/integration/tests/test_metrics.cpp @@ -114,9 +114,12 @@ CASSANDRA_INTEGRATION_TEST_F(MetricsTests, ErrorsRequestTimeouts) { CASSANDRA_INTEGRATION_TEST_F(MetricsTests, Requests) { CHECK_FAILURE; + Statement statement(SELECT_ALL_SYSTEM_LOCAL_CQL); + statement.set_sleep_time(1); // Simulate >=1ms latency. + CassMetrics metrics = session_.metrics(); for (int i = 0; i < 600 && metrics.requests.fifteen_minute_rate == 0; ++i) { - session_.execute_async(SELECT_ALL_SYSTEM_LOCAL_CQL); + session_.execute_async(statement); metrics = session_.metrics(); msleep(100); } @@ -124,7 +127,7 @@ CASSANDRA_INTEGRATION_TEST_F(MetricsTests, Requests) { EXPECT_LT(metrics.requests.min, CASS_UINT64_MAX); EXPECT_GT(metrics.requests.max, 0u); EXPECT_GT(metrics.requests.mean, 0u); - EXPECT_GT(metrics.requests.stddev, 0u); + EXPECT_GE(metrics.requests.stddev, 0u); EXPECT_GT(metrics.requests.median, 0u); EXPECT_GT(metrics.requests.percentile_75th, 0u); EXPECT_GT(metrics.requests.percentile_95th, 0u);