Skip to content

Deadlock when performing a schema change right after killing a node #168

@kbr-

Description

@kbr-

Datastax JIRA issue: https://datastax-oss.atlassian.net/browse/PYTHON-1312

The Cluster class stores an executor object, which can be a ThreadPoolExecutor or a different type of executor with the same interface:

self.executor = self._create_thread_pool_executor(max_workers=executor_threads)

the default value for executor_threads is 2.

One possible task running on the executor is waiting for schema agreement:

def _handle_schema_change(self, event):
        if self._schema_event_refresh_window < 0:
            return
        delay = self._delay_for_event_type('schema_change', self._schema_event_refresh_window)
        self._cluster.scheduler.schedule_unique(delay, self.refresh_schema, **event)

scheduler.schedule_unique sends a task to the executor; refresh_schema calls wait_for_schema_agreement.

Another possible task is handling an on_down notification, pushed by the cluster to the driver when the cluster notices that a node is DOWN:

    @run_in_executor
    def on_down(self, host, is_host_addition, expect_host_to_be_down=False):

The run_in_executor decorator causes this function to be run as a task scheduled inside the executor.

In wait_for_schema_agreement, we wait until every peer of the node we started the wait on either:

  • has the same schema version as that node,
  • or is dead.
    schema_mismatches = self._get_schema_mismatches(peers_result, local_result, connection.endpoint)
                if schema_mismatches is None:
                    return True
def _get_schema_mismatches(self, peers_result, local_result, local_address):
        peers_result = dict_factory(peers_result.column_names, peers_result.parsed_rows)

        versions = defaultdict(set)
        if local_result.parsed_rows:
            local_row = dict_factory(local_result.column_names, local_result.parsed_rows)[0]
            if local_row.get("schema_version"):
                versions[local_row.get("schema_version")].add(local_address)

        for row in peers_result:
            schema_ver = row.get('schema_version')
            if not schema_ver:
                continue
            endpoint = self._cluster.endpoint_factory.create(row)
            peer = self._cluster.metadata.get_host(endpoint)
            if peer and peer.is_up is not False:
                versions[schema_ver].add(endpoint)

        if len(versions) == 1:
            log.debug("[control connection] Schemas match")
            return None

        return dict((version, list(nodes)) for version, nodes in six.iteritems(versions))

Now the following scenario can happen:

  • the driver has control_connection established to node A
  • we kill a node B forcefully
  • then we immediately schedule a schema change on A
  • A sends a notification to the driver
  • the driver schedules wait_for_schema_agreement tasks in the executor; for some reason, it schedules more than one (didn't investigate why, but it's not important - liveness shouldn't depend on the number of available threads in the executor)
  • wait_for_schema_agreement gets stuck because A has a different schema version than B and B is considered up by the driver
  • eventually, A notices that B is down and delivers a notification to the driver
  • the driver submits the on_down task, but there are no available threads in the pool, so we don't set is_up = False for B
  • wait_for_schema_agreeement never finishes. on_down is never executed. We've deadlocked.

Metadata

Metadata

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions