Skip to content

Conversation

mikefero
Copy link
Contributor

@mikefero mikefero commented Jun 4, 2019

No description provided.

Copy link
Contributor

@mpenick mpenick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem to be an example of "concurrent execution async" and more of an example of executing synchronous requests from multiple threads. There are two different ways (that I know of) to handle asynchronous concurrent requests with our driver. They each have their own trade-offs.

Note: This essentially pseudo-code in these examples.

Chunking

void insert_into_concurrent_chunking(void* data) {
  int i;
  CassFuture* futures[CONCURRENCY_LEVEL];

  int num_requests = NUM_REQUESTS;

  while (num_requests > 0) {
    int num_outstanding_requests = CONCURRENCY_LEVEL;
    if (num_requests < num_outstanding_requests) {
      num_outstanding_requests = num_requests;
    }
    for (i = 0; i < num_outstanding_requests; ++i) {
      CassUuid uuid;
      char value_buffer[16];
      CassStatement* statement = cass_prepared_bind(prepared);
      cass_statement_set_is_idempotent(statement, cass_true);
      cass_uuid_gen_random(uuid_gen, &uuid);
      cass_statement_bind_uuid_by_name(statement, "id", uuid);
      snprintf(value_buffer, sizeof(value_buffer), "%d", i);
      cass_statement_bind_string_by_name(statement, "value", value_buffer);

      futures[i] = cass_session_execute(session, statement);

      cass_statement_free(statement);
    }

    for (i = 0; i < num_outstanding_requests; ++i) {
      CassFuture* future = futures[i];
      CassError rc = cass_future_error_code(future);
      if (rc != CASS_OK) {
        print_error(future);
      }
      cass_future_free(future);
    }
  }
}

Callback

uv_mutex_t callback_mutex; // Needs to be initialized
int num_requests = 0;
int num_outstanding_requests = 0;

void run_query(int i);

void on_request(CassFuture* future, void* data) {
  uv_mutex_lock(&callback_mutex);
  int is_done = num_outstanding_requests == 0;
  int i = ++num_requests;
  if (i < NUM_REQUESTS) {
    uv_mutex_unlock(&callback_mutex);
    run_query(i);
  } else {
    uv_mutex_unlock(&callback_mutex);
    if (is_done) {
      // Do something to notify we're done.
    }
  }
}

void run_query(int i) {
  CassUuid uuid;
  CassFuture* future;
  char value_buffer[16];
  CassStatement* statement = cass_prepared_bind(prepared);

  uv_mutex_lock(&callback_mutex);
  num_outstanding_requests++;
  uv_mutex_unlock(&callback_mutex);

  cass_statement_set_is_idempotent(statement, cass_true);
  cass_uuid_gen_random(uuid_gen, &uuid);
  cass_statement_bind_uuid_by_name(statement, "id", uuid);
  snprintf(value_buffer, sizeof(value_buffer), "%d", i);
  cass_statement_bind_string_by_name(statement, "value", value_buffer);

  future = cass_session_execute(session, statement);

  cass_future_set_callback(future, on_request, NULL);

  cass_future_free(future);
  cass_statement_free(statement);
}

void insert_into_concurrent_callback(void* data) {
  int i;

  num_requests = CONCURRENCY_LEVEL;
  if (NUM_REQUESTS < num_requests) {
    num_requests = NUM_REQUESTS;
  }
  for (i = 0; i < CONCURRENCY_LEVEL; ++i) {
    run_query(i);
  }

  // Wait until notified we're done
}


#include <uv.h>

#define CONCURRENCY_LEVEL 32 /* Maximum amount of parallel async executions (threads) */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be NUM_THREADS and should be much smaller than 32 or we could just eliminate threads entirely from this example to simplify it to focus on doing concurrent asynchronous requests. I think that CONCURRENCY_LEVEL is a better name for outstanding asynchronous requests.

Copy link
Contributor

@mpenick mpenick Jun 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a 3rd way is using synchronous requests from multiple threads...but it's not really async.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose the chunking method because I wanted to keep things off the callback for our examples as a "best practice" approach.

@mpenick
Copy link
Contributor

mpenick commented Jun 21, 2019

Copy link
Contributor

@mpenick mpenick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One minor nit. Good work.

if (num_requests < num_outstanding_requests) {
num_outstanding_requests = num_requests;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be clearer if num_requests was updated here instead of below in the future check loop.

num_request -= num_outstanding_requests;

@mikefero mikefero merged commit 53cfb30 into master Jun 26, 2019
@mikefero mikefero deleted the CPP-785 branch June 26, 2019 15:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants