From b07fb4111c9f8afd3c5ad962aac633f9ff0ad9fb Mon Sep 17 00:00:00 2001 From: wudidapaopao Date: Fri, 30 May 2025 00:50:30 +0800 Subject: [PATCH 1/6] chore: add new API to chdb.h --- programs/local/LocalChdb.h | 18 +++++ programs/local/LocalServer.cpp | 2 + programs/local/chdb.h | 121 ++++++++++++++++++++++++++++----- src/Client/ClientBase.h | 3 + src/Client/LocalConnection.cpp | 13 ++++ src/Client/LocalConnection.h | 5 ++ 6 files changed, 145 insertions(+), 17 deletions(-) diff --git a/programs/local/LocalChdb.h b/programs/local/LocalChdb.h index 980965ff5b8..72f0d54e8dd 100644 --- a/programs/local/LocalChdb.h +++ b/programs/local/LocalChdb.h @@ -125,6 +125,22 @@ class local_result_wrapper } return result->bytes_read; } + size_t storage_rows_read() + { + if (result == nullptr) + { + return 0; + } + return result->storage_rows_read; + } + size_t storage_bytes_read() + { + if (result == nullptr) + { + return 0; + } + return result->storage_bytes_read; + } double elapsed() { if (result == nullptr) @@ -166,6 +182,8 @@ class query_result size_t size() { return result_wrapper->size(); } size_t rows_read() { return result_wrapper->rows_read(); } size_t bytes_read() { return result_wrapper->bytes_read(); } + size_t storgae_rows_read() { return result_wrapper->storage_rows_read(); } + size_t storage_bytes_read() { return result_wrapper->storage_bytes_read(); } double elapsed() { return result_wrapper->elapsed(); } bool has_error() { return result_wrapper->has_error(); } py::str error_message() { return result_wrapper->error_message(); } diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index 4bf7b875b31..e4e85088a7a 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -1289,6 +1289,8 @@ static local_result_v2 * createMaterializedLocalQueryResult(DB::LocalServer * se } result->rows_read = server->getProcessedRows(); result->bytes_read = server->getProcessedBytes(); + result->storage_rows_read = server->getStorgaeRowsRead(); + result->storage_bytes_read = server->getStorageBytesRead(); result->elapsed = server->getElapsedTime(); } } diff --git a/programs/local/chdb.h b/programs/local/chdb.h index 1367d9c7b79..c1e7a8775a0 100644 --- a/programs/local/chdb.h +++ b/programs/local/chdb.h @@ -1,16 +1,19 @@ #pragma once #ifdef __cplusplus -# include -# include +#include +#include extern "C" { #else -# include -# include -# include +#include +#include +#include #endif #define CHDB_EXPORT __attribute__((visibility("default"))) + +#ifndef CHDB_NO_DEPRECATED +// WARNING: The following structs are deprecated and will be removed in a future version. struct local_result { char * buf; @@ -45,12 +48,6 @@ struct local_result_v2 }; #endif -CHDB_EXPORT struct local_result * query_stable(int argc, char ** argv); -CHDB_EXPORT void free_result(struct local_result * result); - -CHDB_EXPORT struct local_result_v2 * query_stable_v2(int argc, char ** argv); -CHDB_EXPORT void free_result_v2(struct local_result_v2 * result); - /** * Connection structure for chDB * Contains server instance, connection state, and query processing queue @@ -62,10 +59,37 @@ struct chdb_conn void * queue; /* Query processing queue */ }; +#endif + +// Opaque handle for query results. +// Internal data structure managed by chDB implementation. +// Users should only interact through API functions. +typedef struct { + void * internal_data; +} chdb_result; + +// Connection handle wrapping database session state. +// Internal data structure managed by chDB implementation. +// Users should only interact through API functions. +typedef struct _chdb_connection { + void * internal_data; +} * chdb_connection; + +// Opaque handle for streaming query results. +// Internal data structure managed by chDB implementation. +// Users should only interact through API functions. typedef struct { void * internal_data; } chdb_streaming_result; +#ifndef CHDB_NO_DEPRECATED +// WARNING: The following interfaces are deprecated and will be removed in a future version. +CHDB_EXPORT struct local_result * query_stable(int argc, char ** argv); +CHDB_EXPORT void free_result(struct local_result * result); + +CHDB_EXPORT struct local_result_v2 * query_stable_v2(int argc, char ** argv); +CHDB_EXPORT void free_result_v2(struct local_result_v2 * result); + /** * Creates a new chDB connection. * Only one active connection is allowed per process. @@ -110,12 +134,67 @@ CHDB_EXPORT struct local_result_v2 * query_conn(struct chdb_conn * conn, const c CHDB_EXPORT chdb_streaming_result * query_conn_streaming(struct chdb_conn * conn, const char * query, const char * format); /** - * Retrieves error message from streaming result. - * @brief Gets error message associated with streaming query execution + * Fetches next chunk of streaming results. + * @brief Iterates through streaming query results + * @param conn Active connection handle * @param result Streaming result handle from query_conn_streaming() - * @return Null-terminated error message string, or NULL if no error occurred + * @return Materialized result chunk with data + * @note Returns empty result when stream ends */ -CHDB_EXPORT const char * chdb_streaming_result_error(chdb_streaming_result * result); +CHDB_EXPORT struct local_result_v2 * chdb_streaming_fetch_result(struct chdb_conn * conn, chdb_streaming_result * result); + +/** + * Cancels ongoing streaming query. + * @brief Aborts streaming query execution and cleans up resources + * @param conn Active connection handle + * @param result Streaming result handle to cancel + */ +CHDB_EXPORT void chdb_streaming_cancel_query(struct chdb_conn * conn, chdb_streaming_result * result); + +#endif + +/** + * Creates a new chDB connection. + * Only one active connection is allowed per process. + * Creating a new connection with different path requires closing existing connection. + * + * @param argc Number of command-line arguments + * @param argv Command-line arguments array (--path= to specify database location) + * @return Pointer to connection pointer, or NULL on failure + * @note Default path is ":memory:" if not specified + */ +CHDB_EXPORT chdb_connection * chdb_connect(int argc, char ** argv); + +/** + * Closes an existing chDB connection and cleans up resources. + * Thread-safe function that handles connection shutdown and cleanup. + * + * @param conn Pointer to connection pointer to close + */ + CHDB_EXPORT void chdb_close_conn(chdb_connection * conn); + +/** + * Executes a query on the given connection. + * Thread-safe function that handles query execution in a separate thread. + * + * @param conn Connection to execute query on + * @param query SQL query string to execute + * @param format Output format string (e.g., "CSV", default format) + * @return Query result structure containing output or error message + * @note Returns error result if connection is invalid or closed + */ +CHDB_EXPORT chdb_result * chdb_query(chdb_connection conn, const char * query, const char * format); + +/** + * Executes a streaming query on the given connection. + * @brief Initializes streaming query execution and returns result handle + * @param conn Connection to execute query on + * @param query SQL query string to execute + * @param format Output format string (e.g. "CSV", default format) + * @return Streaming result handle containing query state or error message + * @note Returns error result if connection is invalid or closed + */ +CHDB_EXPORT chdb_streaming_result * chdb_stream_query(chdb_connection conn, const char * query, const char * format); /** * Fetches next chunk of streaming results. @@ -125,7 +204,7 @@ CHDB_EXPORT const char * chdb_streaming_result_error(chdb_streaming_result * res * @return Materialized result chunk with data * @note Returns empty result when stream ends */ -CHDB_EXPORT struct local_result_v2 * chdb_streaming_fetch_result(struct chdb_conn * conn, chdb_streaming_result * result); +CHDB_EXPORT chdb_result * chdb_stream_fetch_result(chdb_connection conn, chdb_streaming_result * result); /** * Cancels ongoing streaming query. @@ -133,7 +212,15 @@ CHDB_EXPORT struct local_result_v2 * chdb_streaming_fetch_result(struct chdb_con * @param conn Active connection handle * @param result Streaming result handle to cancel */ -CHDB_EXPORT void chdb_streaming_cancel_query(struct chdb_conn * conn, chdb_streaming_result * result); +CHDB_EXPORT void chdb_stream_cancel_query(chdb_connection conn, chdb_streaming_result * result); + +/** + * Retrieves error message from streaming result. + * @brief Gets error message associated with streaming query execution + * @param result Streaming result handle from query_conn_streaming() + * @return Null-terminated error message string, or NULL if no error occurred + */ +CHDB_EXPORT const char * chdb_streaming_result_error(chdb_streaming_result * result); /** * Releases resources associated with streaming result. diff --git a/src/Client/ClientBase.h b/src/Client/ClientBase.h index 783f8467aa8..f82c3a275dc 100644 --- a/src/Client/ClientBase.h +++ b/src/Client/ClientBase.h @@ -127,6 +127,9 @@ class ClientBase return result; } + size_t getStorgaeRowsRead() const { return connection->getCHDBProgress().read_rows; } + size_t getStorageBytesRead() const { return connection->getCHDBProgress().read_bytes; } + size_t getProcessedRows() const { return processed_rows; } size_t getProcessedBytes() const { return processed_bytes; } double getElapsedTime() const { return progress_indication.elapsedSeconds(); } diff --git a/src/Client/LocalConnection.cpp b/src/Client/LocalConnection.cpp index e9206a4a2f1..245b9496467 100644 --- a/src/Client/LocalConnection.cpp +++ b/src/Client/LocalConnection.cpp @@ -43,6 +43,8 @@ LocalConnection::LocalConnection(ContextPtr context_, ReadBuffer * in_, bool sen /// Authenticate and create a context to execute queries. session.authenticate("default", "", Poco::Net::SocketAddress{}); session.makeSessionContext(); + + send_progress = false; } LocalConnection::~LocalConnection() @@ -77,6 +79,11 @@ void LocalConnection::updateProgress(const Progress & value) state->progress.incrementPiecewiseAtomically(value); } +void LocalConnection::updateCHDBProgress(const Progress & value) +{ + chdb_progress.incrementPiecewiseAtomically(value); +} + void LocalConnection::sendProfileEvents() { Block profile_block; @@ -113,6 +120,11 @@ void LocalConnection::sendQuery( query_context->setProgressCallback([this] (const Progress & value) { this->updateProgress(value); }); query_context->setFileProgressCallback([this](const FileProgress & value) { this->updateProgress(Progress(value)); }); } + else + { + query_context->setProgressCallback([this] (const Progress & value) { this->updateCHDBProgress(value); }); + query_context->setFileProgressCallback([this](const FileProgress & value) { this->updateCHDBProgress(Progress(value)); }); + } /// Switch the database to the desired one (set by the USE query) /// but don't attempt to do it if we are already in that database. @@ -125,6 +137,7 @@ void LocalConnection::sendQuery( state.reset(); state.emplace(); + chdb_progress.reset(); state->query_id = query_id; state->query = query; diff --git a/src/Client/LocalConnection.h b/src/Client/LocalConnection.h index cce5b3e231b..6cdc892e1f9 100644 --- a/src/Client/LocalConnection.h +++ b/src/Client/LocalConnection.h @@ -142,6 +142,7 @@ class LocalConnection : public IServerConnection, WithContext void setThrottler(const ThrottlerPtr &) override {} + const Progress & getCHDBProgress() const { return chdb_progress; } #if USE_PYTHON void resetQueryContext(); #endif @@ -153,6 +154,8 @@ class LocalConnection : public IServerConnection, WithContext void updateProgress(const Progress & value); + void updateCHDBProgress(const Progress & value); + void sendProfileEvents(); /// Returns true on executor timeout, meaning a retryable error. @@ -170,6 +173,8 @@ class LocalConnection : public IServerConnection, WithContext std::optional state; + Progress chdb_progress; + /// Last "server" packet. std::optional next_packet_type; From 774af991a0df0ab4807cca0469ca9a57321ca5b5 Mon Sep 17 00:00:00 2001 From: wudidapaopao Date: Wed, 4 Jun 2025 03:48:28 +0800 Subject: [PATCH 2/6] chore: add chdb.cpp --- programs/local/CMakeLists.txt | 6 +- programs/local/LocalChdb.cpp | 3 +- programs/local/LocalChdb.h | 4 +- programs/local/LocalServer.cpp | 816 +-------------------------- programs/local/LocalServer.h | 11 +- programs/local/QueryResult.h | 82 +++ programs/local/chdb-internal.h | 69 +-- programs/local/chdb.cpp | 973 +++++++++++++++++++++++++++++++++ programs/local/chdb.h | 106 +++- src/Client/ClientBase.h | 12 - 10 files changed, 1186 insertions(+), 896 deletions(-) create mode 100644 programs/local/QueryResult.h create mode 100644 programs/local/chdb.cpp diff --git a/programs/local/CMakeLists.txt b/programs/local/CMakeLists.txt index b0aff79b11c..46bb6dacc09 100644 --- a/programs/local/CMakeLists.txt +++ b/programs/local/CMakeLists.txt @@ -1,7 +1,11 @@ -set (CLICKHOUSE_LOCAL_SOURCES LocalServer.cpp) +set (CLICKHOUSE_LOCAL_SOURCES + chdb.cpp + LocalServer.cpp +) if (USE_PYTHON) set (CHDB_SOURCES + chdb.cpp FormatHelper.cpp ListScan.cpp LocalChdb.cpp diff --git a/programs/local/LocalChdb.cpp b/programs/local/LocalChdb.cpp index eaf52793113..324a5a32373 100644 --- a/programs/local/LocalChdb.cpp +++ b/programs/local/LocalChdb.cpp @@ -1,6 +1,7 @@ #include "LocalChdb.h" #include "LocalServer.h" #include "chdb.h" +#include "chdb-internal.h" #include "PythonImporter.h" #include "PythonTableCache.h" #include "TableFunctionPython.h" @@ -492,7 +493,7 @@ PYBIND11_MODULE(_chdb, m) auto destroy_import_cache = []() { - DB::LocalServer::cleanupConnection(); + CHDB::chdbCleanupConnection(); CHDB::PythonTableCache::clear(); CHDB::PythonImporter::destroy(); }; diff --git a/programs/local/LocalChdb.h b/programs/local/LocalChdb.h index 72f0d54e8dd..00254dcfcea 100644 --- a/programs/local/LocalChdb.h +++ b/programs/local/LocalChdb.h @@ -131,7 +131,7 @@ class local_result_wrapper { return 0; } - return result->storage_rows_read; + return 0; } size_t storage_bytes_read() { @@ -139,7 +139,7 @@ class local_result_wrapper { return 0; } - return result->storage_bytes_read; + return 0; } double elapsed() { diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index e4e85088a7a..be96e2d17f8 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -1,11 +1,8 @@ #include "LocalServer.h" -#include "chdb.h" #include "chdb-internal.h" #if USE_PYTHON -#include "FormatHelper.h" #include "TableFunctionPython.h" -#include "PythonTableCache.h" #include #include #endif @@ -74,12 +71,7 @@ # include #endif -static local_result_v2 * createStreamingIterateQueryResult(DB::LocalServer * server, const CHDB::QueryRequestBase & req); - namespace fs = std::filesystem; -static std::shared_mutex global_connection_mutex; -chdb_conn * global_conn_ptr = nullptr; -std::string global_db_path; namespace CurrentMetrics { extern const Metric MemoryTracking; @@ -480,17 +472,6 @@ void LocalServer::connect() connection_parameters, client_context, in, need_render_progress, need_render_profile_events, server_display_name); } -void LocalServer::cleanupConnection() -{ - try - { - close_conn(&global_conn_ptr); - } - catch (...) - { - } -} - int LocalServer::main(const std::vector & /*args*/) try @@ -1031,14 +1012,8 @@ void LocalServer::readArguments(int argc, char ** argv, Arguments & common_argum void LocalServer::cleanStreamingQuery() { if (streaming_query_context && streaming_query_context->streaming_result) - { - auto streaming_iter_req = std::make_unique(); - streaming_iter_req->streaming_result = reinterpret_cast(streaming_query_context->streaming_result); - streaming_iter_req->is_canceled = true; + CHDB::cancelStreamQuery(this, streaming_query_context->streaming_result); - auto * local_result = createStreamingIterateQueryResult(this, *streaming_iter_req); - free_result_v2(local_result); - } streaming_query_context.reset(); } @@ -1074,796 +1049,15 @@ void LocalServer::cleanStreamingQuery() // } // } -class query_result_ -{ -public: - explicit query_result_(std::vector* buf, uint64_t rows, - uint64_t bytes, double elapsed): - rows_(rows), bytes_(bytes), elapsed_(elapsed), - buf_(buf) { } - - explicit query_result_(std::string&& error_msg): error_msg_(error_msg) { } - - std::string string() - { - return std::string(buf_->begin(), buf_->end()); - } - - uint64_t rows_; - uint64_t bytes_; - double elapsed_; - std::vector * buf_; - std::string error_msg_; -}; - - -// global mutex for all local servers -static std::mutex CHDB_MUTEX; - -std::unique_ptr pyEntryClickHouseLocal(int argc, char ** argv) -{ - try - { - std::lock_guard lock(CHDB_MUTEX); - DB::LocalServer app; - app.init(argc, argv); - int ret = app.run(); - if (ret == 0) - { - return std::make_unique( - app.getQueryOutputVector(), - app.getProcessedRows(), - app.getProcessedBytes(), - app.getElapsedTime()); - } else { - return std::make_unique(app.getErrorMsg()); - } - } - catch (const DB::Exception & e) - { - // wrap the error message into a new std::exception - throw std::domain_error(DB::getExceptionMessage(e, false)); - } - catch (const boost::program_options::error & e) - { - throw std::invalid_argument("Bad arguments: " + std::string(e.what())); - } - catch (...) - { - throw std::domain_error(DB::getCurrentExceptionMessage(true)); - } -} - -DB::LocalServer * bgClickHouseLocal(int argc, char ** argv) -{ - DB::LocalServer * app = nullptr; - try - { - app = new DB::LocalServer(); - app->setBackground(true); - app->init(argc, argv); - int ret = app->run(); - if (ret != 0) - { - auto err_msg = app->getErrorMsg(); - LOG_ERROR(&app->logger(), "Error running bgClickHouseLocal: {}", err_msg); - delete app; - app = nullptr; - throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Error running bgClickHouseLocal: {}", err_msg); - } - return app; - } - catch (const DB::Exception & e) - { - delete app; - throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "bgClickHouseLocal {}", DB::getExceptionMessage(e, false)); - } - catch (const Poco::Exception & e) - { - delete app; - throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "bgClickHouseLocal {}", e.displayText()); - } - catch (const std::exception & e) - { - delete app; - throw std::domain_error(e.what()); - } - catch (...) - { - delete app; - throw std::domain_error(DB::getCurrentExceptionMessage(true)); - } -} - -// todo fix the memory leak and unnecessary copy -local_result * query_stable(int argc, char ** argv) -{ - auto result = pyEntryClickHouseLocal(argc, argv); - if (!result->error_msg_.empty() || result->buf_ == nullptr) - { - return nullptr; - } - local_result * res = new local_result; - res->len = result->buf_->size(); - res->buf = result->buf_->data(); - res->_vec = result->buf_; - res->rows_read = result->rows_; - res->bytes_read = result->bytes_; - res->elapsed = result->elapsed_; - return res; -} - -void free_result(local_result * result) -{ - if (!result) - { - return; - } - if (result->_vec) - { - std::vector * vec = reinterpret_cast *>(result->_vec); - delete vec; - result->_vec = nullptr; - } - delete result; -} - -local_result_v2 * query_stable_v2(int argc, char ** argv) -{ - auto * res = new local_result_v2{}; - // pyEntryClickHouseLocal may throw some serious exceptions, although it's not likely - // to happen in the context of clickhouse-local. we catch them here and return an error - try - { - auto result = pyEntryClickHouseLocal(argc, argv); - if (!result->error_msg_.empty()) - { - // Handle scenario with an error message - res->error_message = new char[result->error_msg_.size() + 1]; - std::memcpy(res->error_message, result->error_msg_.c_str(), result->error_msg_.size() + 1); - } - else if (result->buf_ == nullptr) - { - // Handle scenario where result is empty and there's no error - res->rows_read = result->rows_; - res->bytes_read = result->bytes_; - res->elapsed = result->elapsed_; - } - else - { - // Handle successful data retrieval scenario - res->_vec = result->buf_; - res->len = result->buf_->size(); - res->buf = result->buf_->data(); - res->rows_read = result->rows_; - res->bytes_read = result->bytes_; - res->elapsed = result->elapsed_; - } - } - catch (const std::exception & e) - { - res->error_message = new char[strlen(e.what()) + 1]; - std::strcpy(res->error_message, e.what()); - } - catch (...) - { - const char * unknown_exception_msg = "Unknown exception"; - size_t len = std::strlen(unknown_exception_msg) + 1; - res->error_message = new char[len]; - std::strcpy(res->error_message, unknown_exception_msg); - } - return res; -} - -void free_result_v2(local_result_v2 * result) -{ - if (!result) - return; - - delete reinterpret_cast *>(result->_vec); - delete[] result->error_message; - delete result; -} - -static local_result_v2 * createMaterializedLocalQueryResult(DB::LocalServer * server, const CHDB::QueryRequestBase & req) -{ - auto * result = new local_result_v2(); - const auto & materialized_request = dynamic_cast(req); - - try - { - if (!server->parseQueryTextWithOutputFormat(materialized_request.query, materialized_request.format)) - { - std::string error = server->getErrorMsg(); - result->error_message = new char[error.length() + 1]; - std::strcpy(result->error_message, error.c_str()); - } - else - { - auto * query_output_vec = server->stealQueryOutputVector(); - if (query_output_vec) - { - result->_vec = query_output_vec; - result->len = query_output_vec->size(); - result->buf = query_output_vec->data(); - } - result->rows_read = server->getProcessedRows(); - result->bytes_read = server->getProcessedBytes(); - result->storage_rows_read = server->getStorgaeRowsRead(); - result->storage_bytes_read = server->getStorageBytesRead(); - result->elapsed = server->getElapsedTime(); - } - } - catch (const DB::Exception & e) - { - std::string error = DB::getExceptionMessage(e, false); - result->error_message = new char[error.length() + 1]; - std::strcpy(result->error_message, error.c_str()); - } - catch (...) - { - const char * unknown_error = "Unknown error occurred"; - result->error_message = new char[strlen(unknown_error) + 1]; - std::strcpy(result->error_message, unknown_error); - } - return result; -} - -static chdb_streaming_result * createStreamingQueryResult(DB::LocalServer * server, const CHDB::QueryRequestBase & req) -{ - auto * result = new chdb_streaming_result(); - auto * streaming_result = new CHDB::StreamingResultData(); - result->internal_data = streaming_result; - const auto & streaming_init_request = dynamic_cast(req); - - try - { - if (!server->parseQueryTextWithOutputFormat(streaming_init_request.query, streaming_init_request.format)) - streaming_result->error_message = server->getErrorMsg(); - } - catch (const DB::Exception& e) - { - std::string error = DB::getExceptionMessage(e, false); - streaming_result->error_message = error; - } - catch (...) - { - const char * unknown_error = "Unknown error occurred"; - streaming_result->error_message = unknown_error; - } - return result; -} - -static local_result_v2 * createStreamingIterateQueryResult(DB::LocalServer * server, const CHDB::QueryRequestBase & req) -{ - auto * result = new local_result_v2(); - const auto & streaming_iter_request = dynamic_cast(req); - const auto old_processed_rows = server->getProcessedRows(); - const auto old_processed_bytes = server->getProcessedBytes(); - const auto old_elapsed_time = server->getElapsedTime(); - - try - { - if (!server->processStreamingQuery(streaming_iter_request.streaming_result, streaming_iter_request.is_canceled)) - { - std::string error = server->getErrorMsg(); - result->error_message = new char[error.length() + 1]; - std::strcpy(result->error_message, error.c_str()); - } - else - { - const auto processed_rows = server->getProcessedRows(); - const auto processed_bytes = server->getProcessedBytes(); - const auto elapsed_time = server->getElapsedTime(); - if (processed_rows <= old_processed_rows) - return result; - - auto * query_output_vec = server->stealQueryOutputVector(); - if (query_output_vec) - { - result->_vec = query_output_vec; - result->len = query_output_vec->size(); - result->buf = query_output_vec->data(); - } - result->rows_read = processed_rows - old_processed_rows; - result->bytes_read = processed_bytes - old_processed_bytes; - result->elapsed = elapsed_time - old_elapsed_time; - } - } - catch (const DB::Exception& e) - { - std::string error = DB::getExceptionMessage(e, false); - result->error_message = new char[error.length() + 1]; - std::strcpy(result->error_message, error.c_str()); - } - catch (...) - { - const char * unknown_error = "Unknown error occurred"; - result->error_message = new char[strlen(unknown_error) + 1]; - std::strcpy(result->error_message, unknown_error); - } - return result; -} - -static CHDB::ResultData createQueryResult(DB::LocalServer * server, const CHDB::QueryRequestBase & req) -{ - CHDB::ResultData result_data; - - if (!req.isStreaming()) - { - result_data.result_type = CHDB::QueryResultType::RESULT_TYPE_MATERIALIZED; - result_data.materialized_result = createMaterializedLocalQueryResult(server, req); - result_data.is_end = true; - } - else if (!req.isIteration()) - { - server->streaming_query_context = std::make_shared(); - result_data.result_type = CHDB::QueryResultType::RESULT_TYPE_STREAMING; - result_data.streaming_result = createStreamingQueryResult(server, req); - result_data.is_end = chdb_streaming_result_error(result_data.streaming_result); - - if (!result_data.is_end) - server->streaming_query_context->streaming_result = result_data.streaming_result; - else - server->streaming_query_context.reset(); - } - else - { - result_data.result_type = CHDB::QueryResultType::RESULT_TYPE_MATERIALIZED; - result_data.materialized_result = createStreamingIterateQueryResult(server, req); - const auto & streaming_iter_request = dynamic_cast(req); - result_data.is_end = result_data.materialized_result->error_message - || result_data.materialized_result->rows_read == 0 - || streaming_iter_request.is_canceled; - } - - if (result_data.is_end) - { - server->streaming_query_context.reset(); -#if USE_PYTHON - if (auto * local_connection = static_cast(server->connection.get())) - { - /// Must clean up Context objects whether the query succeeds or fails. - /// During process exit, if LocalServer destructor triggers while cached PythonStorage - /// objects still exist in Context, their destruction will attempt to acquire GIL. - /// Acquiring GIL during process termination leads to immediate thread termination. - local_connection->resetQueryContext(); - } - - CHDB::PythonTableCache::clear(); -#endif - } - - return result_data; -} - -chdb_conn ** connect_chdb(int argc, char ** argv) -{ - std::lock_guard global_lock(global_connection_mutex); - - std::string path = ":memory:"; // Default path - for (int i = 1; i < argc; i++) - { - if (strncmp(argv[i], "--path=", 7) == 0) - { - path = argv[i] + 7; - break; - } - } - - if (global_conn_ptr != nullptr) - { - if (path == global_db_path) - return &global_conn_ptr; - - throw DB::Exception( - DB::ErrorCodes::BAD_ARGUMENTS, - "Another connection is already active with different path. Old path = {}, new path = {}, " - "please close the existing connection first.", - global_db_path, - path); - } - - auto * conn = new chdb_conn(); - auto * q_queue = new CHDB::QueryQueue(); - conn->queue = q_queue; - - std::mutex init_mutex; - std::condition_variable init_cv; - bool init_done = false; - bool init_success = false; - std::exception_ptr init_exception; - - // Start query processing thread - std::thread( - [&]() - { - auto * queue = static_cast(conn->queue); - try - { - DB::LocalServer * server = bgClickHouseLocal(argc, argv); - conn->server = server; - conn->connected = true; - - global_conn_ptr = conn; - global_db_path = path; - - // Signal successful initialization - { - std::lock_guard init_lock(init_mutex); - init_success = true; - init_done = true; - } - init_cv.notify_one(); - - while (true) - { - { - std::unique_lock lock(queue->mutex); - queue->query_cv.wait(lock, [queue]() { return queue->has_query || queue->shutdown; }); - - if (queue->shutdown) - { - try - { - server->cleanup(); - delete server; - } - catch (...) - { - // Log error but continue shutdown - LOG_ERROR(&Poco::Logger::get("LocalServer"), "Error during server cleanup"); - } - queue->cleanup_done = true; - queue->query_cv.notify_all(); - break; - } - - } - - CHDB::QueryRequestBase & req = *(queue->current_query); - auto result = createQueryResult(server, req); - - { - std::lock_guard lock(queue->mutex); - if (req.isStreaming() && !req.isIteration() && !result.is_end) - queue->has_streaming_query = true; - - if (req.isStreaming() && req.isIteration() && result.is_end) - queue->has_streaming_query = false; - - queue->current_result = result; - queue->has_result = true; - queue->has_query = false; - } - queue->result_cv.notify_all(); - } - } - catch (const DB::Exception & e) - { - // Log the error - LOG_ERROR(&Poco::Logger::get("LocalServer"), "Query thread terminated with error: {}", e.what()); - - // Signal thread termination - { - std::lock_guard init_lock(init_mutex); - init_exception = std::current_exception(); - init_done = true; - std::lock_guard lock(queue->mutex); - queue->shutdown = true; - queue->cleanup_done = true; - } - init_cv.notify_one(); - queue->query_cv.notify_all(); - queue->result_cv.notify_all(); - } - catch (...) - { - LOG_ERROR(&Poco::Logger::get("LocalServer"), "Query thread terminated with unknown error"); - - { - std::lock_guard init_lock(init_mutex); - init_exception = std::current_exception(); - init_done = true; - std::lock_guard lock(queue->mutex); - queue->shutdown = true; - queue->cleanup_done = true; - } - init_cv.notify_one(); - queue->query_cv.notify_all(); - queue->result_cv.notify_all(); - } - }) - .detach(); - - // Wait for initialization to complete - { - std::unique_lock init_lock(init_mutex); - init_cv.wait(init_lock, [&init_done]() { return init_done; }); - - if (!init_success) - { - delete q_queue; - delete conn; - if (init_exception) - std::rethrow_exception(init_exception); - throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Failed to create connection"); - } - } - - return &global_conn_ptr; -} - -void close_conn(chdb_conn ** conn) -{ - std::lock_guard global_lock(global_connection_mutex); - - if (!conn || !*conn) - return; - - if ((*conn)->connected) - { - if ((*conn)->queue) - { - auto * queue = static_cast((*conn)->queue); - - { - std::unique_lock queue_lock(queue->mutex); - queue->shutdown = true; - queue->query_cv.notify_all(); // Wake up query processing thread - queue->result_cv.notify_all(); // Wake up any waiting result threads - - // Wait for server cleanup - queue->query_cv.wait(queue_lock, [queue] { return queue->cleanup_done; }); - - // Clean up current result if any - queue->current_result.reset(); - queue->has_result = false; - } - - delete queue; - (*conn)->queue = nullptr; - } - - // Mark as disconnected BEFORE deleting queue and nulling global pointer - (*conn)->connected = false; - } - // Clear global pointer under lock before queue deletion - if (*conn != global_conn_ptr) - { - LOG_ERROR(&Poco::Logger::get("LocalServer"), "Connection mismatch during close_conn"); - } - - delete *conn; - *conn = nullptr; -} - -static bool checkConnectionValidity(chdb_conn * conn) -{ - return conn && conn->connected && conn->queue; -} - -template -T* createErrorResultImpl(const char * error_msg); - -template <> -local_result_v2 * createErrorResultImpl(const char * error_msg) -{ - auto * result = new local_result_v2{}; - result->error_message = new char[strlen(error_msg) + 1]; - std::strcpy(result->error_message, error_msg); - return result; -} - -template <> -chdb_streaming_result * createErrorResultImpl(const char * error_msg) -{ - auto * stream_result = new chdb_streaming_result(); - auto * stream_result_data = new CHDB::StreamingResultData(); - stream_result_data->error_message = error_msg; - stream_result->internal_data = stream_result_data; - return stream_result; -} - -template -T * createErrorResult(const char * error_msg) -{ - if constexpr (std::is_same_v) { - return createErrorResultImpl(error_msg); - } else { - return createErrorResultImpl(error_msg); - } -} - -static CHDB::ResultData executeQueryRequest( - CHDB::QueryQueue * queue, - const char * query, - const char * format, - CHDB::QueryType query_type, - chdb_streaming_result * streaming_result_ = nullptr, - bool is_canceled = false) -{ - CHDB::ResultData result; - try - { - { - std::unique_lock lock(queue->mutex); - // Wait until any ongoing query completes - if (query_type == CHDB::QueryType::TYPE_STREAMING_ITER) - queue->result_cv.wait(lock, [queue]() { return (!queue->has_query && !queue->has_result) || queue->shutdown; }); - else - queue->result_cv.wait(lock, [queue]() { return (!queue->has_query && !queue->has_result && !queue->has_streaming_query) || queue->shutdown; }); - - if (queue->shutdown) - { - if (query_type == CHDB::QueryType::TYPE_STREAMING_INIT) - result.streaming_result = createErrorResult("Connection is shutting down"); - else - result.materialized_result = createErrorResult("Connection is shutting down"); - return result; - } - - if (query_type == CHDB::QueryType::TYPE_STREAMING_INIT) - { - auto streaming_req = std::make_unique(); - streaming_req->query = query; - streaming_req->format = format; - queue->current_query = std::move(streaming_req); -#if USE_PYTHON - CHDB::SetCurrentFormat(format); -#endif - } - else if (query_type == CHDB::QueryType::TYPE_MATERIALIZED) - { - auto materialized_req = std::make_unique(); - materialized_req->query = query; - materialized_req->format = format; - queue->current_query = std::move(materialized_req); -#if USE_PYTHON - CHDB::SetCurrentFormat(format); -#endif - } - else - { - auto streaming_iter_req = std::make_unique(); - streaming_iter_req->streaming_result = streaming_result_; - streaming_iter_req->is_canceled = is_canceled; - queue->current_query = std::move(streaming_iter_req); - } - - queue->has_query = true; - queue->current_result.clear(); - queue->has_result = false; - } - queue->query_cv.notify_one(); - - { - std::unique_lock lock(queue->mutex); - queue->result_cv.wait(lock, [queue]() { return queue->has_result || queue->shutdown; }); - - if (!queue->shutdown && queue->has_result) - { - result = queue->current_result; - queue->current_result.clear(); - queue->has_result = false; - queue->has_query = false; - } - } - queue->result_cv.notify_all(); - } - catch (...) - { - // Handle any exceptions during query processing - if (query_type == CHDB::QueryType::TYPE_STREAMING_INIT) - result.streaming_result = createErrorResult("Error occurred while processing query"); - else - result.materialized_result = createErrorResult("Error occurred while processing query"); - } - - return result; -} - -struct local_result_v2 * query_conn(chdb_conn * conn, const char * query, const char * format) -{ - // Add connection validity check under global lock - std::shared_lock global_lock(global_connection_mutex); - - if (!checkConnectionValidity(conn)) - return createErrorResult("Invalid or closed connection"); - auto * queue = static_cast(conn->queue); - CHDB::ResultData result = executeQueryRequest(queue, query, format, CHDB::QueryType::TYPE_MATERIALIZED); - - auto * local_result = result.materialized_result; - if (!local_result) - local_result = createErrorResult("Query processing failed"); - - return local_result; -} - -chdb_streaming_result * query_conn_streaming(chdb_conn * conn, const char * query, const char * format) -{ - // Add connection validity check under global lock - std::shared_lock global_lock(global_connection_mutex); - - if (!checkConnectionValidity(conn)) - return createErrorResult("Invalid or closed connection"); - - auto * queue = static_cast(conn->queue); - CHDB::ResultData result = executeQueryRequest(queue, query, format, CHDB::QueryType::TYPE_STREAMING_INIT); - - auto * streaming_result = result.streaming_result; - if (!streaming_result) - streaming_result = createErrorResult("Query processing failed"); - - return streaming_result; -} - -const char * chdb_streaming_result_error(chdb_streaming_result * result) -{ - if (!result || !result->internal_data) - return nullptr; - - auto & result_data = *(reinterpret_cast(result->internal_data)); - if (result_data.error_message.empty()) - return nullptr; - - return result_data.error_message.c_str(); -} - -local_result_v2 * chdb_streaming_fetch_result(chdb_conn * conn, chdb_streaming_result * result) -{ - // Add connection validity check under global lock - std::shared_lock global_lock(global_connection_mutex); - - if (!checkConnectionValidity(conn)) - return createErrorResult("Invalid or closed connection"); - - auto * queue = static_cast(conn->queue); - CHDB::ResultData result_data = executeQueryRequest(queue, nullptr, nullptr, CHDB::QueryType::TYPE_STREAMING_ITER, result); - - auto * local_result = result_data.materialized_result; - if (!local_result) - local_result = createErrorResult("Query processing failed"); - - return local_result; -} - -void chdb_streaming_cancel_query(chdb_conn * conn, chdb_streaming_result * result) -{ - // Add connection validity check under global lock - std::shared_lock global_lock(global_connection_mutex); - - if (!checkConnectionValidity(conn)) - return; - - auto * queue = static_cast(conn->queue); - CHDB::ResultData result_data = executeQueryRequest(queue, nullptr, nullptr, CHDB::QueryType::TYPE_STREAMING_ITER, result, true); - auto * local_result = result_data.materialized_result; - free_result_v2(local_result); -} - -void chdb_destroy_result(chdb_streaming_result * result) -{ - if (!result) - return; - - if (result->internal_data) - { - auto * result_data = reinterpret_cast(result->internal_data); - delete result_data; - } - - delete result; -} /** * The dummy_calls function is used to prevent certain functions from being optimized out by the compiler. - * It includes calls to 'query_stable' and 'free_result' within a condition that is always false. - * This approach ensures these functions are recognized as used by the compiler, particularly under high + * It includes calls to 'query_stable' and 'free_result' within a condition that is always false. + * This approach ensures these functions are recognized as used by the compiler, particularly under high * optimization levels like -O3, where unused functions might otherwise be discarded. - * + * * Without this the Github runner macOS 12 builder will fail to find query_stable and free_result. * It is strange because the same code works fine on my own macOS 12 x86_64 and arm64 machines. * @@ -1892,7 +1086,7 @@ void dummy_calls_v2() int mainEntryClickHouseLocal(int argc, char ** argv) { dummy_calls(); - auto result = pyEntryClickHouseLocal(argc, argv); + auto result = CHDB::pyEntryClickHouseLocal(argc, argv); if (result) { std::cout << result->string() << std::endl; diff --git a/programs/local/LocalServer.h b/programs/local/LocalServer.h index 2db9fe5e2af..7c3d3b6be68 100644 --- a/programs/local/LocalServer.h +++ b/programs/local/LocalServer.h @@ -34,7 +34,16 @@ class LocalServer : public ClientApplicationBase, public Loggers void connect() override; - static void cleanupConnection(); + size_t getStorgaeRowsRead() const + { + auto * local_connection = static_cast(connection.get()); + return local_connection->getCHDBProgress().read_rows; + } + size_t getStorageBytesRead() const + { + auto * local_connection = static_cast(connection.get()); + return local_connection->getCHDBProgress().read_bytes; + } protected: Poco::Util::LayeredConfiguration & getClientConfiguration() override; diff --git a/programs/local/QueryResult.h b/programs/local/QueryResult.h new file mode 100644 index 00000000000..d0f384a4d4a --- /dev/null +++ b/programs/local/QueryResult.h @@ -0,0 +1,82 @@ +#pragma once + +#include "chdb-internal.h" + +#include +#include + +namespace CHDB { + +enum class QueryResultType : uint8_t +{ + RESULT_TYPE_MATERIALIZED = 0, + RESULT_TYPE_STREAMING = 1, + RESULT_TYPE_NONE = 2 +}; + +class QueryResult { +public: + explicit QueryResult(QueryResultType type, String error_message_ = "") + : result_type(type), error_message(std::move(error_message_)) + {} + + virtual ~QueryResult() = default; + + QueryResultType getType() const { return result_type; } + const String & getError() const { return error_message; } + +protected: + QueryResultType result_type; + String error_message; +}; + +class StreamQueryResult : public QueryResult { +public: + explicit StreamQueryResult(String error_message_ = "") + : QueryResult(QueryResultType::RESULT_TYPE_STREAMING, std::move(error_message_)) + {} +}; + +using ResultBuffer = std::unique_ptr>; + +class MaterializedQueryResult : public QueryResult { +public: + explicit MaterializedQueryResult( + ResultBuffer result_buffer_, + double elapsed_, + uint64_t rows_read_, + uint64_t bytes_read_, + uint64_t storage_rows_read_, + uint64_t storage_bytes_read_) + : QueryResult(QueryResultType::RESULT_TYPE_MATERIALIZED), + result_buffer(std::move(result_buffer_)), + elapsed(elapsed_), + rows_read(rows_read_), + bytes_read(bytes_read_), + storage_rows_read(storage_rows_read_), + storage_bytes_read(storage_bytes_read_) + {} + + explicit MaterializedQueryResult(String error_message_) + : QueryResult(QueryResultType::RESULT_TYPE_MATERIALIZED, std::move(error_message_)) + {} + + String string() + { + return String(result_buffer->begin(), result_buffer->end()); + } + +public: + ResultBuffer result_buffer; + double elapsed; + uint64_t rows_read; + uint64_t bytes_read; + uint64_t storage_rows_read; + uint64_t storage_bytes_read; +}; + +using QueryResultPtr = std::unique_ptr; +using MaterializedQueryResultPtr = std::unique_ptr; +using StreamQueryResultPtr = std::unique_ptr; + +} // namespace CHDB diff --git a/programs/local/chdb-internal.h b/programs/local/chdb-internal.h index 9f813a600ea..b655660bb75 100644 --- a/programs/local/chdb-internal.h +++ b/programs/local/chdb-internal.h @@ -1,37 +1,48 @@ #pragma once #include "chdb.h" +#include "QueryResult.h" + #include #include #include #include #include +namespace DB +{ + class LocalServer; +} + namespace CHDB { -struct QueryRequestBase { +struct QueryRequestBase +{ virtual ~QueryRequestBase() = default; virtual bool isStreaming() const = 0; virtual bool isIteration() const { return false; } }; -struct MaterializedQueryRequest : QueryRequestBase { +struct MaterializedQueryRequest : QueryRequestBase +{ std::string query; std::string format; bool isStreaming() const override { return false; } }; -struct StreamingInitRequest : QueryRequestBase { +struct StreamingInitRequest : QueryRequestBase +{ std::string query; std::string format; bool isStreaming() const override { return true; } }; -struct StreamingIterateRequest : QueryRequestBase { - chdb_streaming_result * streaming_result = nullptr; +struct StreamingIterateRequest : QueryRequestBase +{ + void * streaming_result = nullptr; bool is_canceled = false; bool isStreaming() const override { return true; } @@ -45,53 +56,13 @@ enum class QueryType : uint8_t TYPE_STREAMING_ITER = 2 }; -enum class QueryResultType : uint8_t -{ - RESULT_TYPE_MATERIALIZED = 0, - RESULT_TYPE_STREAMING = 1, - RESULT_TYPE_NONE = 2 -}; - -struct StreamingResultData -{ - std::string error_message; -}; - -struct ResultData -{ - bool is_end = false; - QueryResultType result_type = QueryResultType::RESULT_TYPE_NONE; - union - { - local_result_v2 * materialized_result = nullptr; - chdb_streaming_result * streaming_result; - }; - - void clear() - { - result_type = QueryResultType::RESULT_TYPE_MATERIALIZED; - materialized_result = nullptr; - is_end = false; - } - - void reset() - { - if (result_type == QueryResultType::RESULT_TYPE_MATERIALIZED && materialized_result) - free_result_v2(materialized_result); - else if (result_type == QueryResultType::RESULT_TYPE_STREAMING && streaming_result) - chdb_destroy_result(streaming_result); - - clear(); - } -}; - struct QueryQueue { std::mutex mutex; std::condition_variable query_cv; // For query submission std::condition_variable result_cv; // For query result retrieval std::unique_ptr current_query; - ResultData current_result; + QueryResultPtr current_result; bool has_result = false; bool has_query = false; bool has_streaming_query = false; @@ -99,4 +70,10 @@ struct QueryQueue bool cleanup_done = false; }; +std::unique_ptr pyEntryClickHouseLocal(int argc, char ** argv); + +void chdbCleanupConnection(); + +void cancelStreamQuery(DB::LocalServer * server, void * stream_result); + } diff --git a/programs/local/chdb.cpp b/programs/local/chdb.cpp new file mode 100644 index 00000000000..a59dea5880e --- /dev/null +++ b/programs/local/chdb.cpp @@ -0,0 +1,973 @@ +#include "chdb.h" +#include "chdb-internal.h" +#include "LocalServer.h" +#include "QueryResult.h" + +#if USE_PYTHON +#include "FormatHelper.h" +#include "PythonTableCache.h" +#endif + +namespace CHDB +{ + +static std::shared_mutex global_connection_mutex; +static std::mutex CHDB_MUTEX; +chdb_conn * global_conn_ptr = nullptr; +std::string global_db_path; + +static DB::LocalServer * bgClickHouseLocal(int argc, char ** argv) +{ + DB::LocalServer * app = nullptr; + try + { + app = new DB::LocalServer(); + app->setBackground(true); + app->init(argc, argv); + int ret = app->run(); + if (ret != 0) + { + auto err_msg = app->getErrorMsg(); + LOG_ERROR(&app->logger(), "Error running bgClickHouseLocal: {}", err_msg); + delete app; + app = nullptr; + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Error running bgClickHouseLocal: {}", err_msg); + } + return app; + } + catch (const DB::Exception & e) + { + delete app; + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "bgClickHouseLocal {}", DB::getExceptionMessage(e, false)); + } + catch (const Poco::Exception & e) + { + delete app; + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "bgClickHouseLocal {}", e.displayText()); + } + catch (const std::exception & e) + { + delete app; + throw std::domain_error(e.what()); + } + catch (...) + { + delete app; + throw std::domain_error(DB::getCurrentExceptionMessage(true)); + } +} + +static local_result_v2 * convert2LocalResultV2(QueryResult * query_result) +{ + auto local_result = new local_result_v2(); + auto * materialized_query_result = static_cast(query_result); + + if (!materialized_query_result) + { + String error = "Query processing failed"; + local_result->error_message = new char[error.size() + 1]; + std::memcpy(local_result->error_message, error.c_str(), error.size() + 1); + } + else if (!materialized_query_result->getError().empty()) + { + const String & error = materialized_query_result->getError(); + local_result->error_message = new char[error.size() + 1]; + std::memcpy(local_result->error_message, error.c_str(), error.size() + 1); + } + else if (!materialized_query_result->result_buffer) + { + local_result->rows_read = materialized_query_result->rows_read; + local_result->bytes_read = materialized_query_result->bytes_read; + local_result->elapsed = materialized_query_result->elapsed; + } + else + { + local_result->len = materialized_query_result->result_buffer->size(); + local_result->buf = materialized_query_result->result_buffer->data(); + local_result->_vec = materialized_query_result->result_buffer.release(); + local_result->rows_read = materialized_query_result->rows_read; + local_result->bytes_read = materialized_query_result->bytes_read; + local_result->elapsed = materialized_query_result->elapsed; + } + + return local_result; +} + +static local_result_v2 * createErrorLocalResultV2(const String & error) +{ + auto local_result = new local_result_v2(); + local_result->error_message = new char[error.size() + 1]; + std::memcpy(local_result->error_message, error.c_str(), error.size() + 1); + return local_result; +} + +static QueryResultPtr createMaterializedLocalQueryResult(DB::LocalServer * server, const CHDB::QueryRequestBase & req) +{ + QueryResultPtr query_result; + const auto & materialized_request = static_cast(req); + + try + { + if (!server->parseQueryTextWithOutputFormat(materialized_request.query, materialized_request.format)) + { + query_result = std::make_unique(server->getErrorMsg()); + } + else + { + query_result = std::make_unique( + ResultBuffer(server->stealQueryOutputVector()), + server->getElapsedTime(), + server->getProcessedRows(), + server->getProcessedBytes(), + server->getStorgaeRowsRead(), + server->getStorageBytesRead()); + } + } + catch (const DB::Exception & e) + { + query_result = std::make_unique(DB::getExceptionMessage(e, false)); + } + catch (...) + { + String error_message = "Unknown error occurred"; + query_result = std::make_unique(error_message); + } + + return query_result; +} + +static QueryResultPtr createStreamingQueryResult(DB::LocalServer * server, const CHDB::QueryRequestBase & req) +{ + QueryResultPtr query_result; + const auto & streaming_init_request = static_cast(req); + + try + { + if (!server->parseQueryTextWithOutputFormat(streaming_init_request.query, streaming_init_request.format)) + query_result = std::make_unique(server->getErrorMsg()); + else + query_result = std::make_unique(); + } + catch (const DB::Exception& e) + { + query_result = std::make_unique(DB::getExceptionMessage(e, false)); + } + catch (...) + { + String error_message = "Unknown error occurred"; + query_result = std::make_unique(error_message); + } + + return query_result; +} + +static QueryResultPtr createStreamingIterateQueryResult(DB::LocalServer * server, const CHDB::QueryRequestBase & req) +{ + QueryResultPtr query_result; + const auto & streaming_iter_request = static_cast(req); + const auto old_processed_rows = server->getProcessedRows(); + const auto old_processed_bytes = server->getProcessedBytes(); + const auto old_storage_rows_read = server->getStorgaeRowsRead(); + const auto old_storage_bytes_read = server->getStorageBytesRead(); + const auto old_elapsed_time = server->getElapsedTime(); + + try + { + if (!server->processStreamingQuery(streaming_iter_request.streaming_result, streaming_iter_request.is_canceled)) + { + query_result = std::make_unique(server->getErrorMsg()); + } + else + { + const auto processed_rows = server->getProcessedRows(); + const auto processed_bytes = server->getProcessedBytes(); + const auto storage_rows_read = server->getStorgaeRowsRead(); + const auto storage_bytes_read = server->getStorageBytesRead(); + const auto elapsed_time = server->getElapsedTime(); + if (processed_rows <= old_processed_rows) + query_result = std::make_unique(nullptr, 0.0, 0, 0, 0, 0); + else + query_result = std::make_unique( + ResultBuffer(server->stealQueryOutputVector()), + elapsed_time - old_elapsed_time, + processed_rows - old_processed_rows, + processed_bytes - old_processed_bytes, + storage_rows_read - old_storage_rows_read, + storage_bytes_read - old_storage_bytes_read); + } + } + catch (const DB::Exception& e) + { + query_result = std::make_unique(DB::getExceptionMessage(e, false)); + } + catch (...) + { + String error_message = "Unknown error occurred"; + query_result = std::make_unique(error_message); + } + + return query_result; +} + +static std::pair createQueryResult(DB::LocalServer * server, const CHDB::QueryRequestBase & req) +{ + QueryResultPtr query_result; + bool is_end = false; + + if (!req.isStreaming()) + { + query_result = createMaterializedLocalQueryResult(server, req); + is_end = true; + } + else if (!req.isIteration()) + { + server->streaming_query_context = std::make_shared(); + query_result = createStreamingQueryResult(server, req); + is_end = !query_result->getError().empty(); + + if (!is_end) + server->streaming_query_context->streaming_result = query_result.get(); + else + server->streaming_query_context.reset(); + } + else + { + query_result = createStreamingIterateQueryResult(server, req); + const auto & streaming_iter_request = static_cast(req); + auto materialized_query_result_ptr = static_cast(query_result.get()); + + is_end = !materialized_query_result_ptr->getError().empty() + || materialized_query_result_ptr->rows_read == 0 + || streaming_iter_request.is_canceled; + } + + if (is_end) + { + server->streaming_query_context.reset(); +#if USE_PYTHON + if (auto * local_connection = static_cast(server->connection.get())) + { + /// Must clean up Context objects whether the query succeeds or fails. + /// During process exit, if LocalServer destructor triggers while cached PythonStorage + /// objects still exist in Context, their destruction will attempt to acquire GIL. + /// Acquiring GIL during process termination leads to immediate thread termination. + local_connection->resetQueryContext(); + } + + CHDB::PythonTableCache::clear(); +#endif + } + + return std::make_pair(std::move(query_result), is_end); +} + +static bool checkConnectionValidity(chdb_conn * conn) +{ + return conn && conn->connected && conn->queue; +} + +static QueryResultPtr executeQueryRequest( + CHDB::QueryQueue * queue, + const char * query, + const char * format, + CHDB::QueryType query_type, + void * streaming_result_ = nullptr, + bool is_canceled = false) +{ + QueryResultPtr query_result; + + try + { + { + std::unique_lock lock(queue->mutex); + // Wait until any ongoing query completes + if (query_type == CHDB::QueryType::TYPE_STREAMING_ITER) + queue->result_cv.wait(lock, [queue]() { return (!queue->has_query && !queue->has_result) || queue->shutdown; }); + else + queue->result_cv.wait(lock, [queue]() { return (!queue->has_query && !queue->has_result && !queue->has_streaming_query) || queue->shutdown; }); + + if (queue->shutdown) + { + String error_message = "connection is shutting down"; + if (query_type == CHDB::QueryType::TYPE_STREAMING_INIT) + { + query_result.reset(new StreamQueryResult(error_message)); + } + else + { + query_result.reset(new MaterializedQueryResult(error_message)); + } + return query_result; + } + + if (query_type == CHDB::QueryType::TYPE_STREAMING_INIT) + { + auto streaming_req = std::make_unique(); + streaming_req->query = query; + streaming_req->format = format; + queue->current_query = std::move(streaming_req); +#if USE_PYTHON + CHDB::SetCurrentFormat(format); +#endif + } + else if (query_type == CHDB::QueryType::TYPE_MATERIALIZED) + { + auto materialized_req = std::make_unique(); + materialized_req->query = query; + materialized_req->format = format; + queue->current_query = std::move(materialized_req); +#if USE_PYTHON + CHDB::SetCurrentFormat(format); +#endif + } + else + { + auto streaming_iter_req = std::make_unique(); + streaming_iter_req->streaming_result = streaming_result_; + streaming_iter_req->is_canceled = is_canceled; + queue->current_query = std::move(streaming_iter_req); + } + + queue->has_query = true; + queue->current_result.reset(); + queue->has_result = false; + } + queue->query_cv.notify_one(); + + { + std::unique_lock lock(queue->mutex); + queue->result_cv.wait(lock, [queue]() { return queue->has_result || queue->shutdown; }); + + if (!queue->shutdown && queue->has_result) + { + query_result = std::move(queue->current_result); + queue->has_result = false; + queue->has_query = false; + } + } + queue->result_cv.notify_all(); + } + catch (...) + { + // Handle any exceptions during query processing + String error_message = "Error occurred while processing query"; + if (query_type == CHDB::QueryType::TYPE_STREAMING_INIT) + query_result.reset(new StreamQueryResult(error_message)); + else + query_result.reset(new MaterializedQueryResult(error_message)); + } + + return query_result; +} + +void chdbCleanupConnection() +{ + try + { + close_conn(&global_conn_ptr); + } + catch (...) + { + } +} + +void cancelStreamQuery(DB::LocalServer * server, void * stream_result) +{ + auto streaming_iter_req = std::make_unique(); + streaming_iter_req->streaming_result = stream_result; + streaming_iter_req->is_canceled = true; + + createStreamingIterateQueryResult(server, *streaming_iter_req); +} + +std::unique_ptr pyEntryClickHouseLocal(int argc, char ** argv) +{ + try + { + std::lock_guard lock(CHDB_MUTEX); + DB::LocalServer app; + app.init(argc, argv); + int ret = app.run(); + if (ret == 0) + { + return std::make_unique( + ResultBuffer(app.stealQueryOutputVector()), + app.getElapsedTime(), + app.getProcessedRows(), + app.getProcessedBytes(), + app.getStorgaeRowsRead(), + app.getStorageBytesRead()); + } else { + return std::make_unique(app.getErrorMsg()); + } + } + catch (const DB::Exception & e) + { + // wrap the error message into a new std::exception + throw std::domain_error(DB::getExceptionMessage(e, false)); + } + catch (const boost::program_options::error & e) + { + throw std::invalid_argument("Bad arguments: " + std::string(e.what())); + } + catch (...) + { + throw std::domain_error(DB::getCurrentExceptionMessage(true)); + } +} + +} // namespace CHDB + +using namespace CHDB; + +local_result * query_stable(int argc, char ** argv) +{ + auto query_result = pyEntryClickHouseLocal(argc, argv); + if (!query_result->getError().empty() || query_result->result_buffer == nullptr) + return nullptr; + + local_result * res = new local_result; + res->len = query_result->result_buffer->size(); + res->buf = query_result->result_buffer->data(); + res->_vec = query_result->result_buffer.release(); + res->rows_read = query_result->rows_read; + res->bytes_read = query_result->bytes_read; + res->elapsed = query_result->elapsed; + return res; +} + +void free_result(local_result * result) +{ + if (!result) + { + return; + } + if (result->_vec) + { + std::vector * vec = reinterpret_cast *>(result->_vec); + delete vec; + result->_vec = nullptr; + } + delete result; +} + +local_result_v2 * query_stable_v2(int argc, char ** argv) +{ + // pyEntryClickHouseLocal may throw some serious exceptions, although it's not likely + // to happen in the context of clickhouse-local. we catch them here and return an error + local_result_v2 * res = nullptr; + try + { + auto query_result = pyEntryClickHouseLocal(argc, argv); + + return convert2LocalResultV2(query_result.get()); + } + catch (const std::exception & e) + { + res = new local_result_v2(); + res->error_message = new char[strlen(e.what()) + 1]; + std::strcpy(res->error_message, e.what()); + } + catch (...) + { + res = new local_result_v2(); + const char * unknown_exception_msg = "Unknown exception"; + size_t len = std::strlen(unknown_exception_msg) + 1; + res->error_message = new char[len]; + std::strcpy(res->error_message, unknown_exception_msg); + } + + return res; +} + +void free_result_v2(local_result_v2 * result) +{ + if (!result) + return; + + delete reinterpret_cast *>(result->_vec); + delete[] result->error_message; + delete result; +} + +chdb_conn ** connect_chdb(int argc, char ** argv) +{ + std::lock_guard global_lock(global_connection_mutex); + + std::string path = ":memory:"; // Default path + for (int i = 1; i < argc; i++) + { + if (strncmp(argv[i], "--path=", 7) == 0) + { + path = argv[i] + 7; + break; + } + } + + if (global_conn_ptr != nullptr) + { + if (path == global_db_path) + return &global_conn_ptr; + + throw DB::Exception( + DB::ErrorCodes::BAD_ARGUMENTS, + "Another connection is already active with different path. Old path = {}, new path = {}, " + "please close the existing connection first.", + global_db_path, + path); + } + + auto * conn = new chdb_conn(); + auto * q_queue = new CHDB::QueryQueue(); + conn->queue = q_queue; + + std::mutex init_mutex; + std::condition_variable init_cv; + bool init_done = false; + bool init_success = false; + std::exception_ptr init_exception; + + // Start query processing thread + std::thread( + [&]() + { + auto * queue = static_cast(conn->queue); + try + { + DB::LocalServer * server = bgClickHouseLocal(argc, argv); + conn->server = server; + conn->connected = true; + + global_conn_ptr = conn; + global_db_path = path; + + // Signal successful initialization + { + std::lock_guard init_lock(init_mutex); + init_success = true; + init_done = true; + } + init_cv.notify_one(); + + while (true) + { + { + std::unique_lock lock(queue->mutex); + queue->query_cv.wait(lock, [queue]() { return queue->has_query || queue->shutdown; }); + + if (queue->shutdown) + { + try + { + server->cleanup(); + delete server; + } + catch (...) + { + // Log error but continue shutdown + LOG_ERROR(&Poco::Logger::get("LocalServer"), "Error during server cleanup"); + } + queue->cleanup_done = true; + queue->query_cv.notify_all(); + break; + } + + } + + CHDB::QueryRequestBase & req = *(queue->current_query); + auto result = createQueryResult(server, req); + bool is_end = result.second; + + { + std::lock_guard lock(queue->mutex); + if (req.isStreaming() && !req.isIteration() && !is_end) + queue->has_streaming_query = true; + + if (req.isStreaming() && req.isIteration() && is_end) + queue->has_streaming_query = false; + + queue->current_result = std::move(result.first); + queue->has_result = true; + queue->has_query = false; + } + queue->result_cv.notify_all(); + } + } + catch (const DB::Exception & e) + { + // Log the error + LOG_ERROR(&Poco::Logger::get("LocalServer"), "Query thread terminated with error: {}", e.what()); + + // Signal thread termination + { + std::lock_guard init_lock(init_mutex); + init_exception = std::current_exception(); + init_done = true; + std::lock_guard lock(queue->mutex); + queue->shutdown = true; + queue->cleanup_done = true; + } + init_cv.notify_one(); + queue->query_cv.notify_all(); + queue->result_cv.notify_all(); + } + catch (...) + { + LOG_ERROR(&Poco::Logger::get("LocalServer"), "Query thread terminated with unknown error"); + + { + std::lock_guard init_lock(init_mutex); + init_exception = std::current_exception(); + init_done = true; + std::lock_guard lock(queue->mutex); + queue->shutdown = true; + queue->cleanup_done = true; + } + init_cv.notify_one(); + queue->query_cv.notify_all(); + queue->result_cv.notify_all(); + } + }) + .detach(); + + // Wait for initialization to complete + { + std::unique_lock init_lock(init_mutex); + init_cv.wait(init_lock, [&init_done]() { return init_done; }); + + if (!init_success) + { + delete q_queue; + delete conn; + if (init_exception) + std::rethrow_exception(init_exception); + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Failed to create connection"); + } + } + + return &global_conn_ptr; +} + +void close_conn(chdb_conn ** conn) +{ + std::lock_guard global_lock(global_connection_mutex); + + if (!conn || !*conn) + return; + + if ((*conn)->connected) + { + if ((*conn)->queue) + { + auto * queue = static_cast((*conn)->queue); + + { + std::unique_lock queue_lock(queue->mutex); + queue->shutdown = true; + queue->query_cv.notify_all(); // Wake up query processing thread + queue->result_cv.notify_all(); // Wake up any waiting result threads + + // Wait for server cleanup + queue->query_cv.wait(queue_lock, [queue] { return queue->cleanup_done; }); + + // Clean up current result if any + queue->current_result.reset(); + queue->has_result = false; + } + + delete queue; + (*conn)->queue = nullptr; + } + + // Mark as disconnected BEFORE deleting queue and nulling global pointer + (*conn)->connected = false; + } + // Clear global pointer under lock before queue deletion + if (*conn != global_conn_ptr) + { + LOG_ERROR(&Poco::Logger::get("LocalServer"), "Connection mismatch during close_conn"); + } + + delete *conn; + *conn = nullptr; +} + +struct local_result_v2 * query_conn(chdb_conn * conn, const char * query, const char * format) +{ + // Add connection validity check under global lock + std::shared_lock global_lock(global_connection_mutex); + + if (!checkConnectionValidity(conn)) + return createErrorLocalResultV2("Invalid or closed connection"); + + auto * queue = static_cast(conn->queue); + auto query_result = executeQueryRequest(queue, query, format, CHDB::QueryType::TYPE_MATERIALIZED); + + return convert2LocalResultV2(query_result.get()); +} + +chdb_streaming_result * query_conn_streaming(chdb_conn * conn, const char * query, const char * format) +{ + // Add connection validity check under global lock + std::shared_lock global_lock(global_connection_mutex); + + if (!checkConnectionValidity(conn)) + { + auto * result = new StreamQueryResult("Invalid or closed connection"); + return reinterpret_cast(result); + } + + auto * queue = static_cast(conn->queue); + auto query_result = executeQueryRequest(queue, query, format, CHDB::QueryType::TYPE_STREAMING_INIT); + + if (!query_result) + { + auto * result = new StreamQueryResult("Query processing failed"); + return reinterpret_cast(result); + } + + return reinterpret_cast(query_result.release()); +} + +const char * chdb_streaming_result_error(chdb_streaming_result * result) +{ + if (!result) + return nullptr; + + auto stream_query_result = reinterpret_cast(result); + + const auto & error_message = stream_query_result->getError(); + if (!error_message.empty()) + return error_message.c_str(); + + return nullptr; +} + +local_result_v2 * chdb_streaming_fetch_result(chdb_conn * conn, chdb_streaming_result * result) +{ + // Add connection validity check under global lock + std::shared_lock global_lock(global_connection_mutex); + + if (!checkConnectionValidity(conn)) + return createErrorLocalResultV2("Invalid or closed connection"); + + auto * queue = static_cast(conn->queue); + auto query_result = executeQueryRequest(queue, nullptr, nullptr, CHDB::QueryType::TYPE_STREAMING_ITER, result); + + return convert2LocalResultV2(query_result.get()); +} + +void chdb_streaming_cancel_query(chdb_conn * conn, chdb_streaming_result * result) +{ + // Add connection validity check under global lock + std::shared_lock global_lock(global_connection_mutex); + + if (!checkConnectionValidity(conn)) + return; + + auto * queue = static_cast(conn->queue); + auto query_result = executeQueryRequest(queue, nullptr, nullptr, CHDB::QueryType::TYPE_STREAMING_ITER, result, true); + query_result.reset(); +} + +void chdb_destroy_result(chdb_streaming_result * result) +{ + if (!result) + return; + + auto stream_query_result = reinterpret_cast(result); + delete stream_query_result; +} + +// ============== New API Implementation ============== + +chdb_connection * chdb_connect(int argc, char ** argv) +{ + auto connection = connect_chdb(argc, argv); + + return reinterpret_cast(connection); +} + +void chdb_close_conn(chdb_connection * conn) +{ + auto connection = reinterpret_cast(conn); + + close_conn(connection); +} + +chdb_result * chdb_query(chdb_connection conn, const char * query, const char * format) +{ + std::shared_lock global_lock(global_connection_mutex); + + auto connection = reinterpret_cast(conn); + if (!checkConnectionValidity(connection)) + { + auto * result = new MaterializedQueryResult("Invalid or closed connection"); + return reinterpret_cast(result); + } + + auto * queue = static_cast(connection->queue); + auto query_result = executeQueryRequest(queue, query, format, CHDB::QueryType::TYPE_MATERIALIZED); + + return reinterpret_cast(query_result.release()); + +} + +chdb_result * chdb_stream_query(chdb_connection conn, const char * query, const char * format) +{ + std::shared_lock global_lock(global_connection_mutex); + + auto connection = reinterpret_cast(conn); + if (!checkConnectionValidity(connection)) + { + auto * result = new StreamQueryResult("Invalid or closed connection"); + return reinterpret_cast(result); + } + + auto * queue = static_cast(connection->queue); + auto query_result = executeQueryRequest(queue, query, format, CHDB::QueryType::TYPE_STREAMING_INIT); + + if (!query_result) + { + auto * result = new StreamQueryResult("Query processing failed"); + return reinterpret_cast(result); + } + + return reinterpret_cast(query_result.release()); +} + +chdb_result * chdb_stream_fetch_result(chdb_connection conn, chdb_result * result) +{ + std::shared_lock global_lock(global_connection_mutex); + + auto connection = reinterpret_cast(conn); + if (!checkConnectionValidity(connection)) + { + auto * query_result = new MaterializedQueryResult("Invalid or closed connection"); + return reinterpret_cast(query_result); + } + + auto * queue = static_cast(connection->queue); + auto query_result = executeQueryRequest(queue, nullptr, nullptr, CHDB::QueryType::TYPE_STREAMING_ITER, result); + + return reinterpret_cast(query_result.release()); +} + +void chdb_stream_cancel_query(chdb_connection conn, chdb_result * result) +{ + std::shared_lock global_lock(global_connection_mutex); + + auto connection = reinterpret_cast(conn); + if (!checkConnectionValidity(connection)) + return; + + auto * queue = static_cast(connection->queue); + auto query_result = executeQueryRequest(queue, nullptr, nullptr, CHDB::QueryType::TYPE_STREAMING_ITER, result, true); + query_result.reset(); +} + +void chdb_destroy_query_result(chdb_result * result) +{ + auto query_result = reinterpret_cast(result); + delete query_result; +} + +const char * chdb_result_buffer(chdb_result * result) +{ + auto query_result = reinterpret_cast(result); + + if (query_result->getType() == QueryResultType::RESULT_TYPE_MATERIALIZED) + { + auto materialized_result = reinterpret_cast(query_result); + return materialized_result->result_buffer->data(); + } + + return nullptr; +} + +size_t chdb_result_length(chdb_result * result) +{ + auto query_result = reinterpret_cast(result); + + if (query_result->getType() == QueryResultType::RESULT_TYPE_MATERIALIZED) + { + auto materialized_result = reinterpret_cast(query_result); + return materialized_result->result_buffer->size(); + } + + return 0; +} + +double chdb_result_elapsed(chdb_result * result) +{ + auto query_result = reinterpret_cast(result); + + if (query_result->getType() == QueryResultType::RESULT_TYPE_MATERIALIZED) + { + auto materialized_result = reinterpret_cast(query_result); + return materialized_result->elapsed; + } + + return 0.0; +} + +uint64_t chdb_result_rows_read(chdb_result * result) +{ + auto query_result = reinterpret_cast(result); + + if (query_result->getType() == QueryResultType::RESULT_TYPE_MATERIALIZED) + { + auto materialized_result = reinterpret_cast(query_result); + return materialized_result->rows_read; + } + + return 0; +} + +uint64_t chdb_result_bytes_read(chdb_result * result) +{ + auto query_result = reinterpret_cast(result); + + if (query_result->getType() == QueryResultType::RESULT_TYPE_MATERIALIZED) + { + auto materialized_result = reinterpret_cast(query_result); + return materialized_result->bytes_read; + } + + return 0; +} + +uint64_t chdb_result_storage_rows_read(chdb_result * result) +{ + auto query_result = reinterpret_cast(result); + + if (query_result->getType() == QueryResultType::RESULT_TYPE_MATERIALIZED) + { + auto materialized_result = reinterpret_cast(query_result); + return materialized_result->storage_rows_read; + } + + return 0; +} + +uint64_t chdb_result_storage_bytes_read(chdb_result * result) +{ + auto query_result = reinterpret_cast(result); + + if (query_result->getType() == QueryResultType::RESULT_TYPE_MATERIALIZED) + { + auto materialized_result = reinterpret_cast(query_result); + return materialized_result->storage_bytes_read; + } + + return 0; +} + +const char * chdb_result_error(chdb_result * result) +{ + auto query_result = reinterpret_cast(result); + if (query_result->getError().empty()) + return nullptr; + + return query_result->getError().c_str(); +} diff --git a/programs/local/chdb.h b/programs/local/chdb.h index c1e7a8775a0..ead4d3e2d3d 100644 --- a/programs/local/chdb.h +++ b/programs/local/chdb.h @@ -59,29 +59,29 @@ struct chdb_conn void * queue; /* Query processing queue */ }; +typedef struct +{ + void * internal_data; +} chdb_streaming_result; + #endif // Opaque handle for query results. // Internal data structure managed by chDB implementation. // Users should only interact through API functions. -typedef struct { +typedef struct chdb_result_ +{ void * internal_data; } chdb_result; // Connection handle wrapping database session state. // Internal data structure managed by chDB implementation. // Users should only interact through API functions. -typedef struct _chdb_connection { +typedef struct chdb_connection_ +{ void * internal_data; } * chdb_connection; -// Opaque handle for streaming query results. -// Internal data structure managed by chDB implementation. -// Users should only interact through API functions. -typedef struct { - void * internal_data; -} chdb_streaming_result; - #ifndef CHDB_NO_DEPRECATED // WARNING: The following interfaces are deprecated and will be removed in a future version. CHDB_EXPORT struct local_result * query_stable(int argc, char ** argv); @@ -133,6 +133,14 @@ CHDB_EXPORT struct local_result_v2 * query_conn(struct chdb_conn * conn, const c */ CHDB_EXPORT chdb_streaming_result * query_conn_streaming(struct chdb_conn * conn, const char * query, const char * format); +/** + * Retrieves error message from streaming result. + * @brief Gets error message associated with streaming query execution + * @param result Streaming result handle from query_conn_streaming() + * @return Null-terminated error message string, or NULL if no error occurred + */ + CHDB_EXPORT const char * chdb_streaming_result_error(chdb_streaming_result * result); + /** * Fetches next chunk of streaming results. * @brief Iterates through streaming query results @@ -151,6 +159,14 @@ CHDB_EXPORT struct local_result_v2 * chdb_streaming_fetch_result(struct chdb_con */ CHDB_EXPORT void chdb_streaming_cancel_query(struct chdb_conn * conn, chdb_streaming_result * result); +/** + * Releases resources associated with streaming result. + * @brief Destroys streaming result handle and frees allocated memory + * @param result Streaming result handle to destroy + * @warning Must be called even if query was finished or canceled + */ + CHDB_EXPORT void chdb_destroy_result(chdb_streaming_result * result); + #endif /** @@ -194,7 +210,7 @@ CHDB_EXPORT chdb_result * chdb_query(chdb_connection conn, const char * query, c * @return Streaming result handle containing query state or error message * @note Returns error result if connection is invalid or closed */ -CHDB_EXPORT chdb_streaming_result * chdb_stream_query(chdb_connection conn, const char * query, const char * format); +CHDB_EXPORT chdb_result * chdb_stream_query(chdb_connection conn, const char * query, const char * format); /** * Fetches next chunk of streaming results. @@ -204,7 +220,7 @@ CHDB_EXPORT chdb_streaming_result * chdb_stream_query(chdb_connection conn, cons * @return Materialized result chunk with data * @note Returns empty result when stream ends */ -CHDB_EXPORT chdb_result * chdb_stream_fetch_result(chdb_connection conn, chdb_streaming_result * result); +CHDB_EXPORT chdb_result * chdb_stream_fetch_result(chdb_connection conn, chdb_result * result); /** * Cancels ongoing streaming query. @@ -212,23 +228,69 @@ CHDB_EXPORT chdb_result * chdb_stream_fetch_result(chdb_connection conn, chdb_st * @param conn Active connection handle * @param result Streaming result handle to cancel */ -CHDB_EXPORT void chdb_stream_cancel_query(chdb_connection conn, chdb_streaming_result * result); +CHDB_EXPORT void chdb_stream_cancel_query(chdb_connection conn, chdb_result * result); /** - * Retrieves error message from streaming result. - * @brief Gets error message associated with streaming query execution - * @param result Streaming result handle from query_conn_streaming() - * @return Null-terminated error message string, or NULL if no error occurred + * Destroys a query result and releases all associated resources + * @param result The result handle to destroy */ -CHDB_EXPORT const char * chdb_streaming_result_error(chdb_streaming_result * result); +CHDB_EXPORT void chdb_destroy_query_result(chdb_result * result); /** - * Releases resources associated with streaming result. - * @brief Destroys streaming result handle and frees allocated memory - * @param result Streaming result handle to destroy - * @warning Must be called even if query was finished or canceled + * Gets pointer to the result data buffer + * @param result The query result handle + * @return Read-only pointer to the result data + */ +CHDB_EXPORT const char * chdb_result_buffer(chdb_result * result); + +/** + * Gets the length of the result data + * @param result The query result handle + * @return Size of result data in bytes + */ +CHDB_EXPORT size_t chdb_result_length(chdb_result * result); + +/** + * Gets query execution time + * @param result The query result handle + * @return Elapsed time in seconds + */ +CHDB_EXPORT double chdb_result_elapsed(chdb_result * result); + +/** + * Gets total rows in query result + * @param result The query result handle + * @return Number of rows contained in the result set + */ +CHDB_EXPORT uint64_t chdb_result_rows_read(chdb_result * result); + +/** + * Gets the total bytes occupied by the result set in internal binary format + * @param result The query result handle + * @return Number of bytes occupied by the result set in internal binary representation + */ +CHDB_EXPORT uint64_t chdb_result_bytes_read(chdb_result * result); + +/** + * Gets rows read from storage engine + * @param result The query result handle + * @return Number of rows read from storage + */ +CHDB_EXPORT uint64_t chdb_result_storage_rows_read(chdb_result * result); + +/** + * Gets bytes read from storage engine + * @param result The query result handle + * @return Number of bytes read from storage engine + */ +CHDB_EXPORT uint64_t chdb_result_storage_bytes_read(chdb_result * result); + +/** + * Retrieves error message from query execution + * @param result The query result handle + * @return Null-terminated error description, NULL if no error */ -CHDB_EXPORT void chdb_destroy_result(chdb_streaming_result * result); +CHDB_EXPORT const char * chdb_result_error(chdb_result * result); #ifdef __cplusplus } diff --git a/src/Client/ClientBase.h b/src/Client/ClientBase.h index f82c3a275dc..55bde0de1f5 100644 --- a/src/Client/ClientBase.h +++ b/src/Client/ClientBase.h @@ -109,14 +109,6 @@ class ClientBase bool tryStopQuery() { return query_interrupt_handler.tryStop(); } void stopQuery() { query_interrupt_handler.stop(); } - std::vector * getQueryOutputVector() - { - //get Buffer and convert to vector - // auto buf = query_result_buf->buffer(); - // std::vector vec(buf.begin(), buf.end()); - return query_result_memory; - } - /// Steals and returns the query output vector. /// Afterward, the content of query_result_memory is released by the Python side. std::vector * stealQueryOutputVector() @@ -126,10 +118,6 @@ class ClientBase query_result_buf.reset(); return result; } - - size_t getStorgaeRowsRead() const { return connection->getCHDBProgress().read_rows; } - size_t getStorageBytesRead() const { return connection->getCHDBProgress().read_bytes; } - size_t getProcessedRows() const { return processed_rows; } size_t getProcessedBytes() const { return processed_bytes; } double getElapsedTime() const { return progress_indication.elapsedSeconds(); } From cc0f2bfc7f79e9fad0f61aeed2509df9538ae5cc Mon Sep 17 00:00:00 2001 From: wudidapaopao Date: Wed, 4 Jun 2025 14:35:12 +0800 Subject: [PATCH 3/6] feat: add storage_rows_read and storage_bytes_read interfaces for retrieving storage metrics --- programs/local/LocalChdb.cpp | 55 ++++++----- programs/local/LocalChdb.h | 182 ++++++++++++----------------------- programs/local/chdb.cpp | 101 +++++++++++++++++-- programs/local/chdb.h | 10 +- tests/test_statistics.py | 45 ++++++++- 5 files changed, 235 insertions(+), 158 deletions(-) diff --git a/programs/local/LocalChdb.cpp b/programs/local/LocalChdb.cpp index 324a5a32373..c411acc48cb 100644 --- a/programs/local/LocalChdb.cpp +++ b/programs/local/LocalChdb.cpp @@ -1,19 +1,17 @@ #include "LocalChdb.h" -#include "LocalServer.h" #include "chdb.h" #include "chdb-internal.h" #include "PythonImporter.h" #include "PythonTableCache.h" -#include "TableFunctionPython.h" +#include "StoragePython.h" -#include #include namespace py = pybind11; extern bool inside_main = true; -local_result_v2 * queryToBuffer( +chdb_result * queryToBuffer( const std::string & queryStr, const std::string & output_format = "CSV", const std::string & path = {}, @@ -61,7 +59,7 @@ local_result_v2 * queryToBuffer( argv_char.push_back(const_cast(arg.c_str())); py::gil_scoped_release release; - return query_stable_v2(argv_char.size(), argv_char.data()); + return chdb_query_cmdline(argv_char.size(), argv_char.data()); } // Pybind11 will take over the ownership of the `query_result` object @@ -219,7 +217,7 @@ connection_wrapper::connection_wrapper(const std::string & conn_str) argv_char.push_back(const_cast(arg.c_str())); } - conn = connect_chdb(argv_char.size(), argv_char.data()); + conn = chdb_connect(argv_char.size(), argv_char.data()); db_path = path; is_memory_db = (path == ":memory:"); } @@ -227,14 +225,14 @@ connection_wrapper::connection_wrapper(const std::string & conn_str) connection_wrapper::~connection_wrapper() { py::gil_scoped_release release; - close_conn(conn); + chdb_close_conn(conn); } void connection_wrapper::close() { { py::gil_scoped_release release; - close_conn(conn); + chdb_close_conn(conn); } // Ensure that if a new connection is created before this object is destroyed that we don't try to close it. conn = nullptr; @@ -255,15 +253,17 @@ query_result * connection_wrapper::query(const std::string & query_str, const st CHDB::PythonTableCache::findQueryableObjFromQuery(query_str); py::gil_scoped_release release; - auto * result = query_conn(*conn, query_str.c_str(), format.c_str()); - if (result->len == 0) + auto * result = chdb_query(*conn, query_str.c_str(), format.c_str()); + if (chdb_result_length(result)) { LOG_DEBUG(getLogger("CHDB"), "Empty result returned for query: {}", query_str); } - if (result->error_message) + + auto * error_msg = chdb_result_error(result); + if (error_msg) { - std::string msg_copy(result->error_message); - free_result_v2(result); + std::string msg_copy(error_msg); + chdb_destroy_query_result(result); throw std::runtime_error(msg_copy); } return new query_result(result, false); @@ -274,12 +274,12 @@ streaming_query_result * connection_wrapper::send_query(const std::string & quer CHDB::PythonTableCache::findQueryableObjFromQuery(query_str); py::gil_scoped_release release; - auto * result = query_conn_streaming(*conn, query_str.c_str(), format.c_str()); - const auto * error_msg = chdb_streaming_result_error(result); + auto * result = chdb_stream_query(*conn, query_str.c_str(), format.c_str()); + auto * error_msg = chdb_result_error(result); if (error_msg) { std::string msg_copy(error_msg); - chdb_destroy_result(result); + chdb_destroy_query_result(result); throw std::runtime_error(msg_copy); } @@ -293,15 +293,16 @@ query_result * connection_wrapper::streaming_fetch_result(streaming_query_result if (!streaming_result || !streaming_result->get_result()) return nullptr; - auto * result = chdb_streaming_fetch_result(*conn, streaming_result->get_result()); + auto * result = chdb_stream_fetch_result(*conn, streaming_result->get_result()); - if (result->len == 0) + if (chdb_result_length(result) == 0) LOG_DEBUG(getLogger("CHDB"), "Empty result returned for streaming query"); - if (result->error_message) + auto * error_msg = chdb_result_error(result); + if (error_msg) { - std::string msg_copy(result->error_message); - free_result_v2(result); + std::string msg_copy(error_msg); + chdb_destroy_query_result(result); throw std::runtime_error(msg_copy); } @@ -315,7 +316,7 @@ void connection_wrapper::streaming_cancel_query(streaming_query_result * streami if (!streaming_result || !streaming_result->get_result()) return; - chdb_streaming_cancel_query(*conn, streaming_result->get_result()); + chdb_stream_cancel_query(*conn, streaming_result->get_result()); } void cursor_wrapper::execute(const std::string & query_str) @@ -325,7 +326,7 @@ void cursor_wrapper::execute(const std::string & query_str) // Use JSONCompactEachRowWithNamesAndTypes format for better type support py::gil_scoped_release release; - current_result = query_conn(conn->get_conn(), query_str.c_str(), "JSONCompactEachRowWithNamesAndTypes"); + current_result = chdb_query(conn->get_conn(), query_str.c_str(), "JSONCompactEachRowWithNamesAndTypes"); } @@ -390,7 +391,7 @@ PYBIND11_MODULE(_chdb, m) .def("view", &memoryview_wrapper::view); py::class_(m, "query_result") - .def(py::init(), py::return_value_policy::take_ownership) + .def(py::init(), py::return_value_policy::take_ownership) .def("data", &query_result::data) .def("bytes", &query_result::bytes) .def("__str__", &query_result::str) @@ -400,13 +401,15 @@ PYBIND11_MODULE(_chdb, m) .def("size", &query_result::size) .def("rows_read", &query_result::rows_read) .def("bytes_read", &query_result::bytes_read) + .def("storage_rows_read", &query_result::storage_rows_read) + .def("storage_bytes_read", &query_result::storage_bytes_read) .def("elapsed", &query_result::elapsed) .def("get_memview", &query_result::get_memview) .def("has_error", &query_result::has_error) .def("error_message", &query_result::error_message); py::class_(m, "streaming_query_result") - .def(py::init(), py::return_value_policy::take_ownership) + .def(py::init(), py::return_value_policy::take_ownership) .def("has_error", &streaming_query_result::has_error) .def("error_message", &streaming_query_result::error_message); @@ -448,6 +451,8 @@ PYBIND11_MODULE(_chdb, m) .def("data_size", &cursor_wrapper::data_size) .def("rows_read", &cursor_wrapper::rows_read) .def("bytes_read", &cursor_wrapper::bytes_read) + .def("storage_rows_read", &cursor_wrapper::storage_rows_read) + .def("storage_bytes_read", &cursor_wrapper::storage_bytes_read) .def("elapsed", &cursor_wrapper::elapsed) .def("has_error", &cursor_wrapper::has_error) .def("error_message", &cursor_wrapper::error_message); diff --git a/programs/local/LocalChdb.h b/programs/local/LocalChdb.h index 00254dcfcea..5cf30058135 100644 --- a/programs/local/LocalChdb.h +++ b/programs/local/LocalChdb.h @@ -1,21 +1,10 @@ #pragma once -#include -#include +#include "chdb.h" +#include "PybindWrapper.h" #include "config.h" -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include -# include "chdb.h" +#include namespace py = pybind11; @@ -29,14 +18,14 @@ class __attribute__((visibility("default"))) streaming_query_result; class connection_wrapper { private: - chdb_conn ** conn; + chdb_connection * conn; std::string db_path; bool is_memory_db; bool is_readonly; public: explicit connection_wrapper(const std::string & conn_str); - chdb_conn * get_conn() { return *conn; } + chdb_connection get_conn() { return *conn; } ~connection_wrapper(); cursor_wrapper * cursor(); void commit(); @@ -54,43 +43,24 @@ class connection_wrapper class local_result_wrapper { private: - local_result_v2 * result; + chdb_result * result; bool keep_buf; // background server mode will handle buf in ClickHouse engine public: - local_result_wrapper(local_result_v2 * result) : result(result), keep_buf(false) { } - local_result_wrapper(local_result_v2 * result, bool keep_buf) : result(result), keep_buf(keep_buf) { } + local_result_wrapper(chdb_result * result) : result(result), keep_buf(false) { } + local_result_wrapper(chdb_result * result, bool keep_buf) : result(result), keep_buf(keep_buf) { } ~local_result_wrapper() { - if (keep_buf) - { - if (!result) - return; - - result->_vec = nullptr; - delete[] result->error_message; - delete result; - } - else - { - free_result_v2(result); - } + /// keep_buf is always false + chdb_destroy_query_result(result); } char * data() { - if (result == nullptr) - { - return nullptr; - } - return result->buf; + return chdb_result_buffer(result); } size_t size() { - if (result == nullptr) - { - return 0; - } - return result->len; + return chdb_result_length(result); } py::bytes bytes() { @@ -98,7 +68,7 @@ class local_result_wrapper { return py::bytes(); } - return py::bytes(result->buf, result->len); + return py::bytes(chdb_result_buffer(result), chdb_result_length(result)); } py::str str() { @@ -106,62 +76,39 @@ class local_result_wrapper { return py::str(); } - return py::str(result->buf, result->len); + return py::str(chdb_result_buffer(result), chdb_result_length(result)); } // Query statistics size_t rows_read() { - if (result == nullptr) - { - return 0; - } - return result->rows_read; + return chdb_result_rows_read(result); } size_t bytes_read() { - if (result == nullptr) - { - return 0; - } - return result->bytes_read; + return chdb_result_bytes_read(result); } size_t storage_rows_read() { - if (result == nullptr) - { - return 0; - } - return 0; + return chdb_result_storage_rows_read(result); } size_t storage_bytes_read() { - if (result == nullptr) - { - return 0; - } - return 0; + return chdb_result_storage_bytes_read(result); } double elapsed() { - if (result == nullptr) - { - return 0; - } - return result->elapsed; + return chdb_result_elapsed(result); } bool has_error() { - if (result == nullptr) - { - return false; - } - return result->error_message != nullptr; + return chdb_result_error(result) != nullptr; } py::str error_message() { - if (has_error()) + auto error_message = chdb_result_error(result); + if (error_message) { - return py::str(result->error_message); + return py::str(error_message); } return py::str(); } @@ -173,8 +120,8 @@ class query_result std::shared_ptr result_wrapper; public: - query_result(local_result_v2 * result) : result_wrapper(std::make_shared(result)) { } - query_result(local_result_v2 * result, bool keep_buf) : result_wrapper(std::make_shared(result, keep_buf)) { } + query_result(chdb_result * result) : result_wrapper(std::make_shared(result)) { } + query_result(chdb_result * result, bool keep_buf) : result_wrapper(std::make_shared(result, keep_buf)) { } ~query_result() = default; char * data() { return result_wrapper->data(); } py::bytes bytes() { return result_wrapper->bytes(); } @@ -182,7 +129,7 @@ class query_result size_t size() { return result_wrapper->size(); } size_t rows_read() { return result_wrapper->rows_read(); } size_t bytes_read() { return result_wrapper->bytes_read(); } - size_t storgae_rows_read() { return result_wrapper->storage_rows_read(); } + size_t storage_rows_read() { return result_wrapper->storage_rows_read(); } size_t storage_bytes_read() { return result_wrapper->storage_bytes_read(); } double elapsed() { return result_wrapper->elapsed(); } bool has_error() { return result_wrapper->has_error(); } @@ -193,17 +140,25 @@ class query_result class streaming_query_result { private: - chdb_streaming_result * result; + chdb_result * result; public: - streaming_query_result(chdb_streaming_result * result_) : result(result_) {} + streaming_query_result(chdb_result * result_) : result(result_) {} ~streaming_query_result() { - chdb_destroy_result(result); + chdb_destroy_query_result(result); } - bool has_error() { return chdb_streaming_result_error(result) != nullptr; } - py::str error_message() { return chdb_streaming_result_error(result); } - chdb_streaming_result * get_result() { return result; } + bool has_error() { return chdb_result_error(result) != nullptr; } + py::str error_message() + { + auto error_message = chdb_result_error(result); + if (error_message) + { + return py::str(error_message); + } + return py::str(); + } + chdb_result * get_result() { return result; } }; class memoryview_wrapper @@ -248,19 +203,13 @@ class cursor_wrapper { private: connection_wrapper * conn; - local_result_v2 * current_result; + chdb_result * current_result; void release_result() { if (current_result) { - if (current_result->_vec) - { - auto * vec = reinterpret_cast *>(current_result->_vec); - delete vec; - current_result->_vec = nullptr; - } - free_result_v2(current_result); + chdb_destroy_query_result(current_result); current_result = nullptr; } @@ -286,59 +235,50 @@ class cursor_wrapper { return py::memoryview(py::memoryview::from_memory(nullptr, 0, true)); } - return py::memoryview(py::memoryview::from_memory(current_result->buf, current_result->len, true)); + return py::memoryview(py::memoryview::from_memory(chdb_result_buffer(current_result), chdb_result_length(current_result), true)); } size_t data_size() { - if (current_result == nullptr) - { - return 0; - } - return current_result->len; + return chdb_result_length(current_result); } size_t rows_read() { - if (current_result == nullptr) - { - return 0; - } - return current_result->rows_read; + return chdb_result_rows_read(current_result); } size_t bytes_read() { - if (current_result == nullptr) - { - return 0; - } - return current_result->bytes_read; + return chdb_result_bytes_read(current_result); + } + + size_t storage_rows_read() + { + return chdb_result_storage_rows_read(current_result); + } + + size_t storage_bytes_read() + { + return chdb_result_storage_bytes_read(current_result); } double elapsed() { - if (current_result == nullptr) - { - return 0; - } - return current_result->elapsed; + return chdb_result_elapsed(current_result); } bool has_error() { - if (current_result == nullptr) - { - return false; - } - return current_result->error_message != nullptr; + return chdb_result_error(current_result) != nullptr; } py::str error_message() { - if (has_error()) + auto error_message = chdb_result_error(current_result); + if (error_message) { - return py::str(current_result->error_message); + return py::str(error_message); } return py::str(); } diff --git a/programs/local/chdb.cpp b/programs/local/chdb.cpp index a59dea5880e..3f5a6004ed1 100644 --- a/programs/local/chdb.cpp +++ b/programs/local/chdb.cpp @@ -1,4 +1,5 @@ #include "chdb.h" +#include #include "chdb-internal.h" #include "LocalServer.h" #include "QueryResult.h" @@ -790,6 +791,9 @@ chdb_connection * chdb_connect(int argc, char ** argv) void chdb_close_conn(chdb_connection * conn) { + if (!conn || !*conn) + return; + auto connection = reinterpret_cast(conn); close_conn(connection); @@ -799,6 +803,12 @@ chdb_result * chdb_query(chdb_connection conn, const char * query, const char * { std::shared_lock global_lock(global_connection_mutex); + if (!conn) + { + auto * result = new MaterializedQueryResult("Unexepected null connection"); + return reinterpret_cast(result); + } + auto connection = reinterpret_cast(conn); if (!checkConnectionValidity(connection)) { @@ -813,10 +823,37 @@ chdb_result * chdb_query(chdb_connection conn, const char * query, const char * } +chdb_result * chdb_query_cmdline(int argc, char ** argv) +{ + MaterializedQueryResult * result = nullptr; + try + { + auto query_result = pyEntryClickHouseLocal(argc, argv); + + return reinterpret_cast(query_result.release()); + } + catch (const std::exception & e) + { + result = new MaterializedQueryResult(e.what()); + } + catch (...) + { + result = new MaterializedQueryResult("Unknown exception"); + } + + return reinterpret_cast(result); +} + chdb_result * chdb_stream_query(chdb_connection conn, const char * query, const char * format) { std::shared_lock global_lock(global_connection_mutex); + if (!conn) + { + auto * result = new StreamQueryResult("Unexepected null connection"); + return reinterpret_cast(result); + } + auto connection = reinterpret_cast(conn); if (!checkConnectionValidity(connection)) { @@ -840,6 +877,19 @@ chdb_result * chdb_stream_fetch_result(chdb_connection conn, chdb_result * resul { std::shared_lock global_lock(global_connection_mutex); + if (!conn) + { + auto * query_result = new MaterializedQueryResult("Unexepected null connection"); + return reinterpret_cast(query_result); + } + + if (!result) + { + auto * query_result = new MaterializedQueryResult("Unexepected null result"); + return reinterpret_cast(query_result); + } + + auto connection = reinterpret_cast(conn); if (!checkConnectionValidity(connection)) { @@ -857,6 +907,9 @@ void chdb_stream_cancel_query(chdb_connection conn, chdb_result * result) { std::shared_lock global_lock(global_connection_mutex); + if (!result) + return; + auto connection = reinterpret_cast(conn); if (!checkConnectionValidity(connection)) return; @@ -868,18 +921,24 @@ void chdb_stream_cancel_query(chdb_connection conn, chdb_result * result) void chdb_destroy_query_result(chdb_result * result) { + if (!result) + return; + auto query_result = reinterpret_cast(result); delete query_result; } -const char * chdb_result_buffer(chdb_result * result) +char * chdb_result_buffer(chdb_result * result) { + if (!result) + return nullptr; + auto query_result = reinterpret_cast(result); if (query_result->getType() == QueryResultType::RESULT_TYPE_MATERIALIZED) { - auto materialized_result = reinterpret_cast(query_result); - return materialized_result->result_buffer->data(); + auto materialized_result = reinterpret_cast(result); + return materialized_result->result_buffer ? materialized_result->result_buffer->data() : nullptr; } return nullptr; @@ -887,12 +946,15 @@ const char * chdb_result_buffer(chdb_result * result) size_t chdb_result_length(chdb_result * result) { + if (!result) + return 0; + auto query_result = reinterpret_cast(result); if (query_result->getType() == QueryResultType::RESULT_TYPE_MATERIALIZED) { - auto materialized_result = reinterpret_cast(query_result); - return materialized_result->result_buffer->size(); + auto materialized_result = reinterpret_cast(result); + return materialized_result->result_buffer ? materialized_result->result_buffer->size() : 0; } return 0; @@ -900,11 +962,14 @@ size_t chdb_result_length(chdb_result * result) double chdb_result_elapsed(chdb_result * result) { + if (!result) + return 0.0; + auto query_result = reinterpret_cast(result); if (query_result->getType() == QueryResultType::RESULT_TYPE_MATERIALIZED) { - auto materialized_result = reinterpret_cast(query_result); + auto materialized_result = reinterpret_cast(result); return materialized_result->elapsed; } @@ -913,11 +978,14 @@ double chdb_result_elapsed(chdb_result * result) uint64_t chdb_result_rows_read(chdb_result * result) { + if (!result) + return 0; + auto query_result = reinterpret_cast(result); if (query_result->getType() == QueryResultType::RESULT_TYPE_MATERIALIZED) { - auto materialized_result = reinterpret_cast(query_result); + auto materialized_result = reinterpret_cast(result); return materialized_result->rows_read; } @@ -926,11 +994,14 @@ uint64_t chdb_result_rows_read(chdb_result * result) uint64_t chdb_result_bytes_read(chdb_result * result) { + if (!result) + return 0; + auto query_result = reinterpret_cast(result); if (query_result->getType() == QueryResultType::RESULT_TYPE_MATERIALIZED) { - auto materialized_result = reinterpret_cast(query_result); + auto materialized_result = reinterpret_cast(result); return materialized_result->bytes_read; } @@ -939,11 +1010,14 @@ uint64_t chdb_result_bytes_read(chdb_result * result) uint64_t chdb_result_storage_rows_read(chdb_result * result) { + if (!result) + return 0; + auto query_result = reinterpret_cast(result); if (query_result->getType() == QueryResultType::RESULT_TYPE_MATERIALIZED) { - auto materialized_result = reinterpret_cast(query_result); + auto materialized_result = reinterpret_cast(result); return materialized_result->storage_rows_read; } @@ -952,11 +1026,14 @@ uint64_t chdb_result_storage_rows_read(chdb_result * result) uint64_t chdb_result_storage_bytes_read(chdb_result * result) { + if (!result) + return 0; + auto query_result = reinterpret_cast(result); if (query_result->getType() == QueryResultType::RESULT_TYPE_MATERIALIZED) { - auto materialized_result = reinterpret_cast(query_result); + auto materialized_result = reinterpret_cast(result); return materialized_result->storage_bytes_read; } @@ -965,7 +1042,11 @@ uint64_t chdb_result_storage_bytes_read(chdb_result * result) const char * chdb_result_error(chdb_result * result) { + if (!result) + return nullptr; + auto query_result = reinterpret_cast(result); + if (query_result->getError().empty()) return nullptr; diff --git a/programs/local/chdb.h b/programs/local/chdb.h index ead4d3e2d3d..7f2ef821e66 100644 --- a/programs/local/chdb.h +++ b/programs/local/chdb.h @@ -201,6 +201,14 @@ CHDB_EXPORT chdb_connection * chdb_connect(int argc, char ** argv); */ CHDB_EXPORT chdb_result * chdb_query(chdb_connection conn, const char * query, const char * format); +/** + * @brief Execute a query with command-line interface + * @param argc Argument count (same as main()'s argc) + * @param argv Argument vector (same as main()'s argv) + * @return Query result structure containing output or error message + */ +CHDB_EXPORT chdb_result * chdb_query_cmdline(int argc, char ** argv); + /** * Executes a streaming query on the given connection. * @brief Initializes streaming query execution and returns result handle @@ -241,7 +249,7 @@ CHDB_EXPORT void chdb_destroy_query_result(chdb_result * result); * @param result The query result handle * @return Read-only pointer to the result data */ -CHDB_EXPORT const char * chdb_result_buffer(chdb_result * result); +CHDB_EXPORT char * chdb_result_buffer(chdb_result * result); /** * Gets the length of the result data diff --git a/tests/test_statistics.py b/tests/test_statistics.py index 87ee86cf429..0aa337e7156 100644 --- a/tests/test_statistics.py +++ b/tests/test_statistics.py @@ -3,10 +3,11 @@ import os import unittest import chdb +import shutil +from chdb import session N = 1000 - class TestQueryStatistics(unittest.TestCase): def setUp(self) -> None: # create tmp csv file @@ -28,6 +29,48 @@ def test_csv_stats(self): self.assertEqual(ret.bytes_read(), 27000) print(f"SQL read {ret.rows_read()} rows, {ret.bytes_read()} bytes, elapsed {ret.elapsed()} seconds") + def test_storage_stats(self): + test_query_statistics_dir = ".tmp_test_query_statistics_dir" + shutil.rmtree(test_query_statistics_dir, ignore_errors=True) + sess = session.Session(test_query_statistics_dir) + + total_rows = 0 + total_bytes = 0 + total_storage_rows_read = 0 + total_storage_bytes_read = 0 + with sess.send_query("SELECT * FROM numbers(1000000)") as stream: + for chunk in stream: + total_rows += chunk.rows_read() + total_bytes += chunk.bytes_read() + total_storage_rows_read += chunk.storage_rows_read() + total_storage_bytes_read += chunk.storage_bytes_read() + + self.assertEqual(total_rows, 1000000) + self.assertEqual(total_storage_rows_read, 1000000) + self.assertEqual(total_storage_bytes_read, 8000000) + self.assertEqual(total_storage_rows_read, 1000000) + + ret = sess.query("SELECT WatchID,JavaEnable FROM url('https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_0.parquet', 'Parquet')") + self.assertEqual(ret.rows_read(), 1000000) + self.assertEqual(ret.bytes_read(), 12000000) + self.assertEqual(ret.storage_rows_read(), 1000000) + self.assertEqual(ret.storage_bytes_read(), 122401411) + + ret = sess.query("SELECT WatchID,JavaEnable FROM url('https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_0.parquet', 'Parquet') limit 10") + self.assertEqual(ret.rows_read(), 10) + self.assertEqual(ret.bytes_read(), 120) + self.assertEqual(ret.storage_rows_read(), 65409) + self.assertEqual(ret.storage_bytes_read(), 7563614) + + ret = sess.query("SELECT WatchID,JavaEnable FROM url('https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_0.parquet', 'Parquet') where JavaEnable < 0") + self.assertEqual(ret.rows_read(), 0) + self.assertEqual(ret.bytes_read(), 0) + self.assertEqual(ret.storage_rows_read(), 0) + self.assertEqual(ret.storage_bytes_read(), 0) + + sess.close() + shutil.rmtree(test_query_statistics_dir, ignore_errors=True) + def test_non_exist_stats(self): with self.assertRaises(Exception): ret = chdb.query("SELECT * FROM file('notexist.parquet', Parquet)", "Parquet") From efa1932406e038ef273b9edafbd439b775757fd8 Mon Sep 17 00:00:00 2001 From: wudidapaopao Date: Wed, 4 Jun 2025 17:57:32 +0800 Subject: [PATCH 4/6] fix: fix pyEntryClickHouseLocal --- programs/local/chdb.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/programs/local/chdb.cpp b/programs/local/chdb.cpp index 3f5a6004ed1..6078e8f6abf 100644 --- a/programs/local/chdb.cpp +++ b/programs/local/chdb.cpp @@ -396,8 +396,8 @@ std::unique_ptr pyEntryClickHouseLocal(int argc, char * app.getElapsedTime(), app.getProcessedRows(), app.getProcessedBytes(), - app.getStorgaeRowsRead(), - app.getStorageBytesRead()); + 0, + 0); } else { return std::make_unique(app.getErrorMsg()); } From 8a1c8d1ff19eb94e337a854e313fb0980f7532f7 Mon Sep 17 00:00:00 2001 From: wudidapaopao Date: Wed, 4 Jun 2025 18:20:46 +0800 Subject: [PATCH 5/6] fix: fix test_storage_stats --- tests/test_statistics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_statistics.py b/tests/test_statistics.py index 0aa337e7156..4b4f958fd5c 100644 --- a/tests/test_statistics.py +++ b/tests/test_statistics.py @@ -60,7 +60,7 @@ def test_storage_stats(self): self.assertEqual(ret.rows_read(), 10) self.assertEqual(ret.bytes_read(), 120) self.assertEqual(ret.storage_rows_read(), 65409) - self.assertEqual(ret.storage_bytes_read(), 7563614) + # self.assertEqual(ret.storage_bytes_read(), 7563614) # storage_bytes_read() is unstable ret = sess.query("SELECT WatchID,JavaEnable FROM url('https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_0.parquet', 'Parquet') where JavaEnable < 0") self.assertEqual(ret.rows_read(), 0) From a188d9233944427c00e6f6248a46e8fbb88ca696 Mon Sep 17 00:00:00 2001 From: wudidapaopao Date: Thu, 5 Jun 2025 15:34:06 +0800 Subject: [PATCH 6/6] chore: minor code adjustment in chdb.cpp --- programs/local/chdb.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/programs/local/chdb.cpp b/programs/local/chdb.cpp index 6078e8f6abf..5aa0ea22483 100644 --- a/programs/local/chdb.cpp +++ b/programs/local/chdb.cpp @@ -907,7 +907,7 @@ void chdb_stream_cancel_query(chdb_connection conn, chdb_result * result) { std::shared_lock global_lock(global_connection_mutex); - if (!result) + if (!result || !conn) return; auto connection = reinterpret_cast(conn);