Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/include/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,8 @@ PyObject *AerospikeQuery_Get_Partitions_status(AerospikeQuery *self);
PyObject *StoreUnicodePyObject(AerospikeQuery *self, PyObject *obj);

int64_t pyobject_to_int64(PyObject *py_obj);

PyObject *AerospikeQuery_Foreach_Invoke(AerospikeQuery *self,
PyObject *py_callback,
PyObject *py_policy,
PyObject *py_options);
152 changes: 97 additions & 55 deletions src/main/query/foreach.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@

// Struct for Python User-Data for the Callback
typedef struct {
PyObject *callback;
PyObject *py_obj;
AerospikeClient *client;
int partition_query;
as_vector thread_errors;
pthread_mutex_t thread_errors_mutex;
// If false, it is a python list
bool is_pyobj_callback;
} LocalData;

static bool each_result(const as_val *val, void *udata)
Expand All @@ -50,7 +52,7 @@ static bool each_result(const as_val *val, void *udata)

// Extract callback user-data
LocalData *data = (LocalData *)udata;
PyObject *py_callback = data->callback;
PyObject *py_callback_or_list_of_results = data->py_obj;

// Python Function Arguments and Result Value
PyObject *py_arglist = NULL;
Expand All @@ -72,45 +74,60 @@ static bool each_result(const as_val *val, void *udata)
goto EXIT_CALLBACK;
}

