Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ clickhouse-macros = { version = "0.3.0", path = "macros" }
clickhouse-types = { version = "0.1.0", path = "types" }

thiserror = "2.0"
serde = "1.0.106"
serde = { version = "1.0.106", features = ["derive"] }
serde_json = "1"
bytes = "1.5.0"
tokio = { version = "1.0.1", features = ["rt", "macros"] }
http-body-util = "0.1.2"
Expand All @@ -153,18 +154,17 @@ chrono = { version = "0.4", optional = true, features = ["serde"] }
bstr = { version = "1.11.0", default-features = false }
quanta = { version = "0.12", optional = true }
replace_with = { version = "0.1.7" }
tracing = "0.1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the very least, this should go under a feature flag.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do that, but didn't in this PR since it adds a lot of noise to #[cfg] out all the functionality


[dev-dependencies]
clickhouse-macros = { version = "0.3.0", path = "macros" }
criterion = "0.6"
serde = { version = "1.0.106", features = ["derive"] }
tokio = { version = "1.0.1", features = ["full", "test-util"] }
hyper = { version = "1.1", features = ["server"] }
indexmap = { version = "2.10.0", features = ["serde"] }
linked-hash-map = { version = "0.5.6", features = ["serde_impl"] }
fxhash = { version = "0.2.1" }
serde_bytes = "0.11.4"
serde_json = "1"
serde_repr = "0.1.7"
uuid = { version = "1", features = ["v4", "serde"] }
time = { version = "0.3.17", features = ["macros", "rand", "parsing"] }
Expand Down
23 changes: 21 additions & 2 deletions src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,21 @@ impl<T> Insert<T> {
debug_assert!(matches!(self.state, InsertState::NotStarted { .. }));
let (client, sql) = self.state.client_with_sql().unwrap(); // checked above

let span = tracing::info_span!(
"clickhouse.insert",
status = tracing::field::Empty,
otel.status_code = tracing::field::Empty,
otel.kind = "CLIENT",
db.system.name = "clickhouse",
db.query.text = sql,
db.response.returned_rows = tracing::field::Empty,
db.response.read_bytes = tracing::field::Empty,
db.response.read_rows = tracing::field::Empty,
db.response.written_bytes = tracing::field::Empty,
db.response.written_rows = tracing::field::Empty,
)
.entered();

let mut url = Url::parse(&client.url).map_err(|err| Error::InvalidParams(err.into()))?;
let mut pairs = url.query_pairs_mut();
pairs.clear();
Expand Down Expand Up @@ -385,9 +400,13 @@ impl<T> Insert<T> {
.map_err(|err| Error::InvalidParams(Box::new(err)))?;

let future = client.http.request(request);
let span = span.exit();
// TODO: introduce `Executor` to allow bookkeeping of spawned tasks.
let handle =
tokio::spawn(async move { Response::new(future, Compression::None).finish().await });
let handle = tokio::spawn(async move {
Response::new(future, Compression::None, span)
.finish()
.await
});

match self.row_metadata {
None => (), // RowBinary is used, no header is required.
Expand Down
7 changes: 7 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ mod response;
mod row;
mod row_metadata;
mod rowbinary;
mod summary_header;
#[cfg(feature = "inserter")]
mod ticks;

Expand Down Expand Up @@ -385,6 +386,12 @@ impl Client {
query::Query::new(self, query)
}

/// Starts a new SELECT/DDL query, with the `wait_end_of_query` setting enabled
/// to buffer the full query results on the server
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also basically denies the possibility of streaming the data as soon as the first block is processed, cause everything will be written as the temp files first, and it can actively hinder overall performance...

pub fn query_buffered(&self, query: &str) -> query::Query {
query::Query::new_buffered(self, query)
}

/// Enables or disables [`Row`] data types validation against the database schema
/// at the cost of performance. Validation is enabled by default, and in this mode,
/// the client will use `RowBinaryWithNamesAndTypes` format.
Expand Down
37 changes: 36 additions & 1 deletion src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,23 @@ use crate::headers::with_authentication;
pub struct Query {
client: Client,
sql: SqlBuilder,
wait_end_of_query: bool,
}

impl Query {
pub(crate) fn new(client: &Client, template: &str) -> Self {
Self {
client: client.clone(),
sql: SqlBuilder::new(template),
wait_end_of_query: false,
}
}

pub(crate) fn new_buffered(client: &Client, template: &str) -> Self {
Self {
client: client.clone(),
sql: SqlBuilder::new(template),
wait_end_of_query: true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be set via Query::with_option("wait_end_of_query", "1"), and there is no need to add an explicit constructor for that. It simply does not scale for every case and uses different terminology.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moreover, it's not really relevant to the purpose of this PR. Was this an API you were experimenting with and didn't intend to commit?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without wait_end_of_query, the metrics that come out of the X-Clickhouse-Summary header are not meaningful

}
}

Expand Down Expand Up @@ -154,8 +164,25 @@ impl Query {
read_only: bool,
default_format: Option<&str>,
) -> Result<Response> {
let query_formatted = format!("{}", self.sql_display());
let query = self.sql.finish()?;

let execution_span = tracing::info_span!(
"clickhouse.query",
status = tracing::field::Empty,
otel.status_code = tracing::field::Empty,
otel.kind = "CLIENT",
db.system.name = "clickhouse",
db.query.text = query_formatted,
db.response.returned_rows = tracing::field::Empty,
db.response.read_bytes = tracing::field::Empty,
db.response.read_rows = tracing::field::Empty,
db.response.written_bytes = tracing::field::Empty,
db.response.written_rows = tracing::field::Empty,
clickhouse.wait_end_of_query = self.wait_end_of_query,
)
.entered();

let mut url =
Url::parse(&self.client.url).map_err(|err| Error::InvalidParams(Box::new(err)))?;
let mut pairs = url.query_pairs_mut();
Expand Down Expand Up @@ -186,6 +213,10 @@ impl Query {
pairs.append_pair("compress", "1");
}

if self.wait_end_of_query {
pairs.append_pair("wait_end_of_query", "1");
}

for (name, value) in &self.client.options {
pairs.append_pair(name, value);
}
Expand All @@ -206,7 +237,11 @@ impl Query {
.map_err(|err| Error::InvalidParams(Box::new(err)))?;

let future = self.client.http.request(request);
Ok(Response::new(future, self.client.compression))
Ok(Response::new(
future,
self.client.compression,
execution_span.exit(),
))
}

/// Similar to [`Client::with_option`], but for this particular query only.
Expand Down
136 changes: 98 additions & 38 deletions src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,63 +19,111 @@ use crate::compression::lz4::Lz4Decoder;
use crate::{
compression::Compression,
error::{Error, Result},
summary_header::Summary,
};

use tracing::{Instrument, Span};

// === Response ===

pub(crate) enum Response {
// Headers haven't been received yet.
// `Box<_>` improves performance by reducing the size of the whole future.
Waiting(ResponseFuture),
Waiting(ResponseFuture, Span),
// Headers have been received, streaming the body.
Loading(Chunks),
Loading(Chunks, Span),
}

pub(crate) type ResponseFuture = Pin<Box<dyn Future<Output = Result<Chunks>> + Send>>;

impl Response {
pub(crate) fn new(response: HyperResponseFuture, compression: Compression) -> Self {
Self::Waiting(Box::pin(async move {
let response = response.await?;

let status = response.status();
let exception_code = response.headers().get("X-ClickHouse-Exception-Code");

if status == StatusCode::OK && exception_code.is_none() {
// More likely to be successful, start streaming.
// It still can fail, but we'll handle it in `DetectDbException`.
Ok(Chunks::new(response.into_body(), compression))
} else {
// An instantly failed request.
Err(collect_bad_response(
status,
exception_code
.and_then(|value| value.to_str().ok())
.map(|code| format!("Code: {code}")),
response.into_body(),
compression,
)
.await)
pub(crate) fn new(response: HyperResponseFuture, compression: Compression, span: Span) -> Self {
let inner_span = span.clone();
Self::Waiting(
Box::pin(async move {
let response = response.await?;
if let Some(summary_header) = response.headers().get("x-clickhouse-summary") {
match serde_json::from_slice::<Summary>(summary_header.as_bytes()) {
Ok(summary_header) => {
if let Some(rows) = summary_header.result_rows {
inner_span.record("db.response.returned_rows", rows);
}
if let Some(rows) = summary_header.read_rows {
inner_span.record("db.response.read_rows", rows);
}
if let Some(rows) = summary_header.written_rows {
inner_span.record("db.response.written_rows", rows);
}
if let Some(bytes) = summary_header.read_bytes {
inner_span.record("db.response.read_bytes", bytes);
}
if let Some(bytes) = summary_header.written_bytes {
inner_span.record("db.response.written_bytes", bytes);
}
tracing::debug!(
read_rows = summary_header.read_rows,
read_bytes = summary_header.read_bytes,
written_rows = summary_header.written_bytes,
written_bytes = summary_header.written_rows,
total_rows_to_read = summary_header.total_rows_to_read,
result_rows = summary_header.result_rows,
result_bytes = summary_header.result_bytes,
elapsed_ns = summary_header.elapsed_ns,
"finished processing query"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about relying on the summary header, as it will be off in quite a few situations. To get it right, you will have to use wait_end_of_query indeed, but it is too big a trade-off IMO.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

obviously it depends on the use-case; many of our queries are aggregates that scan a lot of rows but return relatively few, so it's fine to turn of for those

}
Err(e) => {
tracing::warn!(
error = &e as &dyn std::error::Error,
?summary_header,
"invalid x-clickhouse-summary header returned",
);
}
}
}

let status = response.status();
let exception_code = response.headers().get("X-ClickHouse-Exception-Code");

if status == StatusCode::OK && exception_code.is_none() {
inner_span.record("otel.status_code", "OK");
// More likely to be successful, start streaming.
// It still can fail, but we'll handle it in `DetectDbException`.
Ok(Chunks::new(response.into_body(), compression, inner_span))
} else {
inner_span.record("otel.status_code", "ERROR");
// An instantly failed request.
Err(collect_bad_response(
status,
exception_code
.and_then(|value| value.to_str().ok())
.map(|code| format!("Code: {code}")),
response.into_body(),
compression,
)
.await)
}
}))
}), span)
}

pub(crate) fn into_future(self) -> ResponseFuture {
match self {
Self::Waiting(future) => future,
Self::Loading(_) => panic!("response is already streaming"),
Self::Waiting(future, span) => Box::pin(future.instrument(span)),
Self::Loading(_, _) => panic!("response is already streaming"),
}
}

pub(crate) async fn finish(&mut self) -> Result<()> {
let chunks = loop {
let (chunks, span) = loop {
match self {
Self::Waiting(future) => *self = Self::Loading(future.await?),
Self::Loading(chunks) => break chunks,
Self::Waiting(future, span) => {
*self = Self::Loading(future.instrument(span.clone()).await?, span.clone())
}
Self::Loading(chunks, span) => break (chunks, span),
}
};

while chunks.try_next().await?.is_some() {}
while chunks.try_next().instrument(span.clone()).await?.is_some() {}
Ok(())
}
}
Expand Down Expand Up @@ -153,40 +201,52 @@ pub(crate) struct Chunk {

// * Uses `Option<_>` to make this stream fused.
// * Uses `Box<_>` in order to reduce the size of cursors.
pub(crate) struct Chunks(Option<Box<DetectDbException<Decompress<IncomingStream>>>>);
pub(crate) struct Chunks {
stream: Option<Box<DetectDbException<Decompress<IncomingStream>>>>,
span: Option<Span>,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what the impact on performance is here, cause the overall width of the structure on the hottest path in the client increases significantly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Span is not exactly tiny, either. I estimate it's at least 4 words wide: https://docs.rs/tracing/latest/src/tracing/span.rs.html#348

That's 32 extra bytes added to this structure on 64-bit, not including the discriminator for Option<Span> which is going to add another 8 bytes since Span provides nothing internally for null pointer optimization.

In fact, Span itself has a no-op constructor that's recommended to be used in place of Option<Span> for this reason: https://docs.rs/tracing/latest/tracing/struct.Span.html#method.none


impl Chunks {
fn new(stream: Incoming, compression: Compression) -> Self {
fn new(stream: Incoming, compression: Compression, span: Span) -> Self {
let stream = IncomingStream(stream);
let stream = Decompress::new(stream, compression);
let stream = DetectDbException(stream);
Self(Some(Box::new(stream)))
Self {
stream: Some(Box::new(stream)),
span: Some(span),
}
}

pub(crate) fn empty() -> Self {
Self(None)
Self {
stream: None,
span: None,
}
}

#[cfg(feature = "futures03")]
pub(crate) fn is_terminated(&self) -> bool {
self.0.is_none()
self.stream.is_none()
}
}

impl Stream for Chunks {
type Item = Result<Chunk>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let guard = self.span.take().map(|s| s.entered());
// We use `take()` to make the stream fused, including the case of panics.
if let Some(mut stream) = self.0.take() {
if let Some(mut stream) = self.stream.take() {
let res = Pin::new(&mut stream).poll_next(cx);

if matches!(res, Poll::Pending | Poll::Ready(Some(Ok(_)))) {
self.0 = Some(stream);
self.stream = Some(stream);
self.span = guard.map(|g| g.exit());
}

res
} else {
self.span = guard.map(|g| g.exit());
Poll::Ready(None)
}
}
Expand Down
Loading