Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ SCYLLA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\
:LoggingTests.*\
:PreparedMetadataTests.*\
:UseKeyspaceCaseSensitiveTests.*\
:MetricsTests.Integration_Cassandra_ErrorsRequestTimeouts\
:MetricsTests.Integration_Cassandra_Requests\
:-PreparedTests.Integration_Cassandra_PreparedIDUnchangedDuringReprepare\
:HeartbeatTests.Integration_Cassandra_HeartbeatFailed\
:ControlConnectionTests.Integration_Cassandra_TopologyChange\
Expand All @@ -36,6 +38,11 @@ SCYLLA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\
:UseKeyspaceCaseSensitiveTests.Integration_Cassandra_ConnectWithKeyspace)
endif

ifndef SCYLLA_NO_VALGRIND_TEST_FILTER
SCYLLA_NO_VALGRIND_TEST_FILTER := $(subst ${SPACE},${EMPTY},AsyncTests.Integration_Cassandra_Simple\
:HeartbeatTests.Integration_Cassandra_HeartbeatFailed)
endif

ifndef CASSANDRA_TEST_FILTER
CASSANDRA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\
:BasicsTests.*\
Expand All @@ -59,6 +66,8 @@ CASSANDRA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\
:LoggingTests.*\
:PreparedMetadataTests.*\
:UseKeyspaceCaseSensitiveTests.*\
:MetricsTests.Integration_Cassandra_ErrorsRequestTimeouts\
:MetricsTests.Integration_Cassandra_Requests\
:-PreparedTests.Integration_Cassandra_PreparedIDUnchangedDuringReprepare\
:PreparedTests.Integration_Cassandra_FailFastWhenPreparedIDChangesDuringReprepare\
:HeartbeatTests.Integration_Cassandra_HeartbeatFailed\
Expand All @@ -72,6 +81,11 @@ CASSANDRA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\
:UseKeyspaceCaseSensitiveTests.Integration_Cassandra_ConnectWithKeyspace)
endif

ifndef CASSANDRA_NO_VALGRIND_TEST_FILTER
CASSANDRA_NO_VALGRIND_TEST_FILTER := $(subst ${SPACE},${EMPTY},AsyncTests.Integration_Cassandra_Simple\
:HeartbeatTests.Integration_Cassandra_HeartbeatFailed)
endif

ifndef CCM_COMMIT_ID
# TODO: change it back to master/next when https://github.com/scylladb/scylla-ccm/issues/646 is fixed.
export CCM_COMMIT_ID := 5392dd68
Expand Down Expand Up @@ -226,7 +240,7 @@ endif
@echo "Running integration tests on scylla ${SCYLLA_VERSION}"
valgrind --error-exitcode=123 --leak-check=full --errors-for-leak-kinds=definite build/cassandra-integration-tests --scylla --version=${SCYLLA_VERSION} --category=CASSANDRA --verbose=ccm --gtest_filter="${SCYLLA_TEST_FILTER}"
@echo "Running timeout sensitive tests on scylla ${SCYLLA_VERSION}"
build/cassandra-integration-tests --scylla --version=${SCYLLA_VERSION} --category=CASSANDRA --verbose=ccm --gtest_filter="AsyncTests.Integration_Cassandra_Simple"
build/cassandra-integration-tests --scylla --version=${SCYLLA_VERSION} --category=CASSANDRA --verbose=ccm --gtest_filter="${SCYLLA_NO_VALGRIND_TEST_FILTER}"

run-test-integration-cassandra: prepare-integration-test download-ccm-cassandra-image install-java8-if-missing
ifdef DONT_REBUILD_INTEGRATION_BIN
Expand All @@ -237,7 +251,7 @@ endif
@echo "Running integration tests on cassandra ${CASSANDRA_VERSION}"
valgrind --error-exitcode=123 --leak-check=full --errors-for-leak-kinds=definite build/cassandra-integration-tests --version=${CASSANDRA_VERSION} --category=CASSANDRA --verbose=ccm --gtest_filter="${CASSANDRA_TEST_FILTER}"
@echo "Running timeout sensitive tests on cassandra ${CASSANDRA_VERSION}"
build/cassandra-integration-tests --version=${CASSANDRA_VERSION} --category=CASSANDRA --verbose=ccm --gtest_filter="AsyncTests.Integration_Cassandra_Simple"
build/cassandra-integration-tests --version=${CASSANDRA_VERSION} --category=CASSANDRA --verbose=ccm --gtest_filter="${CASSANDRA_NO_VALGRIND_TEST_FILTER}"

