Skip to content

Commit 3c28a13

Browse files
authored
Merge pull request #280 from muzarski/metrics
implement `cass_session_get_metrics` and enable some integration tests
2 parents 293db72 + 0f5ea42 commit 3c28a13

File tree

15 files changed

+222
-12
lines changed

15 files changed

+222
-12
lines changed

Makefile

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ SCYLLA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\
2525
:LoggingTests.*\
2626
:PreparedMetadataTests.*\
2727
:UseKeyspaceCaseSensitiveTests.*\
28+
:MetricsTests.Integration_Cassandra_ErrorsRequestTimeouts\
29+
:MetricsTests.Integration_Cassandra_Requests\
2830
:-PreparedTests.Integration_Cassandra_PreparedIDUnchangedDuringReprepare\
2931
:HeartbeatTests.Integration_Cassandra_HeartbeatFailed\
3032
:ControlConnectionTests.Integration_Cassandra_TopologyChange\
@@ -36,6 +38,11 @@ SCYLLA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\
3638
:UseKeyspaceCaseSensitiveTests.Integration_Cassandra_ConnectWithKeyspace)
3739
endif
3840

41+
ifndef SCYLLA_NO_VALGRIND_TEST_FILTER
42+
SCYLLA_NO_VALGRIND_TEST_FILTER := $(subst ${SPACE},${EMPTY},AsyncTests.Integration_Cassandra_Simple\
43+
:HeartbeatTests.Integration_Cassandra_HeartbeatFailed)
44+
endif
45+
3946
ifndef CASSANDRA_TEST_FILTER
4047
CASSANDRA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\
4148
:BasicsTests.*\
@@ -59,6 +66,8 @@ CASSANDRA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\
5966
:LoggingTests.*\
6067
:PreparedMetadataTests.*\
6168
:UseKeyspaceCaseSensitiveTests.*\
69+
:MetricsTests.Integration_Cassandra_ErrorsRequestTimeouts\
70+
:MetricsTests.Integration_Cassandra_Requests\
6271
:-PreparedTests.Integration_Cassandra_PreparedIDUnchangedDuringReprepare\
6372
:PreparedTests.Integration_Cassandra_FailFastWhenPreparedIDChangesDuringReprepare\
6473
:HeartbeatTests.Integration_Cassandra_HeartbeatFailed\
@@ -72,6 +81,11 @@ CASSANDRA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\
7281
:UseKeyspaceCaseSensitiveTests.Integration_Cassandra_ConnectWithKeyspace)
7382
endif
7483

84+
ifndef CASSANDRA_NO_VALGRIND_TEST_FILTER
85+
CASSANDRA_NO_VALGRIND_TEST_FILTER := $(subst ${SPACE},${EMPTY},AsyncTests.Integration_Cassandra_Simple\
86+
:HeartbeatTests.Integration_Cassandra_HeartbeatFailed)
87+
endif
88+
7589
ifndef CCM_COMMIT_ID
7690
# TODO: change it back to master/next when https://github.com/scylladb/scylla-ccm/issues/646 is fixed.
7791
export CCM_COMMIT_ID := 5392dd68
@@ -226,7 +240,7 @@ endif
226240
@echo "Running integration tests on scylla ${SCYLLA_VERSION}"
227241
valgrind --error-exitcode=123 --leak-check=full --errors-for-leak-kinds=definite build/cassandra-integration-tests --scylla --version=${SCYLLA_VERSION} --category=CASSANDRA --verbose=ccm --gtest_filter="${SCYLLA_TEST_FILTER}"
228242
@echo "Running timeout sensitive tests on scylla ${SCYLLA_VERSION}"
229-
build/cassandra-integration-tests --scylla --version=${SCYLLA_VERSION} --category=CASSANDRA --verbose=ccm --gtest_filter="AsyncTests.Integration_Cassandra_Simple"
243+
build/cassandra-integration-tests --scylla --version=${SCYLLA_VERSION} --category=CASSANDRA --verbose=ccm --gtest_filter="${SCYLLA_NO_VALGRIND_TEST_FILTER}"
230244

