Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion include/cassandra.h
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,8 @@ typedef enum CassColumnType_ {
CASS_COLUMN_TYPE_REGULAR,
CASS_COLUMN_TYPE_PARTITION_KEY,
CASS_COLUMN_TYPE_CLUSTERING_KEY,
CASS_COLUMN_TYPE_STATIC
CASS_COLUMN_TYPE_STATIC,
CASS_COLUMN_TYPE_COMPACT_VALUE
} CassColumnType;

typedef enum CassErrorSource_ {
Expand Down Expand Up @@ -2322,6 +2323,9 @@ cass_aggregate_meta_final_func(const CassAggregateMeta* aggregate_meta);
/**
* Gets the initial condition value for the aggregate.
*
* <b>Note:</b> The value of the initial condition will always be
* a "varchar" type for Cassandra 3.0+.
*
* @public @memberof CassAggregateMeta
*
* @param[in] aggregate_meta
Expand Down Expand Up @@ -4278,6 +4282,18 @@ cass_data_type_set_class_name_n(CassDataType* data_type,
const char* class_name,
size_t class_name_length);

/**
* Gets the sub-data type count of a UDT (user defined type), tuple
* or collection.
*
* <b>Note:</b> Only valid for UDT, tuple and collection data types.
*
* @param[in] data_type
* @return Returns the number of sub-data types
*/
CASS_EXPORT size_t
cass_data_sub_type_count(const CassDataType* data_type);

/**
* Gets the sub-data type of a UDT (user defined type), tuple or collection at
* the specified index.
Expand Down
4 changes: 2 additions & 2 deletions src/abstract_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class AbstractData {
protected:
virtual size_t get_indices(StringRef name,
IndexVec* indices) = 0;
virtual const SharedRefPtr<const DataType>& get_type(size_t index) const = 0;
virtual const DataType::ConstPtr& get_type(size_t index) const = 0;

private:
template <class T>
Expand All @@ -152,7 +152,7 @@ class AbstractData {
return CASS_ERROR_LIB_INDEX_OUT_OF_BOUNDS;
}
IsValidDataType<T> is_valid_type;
SharedRefPtr<const DataType> data_type(get_type(index));
DataType::ConstPtr data_type(get_type(index));
if (data_type && !is_valid_type(value, data_type)) {
return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
}
Expand Down
6 changes: 3 additions & 3 deletions src/collection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class Collection : public RefCounted<Collection> {
items_.reserve(item_count);
}

Collection(const SharedRefPtr<const CollectionType>& data_type,
Collection(const CollectionType::ConstPtr& data_type,
size_t item_count)
: data_type_(data_type) {
items_.reserve(item_count);
Expand All @@ -51,7 +51,7 @@ class Collection : public RefCounted<Collection> {
return static_cast<CassCollectionType>(data_type_->value_type());
}

const SharedRefPtr<const CollectionType>& data_type() const { return data_type_; }
const CollectionType::ConstPtr& data_type() const { return data_type_; }
const BufferVec& items() const { return items_; }

#define APPEND_TYPE(Type) \
Expand Down Expand Up @@ -129,7 +129,7 @@ class Collection : public RefCounted<Collection> {
void encode_items_uint16(char* buf) const;

private:
SharedRefPtr<const CollectionType> data_type_;
CollectionType::ConstPtr data_type_;
BufferVec items_;

private:
Expand Down
2 changes: 1 addition & 1 deletion src/collection_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ char* CollectionIterator::decode_value(char* position) {
int32_t size;
char* buffer = decode_size(protocol_version, position, size);

SharedRefPtr<const DataType> data_type;
DataType::ConstPtr data_type;
if (collection_->value_type() == CASS_VALUE_TYPE_MAP) {
data_type = (index_ % 2 == 0) ? collection_->primary_data_type()
: collection_->secondary_data_type();
Expand Down
2 changes: 1 addition & 1 deletion src/collection_iterator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class TupleIterator : public ValueIterator {
: ValueIterator(CASS_ITERATOR_TYPE_TUPLE)
, tuple_(tuple)
, position_(tuple->data()) {
SharedRefPtr<const CollectionType> collection_type(tuple->data_type());
CollectionType::ConstPtr collection_type(tuple->data_type());
next_ = collection_type->types().begin();
end_ = collection_type->types().end();
}
Expand Down
187 changes: 133 additions & 54 deletions src/control_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,19 @@
#define SELECT_PEERS "SELECT peer, data_center, rack, release_version, rpc_address FROM system.peers"
#define SELECT_PEERS_TOKENS "SELECT peer, data_center, rack, release_version, rpc_address, tokens FROM system.peers"

#define SELECT_KEYSPACES "SELECT * FROM system.schema_keyspaces"
#define SELECT_COLUMN_FAMILIES "SELECT * FROM system.schema_columnfamilies"
#define SELECT_COLUMNS "SELECT * FROM system.schema_columns"
#define SELECT_USERTYPES "SELECT * FROM system.schema_usertypes"
#define SELECT_FUNCTIONS "SELECT * FROM system.schema_functions"
#define SELECT_AGGREGATES "SELECT * FROM system.schema_aggregates"
#define SELECT_KEYSPACES_20 "SELECT * FROM system.schema_keyspaces"
#define SELECT_COLUMN_FAMILIES_20 "SELECT * FROM system.schema_columnfamilies"
#define SELECT_COLUMNS_20 "SELECT * FROM system.schema_columns"
#define SELECT_USERTYPES_21 "SELECT * FROM system.schema_usertypes"
#define SELECT_FUNCTIONS_22 "SELECT * FROM system.schema_functions"
#define SELECT_AGGREGATES_22 "SELECT * FROM system.schema_aggregates"

#define SELECT_KEYSPACES_30 "SELECT * FROM system_schema.keyspaces"
#define SELECT_TABLES_30 "SELECT * FROM system_schema.tables"
#define SELECT_COLUMNS_30 "SELECT * FROM system_schema.columns"
#define SELECT_USERTYPES_30 "SELECT * FROM system_schema.types"
#define SELECT_FUNCTIONS_30 "SELECT * FROM system_schema.functions"
#define SELECT_AGGREGATES_30 "SELECT * FROM system_schema.aggregates"

namespace cass {

Expand Down Expand Up @@ -194,7 +201,7 @@ void ControlConnection::on_ready(Connection* connection) {

// The control connection has to refresh meta when there's a reconnect because
// events could have been missed while not connected.
query_meta_all();
query_meta_hosts();
}

void ControlConnection::on_close(Connection* connection) {
Expand Down Expand Up @@ -359,40 +366,23 @@ void ControlConnection::on_event(EventResponse* response) {
}
}

//TODO: query and callbacks should be in Metadata
// punting for now because of tight coupling of Session and CC state
void ControlConnection::query_meta_all() {
ScopedRefPtr<ControlMultipleRequestHandler<QueryMetadataAllData> > handler(
new ControlMultipleRequestHandler<QueryMetadataAllData>(this, ControlConnection::on_query_meta_all, QueryMetadataAllData()));
void ControlConnection::query_meta_hosts() {
ScopedRefPtr<ControlMultipleRequestHandler<UnusedData> > handler(
new ControlMultipleRequestHandler<UnusedData>(this, ControlConnection::on_query_hosts, UnusedData()));
handler->execute_query(SELECT_LOCAL_TOKENS);
handler->execute_query(SELECT_PEERS_TOKENS);

if (session_->config().use_schema()) {
handler->execute_query(SELECT_KEYSPACES);
handler->execute_query(SELECT_COLUMN_FAMILIES);
handler->execute_query(SELECT_COLUMNS);
if (protocol_version_ >= 3) {
handler->execute_query(SELECT_USERTYPES);
}
if (protocol_version_ >= 4) {
handler->execute_query(SELECT_FUNCTIONS);
handler->execute_query(SELECT_AGGREGATES);
}
}
}

void ControlConnection::on_query_meta_all(ControlConnection* control_connection,
const QueryMetadataAllData& unused,
const MultipleRequestHandler::ResponseVec& responses) {
void ControlConnection::on_query_hosts(ControlConnection* control_connection,
const UnusedData& data,
const MultipleRequestHandler::ResponseVec& responses) {
Connection* connection = control_connection->connection_;
if (connection == NULL) {
return;
}

Session* session = control_connection->session_;

session->metadata().clear_and_update_back();

bool is_initial_connection = (control_connection->state_ == CONTROL_STATE_NEW);

// If the 'system.local' table is empty the connection isn't used as a control
Expand Down Expand Up @@ -460,20 +450,73 @@ void ControlConnection::on_query_meta_all(ControlConnection* control_connection,
session->purge_hosts(is_initial_connection);

if (session->config().use_schema()) {
session->metadata().update_keyspaces(static_cast<ResultResponse*>(responses[2].get()));
session->metadata().update_tables(static_cast<ResultResponse*>(responses[3].get()),
static_cast<ResultResponse*>(responses[4].get()));
if (control_connection->protocol_version_ >= 3) {
session->metadata().update_user_types(static_cast<ResultResponse*>(responses[5].get()));
control_connection->query_meta_schema();
} else {
control_connection->state_ = CONTROL_STATE_READY;
session->on_control_connection_ready();
// Create a new query plan that considers all the new hosts from the
// "system" tables.
control_connection->query_plan_.reset(session->new_query_plan());
}
}

//TODO: query and callbacks should be in Metadata
// punting for now because of tight coupling of Session and CC state
void ControlConnection::query_meta_schema() {
ScopedRefPtr<ControlMultipleRequestHandler<UnusedData> > handler(
new ControlMultipleRequestHandler<UnusedData>(this, ControlConnection::on_query_meta_schema, UnusedData()));

if (session_->metadata().cassandra_version() >= VersionNumber(3, 0, 0)) {
handler->execute_query(SELECT_KEYSPACES_30);
handler->execute_query(SELECT_TABLES_30);
handler->execute_query(SELECT_COLUMNS_30);
handler->execute_query(SELECT_USERTYPES_30);
handler->execute_query(SELECT_FUNCTIONS_30);
handler->execute_query(SELECT_AGGREGATES_30);
} else {
handler->execute_query(SELECT_KEYSPACES_20);
handler->execute_query(SELECT_COLUMN_FAMILIES_20);
handler->execute_query(SELECT_COLUMNS_20);
if (session_->metadata().cassandra_version() >= VersionNumber(2, 1, 0)) {
handler->execute_query(SELECT_USERTYPES_21);
}
if (control_connection->protocol_version_ >= 4) {
session->metadata().update_functions(static_cast<ResultResponse*>(responses[6].get()));
session->metadata().update_aggregates(static_cast<ResultResponse*>(responses[7].get()));
if (session_->metadata().cassandra_version() >= VersionNumber(2, 2, 0)) {
handler->execute_query(SELECT_FUNCTIONS_22);
handler->execute_query(SELECT_AGGREGATES_22);
}
session->metadata().swap_to_back_and_update_front();
if (control_connection->should_query_tokens_) session->metadata().build();
}
}

void ControlConnection::on_query_meta_schema(ControlConnection* control_connection,
const UnusedData& unused,
const MultipleRequestHandler::ResponseVec& responses) {
Connection* connection = control_connection->connection_;
if (connection == NULL) {
return;
}

Session* session = control_connection->session_;

session->metadata().clear_and_update_back();

bool is_initial_connection = (control_connection->state_ == CONTROL_STATE_NEW);

session->metadata().update_keyspaces(static_cast<ResultResponse*>(responses[0].get()));
session->metadata().update_tables(static_cast<ResultResponse*>(responses[1].get()),
static_cast<ResultResponse*>(responses[2].get()));

if (session->metadata().cassandra_version() >= VersionNumber(2, 1, 0)) {
session->metadata().update_user_types(static_cast<ResultResponse*>(responses[3].get()));
}

if (session->metadata().cassandra_version() >= VersionNumber(2, 2, 0)) {
session->metadata().update_functions(static_cast<ResultResponse*>(responses[4].get()));
session->metadata().update_aggregates(static_cast<ResultResponse*>(responses[5].get()));
}

session->metadata().swap_to_back_and_update_front();
if (control_connection->should_query_tokens_) session->metadata().build();

if (is_initial_connection) {
control_connection->state_ = CONTROL_STATE_READY;
session->on_control_connection_ready();
Expand Down Expand Up @@ -636,8 +679,9 @@ void ControlConnection::update_node_info(SharedRefPtr<Host> host, const Row* row
}

if (should_query_tokens_) {
bool is_connected_host = connection_ != NULL && host->address().compare(connection_->address()) == 0;
std::string partitioner;
if (row->get_string_by_name("partitioner", &partitioner)) {
if (is_connected_host && row->get_string_by_name("partitioner", &partitioner)) {
session_->metadata().set_partitioner(partitioner);
}
v = row->get_by_name("tokens");
Expand All @@ -655,7 +699,13 @@ void ControlConnection::update_node_info(SharedRefPtr<Host> host, const Row* row
}

void ControlConnection::refresh_keyspace(const StringRef& keyspace_name) {
std::string query(SELECT_KEYSPACES);
std::string query;

if (session_->metadata().cassandra_version() >= VersionNumber(3, 0, 0)) {
query.assign(SELECT_KEYSPACES_30);
} else {
query.assign(SELECT_KEYSPACES_20);
}
query.append(" WHERE keyspace_name='")
.append(keyspace_name.data(), keyspace_name.size())
.append("'");
Expand Down Expand Up @@ -683,13 +733,26 @@ void ControlConnection::on_refresh_keyspace(ControlConnection* control_connectio

void ControlConnection::refresh_table(const StringRef& keyspace_name,
const StringRef& table_name) {
std::string cf_query(SELECT_COLUMN_FAMILIES);
cf_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size())
.append("' AND columnfamily_name='").append(table_name.data(), table_name.size()).append("'");
std::string cf_query;
std::string col_query;

if (session_->metadata().cassandra_version() >= VersionNumber(3, 0, 0)) {
cf_query.assign(SELECT_TABLES_30);
cf_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size())
.append("' AND table_name='").append(table_name.data(), table_name.size()).append("'");

std::string col_query(SELECT_COLUMNS);
col_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size())
.append("' AND columnfamily_name='").append(table_name.data(), table_name.size()).append("'");
col_query.assign(SELECT_COLUMNS_30);
col_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size())
.append("' AND table_name='").append(table_name.data(), table_name.size()).append("'");
} else {
cf_query.assign(SELECT_COLUMN_FAMILIES_20);
cf_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size())
.append("' AND columnfamily_name='").append(table_name.data(), table_name.size()).append("'");

col_query.assign(SELECT_COLUMNS_20);
col_query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size())
.append("' AND columnfamily_name='").append(table_name.data(), table_name.size()).append("'");
}

LOG_DEBUG("Refreshing table %s; %s", cf_query.c_str(), col_query.c_str());

Expand Down Expand Up @@ -720,7 +783,13 @@ void ControlConnection::on_refresh_table(ControlConnection* control_connection,
void ControlConnection::refresh_type(const StringRef& keyspace_name,
const StringRef& type_name) {

std::string query(SELECT_USERTYPES);
std::string query;
if (session_->metadata().cassandra_version() >= VersionNumber(3, 0, 0)) {
query.assign(SELECT_USERTYPES_30);
} else {
query.assign(SELECT_USERTYPES_21);
}

query.append(" WHERE keyspace_name='").append(keyspace_name.data(), keyspace_name.size())
.append("' AND type_name='").append(type_name.data(), type_name.size()).append("'");

Expand Down Expand Up @@ -752,12 +821,22 @@ void ControlConnection::refresh_function(const StringRef& keyspace_name,
bool is_aggregate) {

std::string query;
if (is_aggregate) {
query.assign(SELECT_AGGREGATES);
query.append(" WHERE keyspace_name=? AND aggregate_name=? AND signature=?");
if (session_->metadata().cassandra_version() >= VersionNumber(3, 0, 0)) {
if (is_aggregate) {
query.assign(SELECT_AGGREGATES_30);
query.append(" WHERE keyspace_name=? AND aggregate_name=? AND argument_types=?");
} else {
query.assign(SELECT_FUNCTIONS_30);
query.append(" WHERE keyspace_name=? AND function_name=? AND argument_types=?");
}
} else {
query.assign(SELECT_FUNCTIONS);
query.append(" WHERE keyspace_name=? AND function_name=? AND signature=?");
if (is_aggregate) {
query.assign(SELECT_AGGREGATES_22);
query.append(" WHERE keyspace_name=? AND aggregate_name=? AND signature=?");
} else {
query.assign(SELECT_FUNCTIONS_22);
query.append(" WHERE keyspace_name=? AND function_name=? AND signature=?");
}
}

LOG_DEBUG("Refreshing %s %s in keyspace %s",
Expand Down
Loading