Skip to content

Commit 2509b1c

Browse files
committed
Fix dict handling in pool and metrics
There are certain rules you need to following in the cases when dicts are modified and read in parallel. Otherwise you end up with `RuntimeError: dictionary changed size during iteration`. Rules are the following: 1. Any iteration over items or keys, needs to be done over a snapshot, i.e. `list()` or `set()` 2. Avoid unnecessary iterations like `len(d.keys())`, you can replace them with `len(d)` This commit fixes code to match these rules in the `pool.py` and `metrics.py`
1 parent 6eaff9c commit 2509b1c

File tree

2 files changed

+9
-9
lines changed

2 files changed

+9
-9
lines changed

cassandra/metrics.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,9 @@ def __init__(self, cluster_proxy):
134134
scales.Stat('known_hosts',
135135
lambda: len(cluster_proxy.metadata.all_hosts())),
136136
scales.Stat('connected_to',
137-
lambda: len(set(chain.from_iterable(s._pools.keys() for s in cluster_proxy.sessions)))),
137+
lambda: len(set(chain.from_iterable(list(s._pools.keys()) for s in cluster_proxy.sessions)))),
138138
scales.Stat('open_connections',
139-
lambda: sum(sum(p.open_count for p in s._pools.values()) for s in cluster_proxy.sessions)))
139+
lambda: sum(sum(p.open_count for p in list(s._pools.values())) for s in cluster_proxy.sessions)))
140140

141141
# TODO, to be removed in 4.0
142142
# /cassandra contains the metrics of the first cluster registered

cassandra/pool.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,7 @@ def _get_connection_for_routing_key(self, routing_key=None, keyspace=None, table
492492
"Connection to shard_id=%i reached orphaned stream limit, replacing on host %s (%s/%i)",
493493
shard_id,
494494
self.host,
495-
len(self._connections.keys()),
495+
len(self._connections),
496496
self.host.sharding_info.shards_count
497497
)
498498
elif shard_id not in self._connecting:
@@ -503,7 +503,7 @@ def _get_connection_for_routing_key(self, routing_key=None, keyspace=None, table
503503
"Trying to connect to missing shard_id=%i on host %s (%s/%i)",
504504
shard_id,
505505
self.host,
506-
len(self._connections.keys()),
506+
len(self._connections),
507507
self.host.sharding_info.shards_count
508508
)
509509

@@ -607,7 +607,7 @@ def _replace(self, connection):
607607

608608
log.debug("Replacing connection (%s) to %s", id(connection), self.host)
609609
try:
610-
if connection.features.shard_id in self._connections.keys():
610+
if connection.features.shard_id in self._connections:
611611
del self._connections[connection.features.shard_id]
612612
if self.host.sharding_info and not self._session.cluster.shard_aware_options.disable:
613613
self._connecting.add(connection.features.shard_id)
@@ -759,7 +759,7 @@ def _open_connection_to_missing_shard(self, shard_id):
759759
with self._lock:
760760
is_shutdown = self.is_shutdown
761761
if not is_shutdown:
762-
if conn.features.shard_id in self._connections.keys():
762+
if conn.features.shard_id in self._connections:
763763
# Move the current connection to the trash and use the new one from now on
764764
old_conn = self._connections[conn.features.shard_id]
765765
log.debug(
@@ -804,7 +804,7 @@ def _open_connection_to_missing_shard(self, shard_id):
804804
num_missing_or_needing_replacement = self.num_missing_or_needing_replacement
805805
log.debug(
806806
"Connected to %s/%i shards on host %s (%i missing or needs replacement)",
807-
len(self._connections.keys()),
807+
len(self._connections),
808808
self.host.sharding_info.shards_count,
809809
self.host,
810810
num_missing_or_needing_replacement
@@ -816,7 +816,7 @@ def _open_connection_to_missing_shard(self, shard_id):
816816
len(self._excess_connections)
817817
)
818818
self._close_excess_connections()
819-
elif self.host.sharding_info.shards_count == len(self._connections.keys()) and self.num_missing_or_needing_replacement == 0:
819+
elif self.host.sharding_info.shards_count == len(self._connections) and self.num_missing_or_needing_replacement == 0:
820820
log.debug(
821821
"All shards are already covered, closing newly opened excess connection %s for host %s",
822822
id(self),
@@ -917,7 +917,7 @@ def get_state(self):
917917
@property
918918
def num_missing_or_needing_replacement(self):
919919
return self.host.sharding_info.shards_count \
920-
- sum(1 for c in self._connections.values() if not c.orphaned_threshold_reached)
920+
- sum(1 for c in list(self._connections.values()) if not c.orphaned_threshold_reached)
921921

922922
@property
923923
def open_count(self):

0 commit comments

Comments
 (0)