231245
run-test-integration-cassandra: prepare-integration-test download-ccm-cassandra-image install-java8-if-missing
232246
ifdef DONT_REBUILD_INTEGRATION_BIN
@@ -237,7 +251,7 @@ endif
237251
@echo "Running integration tests on cassandra ${CASSANDRA_VERSION}"
238252
valgrind --error-exitcode=123 --leak-check=full --errors-for-leak-kinds=definite build/cassandra-integration-tests --version=${CASSANDRA_VERSION} --category=CASSANDRA --verbose=ccm --gtest_filter="${CASSANDRA_TEST_FILTER}"
239253
@echo "Running timeout sensitive tests on cassandra ${CASSANDRA_VERSION}"
240-
build/cassandra-integration-tests --version=${CASSANDRA_VERSION} --category=CASSANDRA --verbose=ccm --gtest_filter="AsyncTests.Integration_Cassandra_Simple"
254+
build/cassandra-integration-tests --version=${CASSANDRA_VERSION} --category=CASSANDRA --verbose=ccm --gtest_filter="${CASSANDRA_NO_VALGRIND_TEST_FILTER}"
241255

242256
run-test-unit: install-cargo-if-missing _update-rust-tooling
243257
@cd ${CURRENT_DIR}/scylla-rust-wrapper; cargo test

scylla-rust-wrapper/Cargo.lock

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

scylla-rust-wrapper/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ license = "MIT OR Apache-2.0"
1212
[dependencies]
1313
scylla = { git = "https://github.com/scylladb/scylla-rust-driver.git", rev = "v1.1.0", features = [
1414
"openssl-010",
15+
"metrics",
1516
] }
1617
tokio = { version = "1.27.0", features = ["full"] }
1718
uuid = "1.1.2"

scylla-rust-wrapper/build.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,4 +153,9 @@ fn main() {
153153
&["CassIteratorType_", "CassIteratorType"],
154154
&out_path,
155155
);
156+
prepare_cppdriver_data(
157+
"cppdriver_metrics_types.rs",
158+
&["CassMetrics_", "CassMetrics"],
159+
&out_path,
160+
);
156161
}

scylla-rust-wrapper/src/integration_testing.rs

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,16 @@
11
use std::ffi::{CString, c_char};
2+
use std::net::SocketAddr;
3+
use std::sync::Arc;
4+
use std::time::Duration;
5+
6+
use scylla::errors::{RequestAttemptError, RequestError};
7+
use scylla::observability::history::{AttemptId, HistoryListener, RequestId, SpeculativeId};
8+
use scylla::policies::retry::RetryDecision;
29

310
use crate::argconv::{BoxFFI, CMut, CassBorrowedExclusivePtr};
411
use crate::cluster::CassCluster;
5-
use crate::types::{cass_int32_t, cass_uint16_t, size_t};
12+
use crate::statement::{BoundStatement, CassStatement};
13+
use crate::types::{cass_int32_t, cass_uint16_t, cass_uint64_t, size_t};
614

