Skip to content

Commit 53cfb30

Browse files
authored
CPP-785 - Concurrent execution async example (#447)
1 parent 17611ef commit 53cfb30

File tree

3 files changed

+194
-0
lines changed

3 files changed

+194
-0
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
concurrent_executions
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
cmake_minimum_required(VERSION 2.6.4)
2+
3+
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ".")
4+
set(PROJECT_EXAMPLE_NAME concurrent_executions)
5+
6+
file(GLOB EXAMPLE_SRC_FILES ${CASS_ROOT_DIR}/examples/concurrent_executions/*.c)
7+
include_directories(${INCLUDES})
8+
add_executable(${PROJECT_EXAMPLE_NAME} ${EXAMPLE_SRC_FILES})
9+
target_link_libraries(${PROJECT_EXAMPLE_NAME} ${PROJECT_LIB_NAME_TARGET} ${CASS_LIBS})
10+
add_dependencies(${PROJECT_EXAMPLE_NAME} ${PROJECT_LIB_NAME_TARGET})
11+
12+
set_property(
13+
TARGET ${PROJECT_EXAMPLE_NAME}
14+
APPEND PROPERTY COMPILE_FLAGS ${CASS_EXAMPLE_C_FLAGS})
15+
set_property(TARGET ${PROJECT_EXAMPLE_NAME} PROPERTY FOLDER "Examples")
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
/*
2+
This is free and unencumbered software released into the public domain.
3+
4+
Anyone is free to copy, modify, publish, use, compile, sell, or
5+
distribute this software, either in source code form or as a compiled
6+
binary, for any purpose, commercial or non-commercial, and by any
7+
means.
8+
9+
In jurisdictions that recognize copyright laws, the author or authors
10+
of this software dedicate any and all copyright interest in the
11+
software to the public domain. We make this dedication for the benefit
12+
of the public at large and to the detriment of our heirs and
13+
successors. We intend this dedication to be an overt act of
14+
relinquishment in perpetuity of all present and future rights to this
15+
software under copyright law.
16+
17+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18+
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19+
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
20+
IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
21+
OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
22+
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
23+
OTHER DEALINGS IN THE SOFTWARE.
24+
25+
For more information, please refer to <http://unlicense.org/>
26+
*/
27+
28+
#include <stdio.h>
29+
#include <string.h>
30+
31+
#include "cassandra.h"
32+
33+
#define CONCURRENCY_LEVEL 32
34+
#define NUM_REQUESTS 10000
35+
36+
CassSession* session = NULL;
37+
const CassPrepared* prepared = NULL;
38+
CassUuidGen* uuid_gen = NULL;
39+
40+
int num_requests = 0;
41+
int num_outstanding_requests = 0;
42+
43+
void print_error(CassFuture* future) {
44+
const char* message;
45+
size_t message_length;
46+
cass_future_error_message(future, &message, &message_length);
47+
fprintf(stderr, "Error: %.*s\n", (int)message_length, message);
48+
}
49+
50+
CassCluster* create_cluster(const char* hosts) {
51+
CassCluster* cluster = cass_cluster_new();
52+
cass_cluster_set_contact_points(cluster, hosts);
53+
return cluster;
54+
}
55+
56+
CassError connect_session(CassSession* session, const CassCluster* cluster) {
57+
CassError rc = CASS_OK;
58+
CassFuture* future = cass_session_connect(session, cluster);
59+
60+
cass_future_wait(future);
61+
rc = cass_future_error_code(future);
62+
if (rc != CASS_OK) {
63+
print_error(future);
64+
}
65+
cass_future_free(future);
66+
67+
return rc;
68+
}
69+
70+
CassError execute_query(CassSession* session, const char* query) {
71+
CassError rc = CASS_OK;
72+
CassStatement* statement = cass_statement_new(query, 0);
73+
74+
CassFuture* future = cass_session_execute(session, statement);
75+
cass_future_wait(future);
76+
77+
rc = cass_future_error_code(future);
78+
if (rc != CASS_OK) {
79+
print_error(future);
80+
}
81+
82+
cass_statement_free(statement);
83+
cass_future_free(future);
84+
return rc;
85+
}
86+
87+
CassError prepare_insert(CassSession* session, const CassPrepared** prepared) {
88+
CassError rc = CASS_OK;
89+
const char* query = "INSERT INTO examples.concurrent_executions (id, value) VALUES (?, ?);";
90+
91+
CassFuture* future = cass_session_prepare(session, query);
92+
cass_future_wait(future);
93+
94+
rc = cass_future_error_code(future);
95+
if (rc != CASS_OK) {
96+
print_error(future);
97+
} else {
98+
*prepared = cass_future_get_prepared(future);
99+
}
100+
101+
cass_future_free(future);
102+
103+
return rc;
104+
}
105+
106+
void insert_into_concurrent_executions() {
107+
CassFuture* futures[CONCURRENCY_LEVEL];
108+
int num_requests = NUM_REQUESTS;
109+
110+
while (num_requests > 0) {
111+
int i;
112+
int num_outstanding_requests = CONCURRENCY_LEVEL;
113+
if (num_requests < num_outstanding_requests) {
114+
num_outstanding_requests = num_requests;
115+
}
116+
num_requests -= num_outstanding_requests;
117+
118+
for (i = 0; i < num_outstanding_requests; ++i) {
119+
CassUuid uuid;
120+
char value_buffer[16];
121+
CassStatement* statement = cass_prepared_bind(prepared);
122+
cass_statement_set_is_idempotent(statement, cass_true);
123+
cass_uuid_gen_random(uuid_gen, &uuid);
124+
cass_statement_bind_uuid_by_name(statement, "id", uuid);
125+
snprintf(value_buffer, sizeof(value_buffer), "%d", i);
126+
cass_statement_bind_string_by_name(statement, "value", value_buffer);
127+
128+
futures[i] = cass_session_execute(session, statement);
129+
cass_statement_free(statement);
130+
}
131+
132+
for (i = 0; i < num_outstanding_requests; ++i) {
133+
CassFuture* future = futures[i];
134+
CassError rc = cass_future_error_code(future);
135+
if (rc != CASS_OK) {
136+
print_error(future);
137+
}
138+
cass_future_free(future);
139+
}
140+
}
141+
}
142+
143+
int main(int argc, char* argv[]) {
144+
CassCluster* cluster = NULL;
145+
char* hosts = "127.0.0.1";
146+
if (argc > 1) {
147+
hosts = argv[1];
148+
}
149+
session = cass_session_new();
150+
uuid_gen = cass_uuid_gen_new();
151+
cluster = create_cluster(hosts);
152+
153+
if (connect_session(session, cluster) != CASS_OK) {
154+
cass_uuid_gen_free(uuid_gen);
155+
cass_cluster_free(cluster);
156+
cass_session_free(session);
157+
return -1;
158+
}
159+
160+
execute_query(session, "CREATE KEYSPACE IF NOT EXISTS examples WITH replication = { \
161+
'class': 'SimpleStrategy', \
162+
'replication_factor': '1' }");
163+
execute_query(session, "CREATE TABLE IF NOT EXISTS examples.concurrent_executions ( \
164+
id uuid, \
165+
value text, \
166+
PRIMARY KEY (id))");
167+
168+
if (prepare_insert(session, &prepared) == CASS_OK) {
169+
insert_into_concurrent_executions();
170+
cass_prepared_free(prepared);
171+
}
172+
173+
cass_uuid_gen_free(uuid_gen);
174+
cass_cluster_free(cluster);
175+
cass_session_free(session);
176+
177+
return 0;
178+
}

0 commit comments

Comments
 (0)