-
Notifications
You must be signed in to change notification settings - Fork 135
feat: add tracing spans for queries #323
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,6 +39,7 @@ mod response; | |
mod row; | ||
mod row_metadata; | ||
mod rowbinary; | ||
mod summary_header; | ||
#[cfg(feature = "inserter")] | ||
mod ticks; | ||
|
||
|
@@ -385,6 +386,12 @@ impl Client { | |
query::Query::new(self, query) | ||
} | ||
|
||
/// Starts a new SELECT/DDL query, with the `wait_end_of_query` setting enabled | ||
/// to buffer the full query results on the server | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It also basically denies the possibility of streaming the data as soon as the first block is processed, cause everything will be written as the temp files first, and it can actively hinder overall performance... |
||
pub fn query_buffered(&self, query: &str) -> query::Query { | ||
query::Query::new_buffered(self, query) | ||
} | ||
|
||
/// Enables or disables [`Row`] data types validation against the database schema | ||
/// at the cost of performance. Validation is enabled by default, and in this mode, | ||
/// the client will use `RowBinaryWithNamesAndTypes` format. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,13 +24,23 @@ use crate::headers::with_authentication; | |
pub struct Query { | ||
client: Client, | ||
sql: SqlBuilder, | ||
wait_end_of_query: bool, | ||
} | ||
|
||
impl Query { | ||
pub(crate) fn new(client: &Client, template: &str) -> Self { | ||
Self { | ||
client: client.clone(), | ||
sql: SqlBuilder::new(template), | ||
wait_end_of_query: false, | ||
} | ||
} | ||
|
||
pub(crate) fn new_buffered(client: &Client, template: &str) -> Self { | ||
Self { | ||
client: client.clone(), | ||
sql: SqlBuilder::new(template), | ||
wait_end_of_query: true, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be set via There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moreover, it's not really relevant to the purpose of this PR. Was this an API you were experimenting with and didn't intend to commit? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. without |
||
} | ||
} | ||
|
||
|
@@ -154,8 +164,25 @@ impl Query { | |
read_only: bool, | ||
default_format: Option<&str>, | ||
) -> Result<Response> { | ||
let query_formatted = format!("{}", self.sql_display()); | ||
let query = self.sql.finish()?; | ||
|
||
let execution_span = tracing::info_span!( | ||
"clickhouse.query", | ||
status = tracing::field::Empty, | ||
otel.status_code = tracing::field::Empty, | ||
otel.kind = "CLIENT", | ||
db.system.name = "clickhouse", | ||
db.query.text = query_formatted, | ||
db.response.returned_rows = tracing::field::Empty, | ||
db.response.read_bytes = tracing::field::Empty, | ||
db.response.read_rows = tracing::field::Empty, | ||
db.response.written_bytes = tracing::field::Empty, | ||
db.response.written_rows = tracing::field::Empty, | ||
clickhouse.wait_end_of_query = self.wait_end_of_query, | ||
) | ||
.entered(); | ||
|
||
let mut url = | ||
Url::parse(&self.client.url).map_err(|err| Error::InvalidParams(Box::new(err)))?; | ||
let mut pairs = url.query_pairs_mut(); | ||
|
@@ -186,6 +213,10 @@ impl Query { | |
pairs.append_pair("compress", "1"); | ||
} | ||
|
||
if self.wait_end_of_query { | ||
pairs.append_pair("wait_end_of_query", "1"); | ||
} | ||
|
||
for (name, value) in &self.client.options { | ||
pairs.append_pair(name, value); | ||
} | ||
|
@@ -206,7 +237,11 @@ impl Query { | |
.map_err(|err| Error::InvalidParams(Box::new(err)))?; | ||
|
||
let future = self.client.http.request(request); | ||
Ok(Response::new(future, self.client.compression)) | ||
Ok(Response::new( | ||
future, | ||
self.client.compression, | ||
execution_span.exit(), | ||
)) | ||
} | ||
|
||
/// Similar to [`Client::with_option`], but for this particular query only. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,63 +19,111 @@ use crate::compression::lz4::Lz4Decoder; | |
use crate::{ | ||
compression::Compression, | ||
error::{Error, Result}, | ||
summary_header::Summary, | ||
}; | ||
|
||
use tracing::{Instrument, Span}; | ||
|
||
// === Response === | ||
|
||
pub(crate) enum Response { | ||
// Headers haven't been received yet. | ||
// `Box<_>` improves performance by reducing the size of the whole future. | ||
Waiting(ResponseFuture), | ||
Waiting(ResponseFuture, Span), | ||
// Headers have been received, streaming the body. | ||
Loading(Chunks), | ||
Loading(Chunks, Span), | ||
} | ||
|
||
pub(crate) type ResponseFuture = Pin<Box<dyn Future<Output = Result<Chunks>> + Send>>; | ||
|
||
impl Response { | ||
pub(crate) fn new(response: HyperResponseFuture, compression: Compression) -> Self { | ||
Self::Waiting(Box::pin(async move { | ||
let response = response.await?; | ||
|
||
let status = response.status(); | ||
let exception_code = response.headers().get("X-ClickHouse-Exception-Code"); | ||
|
||
if status == StatusCode::OK && exception_code.is_none() { | ||
// More likely to be successful, start streaming. | ||
// It still can fail, but we'll handle it in `DetectDbException`. | ||
Ok(Chunks::new(response.into_body(), compression)) | ||
} else { | ||
// An instantly failed request. | ||
Err(collect_bad_response( | ||
status, | ||
exception_code | ||
.and_then(|value| value.to_str().ok()) | ||
.map(|code| format!("Code: {code}")), | ||
response.into_body(), | ||
compression, | ||
) | ||
.await) | ||
pub(crate) fn new(response: HyperResponseFuture, compression: Compression, span: Span) -> Self { | ||
let inner_span = span.clone(); | ||
Self::Waiting( | ||
Box::pin(async move { | ||
let response = response.await?; | ||
if let Some(summary_header) = response.headers().get("x-clickhouse-summary") { | ||
match serde_json::from_slice::<Summary>(summary_header.as_bytes()) { | ||
Ok(summary_header) => { | ||
if let Some(rows) = summary_header.result_rows { | ||
inner_span.record("db.response.returned_rows", rows); | ||
} | ||
if let Some(rows) = summary_header.read_rows { | ||
inner_span.record("db.response.read_rows", rows); | ||
} | ||
if let Some(rows) = summary_header.written_rows { | ||
inner_span.record("db.response.written_rows", rows); | ||
} | ||
if let Some(bytes) = summary_header.read_bytes { | ||
inner_span.record("db.response.read_bytes", bytes); | ||
} | ||
if let Some(bytes) = summary_header.written_bytes { | ||
inner_span.record("db.response.written_bytes", bytes); | ||
} | ||
tracing::debug!( | ||
read_rows = summary_header.read_rows, | ||
read_bytes = summary_header.read_bytes, | ||
written_rows = summary_header.written_bytes, | ||
written_bytes = summary_header.written_rows, | ||
total_rows_to_read = summary_header.total_rows_to_read, | ||
result_rows = summary_header.result_rows, | ||
result_bytes = summary_header.result_bytes, | ||
elapsed_ns = summary_header.elapsed_ns, | ||
"finished processing query" | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure about relying on the summary header, as it will be off in quite a few situations. To get it right, you will have to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. obviously it depends on the use-case; many of our queries are aggregates that scan a lot of rows but return relatively few, so it's fine to turn of for those |
||
} | ||
Err(e) => { | ||
tracing::warn!( | ||
error = &e as &dyn std::error::Error, | ||
?summary_header, | ||
"invalid x-clickhouse-summary header returned", | ||
); | ||
} | ||
} | ||
} | ||
|
||
let status = response.status(); | ||
let exception_code = response.headers().get("X-ClickHouse-Exception-Code"); | ||
|
||
if status == StatusCode::OK && exception_code.is_none() { | ||
inner_span.record("otel.status_code", "OK"); | ||
// More likely to be successful, start streaming. | ||
// It still can fail, but we'll handle it in `DetectDbException`. | ||
Ok(Chunks::new(response.into_body(), compression, inner_span)) | ||
} else { | ||
inner_span.record("otel.status_code", "ERROR"); | ||
// An instantly failed request. | ||
Err(collect_bad_response( | ||
status, | ||
exception_code | ||
.and_then(|value| value.to_str().ok()) | ||
.map(|code| format!("Code: {code}")), | ||
response.into_body(), | ||
compression, | ||
) | ||
.await) | ||
} | ||
})) | ||
}), span) | ||
} | ||
|
||
pub(crate) fn into_future(self) -> ResponseFuture { | ||
match self { | ||
Self::Waiting(future) => future, | ||
Self::Loading(_) => panic!("response is already streaming"), | ||
Self::Waiting(future, span) => Box::pin(future.instrument(span)), | ||
Self::Loading(_, _) => panic!("response is already streaming"), | ||
} | ||
} | ||
|
||
pub(crate) async fn finish(&mut self) -> Result<()> { | ||
let chunks = loop { | ||
let (chunks, span) = loop { | ||
match self { | ||
Self::Waiting(future) => *self = Self::Loading(future.await?), | ||
Self::Loading(chunks) => break chunks, | ||
Self::Waiting(future, span) => { | ||
*self = Self::Loading(future.instrument(span.clone()).await?, span.clone()) | ||
} | ||
Self::Loading(chunks, span) => break (chunks, span), | ||
} | ||
}; | ||
|
||
while chunks.try_next().await?.is_some() {} | ||
while chunks.try_next().instrument(span.clone()).await?.is_some() {} | ||
Ok(()) | ||
} | ||
} | ||
|
@@ -153,40 +201,52 @@ pub(crate) struct Chunk { | |
|
||
// * Uses `Option<_>` to make this stream fused. | ||
// * Uses `Box<_>` in order to reduce the size of cursors. | ||
pub(crate) struct Chunks(Option<Box<DetectDbException<Decompress<IncomingStream>>>>); | ||
pub(crate) struct Chunks { | ||
stream: Option<Box<DetectDbException<Decompress<IncomingStream>>>>, | ||
span: Option<Span>, | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder what the impact on performance is here, cause the overall width of the structure on the hottest path in the client increases significantly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That's 32 extra bytes added to this structure on 64-bit, not including the discriminator for In fact, |
||
|
||
impl Chunks { | ||
fn new(stream: Incoming, compression: Compression) -> Self { | ||
fn new(stream: Incoming, compression: Compression, span: Span) -> Self { | ||
let stream = IncomingStream(stream); | ||
let stream = Decompress::new(stream, compression); | ||
let stream = DetectDbException(stream); | ||
Self(Some(Box::new(stream))) | ||
Self { | ||
stream: Some(Box::new(stream)), | ||
span: Some(span), | ||
} | ||
} | ||
|
||
pub(crate) fn empty() -> Self { | ||
Self(None) | ||
Self { | ||
stream: None, | ||
span: None, | ||
} | ||
} | ||
|
||
#[cfg(feature = "futures03")] | ||
pub(crate) fn is_terminated(&self) -> bool { | ||
self.0.is_none() | ||
self.stream.is_none() | ||
} | ||
} | ||
|
||
impl Stream for Chunks { | ||
type Item = Result<Chunk>; | ||
|
||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
let guard = self.span.take().map(|s| s.entered()); | ||
// We use `take()` to make the stream fused, including the case of panics. | ||
if let Some(mut stream) = self.0.take() { | ||
if let Some(mut stream) = self.stream.take() { | ||
let res = Pin::new(&mut stream).poll_next(cx); | ||
|
||
if matches!(res, Poll::Pending | Poll::Ready(Some(Ok(_)))) { | ||
self.0 = Some(stream); | ||
self.stream = Some(stream); | ||
self.span = guard.map(|g| g.exit()); | ||
} | ||
|
||
res | ||
} else { | ||
self.span = guard.map(|g| g.exit()); | ||
Poll::Ready(None) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the very least, this should go under a feature flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can do that, but didn't in this PR since it adds a lot of noise to
#[cfg]
out all the functionality