diff --git a/src/include/query.h b/src/include/query.h index a00be4aa7..cd95145fd 100644 --- a/src/include/query.h +++ b/src/include/query.h @@ -150,3 +150,8 @@ PyObject *AerospikeQuery_Get_Partitions_status(AerospikeQuery *self); PyObject *StoreUnicodePyObject(AerospikeQuery *self, PyObject *obj); int64_t pyobject_to_int64(PyObject *py_obj); + +PyObject *AerospikeQuery_Foreach_Invoke(AerospikeQuery *self, + PyObject *py_callback, + PyObject *py_policy, + PyObject *py_options); diff --git a/src/main/query/foreach.c b/src/main/query/foreach.c index 643131e12..a76718732 100644 --- a/src/main/query/foreach.c +++ b/src/main/query/foreach.c @@ -33,11 +33,13 @@ // Struct for Python User-Data for the Callback typedef struct { - PyObject *callback; + PyObject *py_obj; AerospikeClient *client; int partition_query; as_vector thread_errors; pthread_mutex_t thread_errors_mutex; + // If false, it is a python list + bool is_pyobj_callback; } LocalData; static bool each_result(const as_val *val, void *udata) @@ -50,7 +52,7 @@ static bool each_result(const as_val *val, void *udata) // Extract callback user-data LocalData *data = (LocalData *)udata; - PyObject *py_callback = data->callback; + PyObject *py_callback_or_list_of_results = data->py_obj; // Python Function Arguments and Result Value PyObject *py_arglist = NULL; @@ -72,45 +74,60 @@ static bool each_result(const as_val *val, void *udata) goto EXIT_CALLBACK; } - // Build Python Function Arguments - if (data->partition_query) { + if (data->is_pyobj_callback == false) { + // query.results() + if (py_result) { + int retval = + PyList_Append(py_callback_or_list_of_results, py_result); + Py_DECREF(py_result); + if (retval == -1) { + // TODO: should fail, not return true + goto EXIT_CALLBACK; + } + } + } + else { + // Build Python Function Arguments + if (data->partition_query) { - uint32_t part_id = 0; + uint32_t part_id = 0; - as_record *rec = as_record_fromval(val); + as_record *rec = as_record_fromval(val); - if (rec->key.digest.init) { - part_id = - as_partition_getid(rec->key.digest.value, CLUSTER_NPARTITIONS); - } + if (rec->key.digest.init) { + part_id = as_partition_getid(rec->key.digest.value, + CLUSTER_NPARTITIONS); + } - py_arglist = PyTuple_New(2); + py_arglist = PyTuple_New(2); - PyTuple_SetItem(py_arglist, 0, PyLong_FromUnsignedLong(part_id)); - PyTuple_SetItem(py_arglist, 1, py_result); - } - else { - py_arglist = PyTuple_New(1); - PyTuple_SetItem(py_arglist, 0, py_result); - } + PyTuple_SetItem(py_arglist, 0, PyLong_FromUnsignedLong(part_id)); + PyTuple_SetItem(py_arglist, 1, py_result); + } + else { + py_arglist = PyTuple_New(1); + PyTuple_SetItem(py_arglist, 0, py_result); + } - // Invoke Python Callback - py_return = PyObject_Call(py_callback, py_arglist, NULL); - - // Release Python Function Arguments - Py_DECREF(py_arglist); - // handle return value - if (!py_return) { - // an exception was raised, handle it (someday) - // for now, we bail from the loop - as_error_update(&thread_err_local, AEROSPIKE_ERR_CLIENT, - "Callback function contains an error"); - retval = false; - } - else if (py_return == Py_False) { - retval = false; + // Invoke Python Callback + py_return = + PyObject_Call(py_callback_or_list_of_results, py_arglist, NULL); + + // Release Python Function Arguments + Py_DECREF(py_arglist); + // handle return value + if (!py_return) { + // an exception was raised, handle it (someday) + // for now, we bail from the loop + as_error_update(&thread_err_local, AEROSPIKE_ERR_CLIENT, + "Callback function contains an error"); + retval = false; + } + else if (py_return == Py_False) { + retval = false; + } + Py_XDECREF(py_return); } - Py_XDECREF(py_return); EXIT_CALLBACK: if (thread_err_local.code != AEROSPIKE_OK) { @@ -129,27 +146,13 @@ static bool each_result(const as_val *val, void *udata) return retval; } -PyObject *AerospikeQuery_Foreach(AerospikeQuery *self, PyObject *args, - PyObject *kwds) +PyObject *AerospikeQuery_Foreach_Invoke(AerospikeQuery *self, + PyObject *py_callback, + PyObject *py_policy, + PyObject *py_options) { - // Python Function Arguments - PyObject *py_callback = NULL; - PyObject *py_policy = NULL; - PyObject *py_options = NULL; - // Python Function Keyword Arguments - static char *kwlist[] = {"callback", "policy", "options", NULL}; - - // Python Function Argument Parsing - if (PyArg_ParseTupleAndKeywords(args, kwds, "O|OO:foreach", kwlist, - &py_callback, &py_policy, - &py_options) == false) { - as_query_destroy(&self->query); - return NULL; - } - // Initialize callback user data - LocalData data; - data.callback = py_callback; + LocalData data = {0}; data.client = self->client; data.partition_query = 0; @@ -185,6 +188,17 @@ PyObject *AerospikeQuery_Foreach(AerospikeQuery *self, PyObject *args, goto CLEANUP; } + if (!py_callback) { + data.py_obj = PyList_New(0); + if (data.py_obj == NULL) { + goto CLEANUP; + } + } + else { + data.py_obj = py_callback; + } + data.is_pyobj_callback = py_callback != NULL; + // Convert python policy object to as_policy_exists pyobject_to_policy_query( self->client, &err, py_policy, &query_policy, &query_policy_p, @@ -261,10 +275,38 @@ PyObject *AerospikeQuery_Foreach(AerospikeQuery *self, PyObject *args, pthread_mutex_destroy(&data.thread_errors_mutex); if (err.code != AEROSPIKE_OK) { + // TODO: results() used raise_exception(); + Py_XDECREF(data.py_obj); raise_exception_base(&err, Py_None, Py_None, Py_None, Py_None, Py_None); return NULL; } - Py_INCREF(Py_None); - return Py_None; + if (data.is_pyobj_callback) { + Py_RETURN_NONE; + } + else { + return data.py_obj; + } +} + +PyObject *AerospikeQuery_Foreach(AerospikeQuery *self, PyObject *args, + PyObject *kwds) +{ + // Python Function Arguments + PyObject *py_callback = NULL; + PyObject *py_policy = NULL; + PyObject *py_options = NULL; + // Python Function Keyword Arguments + static char *kwlist[] = {"callback", "policy", "options", NULL}; + + // Python Function Argument Parsing + if (PyArg_ParseTupleAndKeywords(args, kwds, "O|OO:foreach", kwlist, + &py_callback, &py_policy, + &py_options) == false) { + as_query_destroy(&self->query); + return NULL; + } + + return AerospikeQuery_Foreach_Invoke(self, py_callback, py_policy, + py_options); } diff --git a/src/main/query/results.c b/src/main/query/results.c index bc0994cb4..656aacc4f 100644 --- a/src/main/query/results.c +++ b/src/main/query/results.c @@ -32,148 +32,18 @@ #undef TRACE #define TRACE() -typedef struct { - PyObject *py_results; - AerospikeClient *client; -} LocalData; - -static bool each_result(const as_val *val, void *udata) -{ - if (!val) { - return false; - } - - PyObject *py_results = NULL; - LocalData *data = (LocalData *)udata; - py_results = data->py_results; - PyObject *py_result = NULL; - - as_error err; - - PyGILState_STATE gstate; - gstate = PyGILState_Ensure(); - - val_to_pyobject(data->client, &err, val, &py_result); - - if (py_result) { - PyList_Append(py_results, py_result); - Py_DECREF(py_result); - } - PyGILState_Release(gstate); - - return true; -} - PyObject *AerospikeQuery_Results(AerospikeQuery *self, PyObject *args, PyObject *kwds) { PyObject *py_policy = NULL; - PyObject *py_results = NULL; PyObject *py_options = NULL; static char *kwlist[] = {"policy", "options", NULL}; - LocalData data; - data.client = self->client; - if (PyArg_ParseTupleAndKeywords(args, kwds, "|OO:results", kwlist, &py_policy, &py_options) == false) { return NULL; } - as_error err; - as_error_init(&err); - - as_policy_query query_policy; - as_policy_query *query_policy_p = NULL; - - // For converting expressions. - as_exp exp_list; - as_exp *exp_list_p = NULL; - - as_partition_filter partition_filter = {0}; - as_partition_filter *partition_filter_p = NULL; - as_partitions_status *ps = NULL; - - if (!self || !self->client->as) { - as_error_update(&err, AEROSPIKE_ERR_PARAM, "Invalid aerospike object"); - goto CLEANUP; - } - - if (!self->client->is_conn_16) { - as_error_update(&err, AEROSPIKE_ERR_CLUSTER, - "No connection to aerospike cluster"); - goto CLEANUP; - } - - // Convert python policy object to as_policy_query - pyobject_to_policy_query( - self->client, &err, py_policy, &query_policy, &query_policy_p, - &self->client->as->config.policies.query, &exp_list, &exp_list_p); - if (err.code != AEROSPIKE_OK) { - goto CLEANUP; - } - - if (set_query_options(&err, py_options, &self->query) != AEROSPIKE_OK) { - goto CLEANUP; - } - - if (py_policy) { - PyObject *py_partition_filter = - PyDict_GetItemString(py_policy, "partition_filter"); - if (py_partition_filter) { - if (convert_partition_filter(self->client, py_partition_filter, - &partition_filter, &ps, - &err) == AEROSPIKE_OK) { - partition_filter_p = &partition_filter; - } - else { - goto CLEANUP; - } - } - } - as_error_reset(&err); - - py_results = PyList_New(0); - data.py_results = py_results; - - Py_BEGIN_ALLOW_THREADS - - if (partition_filter_p) { - if (ps) { - as_partition_filter_set_partitions(partition_filter_p, ps); - } - - aerospike_query_partitions(self->client->as, &err, query_policy_p, - &self->query, partition_filter_p, - each_result, &data); - - if (ps) { - as_partitions_status_release(ps); - } - } - else { - aerospike_query_foreach(self->client->as, &err, query_policy_p, - &self->query, each_result, &data); - } - - Py_END_ALLOW_THREADS - -CLEANUP: /*??trace()*/ - if (exp_list_p) { - as_exp_destroy(exp_list_p); - } - - if (err.code != AEROSPIKE_OK) { - Py_XDECREF(py_results); - raise_exception(&err); - return NULL; - } - - if (self->query.apply.arglist) { - as_arraylist_destroy((as_arraylist *)self->query.apply.arglist); - } - self->query.apply.arglist = NULL; - - return py_results; + return AerospikeQuery_Foreach_Invoke(self, NULL, py_policy, py_options); }