-
Notifications
You must be signed in to change notification settings - Fork 48
Description
Datastax JIRA issue: https://datastax-oss.atlassian.net/browse/PYTHON-1312
The Cluster
class stores an executor object
, which can be a ThreadPoolExecutor
or a different type of executor with the same interface:
self.executor = self._create_thread_pool_executor(max_workers=executor_threads)
the default value for executor_threads
is 2
.
One possible task running on the executor is waiting for schema agreement:
def _handle_schema_change(self, event):
if self._schema_event_refresh_window < 0:
return
delay = self._delay_for_event_type('schema_change', self._schema_event_refresh_window)
self._cluster.scheduler.schedule_unique(delay, self.refresh_schema, **event)
scheduler.schedule_unique
sends a task to the executor; refresh_schema
calls wait_for_schema_agreement
.
Another possible task is handling an on_down
notification, pushed by the cluster to the driver when the cluster notices that a node is DOWN:
@run_in_executor
def on_down(self, host, is_host_addition, expect_host_to_be_down=False):
The run_in_executor
decorator causes this function to be run as a task scheduled inside the executor.
In wait_for_schema_agreement
, we wait until every peer of the node we started the wait on either:
- has the same schema version as that node,
- or is dead.
schema_mismatches = self._get_schema_mismatches(peers_result, local_result, connection.endpoint)
if schema_mismatches is None:
return True
def _get_schema_mismatches(self, peers_result, local_result, local_address):
peers_result = dict_factory(peers_result.column_names, peers_result.parsed_rows)
versions = defaultdict(set)
if local_result.parsed_rows:
local_row = dict_factory(local_result.column_names, local_result.parsed_rows)[0]
if local_row.get("schema_version"):
versions[local_row.get("schema_version")].add(local_address)
for row in peers_result:
schema_ver = row.get('schema_version')
if not schema_ver:
continue
endpoint = self._cluster.endpoint_factory.create(row)
peer = self._cluster.metadata.get_host(endpoint)
if peer and peer.is_up is not False:
versions[schema_ver].add(endpoint)
if len(versions) == 1:
log.debug("[control connection] Schemas match")
return None
return dict((version, list(nodes)) for version, nodes in six.iteritems(versions))
Now the following scenario can happen:
- the driver has control_connection established to node A
- we kill a node B forcefully
- then we immediately schedule a schema change on A
- A sends a notification to the driver
- the driver schedules wait_for_schema_agreement tasks in the executor; for some reason, it schedules more than one (didn't investigate why, but it's not important - liveness shouldn't depend on the number of available threads in the executor)
- wait_for_schema_agreement gets stuck because A has a different schema version than B and B is considered up by the driver
- eventually, A notices that B is down and delivers a notification to the driver
- the driver submits the on_down task, but there are no available threads in the pool, so we don't set is_up = False for B
- wait_for_schema_agreeement never finishes. on_down is never executed. We've deadlocked.