715
#[unsafe(no_mangle)]
816
pub unsafe extern "C" fn testing_cluster_get_connect_timeout(
@@ -54,3 +62,57 @@ pub unsafe extern "C" fn testing_cluster_get_contact_points(
5462
pub unsafe extern "C" fn testing_free_contact_points(contact_points: *mut c_char) {
5563
let _ = unsafe { CString::from_raw(contact_points) };
5664
}
65+
66+
#[derive(Debug)]
67+
struct SleepingHistoryListener(Duration);
68+
69+
impl HistoryListener for SleepingHistoryListener {
70+
fn log_request_start(&self) -> RequestId {
71+
RequestId(0)
72+
}
73+
74+
fn log_request_success(&self, _request_id: RequestId) {}
75+
76+
fn log_request_error(&self, _request_id: RequestId, _error: &RequestError) {}
77+
78+
fn log_new_speculative_fiber(&self, _request_id: RequestId) -> SpeculativeId {
79+
SpeculativeId(0)
80+
}
81+
82+
fn log_attempt_start(
83+
&self,
84+
_request_id: RequestId,
85+
_speculative_id: Option<SpeculativeId>,
86+
_node_addr: SocketAddr,
87+
) -> AttemptId {
88+
// Sleep to simulate a delay in the request
89+
std::thread::sleep(self.0);
90+
AttemptId(0)
91+
}
92+
93+
fn log_attempt_success(&self, _attempt_id: AttemptId) {}
94+
95+
fn log_attempt_error(
96+
&self,
97+
_attempt_id: AttemptId,
98+
_error: &RequestAttemptError,
99+
_retry_decision: &RetryDecision,
100+
) {
101+
}
102+
}
103+
104+
#[unsafe(no_mangle)]
105+
pub unsafe extern "C" fn testing_statement_set_sleeping_history_listener(
106+
statement_raw: CassBorrowedExclusivePtr<CassStatement, CMut>,
107+
sleep_time_ms: cass_uint64_t,
108+
) {
109+
let sleep_time = Duration::from_millis(sleep_time_ms);
110+
let history_listener = Arc::new(SleepingHistoryListener(sleep_time));
111+
112+
match &mut BoxFFI::as_mut_ref(statement_raw).unwrap().statement {
113+
BoundStatement::Simple(inner) => inner.query.set_history_listener(history_listener),
114+
BoundStatement::Prepared(inner) => Arc::make_mut(&mut inner.statement)
115+
.statement
116+
.set_history_listener(history_listener),
117+
}
118+
}

scylla-rust-wrapper/src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,13 @@ pub mod cass_iterator_types {
119119
include_bindgen_generated!("cppdriver_iterator_types.rs");
120120
}
121121

122+
/// CassMetrics
123+
pub mod cass_metrics_types {
124+
#![allow(non_camel_case_types, non_snake_case)]
125+
126+
include_bindgen_generated!("cppdriver_metrics_types.rs");
127+
}
128+
122129
pub static RUNTIME: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap());
123130
pub static LOGGER: LazyLock<RwLock<Logger>> = LazyLock::new(|| {
124131
RwLock::new(Logger {

scylla-rust-wrapper/src/session.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::argconv::*;
22
use crate::batch::CassBatch;
33
use crate::cass_error::*;
4+
use crate::cass_metrics_types::CassMetrics;
45
use crate::cass_types::get_column_type;
56
use crate::cluster::CassCluster;
67
use crate::cluster::build_session_builder;
@@ -19,6 +20,7 @@ use scylla::client::session_builder::SessionBuilder;
1920
use scylla::cluster::metadata::ColumnType;
2021
use scylla::errors::ExecutionError;
2122
use scylla::frame::types::Consistency;
23+
use scylla::observability::metrics::MetricsError;
2224
use scylla::response::PagingStateResponse;
2325
use scylla::response::query_result::QueryResult;
2426
use scylla::statement::unprepared::Statement;
@@ -571,6 +573,91 @@ pub unsafe extern "C" fn cass_session_get_schema_meta(
571573
BoxFFI::into_ptr(Box::new(CassSchemaMeta { keyspaces }))
572574
}
573575

576+
#[unsafe(no_mangle)]
577+
pub unsafe extern "C" fn cass_session_get_metrics(
578+
session_raw: CassBorrowedSharedPtr<CassSession, CConst>,
579+
metrics: *mut CassMetrics,
580+
) {
581+
let Some(maybe_session_lock) = ArcFFI::as_ref(session_raw) else {
582+
tracing::error!("Provided null session pointer to cass_session_get_metrics!");
583+
return;
584+
};
585+
if metrics.is_null() {
586+
tracing::error!("Provided null metrics pointer to cass_session_get_metrics!");
587+
return;
588+
}
589+
590+
let maybe_session_guard = maybe_session_lock.blocking_read();
591+
let maybe_session = maybe_session_guard.as_ref();
592+
let Some(session) = maybe_session else {
593+
tracing::warn!("Attempted to get metrics before connecting session object");
594+
return;
595+
};
596+
597+
let rust_metrics = session.session.get_metrics();
598+
// TODO (rust-driver): Add Snapshot::default() or Snapshot::empty() with 0-initialized snapshot.
599+
let (
600+
min,
601+
max,
602+
mean,
603+
stddev,
604+
median,
605+
percentile_75,
606+
percentile_95,
607+
percentile_98,
608+
percentile_99,
609+
percentile_99_9,
610+
) = match rust_metrics.get_snapshot() {
611+
Ok(snapshot) => (
612+
snapshot.min,
613+
snapshot.max,
614+
snapshot.mean,
615+
snapshot.stddev,
616+
snapshot.median,
617+
snapshot.percentile_75,
618+
snapshot.percentile_95,
619+
snapshot.percentile_98,
620+
snapshot.percentile_99,
621+
snapshot.percentile_99_9,
622+
),
623+
// Histogram is empty, but we don't want to return because there
624+
// are other metrics that don't depend on histogram.
625+
Err(MetricsError::Empty) => (0, 0, 0, 0, 0, 0, 0, 0, 0, 0),
626+
Err(e) => {
627+
tracing::error!("Failed to get metrics snapshot: {}", e);
628+
return;
629+
}
630+
};
631+
632+
const MILLIS_TO_MICROS: u64 = 1000;
633+
// SAFETY: We assume that user provided valid CassMetrics pointer.
634+
unsafe {
635+
(*metrics).requests.min = min * MILLIS_TO_MICROS;
636+
(*metrics).requests.max = max * MILLIS_TO_MICROS;
637+
(*metrics).requests.mean = mean * MILLIS_TO_MICROS;
638+
(*metrics).requests.stddev = stddev * MILLIS_TO_MICROS;
639+
(*metrics).requests.median = median * MILLIS_TO_MICROS;
640+
(*metrics).requests.percentile_75th = percentile_75 * MILLIS_TO_MICROS;
641+
(*metrics).requests.percentile_95th = percentile_95 * MILLIS_TO_MICROS;
642+
(*metrics).requests.percentile_98th = percentile_98 * MILLIS_TO_MICROS;
643+
(*metrics).requests.percentile_99th = percentile_99 * MILLIS_TO_MICROS;
644+
(*metrics).requests.percentile_999th = percentile_99_9 * MILLIS_TO_MICROS;
645+
(*metrics).requests.mean_rate = rust_metrics.get_mean_rate();
646+
(*metrics).requests.one_minute_rate = rust_metrics.get_one_minute_rate();
647+
(*metrics).requests.five_minute_rate = rust_metrics.get_five_minute_rate();
648+
(*metrics).requests.fifteen_minute_rate = rust_metrics.get_fifteen_minute_rate();
649+
650+
(*metrics).stats.total_connections = rust_metrics.get_total_connections();
651+
(*metrics).stats.available_connections = 0; // Deprecated
652+
(*metrics).stats.exceeded_pending_requests_water_mark = 0; // Deprecated
653+
(*metrics).stats.exceeded_write_bytes_water_mark = 0; // Deprecated
654+
655+
(*metrics).errors.connection_timeouts = rust_metrics.get_connection_timeouts();
656+
(*metrics).errors.pending_request_timeouts = 0; // Deprecated
657+
(*metrics).errors.request_timeouts = rust_metrics.get_request_timeouts();
658+
}
659+
}
660+
574661
#[cfg(test)]
575662
mod tests {
576663
use rusty_fork::rusty_fork_test;

src/testing.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,4 +98,8 @@ void set_record_attempted_hosts(CassStatement* statement, bool enable) {
9898
throw std::runtime_error("Unimplemented 'set_record_attempted_hosts'!");
9999
}
100100

101+
void set_sleeping_history_listener_on_statement(CassStatement* statement, uint64_t sleep_time_ms) {
102+
testing_statement_set_sleeping_history_listener(statement, sleep_time_ms);
103+
}
104+
101105
}}} // namespace datastax::internal::testing

src/testing.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ CASS_EXPORT String get_server_name(CassFuture* future);
5050

5151
CASS_EXPORT void set_record_attempted_hosts(CassStatement* statement, bool enable);
5252

53+
CASS_EXPORT void set_sleeping_history_listener_on_statement(CassStatement* statement, uint64_t sleep_time_ms);
54+
5355
}}} // namespace datastax::internal::testing
5456

5557
#endif

src/testing_rust_impls.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ CASS_EXPORT void testing_cluster_get_contact_points(CassCluster* cluster, char**
2121
size_t* contact_points_length);
2222

2323
CASS_EXPORT void testing_free_contact_points(char* contact_points);
24+
25+
// Sets a sleeping history listener on the statement.
26+
// This can be used to enforce a sleep time during statement execution, which increases the latency.
27+
CASS_EXPORT void testing_statement_set_sleeping_history_listener(CassStatement *statement,
28+
cass_uint64_t sleep_time_ms);
2429
}
2530

2631
#endif

0 commit comments

Comments
 (0)