2222#include " utils.hpp"
2323
2424#include < algorithm>
25+ #include < numeric>
2526
2627using namespace datastax ;
2728using namespace datastax ::internal::core;
@@ -77,46 +78,65 @@ ConnectionPool::ConnectionPool(const Connection::Vec& connections, ConnectionPoo
7778 set_pointer_keys (reconnection_schedules_);
7879 set_pointer_keys (to_flush_);
7980
81+ if (host->sharding_info ()) {
82+ const auto hosts_shard_cnt = host->sharding_info ()->get_shards_count ();
83+ connections_by_shard_.resize (hosts_shard_cnt);
84+ num_connections_per_shard_ = settings_.num_connections_per_host / hosts_shard_cnt
85+ + (settings_.num_connections_per_host % hosts_shard_cnt ? 1u : 0u );
86+ } else {
87+ connections_by_shard_.resize (1 );
88+ num_connections_per_shard_ = settings_.num_connections_per_host ;
89+ }
90+
8091 for (Connection::Vec::const_iterator it = connections.begin (), end = connections.end (); it != end;
8192 ++it) {
8293 const Connection::Ptr& connection (*it);
83- if (!connection->is_closing ()) {
94+ if (!connection->is_closing ()
95+ && connections_by_shard_[connection->shard_id ()].size () < num_connections_per_shard_) {
8496 add_connection (PooledConnection::Ptr (new PooledConnection (this , connection)));
8597 }
8698 }
8799
88100 notify_up_or_down ();
89101
90102 // We had non-critical errors or some connections closed
91- assert (connections.size () <= settings_.num_connections_per_host );
92- size_t needed = settings_.num_connections_per_host - connections_.size ();
103+ size_t needed = num_connections_per_shard_ * connections_by_shard_.size ()
104+ - std::accumulate (connections_by_shard_.begin (), connections_by_shard_.end (), 0u ,
105+ [] (size_t acc, const PooledConnection::Vec& v) {
106+ return acc + v.size ();
107+ });
93108 for (size_t i = 0 ; i < needed; ++i) {
94109 schedule_reconnect ();
95110 }
96111}
97112
98113PooledConnection::Ptr ConnectionPool::find_least_busy (int64_t token) const {
99- if (token == CASS_INT64_MIN) {
100- PooledConnection::Vec::const_iterator it =
101- std::min_element (connections_.begin (), connections_.end (), least_busy_comp);
102- if (it == connections_.end () || (*it)->is_closing ()) {
103- return PooledConnection::Ptr ();
114+ if (token == CASS_INT64_MIN || !host_->sharding_info ()) {
115+ // We got a placeholder token, or a sensible token that is useless without the sharding info.
116+ // In both cases we return the least busy connection of the *entire pool* (or NULL).
117+ PooledConnection::Ptr least_busy; // NULL by default
118+ for (const auto & shard_pool : connections_by_shard_) {
119+ for (const auto & conn : shard_pool) {
120+ least_busy = least_busy ? std::min (least_busy, conn, least_busy_comp) : conn;
121+ }
104122 }
105- return *it ;
123+ return least_busy ;
106124 }
107125
108- const auto desired_shard_num = host_-> sharding_info ()-> shard_id (token);
109- const auto conn_it = std::find_if (connections_. begin (), connections_. end (), [desired_shard_num] ( const PooledConnection::Ptr& c) {
110- return c-> shard_id () == desired_shard_num;
111- } );
112- if (conn_it != connections_ .end ()) {
113- return *conn_it ;
126+ // Otherwise, find the least busy connection pointing to the right shard
127+ const auto & shard_pool = connections_by_shard_[host_-> sharding_info ()-> shard_id (token)];
128+ PooledConnection::Vec::const_iterator it =
129+ std::min_element (shard_pool. begin (), shard_pool. end (), least_busy_comp );
130+ if (it == shard_pool .end () || (*it)-> is_closing ()) {
131+ return PooledConnection::Ptr () ;
114132 }
115-
116- return find_least_busy (CASS_INT64_MIN);
133+ return *it;
117134}
118135
119- bool ConnectionPool::has_connections () const { return !connections_.empty (); }
136+ bool ConnectionPool::has_connections () const {
137+ return std::any_of (connections_by_shard_.begin (), connections_by_shard_.end (),
138+ [] (const PooledConnection::Vec& v) { return !v.empty (); });
139+ }
120140
121141void ConnectionPool::flush () {
122142 for (DenseHashSet<PooledConnection*>::const_iterator it = to_flush_.begin (),
@@ -154,8 +174,9 @@ void ConnectionPool::close_connection(PooledConnection* connection, Protected) {
154174 if (metrics_) {
155175 metrics_->total_connections .dec ();
156176 }
157- connections_.erase (std::remove (connections_.begin (), connections_.end (), connection),
158- connections_.end ());
177+ auto & shard_pool = connections_by_shard_[connection->shard_id ()];
178+ shard_pool.erase (std::remove (shard_pool.begin (), shard_pool.end (), connection),
179+ shard_pool.end ());
159180 to_flush_.erase (connection);
160181
161182 if (close_state_ != CLOSE_STATE_OPEN) {
@@ -173,16 +194,16 @@ void ConnectionPool::add_connection(const PooledConnection::Ptr& connection) {
173194 if (metrics_) {
174195 metrics_->total_connections .inc ();
175196 }
176- connections_.push_back (connection);
197+ const size_t new_connections_shard = connection->shard_id ();
198+ LOG_INFO (" add_connection: to host %s to shard %ld" , host_->address_string ().c_str (), new_connections_shard);
199+ connections_by_shard_[new_connections_shard].push_back (connection);
177200}
178201
179202void ConnectionPool::notify_up_or_down () {
180- if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_UP) &&
181- connections_.empty ()) {
203+ if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_UP) && !has_connections ()) {
182204 notify_state_ = NOTIFY_STATE_DOWN;
183205 listener_->on_pool_down (host_->address ());
184- } else if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_DOWN) &&
185- !connections_.empty ()) {
206+ } else if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_DOWN) && has_connections ()) {
186207 notify_state_ = NOTIFY_STATE_UP;
187208 listener_->on_pool_up (host_->address ());
188209 }
@@ -223,11 +244,12 @@ void ConnectionPool::internal_close() {
223244 // Make copies of connection/connector data structures to prevent iterator
224245 // invalidation.
225246
226- PooledConnection::Vec connections (connections_);
227- for (PooledConnection::Vec::iterator it = connections.begin (), end = connections.end ();
228- it != end; ++it) {
229- (*it)->close ();
230- }
247+ auto connections_per_shards = connections_by_shard_;
248+ std::for_each (connections_per_shards.begin (), connections_per_shards.end (), [] (PooledConnection::Vec& v) {
249+ for (auto & c : v) {
250+ c->close ();
251+ }
252+ });
231253
232254 DelayedConnector::Vec pending_connections (pending_connections_);
233255 for (DelayedConnector::Vec::iterator it = pending_connections.begin (),
@@ -244,8 +266,7 @@ void ConnectionPool::internal_close() {
244266void ConnectionPool::maybe_closed () {
245267 // Remove the pool once all current connections and pending connections
246268 // are terminated.
247- if (close_state_ == CLOSE_STATE_WAITING_FOR_CONNECTIONS && connections_.empty () &&
248- pending_connections_.empty ()) {
269+ if (close_state_ == CLOSE_STATE_WAITING_FOR_CONNECTIONS && !has_connections () && pending_connections_.empty ()) {
249270 close_state_ = CLOSE_STATE_CLOSED;
250271 // Only mark DOWN if it's UP otherwise we might get multiple DOWN events
251272 // when connecting the pool.
@@ -275,9 +296,17 @@ void ConnectionPool::on_reconnect(DelayedConnector* connector) {
275296 }
276297
277298 if (connector->is_ok ()) {
278- add_connection (
279- PooledConnection::Ptr (new PooledConnection (this , connector->release_connection ())));
280- notify_up_or_down ();
299+ PooledConnection::Ptr pooled_conn {new PooledConnection (this , connector->release_connection ())};
300+ const size_t new_connections_shard = pooled_conn->shard_id ();
301+ if (connections_by_shard_.size () > new_connections_shard
302+ && connections_by_shard_[new_connections_shard].size () < num_connections_per_shard_) {
303+ add_connection (pooled_conn);
304+ notify_up_or_down ();
305+ } else {
306+ LOG_INFO (" Reconnection to host %s connected us to shard %ld, reconnecting again" ,
307+ address ().to_string ().c_str (), new_connections_shard);
308+ schedule_reconnect (schedule.release ());
309+ }
281310 } else if (!connector->is_canceled ()) {
282311 if (connector->is_critical_error ()) {
283312 LOG_ERROR (" Closing established connection pool to host %s because of the following error: %s" ,
0 commit comments