Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1083,10 +1083,19 @@
used for columns in this cluster.
"""

metadata_request_timeout = datetime.timedelta(seconds=2)
metadata_request_timeout: Optional[float] = None
"""
Timeout for all queries used by driver it self.
Supported only by Scylla clusters.
Specifies a server-side timeout (in seconds) for all internal driver queries,
such as schema metadata lookups and cluster topology requests.

The timeout is enforced by appending `USING TIMEOUT <timeout>` to queries
executed by the driver.

- A value of `0` disables explicit timeout enforcement. In this case,
the driver does not add `USING TIMEOUT`, and the timeout is determined
by the server's defaults.
- Only supported when connected to Scylla clusters.
- If not explicitly set, defaults to the value of `control_connection_timeout`.
"""

@property
Expand Down Expand Up @@ -1205,7 +1214,7 @@
cloud=None,
scylla_cloud=None,
shard_aware_options=None,
metadata_request_timeout=None,
metadata_request_timeout: Optional[float] = None,
column_encryption_policy=None,
application_info:Optional[ApplicationInfoBase]=None
):
Expand Down Expand Up @@ -1303,8 +1312,6 @@
self.no_compact = no_compact

self.auth_provider = auth_provider
if metadata_request_timeout is not None:
self.metadata_request_timeout = metadata_request_timeout

if load_balancing_policy is not None:
if isinstance(load_balancing_policy, type):
Expand Down Expand Up @@ -1415,6 +1422,7 @@
self.cql_version = cql_version
self.max_schema_agreement_wait = max_schema_agreement_wait
self.control_connection_timeout = control_connection_timeout
self.metadata_request_timeout = self.control_connection_timeout if metadata_request_timeout is None else metadata_request_timeout
self.idle_heartbeat_interval = idle_heartbeat_interval
self.idle_heartbeat_timeout = idle_heartbeat_timeout
self.schema_event_refresh_window = schema_event_refresh_window
Expand Down Expand Up @@ -3614,9 +3622,11 @@
if connection.features.sharding_info is not None:
self._uses_peers_v2 = False

# Cassandra does not support "USING TIMEOUT"
self._metadata_request_timeout = None if connection.features.sharding_info is None \
else datetime.timedelta(seconds=self._cluster.control_connection_timeout)
# Only ScyllaDB supports "USING TIMEOUT"
# Sharding information signals it is ScyllaDB
self._metadata_request_timeout = None if connection.features.sharding_info is None or not self._cluster.metadata_request_timeout \
else datetime.timedelta(seconds=self._cluster.metadata_request_timeout)

self._tablets_routing_v1 = connection.features.tablets_routing_v1

# use weak references in both directions
Expand Down Expand Up @@ -4313,7 +4323,7 @@
self._scheduled_tasks.discard(task)
fn, args, kwargs = task
kwargs = dict(kwargs)
future = self._executor.submit(fn, *args, **kwargs)

Check failure on line 4326 in cassandra/cluster.py

View workflow job for this annotation

GitHub Actions / test libev (3.9)

cannot schedule new futures after shutdown

Check failure on line 4326 in cassandra/cluster.py

View workflow job for this annotation

GitHub Actions / test asyncore (3.9)

cannot schedule new futures after shutdown
future.add_done_callback(self._log_if_failed)
else:
self._queue.put_nowait((run_at, i, task))
Expand Down
40 changes: 32 additions & 8 deletions tests/integration/standard/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import sys
import time
import os
from typing import Optional

from packaging.version import Version
from unittest.mock import Mock, patch
import pytest
Expand Down Expand Up @@ -1323,20 +1325,39 @@ def test_token(self):
cluster.shutdown()


class MetadataTimeoutTest(unittest.TestCase):
class TestMetadataTimeout:
"""
Test of TokenMap creation and other behavior.
"""
def test_timeout(self):
cluster = TestCluster()
cluster.metadata_request_timeout = None

@pytest.mark.parametrize(
"opts, expected_query_chunk",
[
(
{"metadata_request_timeout": None},
# Should be borrowed from control_connection_timeout
"USING TIMEOUT 2000ms"
),
(
{"metadata_request_timeout": 0.0},
False
),
(
{"metadata_request_timeout": 4.0},
"USING TIMEOUT 4000ms"
),
(
{"metadata_request_timeout": None, "control_connection_timeout": None},
False,
)
],
ids=["default", "zero", "4s", "both none"]
)
def test_timeout(self, opts, expected_query_chunk):
cluster = TestCluster(**opts)
stmts = []

class ConnectionWrapper(cluster.connection_class):
def __init__(self, *args, **kwargs):
super(ConnectionWrapper, self).__init__(*args, **kwargs)

def send_msg(self, msg, request_id, cb, encoder=ProtocolHandler.encode_message,
decoder=ProtocolHandler.decode_message, result_metadata=None):
if isinstance(msg, QueryMessage):
Expand All @@ -1351,7 +1372,10 @@ def send_msg(self, msg, request_id, cb, encoder=ProtocolHandler.encode_message,
for stmt in stmts:
if "SELECT now() FROM system.local WHERE key='local'" in stmt:
continue
assert "USING TIMEOUT 2000ms" in stmt, f"query `{stmt}` does not contain `USING TIMEOUT 2000ms`"
if expected_query_chunk:
assert expected_query_chunk in stmt, f"query `{stmt}` does not contain `{expected_query_chunk}`"
else:
assert 'USING TIMEOUT' not in stmt, f"query `{stmt}` should not contain `USING TIMEOUT`"


class KeyspaceAlterMetadata(unittest.TestCase):
Expand Down
Loading