diff --git a/include/cassandra.h b/include/cassandra.h index 49666cf20..06389f8bd 100644 --- a/include/cassandra.h +++ b/include/cassandra.h @@ -2213,6 +2213,50 @@ cass_cluster_set_load_balance_dc_aware_n(CassCluster* cluster, unsigned used_hosts_per_remote_dc, cass_bool_t allow_remote_dcs_for_local_cl); +/** + * Configures the cluster to use Rack-aware load balancing. + * For each query, all live nodes in a primary 'local' rack are tried first, + * followed by nodes from local DC and then nodes from other DCs. + * + * With empty local_rack and local_dc, default local_dc and local_rack + * is chosen from the first connected contact point, + * and no remote hosts are considered in query plans. + * If relying on this mechanism, be sure to use only contact + * points from the local rack. + * + * @public @memberof CassCluster + * + * @param[in] cluster + * @param[in] local_dc The primary data center to try first + * @param[in] local_rack The primary rack to try first + * @return CASS_OK if successful, otherwise an error occurred + */ +CASS_EXPORT CassError +cass_cluster_set_load_balance_rack_aware(CassCluster* cluster, + const char* local_dc, + const char* local_rack); + + +/** + * Same as cass_cluster_set_load_balance_rack_aware(), but with lengths for string + * parameters. + * + * @public @memberof CassCluster + * + * @param[in] cluster + * @param[in] local_dc + * @param[in] local_dc_length + * @return same as cass_cluster_set_load_balance_dc_aware() + * + * @see cass_cluster_set_load_balance_dc_aware() + */ +CASS_EXPORT CassError +cass_cluster_set_load_balance_rack_aware_n(CassCluster* cluster, + const char* local_dc, + size_t local_dc_length, + const char* local_rack, + size_t local_rack_length); + /** * Configures the cluster to use token-aware request routing or not. * @@ -3175,6 +3219,16 @@ CASS_EXPORT void cass_session_get_speculative_execution_metrics(const CassSession* session, CassSpeculativeExecutionMetrics* output); +/** + * Gets the current count of inflight request to all hosts. + * + * @public @memberof CassSession + * + * @param[in] session + */ +CASS_EXPORT cass_uint64_t +cass_session_get_inflight_request_count(const CassSession* session); + /** * Get the client id. * diff --git a/src/cluster.cpp b/src/cluster.cpp index a46f3ffc1..21ab3eafb 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -18,6 +18,7 @@ #include "constants.hpp" #include "dc_aware_policy.hpp" +#include "rack_aware_policy.hpp" #include "external.hpp" #include "logger.hpp" #include "resolver.hpp" @@ -240,6 +241,7 @@ Cluster::Cluster(const ControlConnection::Ptr& connection, ClusterListener* list const ControlConnectionSchema& schema, const LoadBalancingPolicy::Ptr& load_balancing_policy, const LoadBalancingPolicy::Vec& load_balancing_policies, const String& local_dc, + const String& local_rack, const StringMultimap& supported_options, const ClusterSettings& settings) : connection_(connection) , listener_(listener ? listener : &nop_cluster_listener__) @@ -251,6 +253,7 @@ Cluster::Cluster(const ControlConnection::Ptr& connection, ClusterListener* list , connected_host_(connected_host) , hosts_(hosts) , local_dc_(local_dc) + , local_rack_(local_rack) , supported_options_(supported_options) , is_recording_events_(settings.disable_events_on_startup) { static const auto optimized_msg = "===== Using optimized driver!!! =====\n"; diff --git a/src/cluster.hpp b/src/cluster.hpp index 4fe36ff5d..dec6043b6 100644 --- a/src/cluster.hpp +++ b/src/cluster.hpp @@ -257,6 +257,7 @@ class Cluster * determining the next control connection host. * @param load_balancing_policies * @param local_dc The local datacenter determined by the metadata service for initializing the + * @param local_rack The local rack determined by the metadata service for initializing the * load balancing policies. * @param supported_options Supported options discovered during control connection. * @param settings The control connection settings to use for reconnecting the @@ -267,6 +268,7 @@ class Cluster const ControlConnectionSchema& schema, const LoadBalancingPolicy::Ptr& load_balancing_policy, const LoadBalancingPolicy::Vec& load_balancing_policies, const String& local_dc, + const String& local_rack, const StringMultimap& supported_options, const ClusterSettings& settings); /** @@ -361,6 +363,7 @@ class Cluster const Host::Ptr& connected_host() const { return connected_host_; } const TokenMap::Ptr& token_map() const { return token_map_; } const String& local_dc() const { return local_dc_; } + const String& local_rack() const { return local_rack_; } const VersionNumber& dse_server_version() const { return connection_->dse_server_version(); } const StringMultimap& supported_options() const { return supported_options_; } const ShardPortCalculator* shard_port_calculator() const { return shard_port_calculator_.get(); } @@ -449,6 +452,7 @@ class Cluster PreparedMetadata prepared_metadata_; TokenMap::Ptr token_map_; String local_dc_; + String local_rack_; StringMultimap supported_options_; Timer timer_; bool is_recording_events_; diff --git a/src/cluster_config.cpp b/src/cluster_config.cpp index ab357ed52..e536515b4 100644 --- a/src/cluster_config.cpp +++ b/src/cluster_config.cpp @@ -301,6 +301,27 @@ CassError cass_cluster_set_load_balance_dc_aware_n(CassCluster* cluster, const c return CASS_OK; } +CassError cass_cluster_set_load_balance_rack_aware(CassCluster* cluster, const char* local_dc, + const char* local_rack) { + if (local_dc == NULL || local_rack == NULL) { + return CASS_ERROR_LIB_BAD_PARAMS; + } + return cass_cluster_set_load_balance_rack_aware_n(cluster, local_dc, SAFE_STRLEN(local_dc), + local_rack, SAFE_STRLEN(local_rack)); +} + +CassError cass_cluster_set_load_balance_rack_aware_n(CassCluster* cluster, const char* local_dc, + size_t local_dc_length, + const char* local_rack, + size_t local_rack_length) { + if (local_dc == NULL || local_dc_length == 0 || local_rack == NULL || local_rack_length == 0) { + return CASS_ERROR_LIB_BAD_PARAMS; + } + cluster->config().set_load_balancing_policy(new RackAwarePolicy( + String(local_dc, local_dc_length), String(local_rack, local_rack_length))); + return CASS_OK; +} + void cass_cluster_set_token_aware_routing(CassCluster* cluster, cass_bool_t enabled) { cluster->config().set_token_aware_routing(enabled == cass_true); } diff --git a/src/cluster_connector.cpp b/src/cluster_connector.cpp index e0415e151..eb959e17f 100644 --- a/src/cluster_connector.cpp +++ b/src/cluster_connector.cpp @@ -16,6 +16,7 @@ #include "cluster_connector.hpp" #include "dc_aware_policy.hpp" +#include "rack_aware_policy.hpp" #include "protocol.hpp" #include "random.hpp" #include "round_robin_policy.hpp" @@ -177,6 +178,7 @@ void ClusterConnector::on_resolve(ClusterMetadataResolver* resolver) { } local_dc_ = resolver->local_dc(); + local_rack_ = resolver->local_rack(); remaining_connector_count_ = resolved_contact_points.size(); for (AddressVec::const_iterator it = resolved_contact_points.begin(), end = resolved_contact_points.end(); @@ -231,7 +233,7 @@ void ClusterConnector::on_connect(ControlConnector* connector) { for (LoadBalancingPolicy::Vec::const_iterator it = policies.begin(), end = policies.end(); it != end; ++it) { LoadBalancingPolicy::Ptr policy(*it); - policy->init(connected_host, hosts, random_, local_dc_); + policy->init(connected_host, hosts, random_, local_dc_, local_rack_); policy->register_handles(event_loop_->loop()); } @@ -248,6 +250,11 @@ void ClusterConnector::on_connect(ControlConnector* connector) { message = "No hosts available for the control connection using the " "DC-aware load balancing policy. " "Check to see if the configured local datacenter is valid"; + } else if (dynamic_cast(query_plan.get()) != + NULL) { // Check if Rack-aware + message = "No hosts available for the control connection using the " + "Rack-aware load balancing policy. " + "Check to see if the configured local datacenter and rack is valid"; } else { message = "No hosts available for the control connection using the " "configured load balancing policy"; @@ -258,7 +265,7 @@ void ClusterConnector::on_connect(ControlConnector* connector) { cluster_.reset(new Cluster(connector->release_connection(), listener_, event_loop_, connected_host, hosts, connector->schema(), default_policy, policies, - local_dc_, connector->supported_options(), settings_)); + local_dc_, local_rack_, connector->supported_options(), settings_)); // Clear any connection errors and set the final negotiated protocol version. error_code_ = CLUSTER_OK; diff --git a/src/cluster_connector.hpp b/src/cluster_connector.hpp index e960fa058..8c8572322 100644 --- a/src/cluster_connector.hpp +++ b/src/cluster_connector.hpp @@ -169,6 +169,7 @@ class ClusterConnector : public RefCounted { Random* random_; Metrics* metrics_; String local_dc_; + String local_rack_; ClusterSettings settings_; Callback callback_; diff --git a/src/cluster_metadata_resolver.hpp b/src/cluster_metadata_resolver.hpp index 90e91acbd..bcca03bf3 100644 --- a/src/cluster_metadata_resolver.hpp +++ b/src/cluster_metadata_resolver.hpp @@ -48,6 +48,7 @@ class ClusterMetadataResolver : public RefCounted { const AddressVec& resolved_contact_points() const { return resolved_contact_points_; } const String& local_dc() const { return local_dc_; } + const String& local_rack() const { return local_rack_; } protected: virtual void internal_resolve(uv_loop_t* loop, const AddressVec& contact_points) = 0; @@ -57,6 +58,7 @@ class ClusterMetadataResolver : public RefCounted { protected: AddressVec resolved_contact_points_; String local_dc_; + String local_rack_; Callback callback_; }; diff --git a/src/dc_aware_policy.cpp b/src/dc_aware_policy.cpp index d68bec1a2..5c1550227 100644 --- a/src/dc_aware_policy.cpp +++ b/src/dc_aware_policy.cpp @@ -43,7 +43,7 @@ DCAwarePolicy::DCAwarePolicy(const String& local_dc, size_t used_hosts_per_remot DCAwarePolicy::~DCAwarePolicy() { uv_rwlock_destroy(&available_rwlock_); } void DCAwarePolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc) { + const String& local_dc, const String& local_rack) { if (local_dc_.empty()) { // Only override if no local DC was specified. local_dc_ = local_dc; } diff --git a/src/dc_aware_policy.hpp b/src/dc_aware_policy.hpp index f76b7307b..526338c29 100644 --- a/src/dc_aware_policy.hpp +++ b/src/dc_aware_policy.hpp @@ -37,7 +37,7 @@ class DCAwarePolicy : public LoadBalancingPolicy { ~DCAwarePolicy(); virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc); + const String& local_dc, const String& local_rack); virtual CassHostDistance distance(const Host::Ptr& host) const; diff --git a/src/execution_profile.hpp b/src/execution_profile.hpp index 2b5645149..cb4d34f61 100644 --- a/src/execution_profile.hpp +++ b/src/execution_profile.hpp @@ -23,6 +23,7 @@ #include "cassandra.h" #include "constants.hpp" #include "dc_aware_policy.hpp" +#include "rack_aware_policy.hpp" #include "dense_hash_map.hpp" #include "latency_aware_policy.hpp" #include "speculative_execution.hpp" diff --git a/src/latency_aware_policy.cpp b/src/latency_aware_policy.cpp index 9f77a384f..29541bbb8 100644 --- a/src/latency_aware_policy.cpp +++ b/src/latency_aware_policy.cpp @@ -27,13 +27,13 @@ using namespace datastax::internal; using namespace datastax::internal::core; void LatencyAwarePolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc) { + const String& local_dc, const String& local_rack) { hosts_->reserve(hosts.size()); std::transform(hosts.begin(), hosts.end(), std::back_inserter(*hosts_), GetHost()); for (HostMap::const_iterator i = hosts.begin(), end = hosts.end(); i != end; ++i) { i->second->enable_latency_tracking(settings_.scale_ns, settings_.min_measured); } - ChainedLoadBalancingPolicy::init(connected_host, hosts, random, local_dc); + ChainedLoadBalancingPolicy::init(connected_host, hosts, random, local_dc, local_rack); } void LatencyAwarePolicy::register_handles(uv_loop_t* loop) { start_timer(loop); } diff --git a/src/latency_aware_policy.hpp b/src/latency_aware_policy.hpp index 178752a4a..c04430c1a 100644 --- a/src/latency_aware_policy.hpp +++ b/src/latency_aware_policy.hpp @@ -51,7 +51,7 @@ class LatencyAwarePolicy : public ChainedLoadBalancingPolicy { virtual ~LatencyAwarePolicy() {} virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc); + const String& local_dc, const String& local_rack); virtual void register_handles(uv_loop_t* loop); virtual void close_handles(); diff --git a/src/list_policy.cpp b/src/list_policy.cpp index 7dc9357d4..fa38c838e 100644 --- a/src/list_policy.cpp +++ b/src/list_policy.cpp @@ -23,7 +23,7 @@ using namespace datastax::internal; using namespace datastax::internal::core; void ListPolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc) { + const String& local_dc, const String& local_rack) { HostMap valid_hosts; for (HostMap::const_iterator i = hosts.begin(), end = hosts.end(); i != end; ++i) { const Host::Ptr& host = i->second; @@ -36,7 +36,7 @@ void ListPolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Ran LOG_ERROR("No valid hosts available for list policy"); } - ChainedLoadBalancingPolicy::init(connected_host, valid_hosts, random, local_dc); + ChainedLoadBalancingPolicy::init(connected_host, valid_hosts, random, local_dc, local_rack); } CassHostDistance ListPolicy::distance(const Host::Ptr& host) const { diff --git a/src/list_policy.hpp b/src/list_policy.hpp index bda75f5f5..99b13eb22 100644 --- a/src/list_policy.hpp +++ b/src/list_policy.hpp @@ -31,7 +31,7 @@ class ListPolicy : public ChainedLoadBalancingPolicy { virtual ~ListPolicy() {} virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc); + const String& local_dc, const String& local_rack); virtual CassHostDistance distance(const Host::Ptr& host) const; diff --git a/src/load_balancing.hpp b/src/load_balancing.hpp index ba60928a9..16259583e 100644 --- a/src/load_balancing.hpp +++ b/src/load_balancing.hpp @@ -43,6 +43,7 @@ typedef enum CassBalancingState_ { typedef enum CassHostDistance_ { CASS_HOST_DISTANCE_LOCAL, CASS_HOST_DISTANCE_REMOTE, + CASS_HOST_DISTANCE_REMOTE2, CASS_HOST_DISTANCE_IGNORE } CassHostDistance; @@ -87,7 +88,7 @@ class LoadBalancingPolicy : public RefCounted { virtual ~LoadBalancingPolicy() {} virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc) = 0; + const String& local_dc, const String &local_rack) = 0; virtual void register_handles(uv_loop_t* loop) {} virtual void close_handles() {} @@ -124,8 +125,8 @@ class ChainedLoadBalancingPolicy : public LoadBalancingPolicy { virtual ~ChainedLoadBalancingPolicy() {} virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc) { - return child_policy_->init(connected_host, hosts, random, local_dc); + const String& local_dc, const String& local_rack) { + return child_policy_->init(connected_host, hosts, random, local_dc, local_rack); } virtual const LoadBalancingPolicy::Ptr& child_policy() const { return child_policy_; } diff --git a/src/rack_aware_policy.cpp b/src/rack_aware_policy.cpp new file mode 100644 index 000000000..626a080e4 --- /dev/null +++ b/src/rack_aware_policy.cpp @@ -0,0 +1,309 @@ +/* + Copyright (c) DataStax, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include "rack_aware_policy.hpp" + +#include "logger.hpp" +#include "request_handler.hpp" +#include "scoped_lock.hpp" + +#include + +using namespace datastax; +using namespace datastax::internal; +using namespace datastax::internal::core; + +RackAwarePolicy::RackAwarePolicy(const String& local_dc, const String& local_rack) + : local_dc_(local_dc) + , local_rack_(local_rack) + , local_rack_live_hosts_(new HostVec()) + , index_(0) { + uv_rwlock_init(&available_rwlock_); +} + +RackAwarePolicy::~RackAwarePolicy() { uv_rwlock_destroy(&available_rwlock_); } + +void RackAwarePolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, + const String& local_dc, const String& local_rack) { + if (local_dc_.empty()) { // Only override if no local DC was specified. + local_dc_ = local_dc; + } + + if (local_dc_.empty() && connected_host && !connected_host->dc().empty()) { + LOG_INFO("Using '%s' for the local data center " + "(if this is incorrect, please provide the correct data center)", + connected_host->dc().c_str()); + local_dc_ = connected_host->dc(); + } + + if (local_rack_.empty()) { // Only override if no local rack was specified. + local_rack_ = local_rack; + } + + if (local_rack_.empty() && connected_host && !connected_host->rack().empty()) { + LOG_INFO("Using '%s' for the local rack " + "(if this is incorrect, please provide the correct rack)", + connected_host->rack().c_str()); + local_rack_ = connected_host->rack(); + } + + available_.resize(hosts.size()); + std::transform(hosts.begin(), hosts.end(), std::inserter(available_, available_.begin()), + GetAddress()); + + for (HostMap::const_iterator i = hosts.begin(), end = hosts.end(); i != end; ++i) { + on_host_added(i->second); + } + if (random != NULL) { + index_ = random->next(std::max(static_cast(1), hosts.size())); + } +} + +CassHostDistance RackAwarePolicy::distance(const Host::Ptr& host) const { + if (local_dc_.empty() || local_rack_.empty() || (host->dc() == local_dc_ && host->rack() == local_rack_)) { + return CASS_HOST_DISTANCE_LOCAL; + } + + if (host->dc() == local_dc_) { + const CopyOnWriteHostVec& hosts = per_remote_rack_live_hosts_.get_hosts(host->rack()); + size_t num_hosts = hosts->size(); + for (size_t i = 0; i < num_hosts; ++i) { + if ((*hosts)[i]->address() == host->address()) { + return CASS_HOST_DISTANCE_REMOTE; + } + } + } + + const CopyOnWriteHostVec& hosts = per_remote_dc_live_hosts_.get_hosts(host->dc()); + size_t num_hosts = hosts->size(); + for (size_t i = 0; i < num_hosts; ++i) { + if ((*hosts)[i]->address() == host->address()) { + return CASS_HOST_DISTANCE_REMOTE2; + } + } + + return CASS_HOST_DISTANCE_IGNORE; +} + +QueryPlan* RackAwarePolicy::new_query_plan(const String& keyspace, RequestHandler* request_handler, + const TokenMap* token_map) { + CassConsistency cl = + request_handler != NULL ? request_handler->consistency() : CASS_DEFAULT_CONSISTENCY; + return new RackAwareQueryPlan(this, cl, index_++); +} + +bool RackAwarePolicy::is_host_up(const Address& address) const { + ScopedReadLock rl(&available_rwlock_); + return available_.count(address) > 0; +} + +void RackAwarePolicy::on_host_added(const Host::Ptr& host) { + const String& dc = host->dc(); + const String& rack = host->rack(); + if (local_dc_.empty() && !dc.empty()) { + LOG_INFO("Using '%s' for local data center " + "(if this is incorrect, please provide the correct data center)", + host->dc().c_str()); + local_dc_ = dc; + } + if (local_rack_.empty() && !rack.empty()) { + LOG_INFO("Using '%s' for local data center " + "(if this is incorrect, please provide the correct data center)", + host->rack().c_str()); + local_rack_ = rack; + } + + if (dc == local_dc_ && rack == local_rack_) { + add_host(local_rack_live_hosts_, host); + } else if (dc == local_dc_) { + per_remote_rack_live_hosts_.add_host_to_key(rack, host); + } else { + per_remote_dc_live_hosts_.add_host_to_key(dc, host); + } +} + +void RackAwarePolicy::on_host_removed(const Host::Ptr& host) { + const String& dc = host->dc(); + const String& rack = host->rack(); + if (dc == local_dc_ && rack == local_rack_) { + remove_host(local_rack_live_hosts_, host); + } else if (dc == local_dc_) { + per_remote_rack_live_hosts_.remove_host_from_key(host->rack(), host); + } else { + per_remote_dc_live_hosts_.remove_host_from_key(host->dc(), host); + } + + ScopedWriteLock wl(&available_rwlock_); + available_.erase(host->address()); +} + +void RackAwarePolicy::on_host_up(const Host::Ptr& host) { + on_host_added(host); + + ScopedWriteLock wl(&available_rwlock_); + available_.insert(host->address()); +} + +void RackAwarePolicy::on_host_down(const Address& address) { + if (!remove_host(local_rack_live_hosts_, address) && + !per_remote_rack_live_hosts_.remove_host(address) && + !per_remote_dc_live_hosts_.remove_host(address)) { + LOG_DEBUG("Attempted to mark host %s as DOWN, but it doesn't exist", + address.to_string().c_str()); + } + + ScopedWriteLock wl(&available_rwlock_); + available_.erase(address); +} + +const String& RackAwarePolicy::local_dc() const { + ScopedReadLock rl(&available_rwlock_); + return local_dc_; +} + +const String& RackAwarePolicy::local_rack() const { + ScopedReadLock rl(&available_rwlock_); + return local_rack_; +} + +void RackAwarePolicy::PerKeyHostMap::add_host_to_key(const String& key, const Host::Ptr& host) { + ScopedWriteLock wl(&rwlock_); + Map::iterator i = map_.find(key); + if (i == map_.end()) { + CopyOnWriteHostVec hosts(new HostVec()); + hosts->push_back(host); + map_.insert(Map::value_type(key, hosts)); + } else { + add_host(i->second, host); + } +} + +void RackAwarePolicy::PerKeyHostMap::remove_host_from_key(const String& key, const Host::Ptr& host) { + ScopedWriteLock wl(&rwlock_); + Map::iterator i = map_.find(key); + if (i != map_.end()) { + core::remove_host(i->second, host); + } +} + +bool RackAwarePolicy::PerKeyHostMap::remove_host(const Address& address) { + ScopedWriteLock wl(&rwlock_); + for (Map::iterator i = map_.begin(), end = map_.end(); i != end; ++i) { + if (core::remove_host(i->second, address)) { + return true; + } + } + return false; +} + +const CopyOnWriteHostVec& RackAwarePolicy::PerKeyHostMap::get_hosts(const String& dc) const { + ScopedReadLock rl(&rwlock_); + Map::const_iterator i = map_.find(dc); + if (i == map_.end()) return no_hosts_; + + return i->second; +} + +void RackAwarePolicy::PerKeyHostMap::copy_keys(KeySet* keys) const { + ScopedReadLock rl(&rwlock_); + for (Map::const_iterator i = map_.begin(), end = map_.end(); i != end; ++i) { + keys->insert(i->first); + } +} + +// Helper functions to prevent copy (Notice: "const CopyOnWriteHostVec&") + +static const Host::Ptr& get_next_host(const CopyOnWriteHostVec& hosts, size_t index) { + return (*hosts)[index % hosts->size()]; +} + +static size_t get_hosts_size(const CopyOnWriteHostVec& hosts) { return hosts->size(); } + +RackAwarePolicy::RackAwareQueryPlan::RackAwareQueryPlan(const RackAwarePolicy* policy, CassConsistency cl, + size_t start_index) + : policy_(policy) + , cl_(cl) + , hosts_(policy_->local_rack_live_hosts_) + , local_remaining_(get_hosts_size(hosts_)) + , remote_remaining_(0) + , index_(start_index) {} + +Host::Ptr RackAwarePolicy::RackAwareQueryPlan::compute_next() { + while (local_remaining_ > 0) { + --local_remaining_; + const Host::Ptr& host(get_next_host(hosts_, index_++)); + if (policy_->is_host_up(host->address())) { + return host; + } + } + + if (!remote_racks_) { + remote_racks_.reset(new PerKeyHostMap::KeySet()); + policy_->per_remote_rack_live_hosts_.copy_keys(remote_racks_.get()); + } + + while (true) { + while (remote_remaining_ > 0) { + --remote_remaining_; + const Host::Ptr& host( + get_next_host(hosts_, index_++)); + if (policy_->is_host_up(host->address())) { + return host; + } + } + + if (remote_racks_->empty()) { + break; + } + + PerKeyHostMap::KeySet::iterator i = remote_racks_->begin(); + hosts_ = policy_->per_remote_rack_live_hosts_.get_hosts(*i); + remote_remaining_ = get_hosts_size(hosts_); + remote_racks_->erase(i); + } + + // Skip remote DCs for LOCAL_ consistency levels. + if (is_dc_local(cl_)) { + return Host::Ptr(); + } + + if (!remote_dcs_) { + remote_dcs_.reset(new PerKeyHostMap::KeySet()); + policy_->per_remote_dc_live_hosts_.copy_keys(remote_dcs_.get()); + } + + while (true) { + while (remote_remaining_ > 0) { + --remote_remaining_; + const Host::Ptr& host( + get_next_host(hosts_, index_++)); + if (policy_->is_host_up(host->address())) { + return host; + } + } + + if (remote_dcs_->empty()) { + break; + } + + PerKeyHostMap::KeySet::iterator i = remote_dcs_->begin(); + hosts_ = policy_->per_remote_dc_live_hosts_.get_hosts(*i); + remote_remaining_ = get_hosts_size(hosts_); + remote_dcs_->erase(i); + } + + return Host::Ptr(); +} diff --git a/src/rack_aware_policy.hpp b/src/rack_aware_policy.hpp new file mode 100644 index 000000000..68282563b --- /dev/null +++ b/src/rack_aware_policy.hpp @@ -0,0 +1,127 @@ +/* + Copyright (c) DataStax, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#ifndef DATASTAX_INTERNAL_RACK_AWARE_POLICY_HPP +#define DATASTAX_INTERNAL_RACK_AWARE_POLICY_HPP + +#include "host.hpp" +#include "load_balancing.hpp" +#include "map.hpp" +#include "round_robin_policy.hpp" +#include "scoped_lock.hpp" +#include "scoped_ptr.hpp" +#include "set.hpp" + +#include + +namespace datastax { namespace internal { namespace core { + +class RackAwarePolicy : public LoadBalancingPolicy { +public: + RackAwarePolicy(const String& local_dc = "", const String &local_rack = ""); + + ~RackAwarePolicy(); + + virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, + const String& local_dc, const String& local_rack); + + virtual CassHostDistance distance(const Host::Ptr& host) const; + + virtual QueryPlan* new_query_plan(const String& keyspace, RequestHandler* request_handler, + const TokenMap* token_map); + + virtual bool is_host_up(const Address& address) const; + + virtual void on_host_added(const Host::Ptr& host); + virtual void on_host_removed(const Host::Ptr& host); + virtual void on_host_up(const Host::Ptr& host); + virtual void on_host_down(const Address& address); + + virtual const String& local_dc() const; + virtual const String& local_rack() const; + + virtual LoadBalancingPolicy* new_instance() { + return new RackAwarePolicy(local_dc_, local_rack_); + } + +private: + class PerKeyHostMap { + public: + typedef internal::Map Map; + typedef Set KeySet; + + PerKeyHostMap() + : no_hosts_(new HostVec()) { + uv_rwlock_init(&rwlock_); + } + ~PerKeyHostMap() { uv_rwlock_destroy(&rwlock_); } + + void add_host_to_key(const String& key, const Host::Ptr& host); + void remove_host_from_key(const String& key, const Host::Ptr& host); + bool remove_host(const Address& address); + const CopyOnWriteHostVec& get_hosts(const String& key) const; + void copy_keys(KeySet* keys) const; + + private: + Map map_; + mutable uv_rwlock_t rwlock_; + const CopyOnWriteHostVec no_hosts_; + + private: + DISALLOW_COPY_AND_ASSIGN(PerKeyHostMap); + }; + + const CopyOnWriteHostVec& get_local_dc_hosts() const; + void get_remote_dcs(PerKeyHostMap::KeySet* remote_dcs) const; + +public: + class RackAwareQueryPlan : public QueryPlan { + public: + RackAwareQueryPlan(const RackAwarePolicy* policy, CassConsistency cl, size_t start_index); + + virtual Host::Ptr compute_next(); + + private: + const RackAwarePolicy* policy_; + CassConsistency cl_; + CopyOnWriteHostVec hosts_; + ScopedPtr remote_racks_; + ScopedPtr remote_dcs_; + size_t local_remaining_; + size_t remote_remaining_; + size_t index_; + }; + +private: + mutable uv_rwlock_t available_rwlock_; + AddressSet available_; + + String local_dc_; + String local_rack_; + + CopyOnWriteHostVec local_rack_live_hosts_; + // remote rack, local dc + PerKeyHostMap per_remote_rack_live_hosts_; + PerKeyHostMap per_remote_dc_live_hosts_; + size_t index_; + +private: + DISALLOW_COPY_AND_ASSIGN(RackAwarePolicy); +}; + +}}} // namespace datastax::internal::core + +#endif diff --git a/src/request_processor.cpp b/src/request_processor.cpp index d02ad9e7f..bda526df4 100644 --- a/src/request_processor.cpp +++ b/src/request_processor.cpp @@ -170,7 +170,7 @@ RequestProcessor::RequestProcessor(RequestProcessorListener* listener, EventLoop const Host::Ptr& connected_host, const HostMap& hosts, const TokenMap::Ptr& token_map, const RequestProcessorSettings& settings, Random* random, - const String& local_dc) + const String& local_dc, const String& local_rack) : connection_pool_manager_(connection_pool_manager) , listener_(listener ? listener : &nop_request_processor_listener__) , event_loop_(event_loop) @@ -213,7 +213,7 @@ RequestProcessor::RequestProcessor(RequestProcessorListener* listener, EventLoop LoadBalancingPolicy::Vec policies = load_balancing_policies(); for (LoadBalancingPolicy::Vec::const_iterator it = policies.begin(); it != policies.end(); ++it) { // Initialize the load balancing policies - (*it)->init(connected_host, hosts, random, local_dc); + (*it)->init(connected_host, hosts, random, local_dc, local_rack); (*it)->register_handles(event_loop_->loop()); } diff --git a/src/request_processor.hpp b/src/request_processor.hpp index 67b0bf47c..253ac4828 100644 --- a/src/request_processor.hpp +++ b/src/request_processor.hpp @@ -166,12 +166,13 @@ class RequestProcessor * @param settings The current settings for the request processor. * @param random A RNG for randomizing hosts in the load balancing policies. * @param local_dc The local datacenter for initializing the load balancing policies. + * @param local_rack The local rack for initializing the load balancing policies. */ RequestProcessor(RequestProcessorListener* listener, EventLoop* event_loop, const ConnectionPoolManager::Ptr& connection_pool_manager, const Host::Ptr& connected_host, const HostMap& hosts, const TokenMap::Ptr& token_map, const RequestProcessorSettings& settings, - Random* random, const String& local_dc); + Random* random, const String& local_dc, const String& local_rack); /** * Close/Terminate the request request processor (thread-safe). diff --git a/src/request_processor_initializer.cpp b/src/request_processor_initializer.cpp index 6705b72e9..24d1a6d48 100644 --- a/src/request_processor_initializer.cpp +++ b/src/request_processor_initializer.cpp @@ -40,7 +40,7 @@ class RunInitializeProcessor : public Task { RequestProcessorInitializer::RequestProcessorInitializer( const Host::Ptr& connected_host, ProtocolVersion protocol_version, const HostMap& hosts, - const TokenMap::Ptr& token_map, const String& local_dc, const Callback& callback) + const TokenMap::Ptr& token_map, const String& local_dc, const String& local_rack, const Callback& callback) : event_loop_(NULL) , listener_(NULL) , metrics_(NULL) @@ -51,6 +51,7 @@ RequestProcessorInitializer::RequestProcessorInitializer( , hosts_(hosts) , token_map_(token_map) , local_dc_(local_dc) + , local_rack_(local_rack) , error_code_(REQUEST_PROCESSOR_OK) , callback_(callback) { uv_mutex_init(&mutex_); @@ -166,7 +167,7 @@ void RequestProcessorInitializer::on_initialize(ConnectionPoolManagerInitializer } else { processor_.reset(new RequestProcessor(listener_, event_loop_, initializer->release_manager(), connected_host_, hosts_, token_map_, settings_, random_, - local_dc_)); + local_dc_, local_rack_)); int rc = processor_->init(RequestProcessor::Protected()); if (rc != 0) { diff --git a/src/request_processor_initializer.hpp b/src/request_processor_initializer.hpp index 8d63380d9..b685e5dd2 100644 --- a/src/request_processor_initializer.hpp +++ b/src/request_processor_initializer.hpp @@ -60,12 +60,13 @@ class RequestProcessorInitializer * @param hosts A mapping of available hosts in the cluster. * @param token_map A token map. * @param local_dc The local datacenter for initializing the load balancing policies. + * @param local_rack The local datacenter for initializing the load balancing policies. * @param callback A callback that is called when the processor is initialized * or if an error occurred. */ RequestProcessorInitializer(const Host::Ptr& connected_host, ProtocolVersion protocol_version, const HostMap& hosts, const TokenMap::Ptr& token_map, - const String& local_dc, const Callback& callback); + const String& local_dc, const String& local_rack, const Callback& callback); ~RequestProcessorInitializer(); /** @@ -176,6 +177,7 @@ class RequestProcessorInitializer HostMap hosts_; const TokenMap::Ptr token_map_; String local_dc_; + String local_rack_; RequestProcessorError error_code_; String error_message_; diff --git a/src/round_robin_policy.cpp b/src/round_robin_policy.cpp index dd7f2ecff..4c8ac62d7 100644 --- a/src/round_robin_policy.cpp +++ b/src/round_robin_policy.cpp @@ -33,7 +33,7 @@ RoundRobinPolicy::RoundRobinPolicy() RoundRobinPolicy::~RoundRobinPolicy() { uv_rwlock_destroy(&available_rwlock_); } void RoundRobinPolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc) { + const String& local_dc, const String& local_rack) { available_.resize(hosts.size()); std::transform(hosts.begin(), hosts.end(), std::inserter(available_, available_.begin()), GetAddress()); diff --git a/src/round_robin_policy.hpp b/src/round_robin_policy.hpp index f5b4f715d..aebc62deb 100644 --- a/src/round_robin_policy.hpp +++ b/src/round_robin_policy.hpp @@ -31,7 +31,7 @@ class RoundRobinPolicy : public LoadBalancingPolicy { ~RoundRobinPolicy(); virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc); + const String& local_dc, const String& local_rack); virtual CassHostDistance distance(const Host::Ptr& host) const; diff --git a/src/session.cpp b/src/session.cpp index 34de94c13..e2a544f28 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -169,6 +169,16 @@ void cass_session_get_speculative_execution_metrics(const CassSession* session, CassUuid cass_session_get_client_id(CassSession* session) { return session->client_id(); } +cass_uint64_t cass_session_get_inflight_request_count(const CassSession* session) { + cass_uint64_t inflight_request_count = 0; + const HostMap hosts = session->cluster()->available_hosts(); + for (HostMap::const_iterator it = hosts.begin(), end = hosts.end(); it != end; ++it) { + const Host::Ptr& host = it->second; + inflight_request_count += host->inflight_request_count(); + } + return inflight_request_count; +} + } // extern "C" static inline bool least_busy_comp(const RequestProcessor::Ptr& a, const RequestProcessor::Ptr& b) { @@ -195,13 +205,14 @@ class SessionInitializer : public RefCounted { SessionInitializer() { uv_mutex_destroy(&mutex_); } void initialize(const Host::Ptr& connected_host, ProtocolVersion protocol_version, - const HostMap& hosts, const TokenMap::Ptr& token_map, const String& local_dc) { + const HostMap& hosts, const TokenMap::Ptr& token_map, const String& local_dc, + const String& local_rack) { inc_ref(); const size_t thread_count_io = remaining_ = session_->config().thread_count_io(); for (size_t i = 0; i < thread_count_io; ++i) { RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - connected_host, protocol_version, hosts, token_map, local_dc, + connected_host, protocol_version, hosts, token_map, local_dc, local_rack, bind_callback(&SessionInitializer::on_initialize, this))); RequestProcessorSettings settings(session_->config()); @@ -360,7 +371,7 @@ void Session::join() { void Session::on_connect(const Host::Ptr& connected_host, ProtocolVersion protocol_version, const HostMap& hosts, const TokenMap::Ptr& token_map, - const String& local_dc) { + const String& local_dc, const String& local_rack) { int rc = 0; if (hosts.empty()) { @@ -394,7 +405,7 @@ void Session::on_connect(const Host::Ptr& connected_host, ProtocolVersion protoc request_processor_count_ = 0; is_closing_ = false; SessionInitializer::Ptr initializer(new SessionInitializer(this)); - initializer->initialize(connected_host, protocol_version, hosts, token_map, local_dc); + initializer->initialize(connected_host, protocol_version, hosts, token_map, local_dc, local_rack); } void Session::on_close() { diff --git a/src/session.hpp b/src/session.hpp index 856833114..45efc5e89 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -54,7 +54,7 @@ class Session virtual void on_connect(const Host::Ptr& connected_host, ProtocolVersion protocol_version, const HostMap& hosts, const TokenMap::Ptr& token_map, - const String& local_dc); + const String& local_dc, const String& local_rack); virtual void on_close(); diff --git a/src/session_base.cpp b/src/session_base.cpp index 6ccb4f7c5..c1e7b6333 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -160,7 +160,7 @@ void SessionBase::notify_closed() { void SessionBase::on_connect(const Host::Ptr& connected_host, ProtocolVersion protocol_version, const HostMap& hosts, const TokenMap::Ptr& token_map, - const String& local_dc) { + const String& local_dc, const String& local_rack) { notify_connected(); } @@ -200,7 +200,7 @@ void SessionBase::on_initialize(ClusterConnector* connector) { } on_connect(cluster_->connected_host(), cluster_->protocol_version(), - cluster_->available_hosts(), cluster_->token_map(), cluster_->local_dc()); + cluster_->available_hosts(), cluster_->token_map(), cluster_->local_dc(), cluster_->local_rack()); } else { assert(!connector->is_canceled() && "Cluster connection process canceled"); switch (connector->error_code()) { diff --git a/src/session_base.hpp b/src/session_base.hpp index 1c3e6c68f..b0c3c7c16 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -117,7 +117,7 @@ class SessionBase : public ClusterListener { */ virtual void on_connect(const Host::Ptr& connected_host, ProtocolVersion protocol_version, const HostMap& hosts, const TokenMap::Ptr& token_map, - const String& local_dc); + const String& local_dc, const String& local_rack); /** * A callback called after the control connection fails to connect. By default diff --git a/src/token_aware_policy.cpp b/src/token_aware_policy.cpp index cc2cd8394..adb7a5f68 100644 --- a/src/token_aware_policy.cpp +++ b/src/token_aware_policy.cpp @@ -37,7 +37,7 @@ static inline bool contains(const CopyOnWriteHostVec& replicas, const Address& a } void TokenAwarePolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc) { + const String& local_dc, const String& local_rack) { if (random != NULL) { if (shuffle_replicas_) { // Store random so that it can be used to shuffle replicas. @@ -48,7 +48,7 @@ void TokenAwarePolicy::init(const Host::Ptr& connected_host, const HostMap& host index_ = random->next(std::max(static_cast(1), hosts.size())); } } - ChainedLoadBalancingPolicy::init(connected_host, hosts, random, local_dc); + ChainedLoadBalancingPolicy::init(connected_host, hosts, random, local_dc, local_rack); } QueryPlan* TokenAwarePolicy::new_query_plan(const String& keyspace, RequestHandler* request_handler, @@ -87,8 +87,8 @@ QueryPlan* TokenAwarePolicy::new_query_plan(const String& keyspace, RequestHandl } Host::Ptr TokenAwarePolicy::TokenAwareQueryPlan::compute_next() { - while (remaining_ > 0) { - --remaining_; + while (remaining_local_ > 0) { + --remaining_local_; const Host::Ptr& host((*replicas_)[index_++ % replicas_->size()]); if (child_policy_->is_host_up(host->address()) && child_policy_->distance(host) == CASS_HOST_DISTANCE_LOCAL) { @@ -96,10 +96,28 @@ Host::Ptr TokenAwarePolicy::TokenAwareQueryPlan::compute_next() { } } + while (remaining_remote_ > 0) { + --remaining_remote_; + const Host::Ptr& host((*replicas_)[index_++ % replicas_->size()]); + if (child_policy_->is_host_up(host->address()) && + child_policy_->distance(host) == CASS_HOST_DISTANCE_REMOTE) { + return host; + } + } + + while (remaining_remote2_ > 0) { + --remaining_remote2_; + const Host::Ptr& host((*replicas_)[index_++ % replicas_->size()]); + if (child_policy_->is_host_up(host->address()) && + child_policy_->distance(host) == CASS_HOST_DISTANCE_REMOTE2) { + return host; + } + } + Host::Ptr host; while ((host = child_plan_->compute_next())) { if (!contains(replicas_, host->address()) || - child_policy_->distance(host) != CASS_HOST_DISTANCE_LOCAL) { + child_policy_->distance(host) > CASS_HOST_DISTANCE_REMOTE2) { return host; } } diff --git a/src/token_aware_policy.hpp b/src/token_aware_policy.hpp index 5a8cee903..637f041cf 100644 --- a/src/token_aware_policy.hpp +++ b/src/token_aware_policy.hpp @@ -35,7 +35,7 @@ class TokenAwarePolicy : public ChainedLoadBalancingPolicy { virtual ~TokenAwarePolicy() {} virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc); + const String& local_dc, const String& local_rack); virtual QueryPlan* new_query_plan(const String& keyspace, RequestHandler* request_handler, const TokenMap* token_map); @@ -53,7 +53,9 @@ class TokenAwarePolicy : public ChainedLoadBalancingPolicy { , child_plan_(child_plan) , replicas_(replicas) , index_(start_index) - , remaining_(replicas->size()) {} + , remaining_local_(replicas->size()) + , remaining_remote_(replicas->size()) + , remaining_remote2_(replicas->size()) {} Host::Ptr compute_next(); @@ -62,7 +64,9 @@ class TokenAwarePolicy : public ChainedLoadBalancingPolicy { ScopedPtr child_plan_; CopyOnWriteHostVec replicas_; size_t index_; - size_t remaining_; + size_t remaining_local_; + size_t remaining_remote_; + size_t remaining_remote2_; }; Random* random_;