2222#include " utils.hpp"
2323
2424#include < algorithm>
25+ #include < numeric>
2526
2627using namespace datastax ;
2728using namespace datastax ::internal::core;
@@ -77,46 +78,70 @@ ConnectionPool::ConnectionPool(const Connection::Vec& connections, ConnectionPoo
7778 set_pointer_keys (reconnection_schedules_);
7879 set_pointer_keys (to_flush_);
7980
81+ if (host->sharding_info ()) {
82+ const auto hosts_shard_cnt = host->sharding_info ()->get_shards_count ();
83+ connections_by_shard_.resize (hosts_shard_cnt);
84+ num_connections_per_shard_ = settings_.num_connections_per_host / hosts_shard_cnt
85+ + (settings_.num_connections_per_host % hosts_shard_cnt ? 1u : 0u );
86+ } else {
87+ connections_by_shard_.resize (1 );
88+ num_connections_per_shard_ = settings_.num_connections_per_host ;
89+ }
90+
8091 for (Connection::Vec::const_iterator it = connections.begin (), end = connections.end (); it != end;
8192 ++it) {
8293 const Connection::Ptr& connection (*it);
8394 if (!connection->is_closing ()) {
84- add_connection (PooledConnection::Ptr (new PooledConnection (this , connection)));
95+ if (connections_by_shard_[connection->shard_id ()].size () < num_connections_per_shard_) {
96+ add_connection (PooledConnection::Ptr (new PooledConnection (this , connection)));
97+ } else {
98+ connection->close ();
99+ }
85100 }
86101 }
87102
88103 notify_up_or_down ();
89104
90105 // We had non-critical errors or some connections closed
91- assert (connections.size () <= settings_.num_connections_per_host );
92- size_t needed = settings_.num_connections_per_host - connections_.size ();
106+ size_t needed = num_connections_per_shard_ * connections_by_shard_.size ()
107+ - std::accumulate (connections_by_shard_.begin (), connections_by_shard_.end (), 0u ,
108+ [] (size_t acc, const PooledConnection::Vec& v) {
109+ return acc + v.size ();
110+ });
93111 for (size_t i = 0 ; i < needed; ++i) {
94112 schedule_reconnect ();
95113 }
96114}
97115
98116PooledConnection::Ptr ConnectionPool::find_least_busy (int64_t token) const {
99- if (token == CASS_INT64_MIN) {
100- PooledConnection::Vec::const_iterator it =
101- std::min_element (connections_.begin (), connections_.end (), least_busy_comp);
102- if (it == connections_.end () || (*it)->is_closing ()) {
103- return PooledConnection::Ptr ();
117+ if (token == CASS_INT64_MIN || !host_->sharding_info ()) {
118+ // We got a placeholder token, or a sensible token that is useless without the sharding info.
119+ // In both cases we return the least busy connection of the *entire pool* (or NULL).
120+ PooledConnection::Ptr least_busy; // NULL by default
121+ for (const auto & shard_pool : connections_by_shard_) {
122+ for (const auto & conn : shard_pool) {
123+ if (!conn->is_closing ()) {
124+ least_busy = least_busy ? std::min (least_busy, conn, least_busy_comp) : conn;
125+ }
126+ }
104127 }
105- return *it ;
128+ return least_busy ;
106129 }
107130
108- const auto desired_shard_num = host_-> sharding_info ()-> shard_id (token);
109- const auto conn_it = std::find_if (connections_. begin (), connections_. end (), [desired_shard_num] ( const PooledConnection::Ptr& c) {
110- return c-> shard_id () == desired_shard_num;
111- } );
112- if (conn_it != connections_ .end ()) {
113- return *conn_it ;
131+ // Otherwise, find the least busy connection pointing to the right shard (if possible)
132+ const auto & shard_pool = connections_by_shard_[host_-> sharding_info ()-> shard_id (token)];
133+ PooledConnection::Vec::const_iterator it =
134+ std::min_element (shard_pool. begin (), shard_pool. end (), least_busy_comp );
135+ if (it == shard_pool .end () || (*it)-> is_closing ()) {
136+ return find_least_busy (CASS_INT64_MIN) ;
114137 }
115-
116- return find_least_busy (CASS_INT64_MIN);
138+ return *it;
117139}
118140
119- bool ConnectionPool::has_connections () const { return !connections_.empty (); }
141+ bool ConnectionPool::has_connections () const {
142+ return std::any_of (connections_by_shard_.begin (), connections_by_shard_.end (),
143+ [] (const PooledConnection::Vec& v) { return !v.empty (); });
144+ }
120145
121146void ConnectionPool::flush () {
122147 for (DenseHashSet<PooledConnection*>::const_iterator it = to_flush_.begin (),
@@ -154,8 +179,9 @@ void ConnectionPool::close_connection(PooledConnection* connection, Protected) {
154179 if (metrics_) {
155180 metrics_->total_connections .dec ();
156181 }
157- connections_.erase (std::remove (connections_.begin (), connections_.end (), connection),
158- connections_.end ());
182+ auto & shard_pool = connections_by_shard_[connection->shard_id ()];
183+ shard_pool.erase (std::remove (shard_pool.begin (), shard_pool.end (), connection),
184+ shard_pool.end ());
159185 to_flush_.erase (connection);
160186
161187 if (close_state_ != CLOSE_STATE_OPEN) {
@@ -173,16 +199,16 @@ void ConnectionPool::add_connection(const PooledConnection::Ptr& connection) {
173199 if (metrics_) {
174200 metrics_->total_connections .inc ();
175201 }
176- connections_.push_back (connection);
202+ const size_t new_connections_shard = connection->shard_id ();
203+ LOG_INFO (" add_connection: to host %s to shard %ld" , host_->address_string ().c_str (), new_connections_shard);
204+ connections_by_shard_[new_connections_shard].push_back (connection);
177205}
178206
179207void ConnectionPool::notify_up_or_down () {
180- if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_UP) &&
181- connections_.empty ()) {
208+ if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_UP) && !has_connections ()) {
182209 notify_state_ = NOTIFY_STATE_DOWN;
183210 listener_->on_pool_down (host_->address ());
184- } else if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_DOWN) &&
185- !connections_.empty ()) {
211+ } else if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_DOWN) && has_connections ()) {
186212 notify_state_ = NOTIFY_STATE_UP;
187213 listener_->on_pool_up (host_->address ());
188214 }
@@ -223,11 +249,12 @@ void ConnectionPool::internal_close() {
223249 // Make copies of connection/connector data structures to prevent iterator
224250 // invalidation.
225251
226- PooledConnection::Vec connections (connections_);
227- for (PooledConnection::Vec::iterator it = connections.begin (), end = connections.end ();
228- it != end; ++it) {
229- (*it)->close ();
230- }
252+ auto connections_per_shards = connections_by_shard_;
253+ std::for_each (connections_per_shards.begin (), connections_per_shards.end (), [] (PooledConnection::Vec& v) {
254+ for (auto & c : v) {
255+ c->close ();
256+ }
257+ });
231258
232259 DelayedConnector::Vec pending_connections (pending_connections_);
233260 for (DelayedConnector::Vec::iterator it = pending_connections.begin (),
@@ -244,8 +271,7 @@ void ConnectionPool::internal_close() {
244271void ConnectionPool::maybe_closed () {
245272 // Remove the pool once all current connections and pending connections
246273 // are terminated.
247- if (close_state_ == CLOSE_STATE_WAITING_FOR_CONNECTIONS && connections_.empty () &&
248- pending_connections_.empty ()) {
274+ if (close_state_ == CLOSE_STATE_WAITING_FOR_CONNECTIONS && !has_connections () && pending_connections_.empty ()) {
249275 close_state_ = CLOSE_STATE_CLOSED;
250276 // Only mark DOWN if it's UP otherwise we might get multiple DOWN events
251277 // when connecting the pool.
@@ -275,9 +301,18 @@ void ConnectionPool::on_reconnect(DelayedConnector* connector) {
275301 }
276302
277303 if (connector->is_ok ()) {
278- add_connection (
279- PooledConnection::Ptr (new PooledConnection (this , connector->release_connection ())));
280- notify_up_or_down ();
304+ PooledConnection::Ptr pooled_conn {new PooledConnection (this , connector->release_connection ())};
305+ const size_t new_connections_shard = pooled_conn->shard_id ();
306+ if (connections_by_shard_.size () > new_connections_shard
307+ && connections_by_shard_[new_connections_shard].size () < num_connections_per_shard_) {
308+ add_connection (pooled_conn);
309+ notify_up_or_down ();
310+ } else {
311+ LOG_INFO (" Reconnection to host %s connected us to shard %ld, reconnecting again" ,
312+ address ().to_string ().c_str (), new_connections_shard);
313+ pooled_conn->close ();
314+ schedule_reconnect (schedule.release ());
315+ }
281316 } else if (!connector->is_canceled ()) {
282317 if (connector->is_critical_error ()) {
283318 LOG_ERROR (" Closing established connection pool to host %s because of the following error: %s" ,
0 commit comments