run-test-unit: install-cargo-if-missing _update-rust-tooling
@cd ${CURRENT_DIR}/scylla-rust-wrapper; cargo test
10 changes: 10 additions & 0 deletions scylla-rust-wrapper/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions scylla-rust-wrapper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ license = "MIT OR Apache-2.0"
[dependencies]
scylla = { git = "https://github.com/scylladb/scylla-rust-driver.git", rev = "v1.1.0", features = [
"openssl-010",
"metrics",
] }
tokio = { version = "1.27.0", features = ["full"] }
uuid = "1.1.2"
Expand Down
5 changes: 5 additions & 0 deletions scylla-rust-wrapper/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,9 @@ fn main() {
&["CassIteratorType_", "CassIteratorType"],
&out_path,
);
prepare_cppdriver_data(
"cppdriver_metrics_types.rs",
&["CassMetrics_", "CassMetrics"],
&out_path,
);
}
64 changes: 63 additions & 1 deletion scylla-rust-wrapper/src/integration_testing.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
use std::ffi::{CString, c_char};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use scylla::errors::{RequestAttemptError, RequestError};
use scylla::observability::history::{AttemptId, HistoryListener, RequestId, SpeculativeId};
use scylla::policies::retry::RetryDecision;

use crate::argconv::{BoxFFI, CMut, CassBorrowedExclusivePtr};
use crate::cluster::CassCluster;
use crate::types::{cass_int32_t, cass_uint16_t, size_t};
use crate::statement::{BoundStatement, CassStatement};
use crate::types::{cass_int32_t, cass_uint16_t, cass_uint64_t, size_t};

