@@ -4220,15 +4220,14 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
4220
4220
if self._is_shutdown:
4221
4221
return
4222
4222
4223
- if not connection:
4224
- connection = self._connection
4223
+ current_connection = connection or self._connection
4225
4224
4226
4225
if preloaded_results:
4227
4226
log.debug("[control connection] Attempting to use preloaded results for schema agreement")
4228
4227
4229
4228
peers_result = preloaded_results[0]
4230
4229
local_result = preloaded_results[1]
4231
- schema_mismatches = self._get_schema_mismatches(peers_result, local_result, connection .endpoint)
4230
+ schema_mismatches = self._get_schema_mismatches(peers_result, local_result, current_connection .endpoint)
4232
4231
if schema_mismatches is None:
4233
4232
return True
4234
4233
@@ -4237,16 +4236,19 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
4237
4236
elapsed = 0
4238
4237
cl = ConsistencyLevel.ONE
4239
4238
schema_mismatches = None
4240
- select_peers_query = self._get_peers_query(self.PeersQueryType.PEERS_SCHEMA, connection)
4239
+ select_peers_query = self._get_peers_query(self.PeersQueryType.PEERS_SCHEMA, current_connection)
4240
+ error_signaled = False
4241
4241
4242
4242
while elapsed < total_timeout:
4243
+ current_connection = connection or self._connection
4244
+
4243
4245
peers_query = QueryMessage(query=maybe_add_timeout_to_query(select_peers_query, self._metadata_request_timeout),
4244
4246
consistency_level=cl)
4245
4247
local_query = QueryMessage(query=maybe_add_timeout_to_query(self._SELECT_SCHEMA_LOCAL, self._metadata_request_timeout),
4246
4248
consistency_level=cl)
4247
4249
try:
4248
4250
timeout = min(self._timeout, total_timeout - elapsed)
4249
- peers_result, local_result = connection .wait_for_responses(
4251
+ peers_result, local_result = current_connection .wait_for_responses(
4250
4252
peers_query, local_query, timeout=timeout)
4251
4253
except OperationTimedOut as timeout:
4252
4254
log.debug("[control connection] Timed out waiting for "
@@ -4257,10 +4259,12 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
4257
4259
if self._is_shutdown:
4258
4260
log.debug("[control connection] Aborting wait for schema match due to shutdown")
4259
4261
return None
4260
- else:
4261
- raise
4262
+ elif not error_signaled:
4263
+ self._signal_error()
4264
+ error_signaled = True
4265
+ continue
4262
4266
4263
- schema_mismatches = self._get_schema_mismatches(peers_result, local_result, connection .endpoint)
4267
+ schema_mismatches = self._get_schema_mismatches(peers_result, local_result, current_connection .endpoint)
4264
4268
if schema_mismatches is None:
4265
4269
return True
4266
4270
@@ -4269,7 +4273,7 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
4269
4273
elapsed = self._time.time() - start
4270
4274
4271
4275
log.warning("Node %s is reporting a schema disagreement: %s",
4272
- connection .endpoint, schema_mismatches)
4276
+ current_connection .endpoint, schema_mismatches)
4273
4277
return False
4274
4278
4275
4279
def _get_schema_mismatches(self, peers_result, local_result, local_address):
0 commit comments