Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion programs/local/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
set (CLICKHOUSE_LOCAL_SOURCES LocalServer.cpp)
set (CLICKHOUSE_LOCAL_SOURCES
chdb.cpp
LocalServer.cpp
)

if (USE_PYTHON)
set (CHDB_SOURCES
chdb.cpp
FormatHelper.cpp
ListScan.cpp
LocalChdb.cpp
Expand Down
58 changes: 32 additions & 26 deletions programs/local/LocalChdb.cpp
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
#include "LocalChdb.h"
#include "LocalServer.h"
#include "chdb.h"
#include "chdb-internal.h"
#include "PythonImporter.h"
#include "PythonTableCache.h"
#include "TableFunctionPython.h"
#include "StoragePython.h"

#include <mutex>
#include <Common/logger_useful.h>

namespace py = pybind11;

extern bool inside_main = true;

local_result_v2 * queryToBuffer(
chdb_result * queryToBuffer(
const std::string & queryStr,
const std::string & output_format = "CSV",
const std::string & path = {},
Expand Down Expand Up @@ -60,7 +59,7 @@ local_result_v2 * queryToBuffer(
argv_char.push_back(const_cast<char *>(arg.c_str()));

py::gil_scoped_release release;
return query_stable_v2(argv_char.size(), argv_char.data());
return chdb_query_cmdline(argv_char.size(), argv_char.data());
}

// Pybind11 will take over the ownership of the `query_result` object
Expand Down Expand Up @@ -218,22 +217,22 @@ connection_wrapper::connection_wrapper(const std::string & conn_str)
argv_char.push_back(const_cast<char *>(arg.c_str()));
}

conn = connect_chdb(argv_char.size(), argv_char.data());
conn = chdb_connect(argv_char.size(), argv_char.data());
db_path = path;
is_memory_db = (path == ":memory:");
}

connection_wrapper::~connection_wrapper()
{
py::gil_scoped_release release;
close_conn(conn);
chdb_close_conn(conn);
}

void connection_wrapper::close()
{
{
py::gil_scoped_release release;
close_conn(conn);
chdb_close_conn(conn);
}
// Ensure that if a new connection is created before this object is destroyed that we don't try to close it.
conn = nullptr;
Expand All @@ -254,15 +253,17 @@ query_result * connection_wrapper::query(const std::string & query_str, const st
CHDB::PythonTableCache::findQueryableObjFromQuery(query_str);

py::gil_scoped_release release;
auto * result = query_conn(*conn, query_str.c_str(), format.c_str());
if (result->len == 0)
auto * result = chdb_query(*conn, query_str.c_str(), format.c_str());
if (chdb_result_length(result))
{
LOG_DEBUG(getLogger("CHDB"), "Empty result returned for query: {}", query_str);
}
if (result->error_message)

auto * error_msg = chdb_result_error(result);
if (error_msg)
{
std::string msg_copy(result->error_message);
free_result_v2(result);
std::string msg_copy(error_msg);
chdb_destroy_query_result(result);
throw std::runtime_error(msg_copy);
}
return new query_result(result, false);
Expand All @@ -273,12 +274,12 @@ streaming_query_result * connection_wrapper::send_query(const std::string & quer
CHDB::PythonTableCache::findQueryableObjFromQuery(query_str);

py::gil_scoped_release release;
auto * result = query_conn_streaming(*conn, query_str.c_str(), format.c_str());
const auto * error_msg = chdb_streaming_result_error(result);
auto * result = chdb_stream_query(*conn, query_str.c_str(), format.c_str());
auto * error_msg = chdb_result_error(result);
if (error_msg)
{
std::string msg_copy(error_msg);
chdb_destroy_result(result);
chdb_destroy_query_result(result);
throw std::runtime_error(msg_copy);
}

Expand All @@ -292,15 +293,16 @@ query_result * connection_wrapper::streaming_fetch_result(streaming_query_result
if (!streaming_result || !streaming_result->get_result())
return nullptr;

auto * result = chdb_streaming_fetch_result(*conn, streaming_result->get_result());
auto * result = chdb_stream_fetch_result(*conn, streaming_result->get_result());

if (result->len == 0)
if (chdb_result_length(result) == 0)
LOG_DEBUG(getLogger("CHDB"), "Empty result returned for streaming query");

if (result->error_message)
auto * error_msg = chdb_result_error(result);
if (error_msg)
{
std::string msg_copy(result->error_message);
free_result_v2(result);
std::string msg_copy(error_msg);
chdb_destroy_query_result(result);
throw std::runtime_error(msg_copy);
}

Expand All @@ -314,7 +316,7 @@ void connection_wrapper::streaming_cancel_query(streaming_query_result * streami
if (!streaming_result || !streaming_result->get_result())
return;

chdb_streaming_cancel_query(*conn, streaming_result->get_result());
chdb_stream_cancel_query(*conn, streaming_result->get_result());
}

void cursor_wrapper::execute(const std::string & query_str)
Expand All @@ -324,7 +326,7 @@ void cursor_wrapper::execute(const std::string & query_str)

// Use JSONCompactEachRowWithNamesAndTypes format for better type support
py::gil_scoped_release release;
current_result = query_conn(conn->get_conn(), query_str.c_str(), "JSONCompactEachRowWithNamesAndTypes");
current_result = chdb_query(conn->get_conn(), query_str.c_str(), "JSONCompactEachRowWithNamesAndTypes");
}


Expand Down Expand Up @@ -389,7 +391,7 @@ PYBIND11_MODULE(_chdb, m)
.def("view", &memoryview_wrapper::view);

py::class_<query_result>(m, "query_result")
.def(py::init<local_result_v2 *>(), py::return_value_policy::take_ownership)
.def(py::init<chdb_result *>(), py::return_value_policy::take_ownership)
.def("data", &query_result::data)
.def("bytes", &query_result::bytes)
.def("__str__", &query_result::str)
Expand All @@ -399,13 +401,15 @@ PYBIND11_MODULE(_chdb, m)
.def("size", &query_result::size)
.def("rows_read", &query_result::rows_read)
.def("bytes_read", &query_result::bytes_read)
.def("storage_rows_read", &query_result::storage_rows_read)
.def("storage_bytes_read", &query_result::storage_bytes_read)
.def("elapsed", &query_result::elapsed)
.def("get_memview", &query_result::get_memview)
.def("has_error", &query_result::has_error)
.def("error_message", &query_result::error_message);

py::class_<streaming_query_result>(m, "streaming_query_result")
.def(py::init<chdb_streaming_result *>(), py::return_value_policy::take_ownership)
.def(py::init<chdb_result *>(), py::return_value_policy::take_ownership)
.def("has_error", &streaming_query_result::has_error)
.def("error_message", &streaming_query_result::error_message);

Expand Down Expand Up @@ -447,6 +451,8 @@ PYBIND11_MODULE(_chdb, m)
.def("data_size", &cursor_wrapper::data_size)
.def("rows_read", &cursor_wrapper::rows_read)
.def("bytes_read", &cursor_wrapper::bytes_read)
.def("storage_rows_read", &cursor_wrapper::storage_rows_read)
.def("storage_bytes_read", &cursor_wrapper::storage_bytes_read)
.def("elapsed", &cursor_wrapper::elapsed)
.def("has_error", &cursor_wrapper::has_error)
.def("error_message", &cursor_wrapper::error_message);
Expand Down Expand Up @@ -492,7 +498,7 @@ PYBIND11_MODULE(_chdb, m)

auto destroy_import_cache = []()
{
DB::LocalServer::cleanupConnection();
CHDB::chdbCleanupConnection();
CHDB::PythonTableCache::clear();
CHDB::PythonImporter::destroy();
};
Expand Down
Loading