#[unsafe(no_mangle)]
pub unsafe extern "C" fn testing_cluster_get_connect_timeout(
Expand Down Expand Up @@ -54,3 +62,57 @@ pub unsafe extern "C" fn testing_cluster_get_contact_points(
pub unsafe extern "C" fn testing_free_contact_points(contact_points: *mut c_char) {
let _ = unsafe { CString::from_raw(contact_points) };
}

#[derive(Debug)]
struct SleepingHistoryListener(Duration);

impl HistoryListener for SleepingHistoryListener {
fn log_request_start(&self) -> RequestId {
RequestId(0)
}

fn log_request_success(&self, _request_id: RequestId) {}

fn log_request_error(&self, _request_id: RequestId, _error: &RequestError) {}

fn log_new_speculative_fiber(&self, _request_id: RequestId) -> SpeculativeId {
SpeculativeId(0)
}

fn log_attempt_start(
&self,
_request_id: RequestId,
_speculative_id: Option<SpeculativeId>,
_node_addr: SocketAddr,
) -> AttemptId {
// Sleep to simulate a delay in the request
std::thread::sleep(self.0);
AttemptId(0)
}

fn log_attempt_success(&self, _attempt_id: AttemptId) {}

fn log_attempt_error(
&self,
_attempt_id: AttemptId,
_error: &RequestAttemptError,
_retry_decision: &RetryDecision,
) {
}
}

#[unsafe(no_mangle)]
pub unsafe extern "C" fn testing_statement_set_sleeping_history_listener(
statement_raw: CassBorrowedExclusivePtr<CassStatement, CMut>,
sleep_time_ms: cass_uint64_t,
) {
let sleep_time = Duration::from_millis(sleep_time_ms);
let history_listener = Arc::new(SleepingHistoryListener(sleep_time));

match &mut BoxFFI::as_mut_ref(statement_raw).unwrap().statement {
BoundStatement::Simple(inner) => inner.query.set_history_listener(history_listener),
BoundStatement::Prepared(inner) => Arc::make_mut(&mut inner.statement)
.statement
.set_history_listener(history_listener),
}
}
7 changes: 7 additions & 0 deletions scylla-rust-wrapper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ pub mod cass_iterator_types {
include_bindgen_generated!("cppdriver_iterator_types.rs");
}

/// CassMetrics
pub mod cass_metrics_types {
#![allow(non_camel_case_types, non_snake_case)]

include_bindgen_generated!("cppdriver_metrics_types.rs");
}

pub static RUNTIME: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap());
pub static LOGGER: LazyLock<RwLock<Logger>> = LazyLock::new(|| {
RwLock::new(Logger {
Expand Down
87 changes: 87 additions & 0 deletions scylla-rust-wrapper/src/session.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::argconv::*;
use crate::batch::CassBatch;
use crate::cass_error::*;
use crate::cass_metrics_types::CassMetrics;
use crate::cass_types::get_column_type;
use crate::cluster::CassCluster;
use crate::cluster::build_session_builder;
Expand All @@ -19,6 +20,7 @@ use scylla::client::session_builder::SessionBuilder;
use scylla::cluster::metadata::ColumnType;
use scylla::errors::ExecutionError;
use scylla::frame::types::Consistency;
use scylla::observability::metrics::MetricsError;
use scylla::response::PagingStateResponse;
use scylla::response::query_result::QueryResult;
use scylla::statement::unprepared::Statement;
Expand Down Expand Up @@ -571,6 +573,91 @@ pub unsafe extern "C" fn cass_session_get_schema_meta(
BoxFFI::into_ptr(Box::new(CassSchemaMeta { keyspaces }))
}

#[unsafe(no_mangle)]
pub unsafe extern "C" fn cass_session_get_metrics(
session_raw: CassBorrowedSharedPtr<CassSession, CConst>,
metrics: *mut CassMetrics,
) {
let Some(maybe_session_lock) = ArcFFI::as_ref(session_raw) else {
tracing::error!("Provided null session pointer to cass_session_get_metrics!");
return;
};
if metrics.is_null() {
tracing::error!("Provided null metrics pointer to cass_session_get_metrics!");
return;
}

let maybe_session_guard = maybe_session_lock.blocking_read();
let maybe_session = maybe_session_guard.as_ref();
let Some(session) = maybe_session else {
tracing::warn!("Attempted to get metrics before connecting session object");
return;
};

let rust_metrics = session.session.get_metrics();
// TODO (rust-driver): Add Snapshot::default() or Snapshot::empty() with 0-initialized snapshot.
let (
min,
max,
mean,
stddev,
median,
percentile_75,
percentile_95,
percentile_98,
percentile_99,
percentile_99_9,
) = match rust_metrics.get_snapshot() {
Ok(snapshot) => (
snapshot.min,
snapshot.max,
snapshot.mean,
snapshot.stddev,
snapshot.median,
snapshot.percentile_75,
snapshot.percentile_95,
snapshot.percentile_98,
snapshot.percentile_99,
snapshot.percentile_99_9,
),
// Histogram is empty, but we don't want to return because there
// are other metrics that don't depend on histogram.
Err(MetricsError::Empty) => (0, 0, 0, 0, 0, 0, 0, 0, 0, 0),
Err(e) => {
tracing::error!("Failed to get metrics snapshot: {}", e);
return;
}
};

const MILLIS_TO_MICROS: u64 = 1000;
// SAFETY: We assume that user provided valid CassMetrics pointer.
unsafe {
(*metrics).requests.min = min * MILLIS_TO_MICROS;
(*metrics).requests.max = max * MILLIS_TO_MICROS;
(*metrics).requests.mean = mean * MILLIS_TO_MICROS;
(*metrics).requests.stddev = stddev * MILLIS_TO_MICROS;
(*metrics).requests.median = median * MILLIS_TO_MICROS;
(*metrics).requests.percentile_75th = percentile_75 * MILLIS_TO_MICROS;
(*metrics).requests.percentile_95th = percentile_95 * MILLIS_TO_MICROS;
(*metrics).requests.percentile_98th = percentile_98 * MILLIS_TO_MICROS;
(*metrics).requests.percentile_99th = percentile_99 * MILLIS_TO_MICROS;
(*metrics).requests.percentile_999th = percentile_99_9 * MILLIS_TO_MICROS;
(*metrics).requests.mean_rate = rust_metrics.get_mean_rate();
(*metrics).requests.one_minute_rate = rust_metrics.get_one_minute_rate();
(*metrics).requests.five_minute_rate = rust_metrics.get_five_minute_rate();
(*metrics).requests.fifteen_minute_rate = rust_metrics.get_fifteen_minute_rate();

(*metrics).stats.total_connections = rust_metrics.get_total_connections();
(*metrics).stats.available_connections = 0; // Deprecated
(*metrics).stats.exceeded_pending_requests_water_mark = 0; // Deprecated
(*metrics).stats.exceeded_write_bytes_water_mark = 0; // Deprecated

(*metrics).errors.connection_timeouts = rust_metrics.get_connection_timeouts();
(*metrics).errors.pending_request_timeouts = 0; // Deprecated
(*metrics).errors.request_timeouts = rust_metrics.get_request_timeouts();
}
}

#[cfg(test)]
mod tests {
use rusty_fork::rusty_fork_test;
Expand Down
4 changes: 4 additions & 0 deletions src/testing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,8 @@ void set_record_attempted_hosts(CassStatement* statement, bool enable) {
throw std::runtime_error("Unimplemented 'set_record_attempted_hosts'!");
}

void set_sleeping_history_listener_on_statement(CassStatement* statement, uint64_t sleep_time_ms) {
testing_statement_set_sleeping_history_listener(statement, sleep_time_ms);
}

}}} // namespace datastax::internal::testing
2 changes: 2 additions & 0 deletions src/testing.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ CASS_EXPORT String get_server_name(CassFuture* future);

CASS_EXPORT void set_record_attempted_hosts(CassStatement* statement, bool enable);

CASS_EXPORT void set_sleeping_history_listener_on_statement(CassStatement* statement, uint64_t sleep_time_ms);

}}} // namespace datastax::internal::testing

