-
Notifications
You must be signed in to change notification settings - Fork 290
CPP-785 - Concurrent execution async example #447
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem to be an example of "concurrent execution async" and more of an example of executing synchronous requests from multiple threads. There are two different ways (that I know of) to handle asynchronous concurrent requests with our driver. They each have their own trade-offs.
Note: This essentially pseudo-code in these examples.
Chunking
void insert_into_concurrent_chunking(void* data) {
int i;
CassFuture* futures[CONCURRENCY_LEVEL];
int num_requests = NUM_REQUESTS;
while (num_requests > 0) {
int num_outstanding_requests = CONCURRENCY_LEVEL;
if (num_requests < num_outstanding_requests) {
num_outstanding_requests = num_requests;
}
for (i = 0; i < num_outstanding_requests; ++i) {
CassUuid uuid;
char value_buffer[16];
CassStatement* statement = cass_prepared_bind(prepared);
cass_statement_set_is_idempotent(statement, cass_true);
cass_uuid_gen_random(uuid_gen, &uuid);
cass_statement_bind_uuid_by_name(statement, "id", uuid);
snprintf(value_buffer, sizeof(value_buffer), "%d", i);
cass_statement_bind_string_by_name(statement, "value", value_buffer);
futures[i] = cass_session_execute(session, statement);
cass_statement_free(statement);
}
for (i = 0; i < num_outstanding_requests; ++i) {
CassFuture* future = futures[i];
CassError rc = cass_future_error_code(future);
if (rc != CASS_OK) {
print_error(future);
}
cass_future_free(future);
}
}
}
Callback
uv_mutex_t callback_mutex; // Needs to be initialized
int num_requests = 0;
int num_outstanding_requests = 0;
void run_query(int i);
void on_request(CassFuture* future, void* data) {
uv_mutex_lock(&callback_mutex);
int is_done = num_outstanding_requests == 0;
int i = ++num_requests;
if (i < NUM_REQUESTS) {
uv_mutex_unlock(&callback_mutex);
run_query(i);
} else {
uv_mutex_unlock(&callback_mutex);
if (is_done) {
// Do something to notify we're done.
}
}
}
void run_query(int i) {
CassUuid uuid;
CassFuture* future;
char value_buffer[16];
CassStatement* statement = cass_prepared_bind(prepared);
uv_mutex_lock(&callback_mutex);
num_outstanding_requests++;
uv_mutex_unlock(&callback_mutex);
cass_statement_set_is_idempotent(statement, cass_true);
cass_uuid_gen_random(uuid_gen, &uuid);
cass_statement_bind_uuid_by_name(statement, "id", uuid);
snprintf(value_buffer, sizeof(value_buffer), "%d", i);
cass_statement_bind_string_by_name(statement, "value", value_buffer);
future = cass_session_execute(session, statement);
cass_future_set_callback(future, on_request, NULL);
cass_future_free(future);
cass_statement_free(statement);
}
void insert_into_concurrent_callback(void* data) {
int i;
num_requests = CONCURRENCY_LEVEL;
if (NUM_REQUESTS < num_requests) {
num_requests = NUM_REQUESTS;
}
for (i = 0; i < CONCURRENCY_LEVEL; ++i) {
run_query(i);
}
// Wait until notified we're done
}
|
||
#include <uv.h> | ||
|
||
#define CONCURRENCY_LEVEL 32 /* Maximum amount of parallel async executions (threads) */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be NUM_THREADS
and should be much smaller than 32 or we could just eliminate threads entirely from this example to simplify it to focus on doing concurrent asynchronous requests. I think that CONCURRENCY_LEVEL
is a better name for outstanding asynchronous requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a 3rd way is using synchronous requests from multiple threads...but it's not really async.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chose the chunking method because I wanted to keep things off the callback for our examples as a "best practice" approach.
It looks like https://github.com/datastax/nodejs-driver/blob/master/examples/concurrent-executions/execute-in-loop.js is doing what I call "chunking". |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One minor nit. Good work.
if (num_requests < num_outstanding_requests) { | ||
num_outstanding_requests = num_requests; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be clearer if num_requests
was updated here instead of below in the future check loop.
num_request -= num_outstanding_requests;
No description provided.