@@ -379,6 +379,30 @@ class AbstractRedisCluster:
379379
380380 ERRORS_ALLOW_RETRY = (ConnectionError , TimeoutError , ClusterDownError )
381381
382+ def replace_default_node (self , target_node : "ClusterNode" = None ) -> None :
383+ """Replace the default cluster node.
384+ A random cluster node will be chosen if target_node isn't passed, and primaries
385+ will be prioritized. The default node will not be changed if there are no other
386+ nodes in the cluster.
387+
388+ Args:
389+ target_node (ClusterNode, optional): Target node to replace the default
390+ node. Defaults to None.
391+ """
392+ if target_node :
393+ self .nodes_manager .default_node = target_node
394+ else :
395+ curr_node = self .get_default_node ()
396+ primaries = [node for node in self .get_primaries () if node != curr_node ]
397+ if primaries :
398+ # Choose a primary if the cluster contains different primaries
399+ self .nodes_manager .default_node = random .choice (primaries )
400+ else :
401+ # Otherwise, hoose a primary if the cluster contains different primaries
402+ replicas = [node for node in self .get_replicas () if node != curr_node ]
403+ if replicas :
404+ self .nodes_manager .default_node = random .choice (replicas )
405+
382406
383407class RedisCluster (AbstractRedisCluster , RedisClusterCommands ):
384408 @classmethod
@@ -811,7 +835,9 @@ def set_response_callback(self, command, callback):
811835 """Set a custom Response Callback"""
812836 self .cluster_response_callbacks [command ] = callback
813837
814- def _determine_nodes (self , * args , ** kwargs ):
838+ def _determine_nodes (self , * args , ** kwargs ) -> List ["ClusterNode" ]:
839+ # Determine which nodes should be executed the command on.
840+ # Returns a list of target nodes.
815841 command = args [0 ].upper ()
816842 if len (args ) >= 2 and f"{ args [0 ]} { args [1 ]} " .upper () in self .command_flags :
817843 command = f"{ args [0 ]} { args [1 ]} " .upper ()
@@ -990,6 +1016,7 @@ def execute_command(self, *args, **kwargs):
9901016 dict<Any, ClusterNode>
9911017 """
9921018 target_nodes_specified = False
1019+ is_default_node = False
9931020 target_nodes = None
9941021 passed_targets = kwargs .pop ("target_nodes" , None )
9951022 if passed_targets is not None and not self ._is_nodes_flag (passed_targets ):
@@ -1020,12 +1047,20 @@ def execute_command(self, *args, **kwargs):
10201047 raise RedisClusterException (
10211048 f"No targets were found to execute { args } command on"
10221049 )
1050+ if (
1051+ len (target_nodes ) == 1
1052+ and target_nodes [0 ] == self .get_default_node ()
1053+ ):
1054+ is_default_node = True
10231055 for node in target_nodes :
10241056 res [node .name ] = self ._execute_command (node , * args , ** kwargs )
10251057 # Return the processed result
10261058 return self ._process_result (args [0 ], res , ** kwargs )
10271059 except Exception as e :
10281060 if retry_attempts > 0 and type (e ) in self .__class__ .ERRORS_ALLOW_RETRY :
1061+ if is_default_node :
1062+ # Replace the default cluster node
1063+ self .replace_default_node ()
10291064 # The nodes and slots cache were reinitialized.
10301065 # Try again with the new cluster setup.
10311066 retry_attempts -= 1
@@ -1883,7 +1918,7 @@ def _send_cluster_commands(
18831918 # if we have to run through it again, we only retry
18841919 # the commands that failed.
18851920 attempt = sorted (stack , key = lambda x : x .position )
1886-
1921+ is_default_node = False
18871922 # build a list of node objects based on node names we need to
18881923 nodes = {}
18891924
@@ -1913,6 +1948,8 @@ def _send_cluster_commands(
19131948 )
19141949
19151950 node = target_nodes [0 ]
1951+ if node == self .get_default_node ():
1952+ is_default_node = True
19161953
19171954 # now that we know the name of the node
19181955 # ( it's just a string in the form of host:port )
@@ -1926,6 +1963,8 @@ def _send_cluster_commands(
19261963 # Connection retries are being handled in the node's
19271964 # Retry object. Reinitialize the node -> slot table.
19281965 self .nodes_manager .initialize ()
1966+ if is_default_node :
1967+ self .replace_default_node ()
19291968 raise
19301969 nodes [node_name ] = NodeCommands (
19311970 redis_node .parse_response ,
@@ -2007,6 +2046,8 @@ def _send_cluster_commands(
20072046 self .reinitialize_counter += 1
20082047 if self ._should_reinitialized ():
20092048 self .nodes_manager .initialize ()
2049+ if is_default_node :
2050+ self .replace_default_node ()
20102051 for c in attempt :
20112052 try :
20122053 # send each command individually like we
0 commit comments