@@ -75,7 +75,7 @@ class KafkaClient(object):
7575 reconnection attempts will continue periodically with this fixed
7676 rate. To avoid connection storms, a randomization factor of 0.2
7777 will be applied to the backoff resulting in a random range between
78- 20% below and 20% above the computed value. Default: 1000 .
78+ 20% below and 20% above the computed value. Default: 30000 .
7979 request_timeout_ms (int): Client request timeout in milliseconds.
8080 Default: 30000.
8181 connections_max_idle_ms: Close idle connections after the number of
@@ -164,7 +164,7 @@ class KafkaClient(object):
164164 'wakeup_timeout_ms' : 3000 ,
165165 'connections_max_idle_ms' : 9 * 60 * 1000 ,
166166 'reconnect_backoff_ms' : 50 ,
167- 'reconnect_backoff_max_ms' : 1000 ,
167+ 'reconnect_backoff_max_ms' : 30000 ,
168168 'max_in_flight_requests_per_connection' : 5 ,
169169 'receive_buffer_bytes' : None ,
170170 'send_buffer_bytes' : None ,
@@ -464,9 +464,8 @@ def is_disconnected(self, node_id):
464464 def connection_delay (self , node_id ):
465465 """
466466 Return the number of milliseconds to wait, based on the connection
467- state, before attempting to send data. When disconnected, this respects
468- the reconnect backoff time. When connecting, returns 0 to allow
469- non-blocking connect to finish. When connected, returns a very large
467+ state, before attempting to send data. When connecting or disconnected,
468+ this respects the reconnect backoff time. When connected, returns a very large
470469 number to handle slow/stalled connections.
471470
472471 Arguments:
@@ -537,7 +536,8 @@ def send(self, node_id, request, wakeup=True):
537536 # we will need to call send_pending_requests()
538537 # to trigger network I/O
539538 future = conn .send (request , blocking = False )
540- self ._sending .add (conn )
539+ if not future .is_done :
540+ self ._sending .add (conn )
541541
542542 # Wakeup signal is useful in case another thread is
543543 # blocked waiting for incoming network traffic while holding
@@ -563,9 +563,7 @@ def poll(self, timeout_ms=None, future=None):
563563 Returns:
564564 list: responses received (can be empty)
565565 """
566- if future is not None :
567- timeout_ms = 100
568- elif timeout_ms is None :
566+ if timeout_ms is None :
569567 timeout_ms = self .config ['request_timeout_ms' ]
570568 elif not isinstance (timeout_ms , (int , float )):
571569 raise TypeError ('Invalid type for timeout: %s' % type (timeout_ms ))
@@ -577,26 +575,25 @@ def poll(self, timeout_ms=None, future=None):
577575 if self ._closed :
578576 break
579577
578+ # Send a metadata request if needed (or initiate new connection)
579+ metadata_timeout_ms = self ._maybe_refresh_metadata ()
580+
580581 # Attempt to complete pending connections
581582 for node_id in list (self ._connecting ):
582583 self ._maybe_connect (node_id )
583584
584- # Send a metadata request if needed
585- metadata_timeout_ms = self ._maybe_refresh_metadata ()
586-
587585 # If we got a future that is already done, don't block in _poll
588586 if future is not None and future .is_done :
589587 timeout = 0
590588 else :
591589 idle_connection_timeout_ms = self ._idle_expiry_manager .next_check_ms ()
590+ request_timeout_ms = self ._next_ifr_request_timeout_ms ()
591+ log .debug ("Timeouts: user %f, metadata %f, idle connection %f, request %f" , timeout_ms , metadata_timeout_ms , idle_connection_timeout_ms , request_timeout_ms )
592592 timeout = min (
593593 timeout_ms ,
594594 metadata_timeout_ms ,
595595 idle_connection_timeout_ms ,
596- self .config ['request_timeout_ms' ])
597- # if there are no requests in flight, do not block longer than the retry backoff
598- if self .in_flight_request_count () == 0 :
599- timeout = min (timeout , self .config ['retry_backoff_ms' ])
596+ request_timeout_ms )
600597 timeout = max (0 , timeout ) # avoid negative timeouts
601598
602599 self ._poll (timeout / 1000 )
@@ -615,6 +612,8 @@ def poll(self, timeout_ms=None, future=None):
615612 def _register_send_sockets (self ):
616613 while self ._sending :
617614 conn = self ._sending .pop ()
615+ if conn ._sock is None :
616+ continue
618617 try :
619618 key = self ._selector .get_key (conn ._sock )
620619 events = key .events | selectors .EVENT_WRITE
@@ -772,6 +771,17 @@ def least_loaded_node(self):
772771
773772 return found
774773
774+ def least_loaded_node_refresh_ms (self ):
775+ """Return connection delay in milliseconds for next available node.
776+
777+ This method is used primarily for retry/backoff during metadata refresh
778+ during / after a cluster outage, in which there are no available nodes.
779+
780+ Returns:
781+ float: delay_ms
782+ """
783+ return min ([self .connection_delay (broker .nodeId ) for broker in self .cluster .brokers ()])
784+
775785 def set_topics (self , topics ):
776786 """Set specific topics to track for metadata.
777787
@@ -803,12 +813,18 @@ def add_topic(self, topic):
803813 self ._topics .add (topic )
804814 return self .cluster .request_update ()
805815
816+ def _next_ifr_request_timeout_ms (self ):
817+ if self ._conns :
818+ return min ([conn .next_ifr_request_timeout_ms () for conn in six .itervalues (self ._conns )])
819+ else :
820+ return float ('inf' )
821+
806822 # This method should be locked when running multi-threaded
807823 def _maybe_refresh_metadata (self , wakeup = False ):
808824 """Send a metadata request if needed.
809825
810826 Returns:
811- int : milliseconds until next refresh
827+ float : milliseconds until next refresh
812828 """
813829 ttl = self .cluster .ttl ()
814830 wait_for_in_progress_ms = self .config ['request_timeout_ms' ] if self ._metadata_refresh_in_progress else 0
@@ -822,8 +838,9 @@ def _maybe_refresh_metadata(self, wakeup=False):
822838 # least_loaded_node()
823839 node_id = self .least_loaded_node ()
824840 if node_id is None :
825- log .debug ("Give up sending metadata request since no node is available" );
826- return self .config ['reconnect_backoff_ms' ]
841+ next_connect_ms = self .least_loaded_node_refresh_ms ()
842+ log .debug ("Give up sending metadata request since no node is available. (reconnect delay %d ms)" , next_connect_ms )
843+ return next_connect_ms
827844
828845 if self ._can_send_request (node_id ):
829846 topics = list (self ._topics )
@@ -850,11 +867,11 @@ def refresh_done(val_or_error):
850867 # the client from unnecessarily connecting to additional nodes while a previous connection
851868 # attempt has not been completed.
852869 if self ._connecting :
853- return self . config [ 'reconnect_backoff_ms' ]
870+ return float ( 'inf' )
854871
855872 if self .maybe_connect (node_id , wakeup = wakeup ):
856873 log .debug ("Initializing connection to node %s for metadata request" , node_id )
857- return self . config [ 'reconnect_backoff_ms' ]
874+ return float ( 'inf' )
858875
859876 # connected but can't send more, OR connecting
860877 # In either case we just need to wait for a network event
0 commit comments