|
1 | 1 | import binascii |
2 | 2 | import datetime |
| 3 | +import random |
3 | 4 | import uuid |
4 | 5 | import warnings |
5 | 6 | from queue import LifoQueue, Queue |
6 | 7 | from time import sleep |
| 8 | +from unittest import mock |
7 | 9 | from unittest.mock import DEFAULT, Mock, call, patch |
8 | 10 |
|
9 | 11 | import pytest |
|
21 | 23 | REDIS_CLUSTER_HASH_SLOTS, |
22 | 24 | REPLICA, |
23 | 25 | ClusterNode, |
| 26 | + LoadBalancer, |
24 | 27 | NodesManager, |
25 | 28 | RedisCluster, |
26 | 29 | get_node_name, |
@@ -812,7 +815,7 @@ def raise_error(target_node, *args, **kwargs): |
812 | 815 | rc = get_mocked_redis_client( |
813 | 816 | host=default_host, |
814 | 817 | port=default_port, |
815 | | - retry=Retry(ConstantBackoff(1), 3), |
| 818 | + retry=Retry(ConstantBackoff(1), 10), |
816 | 819 | ) |
817 | 820 |
|
818 | 821 | with pytest.raises(error): |
@@ -2577,6 +2580,37 @@ def test_allow_custom_queue_class(self, queue_class): |
2577 | 2580 | for node in rc.nodes_manager.nodes_cache.values(): |
2578 | 2581 | assert node.redis_connection.connection_pool.queue_class == queue_class |
2579 | 2582 |
|
| 2583 | + @pytest.mark.parametrize("invalid_index", [-10, 10]) |
| 2584 | + def test_return_primary_if_invalid_node_index_is_returned(self, invalid_index): |
| 2585 | + rc = get_mocked_redis_client( |
| 2586 | + url="redis://[email protected]:7000", |
| 2587 | + cluster_slots=default_cluster_slots, |
| 2588 | + ) |
| 2589 | + random_slot = random.randint( |
| 2590 | + default_cluster_slots[0][0], default_cluster_slots[0][1] |
| 2591 | + ) |
| 2592 | + |
| 2593 | + ports = set() |
| 2594 | + for _ in range(0, 10): |
| 2595 | + ports.add( |
| 2596 | + rc.nodes_manager.get_node_from_slot( |
| 2597 | + random_slot, read_from_replicas=True |
| 2598 | + ).port |
| 2599 | + ) |
| 2600 | + assert ports == {default_port, 7003} |
| 2601 | + |
| 2602 | + ports = set() |
| 2603 | + with mock.patch.object( |
| 2604 | + LoadBalancer, "get_server_index", return_value=invalid_index |
| 2605 | + ): |
| 2606 | + for _ in range(0, 10): |
| 2607 | + ports.add( |
| 2608 | + rc.nodes_manager.get_node_from_slot( |
| 2609 | + random_slot, read_from_replicas=True |
| 2610 | + ).port |
| 2611 | + ) |
| 2612 | + assert ports == {default_port} |
| 2613 | + |
2580 | 2614 |
|
2581 | 2615 | @pytest.mark.onlycluster |
2582 | 2616 | class TestClusterPubSubObject: |
@@ -3007,6 +3041,33 @@ def test_empty_stack(self, r): |
3007 | 3041 | result = p.execute() |
3008 | 3042 | assert result == [] |
3009 | 3043 |
|
| 3044 | + @pytest.mark.parametrize("error", [ConnectionError, TimeoutError]) |
| 3045 | + def test_additional_backoff_cluster_pipeline(self, r, error): |
| 3046 | + with patch.object(ConstantBackoff, "compute") as compute: |
| 3047 | + |
| 3048 | + def _compute(target_node, *args, **kwargs): |
| 3049 | + return 1 |
| 3050 | + |
| 3051 | + compute.side_effect = _compute |
| 3052 | + with patch("redis.cluster.get_connection") as get_connection: |
| 3053 | + |
| 3054 | + def raise_error(target_node, *args, **kwargs): |
| 3055 | + get_connection.failed_calls += 1 |
| 3056 | + raise error("mocked error") |
| 3057 | + |
| 3058 | + get_connection.side_effect = raise_error |
| 3059 | + |
| 3060 | + r.set_retry(Retry(ConstantBackoff(1), 10)) |
| 3061 | + pipeline = r.pipeline() |
| 3062 | + |
| 3063 | + with pytest.raises(error): |
| 3064 | + pipeline.get("bar") |
| 3065 | + pipeline.get("bar") |
| 3066 | + pipeline.execute() |
| 3067 | + # cluster pipeline does one more back off than a single Redis command |
| 3068 | + # this is not required, but it's just how it's implemented as of now |
| 3069 | + assert compute.call_count == r.cluster_error_retry_attempts + 1 |
| 3070 | + |
3010 | 3071 |
|
3011 | 3072 | @pytest.mark.onlycluster |
3012 | 3073 | class TestReadOnlyPipeline: |
|
0 commit comments