From 28304a8e750f3d0118ad6d8ef82fcafc3e4c7ab0 Mon Sep 17 00:00:00 2001 From: s0und0fs1lence Date: Mon, 25 Aug 2025 17:15:05 +0000 Subject: [PATCH 1/5] cache schema fields to avoid memory allocation on every Next() call --- chdb/driver/driver.go | 2 ++ chdb/driver/parquet.go | 14 +++++++------- chdb/driver/parquet_streaming.go | 20 +++++++++++--------- 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/chdb/driver/driver.go b/chdb/driver/driver.go index e01c320..351f55d 100644 --- a/chdb/driver/driver.go +++ b/chdb/driver/driver.go @@ -54,6 +54,7 @@ func (d DriverType) PrepareRows(result chdbpurego.ChdbResult, buf []byte, bufSiz localResult: result, reader: reader, bufferSize: bufSize, needNewBuffer: true, useUnsafeStringReader: useUnsafe, + schemaFields: reader.Schema().Fields(), }, nil } @@ -73,6 +74,7 @@ func (d DriverType) PrepareStreamingRows(result chdbpurego.ChdbStreamResult, buf stream: result, curChunk: nextRes, reader: reader, bufferSize: bufSize, needNewBuffer: true, useUnsafeStringReader: useUnsafe, + schemaFields: reader.Schema().Fields(), }, nil } diff --git a/chdb/driver/parquet.go b/chdb/driver/parquet.go index 700468e..96325ae 100644 --- a/chdb/driver/parquet.go +++ b/chdb/driver/parquet.go @@ -28,6 +28,7 @@ type parquetRows struct { reader *parquet.GenericReader[any] // parquet reader curRecord parquet.Row // TODO: delete this? buffer []parquet.Row // record buffer + schemaFields []parquet.Field // schema fields bufferSize int // amount of records to preload into buffer bufferIndex int64 // index in the current buffer curRow int64 // row counter @@ -36,8 +37,7 @@ type parquetRows struct { } func (r *parquetRows) Columns() (out []string) { - sch := r.reader.Schema() - for _, f := range sch.Fields() { + for _, f := range r.schemaFields { out = append(out, f.Name()) } @@ -53,7 +53,7 @@ func (r *parquetRows) Close() error { r.reader = nil r.localResult.Free() r.localResult = nil - + r.schemaFields = nil r.buffer = nil return nil } @@ -90,7 +90,7 @@ func (r *parquetRows) Next(dest []driver.Value) error { } r.curRecord = r.buffer[r.bufferIndex] - if r.curRecord == nil || len(r.curRecord) == 0 { + if len(r.curRecord) == 0 { return fmt.Errorf("empty row") } var scanError error @@ -166,11 +166,11 @@ func (r *parquetRows) Next(dest []driver.Value) error { } func (r *parquetRows) ColumnTypeDatabaseTypeName(index int) string { - return r.reader.Schema().Fields()[index].Type().String() + return r.schemaFields[index].Type().String() } func (r *parquetRows) ColumnTypeNullable(index int) (nullable, ok bool) { - return r.reader.Schema().Fields()[index].Optional(), true + return r.schemaFields[index].Optional(), true } func (r *parquetRows) ColumnTypePrecisionScale(index int) (precision, scale int64, ok bool) { @@ -178,7 +178,7 @@ func (r *parquetRows) ColumnTypePrecisionScale(index int) (precision, scale int6 } func (r *parquetRows) ColumnTypeScanType(index int) reflect.Type { - switch r.reader.Schema().Fields()[index].Type().Kind() { + switch r.schemaFields[index].Type().Kind() { case parquet.Boolean: return reflect.TypeOf(false) case parquet.Int32: diff --git a/chdb/driver/parquet_streaming.go b/chdb/driver/parquet_streaming.go index 9c3420c..80092b9 100644 --- a/chdb/driver/parquet_streaming.go +++ b/chdb/driver/parquet_streaming.go @@ -18,17 +18,17 @@ type parquetStreamingRows struct { curChunk chdbpurego.ChdbResult // current chunk reader *parquet.GenericReader[any] // parquet reader curRecord parquet.Row - buffer []parquet.Row // record buffer - bufferSize int // amount of records to preload into buffer - bufferIndex int64 // index in the current buffer - curRow int64 // row counter + buffer []parquet.Row // record buffer + schemaFields []parquet.Field // schema fields + bufferSize int // amount of records to preload into buffer + bufferIndex int64 // index in the current buffer + curRow int64 // row counter needNewBuffer bool useUnsafeStringReader bool } func (r *parquetStreamingRows) Columns() (out []string) { - sch := r.reader.Schema() - for _, f := range sch.Fields() { + for _, f := range r.schemaFields { out = append(out, f.Name()) } @@ -45,6 +45,7 @@ func (r *parquetStreamingRows) Close() error { r.stream.Free() r.curChunk = nil r.stream = nil + r.schemaFields = nil r.buffer = nil return nil @@ -85,6 +86,7 @@ func (r *parquetStreamingRows) readNextChunkFromStream() error { return io.EOF } r.reader = parquet.NewGenericReader[any](bytes.NewReader(r.curChunk.Buf())) + r.schemaFields = r.reader.Schema().Fields() return nil } @@ -182,11 +184,11 @@ func (r *parquetStreamingRows) Next(dest []driver.Value) error { } func (r *parquetStreamingRows) ColumnTypeDatabaseTypeName(index int) string { - return r.reader.Schema().Fields()[index].Type().String() + return r.schemaFields[index].Type().String() } func (r *parquetStreamingRows) ColumnTypeNullable(index int) (nullable, ok bool) { - return r.reader.Schema().Fields()[index].Optional(), true + return r.schemaFields[index].Optional(), true } func (r *parquetStreamingRows) ColumnTypePrecisionScale(index int) (precision, scale int64, ok bool) { @@ -194,7 +196,7 @@ func (r *parquetStreamingRows) ColumnTypePrecisionScale(index int) (precision, s } func (r *parquetStreamingRows) ColumnTypeScanType(index int) reflect.Type { - switch r.reader.Schema().Fields()[index].Type().Kind() { + switch r.schemaFields[index].Type().Kind() { case parquet.Boolean: return reflect.TypeOf(false) case parquet.Int32: From e203b4813854597182295cc139dc0703efe3904b Mon Sep 17 00:00:00 2001 From: s0und0fs1lence Date: Mon, 25 Aug 2025 17:16:36 +0000 Subject: [PATCH 2/5] remove unused method --- chdb-purego/chdb.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/chdb-purego/chdb.go b/chdb-purego/chdb.go index 6bb4e7a..3f55a6e 100644 --- a/chdb-purego/chdb.go +++ b/chdb-purego/chdb.go @@ -98,11 +98,6 @@ type connection struct { conn **chdb_conn } -// CancelQuery implements ChdbConn. -func (c *connection) CancelQuery(query ChdbResult) (err error) { - panic("unimplemented") -} - func newChdbConn(conn **chdb_conn) ChdbConn { c := &connection{ conn: conn, From c37c4eeead2b7a3aa757a7bfd57f2f38bc72acbb Mon Sep 17 00:00:00 2001 From: s0und0fs1lence Date: Sun, 31 Aug 2025 11:51:42 +0000 Subject: [PATCH 3/5] bump chdb version --- chdb-purego/binding.go | 36 +++++ chdb-purego/chdb.go | 82 ++++++----- chdb-purego/streaming.go | 21 +-- chdb-purego/types.go | 8 ++ chdb.h | 187 +++++++++++++++++++++++--- chdb/driver/parquet_streaming.go | 2 + chdb/driver/parquet_streaming_test.go | 1 + 7 files changed, 271 insertions(+), 66 deletions(-) diff --git a/chdb-purego/binding.go b/chdb-purego/binding.go index aa7aab1..24c6203 100644 --- a/chdb-purego/binding.go +++ b/chdb-purego/binding.go @@ -3,6 +3,7 @@ package chdbpurego import ( "os" "os/exec" + "unsafe" "github.com/ebitengine/purego" ) @@ -35,6 +36,7 @@ func findLibrary() string { } var ( + // old API queryStable func(argc int, argv []string) *local_result freeResult func(result *local_result) queryStableV2 func(argc int, argv []string) *local_result_v2 @@ -47,6 +49,23 @@ var ( streamingResultNext func(conn *chdb_conn, result *chdb_streaming_result) *local_result_v2 streamingResultDestroy func(result *chdb_streaming_result) streamingResultCancel func(conn *chdb_conn, result *chdb_streaming_result) + + // new API + chdbConnect func(argc int, argv []*byte) *chdb_connection + chdbCloseConn func(conn unsafe.Pointer) + chdbQuery func(conn unsafe.Pointer, query string, format string) *chdb_result + chdbStreamQuery func(conn unsafe.Pointer, query string, format string) *chdb_result + chdbStreamFetchResult func(conn unsafe.Pointer, result *chdb_result) *chdb_result + chdbStreamCancelQuery func(conn unsafe.Pointer, result *chdb_result) + chdbDestroyQueryResult func(result *chdb_result) + chdbResultBuffer func(result *chdb_result) *byte + chdbResultLen func(result *chdb_result) uint //size_t + chdbResultElapsed func(result *chdb_result) float64 // double + chdbResultRowsRead func(result *chdb_result) uint64 + chdbResultBytesRead func(result *chdb_result) uint64 + chdbResultStorageRowsRead func(result *chdb_result) uint64 + chdbResultStorageBytesRead func(result *chdb_result) uint64 + chdbResultError func(result *chdb_result) string ) func init() { @@ -69,4 +88,21 @@ func init() { purego.RegisterLibFunc(&streamingResultCancel, libchdb, "chdb_streaming_cancel_query") purego.RegisterLibFunc(&streamingResultDestroy, libchdb, "chdb_destroy_result") + // new API + purego.RegisterLibFunc(&chdbConnect, libchdb, "chdb_connect") + purego.RegisterLibFunc(&chdbCloseConn, libchdb, "chdb_close_conn") + purego.RegisterLibFunc(&chdbQuery, libchdb, "chdb_query") + purego.RegisterLibFunc(&chdbStreamQuery, libchdb, "chdb_stream_query") + purego.RegisterLibFunc(&chdbStreamFetchResult, libchdb, "chdb_stream_fetch_result") + purego.RegisterLibFunc(&chdbStreamCancelQuery, libchdb, "chdb_stream_cancel_query") + purego.RegisterLibFunc(&chdbDestroyQueryResult, libchdb, "chdb_destroy_query_result") + purego.RegisterLibFunc(&chdbResultBuffer, libchdb, "chdb_result_buffer") + purego.RegisterLibFunc(&chdbResultLen, libchdb, "chdb_result_length") + purego.RegisterLibFunc(&chdbResultElapsed, libchdb, "chdb_result_elapsed") + purego.RegisterLibFunc(&chdbResultRowsRead, libchdb, "chdb_result_rows_read") + purego.RegisterLibFunc(&chdbResultBytesRead, libchdb, "chdb_result_bytes_read") + purego.RegisterLibFunc(&chdbResultStorageRowsRead, libchdb, "chdb_result_storage_rows_read") + purego.RegisterLibFunc(&chdbResultStorageBytesRead, libchdb, "chdb_result_storage_bytes_read") + purego.RegisterLibFunc(&chdbResultError, libchdb, "chdb_result_error") + } diff --git a/chdb-purego/chdb.go b/chdb-purego/chdb.go index 3f55a6e..63b9bb7 100644 --- a/chdb-purego/chdb.go +++ b/chdb-purego/chdb.go @@ -12,12 +12,12 @@ import ( ) type result struct { - localResv2 *local_result_v2 + chdb_result *chdb_result } -func newChdbResult(cRes *local_result_v2) ChdbResult { +func newChdbResult(cRes *chdb_result) ChdbResult { res := &result{ - localResv2: cRes, + chdb_result: cRes, } // runtime.SetFinalizer(res, res.Free) return res @@ -26,35 +26,40 @@ func newChdbResult(cRes *local_result_v2) ChdbResult { // Buf implements ChdbResult. func (c *result) Buf() []byte { - if c.localResv2 != nil { - if c.localResv2.buf != nil && c.localResv2.len > 0 { - return unsafe.Slice(c.localResv2.buf, c.localResv2.len) + if c.chdb_result != nil { + buf := chdbResultBuffer(c.chdb_result) + if buf != nil { + // Assuming we have a way to get the length of the buffer + // Thlis is a placeholder; replace with actual length retrieva logic + length := c.Len() // Replace with actual length + return unsafe.Slice(buf, length) } + } return nil } // BytesRead implements ChdbResult. func (c *result) BytesRead() uint64 { - if c.localResv2 != nil { - return c.localResv2.bytes_read + if c.chdb_result != nil { + return chdbResultBytesRead(c.chdb_result) } return 0 } // Elapsed implements ChdbResult. func (c *result) Elapsed() float64 { - if c.localResv2 != nil { - return c.localResv2.elapsed + if c.chdb_result != nil { + return chdbResultElapsed(c.chdb_result) } return 0 } // Error implements ChdbResult. func (c *result) Error() error { - if c.localResv2 != nil { - if c.localResv2.error_message != nil { - return errors.New(ptrToGoString(c.localResv2.error_message)) + if c.chdb_result != nil { + if s := chdbResultError(c.chdb_result); s != "" { + return errors.New("test") } } return nil @@ -62,25 +67,25 @@ func (c *result) Error() error { // Free implements ChdbResult. func (c *result) Free() { - if c.localResv2 != nil { - freeResultV2(c.localResv2) - c.localResv2 = nil + if c.chdb_result != nil { + chdbDestroyQueryResult(c.chdb_result) + c.chdb_result = nil } } // Len implements ChdbResult. func (c *result) Len() int { - if c.localResv2 != nil { - return int(c.localResv2.len) + if c.chdb_result != nil { + return int(chdbResultLen(c.chdb_result)) } return 0 } // RowsRead implements ChdbResult. func (c *result) RowsRead() uint64 { - if c.localResv2 != nil { - return c.localResv2.rows_read + if c.chdb_result != nil { + return chdbResultRowsRead(c.chdb_result) } return 0 } @@ -95,10 +100,10 @@ func (c *result) String() string { } type connection struct { - conn **chdb_conn + conn *chdb_connection } -func newChdbConn(conn **chdb_conn) ChdbConn { +func newChdbConn(conn *chdb_connection) ChdbConn { c := &connection{ conn: conn, } @@ -109,28 +114,26 @@ func newChdbConn(conn **chdb_conn) ChdbConn { // Close implements ChdbConn. func (c *connection) Close() { if c.conn != nil { - closeConn(c.conn) + chdbCloseConn(c.conn.internal_data) } } // Query implements ChdbConn. func (c *connection) Query(queryStr string, formatStr string) (result ChdbResult, err error) { - if c.conn == nil { return nil, fmt.Errorf("invalid connection") } - rawConn := *c.conn - - res := queryConn(rawConn, queryStr, formatStr) + res := chdbQuery(c.conn.internal_data, queryStr, formatStr) if res == nil { // According to the C ABI of chDB v1.2.0, the C function query_stable_v2 // returns nil if the query returns no data. This is not an error. We // will change this behavior in the future. return newChdbResult(res), nil } - if res.error_message != nil { - return nil, errors.New(ptrToGoString(res.error_message)) + errMsg := chdbResultError(res) + if errMsg != "" { + return nil, errors.New("test") } return newChdbResult(res), nil @@ -143,28 +146,23 @@ func (c *connection) QueryStreaming(queryStr string, formatStr string) (result C return nil, fmt.Errorf("invalid connection") } - rawConn := *c.conn - - res := queryConnStreaming(rawConn, queryStr, formatStr) + res := chdbStreamQuery(c.conn.internal_data, queryStr, formatStr) if res == nil { // According to the C ABI of chDB v1.2.0, the C function query_stable_v2 // returns nil if the query returns no data. This is not an error. We // will change this behavior in the future. - return newStreamingResult(rawConn, res), nil + return newStreamingResult(c.conn, res), nil } - if s := streamingResultError(res); s != nil { - return nil, errors.New(*s) + if s := chdbResultError(res); s != "" { + return nil, errors.New(s) } - return newStreamingResult(rawConn, res), nil + return newStreamingResult(c.conn, res), nil } func (c *connection) Ready() bool { if c.conn != nil { - deref := *c.conn - if deref != nil { - return deref.connected - } + return true } return false } @@ -216,7 +214,7 @@ func NewConnection(argc int, argv []string) (ChdbConn, error) { // fmt.Println("arg: ", arg) // } - var conn **chdb_conn + var conn *chdb_connection var err error func() { defer func() { @@ -224,7 +222,7 @@ func NewConnection(argc int, argv []string) (ChdbConn, error) { err = fmt.Errorf("C++ exception: %v", r) } }() - conn = connectChdb(len(new_argv), c_argv) + conn = chdbConnect(len(new_argv), c_argv) }() if err != nil { diff --git a/chdb-purego/streaming.go b/chdb-purego/streaming.go index 08a144f..b298564 100644 --- a/chdb-purego/streaming.go +++ b/chdb-purego/streaming.go @@ -3,12 +3,12 @@ package chdbpurego import "errors" type streamingResult struct { - curConn *chdb_conn - stream *chdb_streaming_result + curConn *chdb_connection + stream *chdb_result curChunk ChdbResult } -func newStreamingResult(conn *chdb_conn, cRes *chdb_streaming_result) ChdbStreamResult { +func newStreamingResult(conn *chdb_connection, cRes *chdb_result) ChdbStreamResult { // nextChunk := streamingResultNext(conn, cRes) // if nextChunk == nil { @@ -28,16 +28,19 @@ func newStreamingResult(conn *chdb_conn, cRes *chdb_streaming_result) ChdbStream // Error implements ChdbStreamResult. func (c *streamingResult) Error() error { - if s := streamingResultError(c.stream); s != nil { - return errors.New(*s) + if s := chdbResultError(c.stream); s != "" { + return errors.New("test") } return nil } // Free implements ChdbStreamResult. func (c *streamingResult) Free() { - streamingResultCancel(c.curConn, c.stream) - streamingResultDestroy(c.stream) + if c.curConn != nil && c.stream != nil { + //chdbStreamCancelQuery(c.curConn.internal_data, c.stream) + chdbDestroyQueryResult(c.stream) + } + c.stream = nil if c.curChunk != nil { c.curChunk.Free() @@ -53,7 +56,7 @@ func (c *streamingResult) Cancel() { // GetNext implements ChdbStreamResult. func (c *streamingResult) GetNext() ChdbResult { if c.curChunk == nil { - nextChunk := streamingResultNext(c.curConn, c.stream) + nextChunk := chdbStreamFetchResult(c.curConn.internal_data, c.stream) if nextChunk == nil { return nil } @@ -63,7 +66,7 @@ func (c *streamingResult) GetNext() ChdbResult { // free the current chunk before getting the next one c.curChunk.Free() c.curChunk = nil - nextChunk := streamingResultNext(c.curConn, c.stream) + nextChunk := chdbStreamFetchResult(c.curConn.internal_data, c.stream) if nextChunk == nil { return nil } diff --git a/chdb-purego/types.go b/chdb-purego/types.go index 3a67868..2bac48e 100644 --- a/chdb-purego/types.go +++ b/chdb-purego/types.go @@ -36,6 +36,14 @@ type chdb_conn struct { queue unsafe.Pointer } +type chdb_connection struct { + internal_data unsafe.Pointer +} + +type chdb_result struct { + internal_data unsafe.Pointer +} + type ChdbResult interface { Buf() []byte // String rapresentation of the the buffer diff --git a/chdb.h b/chdb.h index 498ca61..7f2ef82 100644 --- a/chdb.h +++ b/chdb.h @@ -1,16 +1,19 @@ #pragma once #ifdef __cplusplus -# include -# include +#include +#include extern "C" { #else -# include -# include -# include +#include +#include +#include #endif #define CHDB_EXPORT __attribute__((visibility("default"))) + +#ifndef CHDB_NO_DEPRECATED +// WARNING: The following structs are deprecated and will be removed in a future version. struct local_result { char * buf; @@ -45,12 +48,6 @@ struct local_result_v2 }; #endif -CHDB_EXPORT struct local_result * query_stable(int argc, char ** argv); -CHDB_EXPORT void free_result(struct local_result * result); - -CHDB_EXPORT struct local_result_v2 * query_stable_v2(int argc, char ** argv); -CHDB_EXPORT void free_result_v2(struct local_result_v2 * result); - /** * Connection structure for chDB * Contains server instance, connection state, and query processing queue @@ -62,10 +59,37 @@ struct chdb_conn void * queue; /* Query processing queue */ }; -typedef struct { +typedef struct +{ void * internal_data; } chdb_streaming_result; +#endif + +// Opaque handle for query results. +// Internal data structure managed by chDB implementation. +// Users should only interact through API functions. +typedef struct chdb_result_ +{ + void * internal_data; +} chdb_result; + +// Connection handle wrapping database session state. +// Internal data structure managed by chDB implementation. +// Users should only interact through API functions. +typedef struct chdb_connection_ +{ + void * internal_data; +} * chdb_connection; + +#ifndef CHDB_NO_DEPRECATED +// WARNING: The following interfaces are deprecated and will be removed in a future version. +CHDB_EXPORT struct local_result * query_stable(int argc, char ** argv); +CHDB_EXPORT void free_result(struct local_result * result); + +CHDB_EXPORT struct local_result_v2 * query_stable_v2(int argc, char ** argv); +CHDB_EXPORT void free_result_v2(struct local_result_v2 * result); + /** * Creates a new chDB connection. * Only one active connection is allowed per process. @@ -115,7 +139,7 @@ CHDB_EXPORT chdb_streaming_result * query_conn_streaming(struct chdb_conn * conn * @param result Streaming result handle from query_conn_streaming() * @return Null-terminated error message string, or NULL if no error occurred */ -CHDB_EXPORT const char * chdb_streaming_result_error(chdb_streaming_result * result); + CHDB_EXPORT const char * chdb_streaming_result_error(chdb_streaming_result * result); /** * Fetches next chunk of streaming results. @@ -141,8 +165,141 @@ CHDB_EXPORT void chdb_streaming_cancel_query(struct chdb_conn * conn, chdb_strea * @param result Streaming result handle to destroy * @warning Must be called even if query was finished or canceled */ -CHDB_EXPORT void chdb_destroy_result(chdb_streaming_result * result); + CHDB_EXPORT void chdb_destroy_result(chdb_streaming_result * result); + +#endif + +/** + * Creates a new chDB connection. + * Only one active connection is allowed per process. + * Creating a new connection with different path requires closing existing connection. + * + * @param argc Number of command-line arguments + * @param argv Command-line arguments array (--path= to specify database location) + * @return Pointer to connection pointer, or NULL on failure + * @note Default path is ":memory:" if not specified + */ +CHDB_EXPORT chdb_connection * chdb_connect(int argc, char ** argv); + +/** + * Closes an existing chDB connection and cleans up resources. + * Thread-safe function that handles connection shutdown and cleanup. + * + * @param conn Pointer to connection pointer to close + */ + CHDB_EXPORT void chdb_close_conn(chdb_connection * conn); + +/** + * Executes a query on the given connection. + * Thread-safe function that handles query execution in a separate thread. + * + * @param conn Connection to execute query on + * @param query SQL query string to execute + * @param format Output format string (e.g., "CSV", default format) + * @return Query result structure containing output or error message + * @note Returns error result if connection is invalid or closed + */ +CHDB_EXPORT chdb_result * chdb_query(chdb_connection conn, const char * query, const char * format); + +/** + * @brief Execute a query with command-line interface + * @param argc Argument count (same as main()'s argc) + * @param argv Argument vector (same as main()'s argv) + * @return Query result structure containing output or error message + */ +CHDB_EXPORT chdb_result * chdb_query_cmdline(int argc, char ** argv); + +/** + * Executes a streaming query on the given connection. + * @brief Initializes streaming query execution and returns result handle + * @param conn Connection to execute query on + * @param query SQL query string to execute + * @param format Output format string (e.g. "CSV", default format) + * @return Streaming result handle containing query state or error message + * @note Returns error result if connection is invalid or closed + */ +CHDB_EXPORT chdb_result * chdb_stream_query(chdb_connection conn, const char * query, const char * format); + +/** + * Fetches next chunk of streaming results. + * @brief Iterates through streaming query results + * @param conn Active connection handle + * @param result Streaming result handle from query_conn_streaming() + * @return Materialized result chunk with data + * @note Returns empty result when stream ends + */ +CHDB_EXPORT chdb_result * chdb_stream_fetch_result(chdb_connection conn, chdb_result * result); + +/** + * Cancels ongoing streaming query. + * @brief Aborts streaming query execution and cleans up resources + * @param conn Active connection handle + * @param result Streaming result handle to cancel + */ +CHDB_EXPORT void chdb_stream_cancel_query(chdb_connection conn, chdb_result * result); + +/** + * Destroys a query result and releases all associated resources + * @param result The result handle to destroy + */ +CHDB_EXPORT void chdb_destroy_query_result(chdb_result * result); + +/** + * Gets pointer to the result data buffer + * @param result The query result handle + * @return Read-only pointer to the result data + */ +CHDB_EXPORT char * chdb_result_buffer(chdb_result * result); + +/** + * Gets the length of the result data + * @param result The query result handle + * @return Size of result data in bytes + */ +CHDB_EXPORT size_t chdb_result_length(chdb_result * result); + +/** + * Gets query execution time + * @param result The query result handle + * @return Elapsed time in seconds + */ +CHDB_EXPORT double chdb_result_elapsed(chdb_result * result); + +/** + * Gets total rows in query result + * @param result The query result handle + * @return Number of rows contained in the result set + */ +CHDB_EXPORT uint64_t chdb_result_rows_read(chdb_result * result); + +/** + * Gets the total bytes occupied by the result set in internal binary format + * @param result The query result handle + * @return Number of bytes occupied by the result set in internal binary representation + */ +CHDB_EXPORT uint64_t chdb_result_bytes_read(chdb_result * result); + +/** + * Gets rows read from storage engine + * @param result The query result handle + * @return Number of rows read from storage + */ +CHDB_EXPORT uint64_t chdb_result_storage_rows_read(chdb_result * result); + +/** + * Gets bytes read from storage engine + * @param result The query result handle + * @return Number of bytes read from storage engine + */ +CHDB_EXPORT uint64_t chdb_result_storage_bytes_read(chdb_result * result); + +/** + * Retrieves error message from query execution + * @param result The query result handle + * @return Null-terminated error description, NULL if no error + */ +CHDB_EXPORT const char * chdb_result_error(chdb_result * result); #ifdef __cplusplus } -#endif \ No newline at end of file +#endif diff --git a/chdb/driver/parquet_streaming.go b/chdb/driver/parquet_streaming.go index 80092b9..71348b4 100644 --- a/chdb/driver/parquet_streaming.go +++ b/chdb/driver/parquet_streaming.go @@ -75,6 +75,8 @@ func (r *parquetStreamingRows) readNextChunkFromStream() error { if err := r.reader.Close(); err != nil { return err } + // free the previous chunk + r.curChunk.Free() r.curChunk = r.stream.GetNext() if r.curChunk == nil { return io.EOF diff --git a/chdb/driver/parquet_streaming_test.go b/chdb/driver/parquet_streaming_test.go index 4b8184f..54c3e55 100644 --- a/chdb/driver/parquet_streaming_test.go +++ b/chdb/driver/parquet_streaming_test.go @@ -41,6 +41,7 @@ func TestDbWithParquetStreaming(t *testing.T) { } } + } func TestDBWithParquetStreamingSession(t *testing.T) { From fda725ce28f9226ec66bceadf3146e25d602568d Mon Sep 17 00:00:00 2001 From: s0und0fs1lence Date: Sun, 31 Aug 2025 11:52:31 +0000 Subject: [PATCH 4/5] fix error messages --- chdb-purego/chdb.go | 4 ++-- chdb-purego/streaming.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/chdb-purego/chdb.go b/chdb-purego/chdb.go index 63b9bb7..bfa1a92 100644 --- a/chdb-purego/chdb.go +++ b/chdb-purego/chdb.go @@ -59,7 +59,7 @@ func (c *result) Elapsed() float64 { func (c *result) Error() error { if c.chdb_result != nil { if s := chdbResultError(c.chdb_result); s != "" { - return errors.New("test") + return errors.New(s) } } return nil @@ -133,7 +133,7 @@ func (c *connection) Query(queryStr string, formatStr string) (result ChdbResult } errMsg := chdbResultError(res) if errMsg != "" { - return nil, errors.New("test") + return nil, errors.New(errMsg) } return newChdbResult(res), nil diff --git a/chdb-purego/streaming.go b/chdb-purego/streaming.go index b298564..42a4964 100644 --- a/chdb-purego/streaming.go +++ b/chdb-purego/streaming.go @@ -29,7 +29,7 @@ func newStreamingResult(conn *chdb_connection, cRes *chdb_result) ChdbStreamResu // Error implements ChdbStreamResult. func (c *streamingResult) Error() error { if s := chdbResultError(c.stream); s != "" { - return errors.New("test") + return errors.New(s) } return nil } From 8872c024a967e1003e7c7f2a5bfa726e10179c51 Mon Sep 17 00:00:00 2001 From: s0und0fs1lence Date: Sun, 31 Aug 2025 12:32:24 +0000 Subject: [PATCH 5/5] fix close conn --- chdb-purego/binding.go | 4 ++-- chdb-purego/chdb.go | 2 +- chdb-purego/streaming.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/chdb-purego/binding.go b/chdb-purego/binding.go index 24c6203..f65920f 100644 --- a/chdb-purego/binding.go +++ b/chdb-purego/binding.go @@ -52,11 +52,11 @@ var ( // new API chdbConnect func(argc int, argv []*byte) *chdb_connection - chdbCloseConn func(conn unsafe.Pointer) + chdbCloseConn func(conn *chdb_connection) chdbQuery func(conn unsafe.Pointer, query string, format string) *chdb_result chdbStreamQuery func(conn unsafe.Pointer, query string, format string) *chdb_result chdbStreamFetchResult func(conn unsafe.Pointer, result *chdb_result) *chdb_result - chdbStreamCancelQuery func(conn unsafe.Pointer, result *chdb_result) + chdbStreamCancelQuery func(conn *chdb_connection, result *chdb_result) chdbDestroyQueryResult func(result *chdb_result) chdbResultBuffer func(result *chdb_result) *byte chdbResultLen func(result *chdb_result) uint //size_t diff --git a/chdb-purego/chdb.go b/chdb-purego/chdb.go index bfa1a92..6b9c3b0 100644 --- a/chdb-purego/chdb.go +++ b/chdb-purego/chdb.go @@ -114,7 +114,7 @@ func newChdbConn(conn *chdb_connection) ChdbConn { // Close implements ChdbConn. func (c *connection) Close() { if c.conn != nil { - chdbCloseConn(c.conn.internal_data) + chdbCloseConn(c.conn) } } diff --git a/chdb-purego/streaming.go b/chdb-purego/streaming.go index 42a4964..7c7db2a 100644 --- a/chdb-purego/streaming.go +++ b/chdb-purego/streaming.go @@ -37,7 +37,7 @@ func (c *streamingResult) Error() error { // Free implements ChdbStreamResult. func (c *streamingResult) Free() { if c.curConn != nil && c.stream != nil { - //chdbStreamCancelQuery(c.curConn.internal_data, c.stream) + chdbStreamCancelQuery(c.curConn, c.stream) chdbDestroyQueryResult(c.stream) }