#endif
5 changes: 5 additions & 0 deletions src/testing_rust_impls.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ CASS_EXPORT void testing_cluster_get_contact_points(CassCluster* cluster, char**
size_t* contact_points_length);

CASS_EXPORT void testing_free_contact_points(char* contact_points);

// Sets a sleeping history listener on the statement.
// This can be used to enforce a sleep time during statement execution, which increases the latency.
CASS_EXPORT void testing_statement_set_sleeping_history_listener(CassStatement *statement,
cass_uint64_t sleep_time_ms);
}

#endif
3 changes: 0 additions & 3 deletions src/testing_unimplemented.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,6 @@ CASS_EXPORT CassRetryPolicy* cass_retry_policy_logging_new(CassRetryPolicy* chil
CASS_EXPORT CassVersion cass_schema_meta_version(const CassSchemaMeta* schema_meta) {
throw std::runtime_error("UNIMPLEMENTED cass_schema_meta_version\n");
}
CASS_EXPORT void cass_session_get_metrics(const CassSession* session, CassMetrics* output) {
throw std::runtime_error("UNIMPLEMENTED cass_session_get_metrics\n");
}
CASS_EXPORT void
cass_session_get_speculative_execution_metrics(const CassSession* session,
CassSpeculativeExecutionMetrics* output) {
Expand Down
2 changes: 1 addition & 1 deletion tests/src/integration/objects/session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class Session : public Object<CassSession, cass_session_free> {
* @return Driver metrics
*/
CassMetrics metrics() const {
CassMetrics metrics;
CassMetrics metrics = {};
cass_session_get_metrics(get(), &metrics);
return metrics;
}
Expand Down
10 changes: 10 additions & 0 deletions tests/src/integration/objects/statement.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,16 @@ class Statement : public Object<CassStatement, cass_statement_free> {
ASSERT_EQ(CASS_OK, cass_statement_set_execution_profile(get(), name.c_str()));
}

/**
* Set a sleeping history listener on the statement.
* This can be used to enforce a sleep time during statement execution, which increases the latency.
*
* @param sleep_time_ms Sleep time in milliseconds
*/
void set_sleep_time(uint64_t sleep_time_ms) {
datastax::internal::testing::set_sleeping_history_listener_on_statement(get(), sleep_time_ms);
}

/**
* Enable/Disable whether the statement is idempotent. Idempotent statements
* are able to be automatically retried after timeouts/errors and can be
Expand Down
9 changes: 6 additions & 3 deletions tests/src/integration/tests/test_heartbeat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,22 @@ CASSANDRA_INTEGRATION_TEST_F(HeartbeatTests, HeartbeatDisabled) {
CASSANDRA_INTEGRATION_TEST_F(HeartbeatTests, HeartbeatFailed) {
CHECK_FAILURE;

logger_.add_critera("Failed to send a heartbeat within connection idle interval.");
logger_.add_critera("Timed out while waiting for response to keepalive request");
Cluster cluster =
default_cluster().with_connection_heartbeat_interval(1).with_connection_idle_timeout(5);
connect(cluster);

cass_uint64_t initial_connections = session_.metrics().stats.total_connections;
pause_node(2);
start_timer();
while (session_.metrics().stats.total_connections >= initial_connections &&

CassMetrics metrics = session_.metrics();
while (metrics.stats.total_connections >= initial_connections &&
elapsed_time() < 60000) {
session_.execute_async(SELECT_ALL_SYSTEM_LOCAL_CQL); // Simply execute statements ignore any
// error that can occur from paused node
metrics = session_.metrics();
}
EXPECT_LT(session_.metrics().stats.total_connections, initial_connections);
EXPECT_LT(metrics.stats.total_connections, initial_connections);
EXPECT_GE(logger_.count(), 1u);
}
Loading