// Build Python Function Arguments
if (data->partition_query) {
if (data->is_pyobj_callback == false) {
// query.results()
if (py_result) {
int retval =
PyList_Append(py_callback_or_list_of_results, py_result);
Py_DECREF(py_result);
if (retval == -1) {
// TODO: should fail, not return true
goto EXIT_CALLBACK;
}
}
}
else {
// Build Python Function Arguments
if (data->partition_query) {

uint32_t part_id = 0;
uint32_t part_id = 0;

as_record *rec = as_record_fromval(val);
as_record *rec = as_record_fromval(val);

if (rec->key.digest.init) {
part_id =
as_partition_getid(rec->key.digest.value, CLUSTER_NPARTITIONS);
}
if (rec->key.digest.init) {
part_id = as_partition_getid(rec->key.digest.value,
CLUSTER_NPARTITIONS);
}

py_arglist = PyTuple_New(2);
py_arglist = PyTuple_New(2);

PyTuple_SetItem(py_arglist, 0, PyLong_FromUnsignedLong(part_id));
PyTuple_SetItem(py_arglist, 1, py_result);
}
else {
py_arglist = PyTuple_New(1);
PyTuple_SetItem(py_arglist, 0, py_result);
}
PyTuple_SetItem(py_arglist, 0, PyLong_FromUnsignedLong(part_id));
PyTuple_SetItem(py_arglist, 1, py_result);
}
else {
py_arglist = PyTuple_New(1);
PyTuple_SetItem(py_arglist, 0, py_result);
}

// Invoke Python Callback
py_return = PyObject_Call(py_callback, py_arglist, NULL);

// Release Python Function Arguments
Py_DECREF(py_arglist);
// handle return value
if (!py_return) {
// an exception was raised, handle it (someday)
// for now, we bail from the loop
as_error_update(&thread_err_local, AEROSPIKE_ERR_CLIENT,
"Callback function contains an error");
retval = false;
}
else if (py_return == Py_False) {
retval = false;
// Invoke Python Callback
py_return =
PyObject_Call(py_callback_or_list_of_results, py_arglist, NULL);

// Release Python Function Arguments
Py_DECREF(py_arglist);
// handle return value
if (!py_return) {
// an exception was raised, handle it (someday)
// for now, we bail from the loop
as_error_update(&thread_err_local, AEROSPIKE_ERR_CLIENT,
"Callback function contains an error");
retval = false;
}
else if (py_return == Py_False) {
retval = false;
}
Py_XDECREF(py_return);
}
Py_XDECREF(py_return);

EXIT_CALLBACK:
if (thread_err_local.code != AEROSPIKE_OK) {
Expand All @@ -129,27 +146,13 @@ static bool each_result(const as_val *val, void *udata)
return retval;
}

PyObject *AerospikeQuery_Foreach(AerospikeQuery *self, PyObject *args,
PyObject *kwds)
PyObject *AerospikeQuery_Foreach_Invoke(AerospikeQuery *self,
PyObject *py_callback,
PyObject *py_policy,
PyObject *py_options)
{
// Python Function Arguments
PyObject *py_callback = NULL;
PyObject *py_policy = NULL;
PyObject *py_options = NULL;
// Python Function Keyword Arguments
static char *kwlist[] = {"callback", "policy", "options", NULL};

// Python Function Argument Parsing
if (PyArg_ParseTupleAndKeywords(args, kwds, "O|OO:foreach", kwlist,
&py_callback, &py_policy,
&py_options) == false) {
as_query_destroy(&self->query);
return NULL;
}

// Initialize callback user data
LocalData data;
data.callback = py_callback;
LocalData data = {0};
data.client = self->client;
data.partition_query = 0;

Expand Down Expand Up @@ -185,6 +188,17 @@ PyObject *AerospikeQuery_Foreach(AerospikeQuery *self, PyObject *args,
goto CLEANUP;
}

if (!py_callback) {
data.py_obj = PyList_New(0);
if (data.py_obj == NULL) {
goto CLEANUP;
}
}
else {
data.py_obj = py_callback;
}
data.is_pyobj_callback = py_callback != NULL;

// Convert python policy object to as_policy_exists
pyobject_to_policy_query(
self->client, &err, py_policy, &query_policy, &query_policy_p,
Expand Down Expand Up @@ -261,10 +275,38 @@ PyObject *AerospikeQuery_Foreach(AerospikeQuery *self, PyObject *args,
pthread_mutex_destroy(&data.thread_errors_mutex);

if (err.code != AEROSPIKE_OK) {
// TODO: results() used raise_exception();
Py_XDECREF(data.py_obj);
raise_exception_base(&err, Py_None, Py_None, Py_None, Py_None, Py_None);
return NULL;
}

Py_INCREF(Py_None);
return Py_None;
if (data.is_pyobj_callback) {
Py_RETURN_NONE;
}
else {
return data.py_obj;
}
}

PyObject *AerospikeQuery_Foreach(AerospikeQuery *self, PyObject *args,
PyObject *kwds)
{
// Python Function Arguments
PyObject *py_callback = NULL;
PyObject *py_policy = NULL;
PyObject *py_options = NULL;
// Python Function Keyword Arguments
static char *kwlist[] = {"callback", "policy", "options", NULL};

// Python Function Argument Parsing
if (PyArg_ParseTupleAndKeywords(args, kwds, "O|OO:foreach", kwlist,
&py_callback, &py_policy,
&py_options) == false) {
as_query_destroy(&self->query);
return NULL;
}

return AerospikeQuery_Foreach_Invoke(self, py_callback, py_policy,
py_options);
}
132 changes: 1 addition & 131 deletions src/main/query/results.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,148 +32,18 @@
#undef TRACE
#define TRACE()

typedef struct {
PyObject *py_results;
AerospikeClient *client;
} LocalData;

static bool each_result(const as_val *val, void *udata)
{
if (!val) {
return false;
}

PyObject *py_results = NULL;
LocalData *data = (LocalData *)udata;
py_results = data->py_results;
PyObject *py_result = NULL;

as_error err;

PyGILState_STATE gstate;
gstate = PyGILState_Ensure();

val_to_pyobject(data->client, &err, val, &py_result);

if (py_result) {
PyList_Append(py_results, py_result);
Py_DECREF(py_result);
}
PyGILState_Release(gstate);

return true;
}

PyObject *AerospikeQuery_Results(AerospikeQuery *self, PyObject *args,
PyObject *kwds)
{
PyObject *py_policy = NULL;
PyObject *py_results = NULL;
PyObject *py_options = NULL;

static char *kwlist[] = {"policy", "options", NULL};

LocalData data;
data.client = self->client;

if (PyArg_ParseTupleAndKeywords(args, kwds, "|OO:results", kwlist,
&py_policy, &py_options) == false) {
return NULL;
}

as_error err;
as_error_init(&err);

as_policy_query query_policy;
as_policy_query *query_policy_p = NULL;

// For converting expressions.
as_exp exp_list;
as_exp *exp_list_p = NULL;

as_partition_filter partition_filter = {0};
as_partition_filter *partition_filter_p = NULL;
as_partitions_status *ps = NULL;

if (!self || !self->client->as) {
as_error_update(&err, AEROSPIKE_ERR_PARAM, "Invalid aerospike object");
goto CLEANUP;
}

if (!self->client->is_conn_16) {
as_error_update(&err, AEROSPIKE_ERR_CLUSTER,
"No connection to aerospike cluster");
goto CLEANUP;
}

// Convert python policy object to as_policy_query
pyobject_to_policy_query(
self->client, &err, py_policy, &query_policy, &query_policy_p,
&self->client->as->config.policies.query, &exp_list, &exp_list_p);
if (err.code != AEROSPIKE_OK) {
goto CLEANUP;
}

if (set_query_options(&err, py_options, &self->query) != AEROSPIKE_OK) {
goto CLEANUP;
}

if (py_policy) {
PyObject *py_partition_filter =
PyDict_GetItemString(py_policy, "partition_filter");
if (py_partition_filter) {
if (convert_partition_filter(self->client, py_partition_filter,
&partition_filter, &ps,
&err) == AEROSPIKE_OK) {
partition_filter_p = &partition_filter;
}
else {
goto CLEANUP;
}
}
}
as_error_reset(&err);

py_results = PyList_New(0);
data.py_results = py_results;

Py_BEGIN_ALLOW_THREADS

if (partition_filter_p) {
if (ps) {
as_partition_filter_set_partitions(partition_filter_p, ps);
}

aerospike_query_partitions(self->client->as, &err, query_policy_p,
&self->query, partition_filter_p,
each_result, &data);

if (ps) {
as_partitions_status_release(ps);
}
}
else {
aerospike_query_foreach(self->client->as, &err, query_policy_p,
&self->query, each_result, &data);
}

Py_END_ALLOW_THREADS

CLEANUP: /*??trace()*/
if (exp_list_p) {
as_exp_destroy(exp_list_p);
}

if (err.code != AEROSPIKE_OK) {
Py_XDECREF(py_results);
raise_exception(&err);
return NULL;
}

if (self->query.apply.arglist) {
as_arraylist_destroy((as_arraylist *)self->query.apply.arglist);
}
self->query.apply.arglist = NULL;

return py_results;
return AerospikeQuery_Foreach_Invoke(self, NULL, py_policy, py_options);
}
Loading