Skip to content
Merged
4 changes: 2 additions & 2 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class KafkaAdminClient(object):
reconnection attempts will continue periodically with this fixed
rate. To avoid connection storms, a randomization factor of 0.2
will be applied to the backoff resulting in a random range between
20% below and 20% above the computed value. Default: 1000.
20% below and 20% above the computed value. Default: 30000.
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 30000.
connections_max_idle_ms: Close idle connections after the number of
Expand Down Expand Up @@ -156,7 +156,7 @@ class KafkaAdminClient(object):
'request_timeout_ms': 30000,
'connections_max_idle_ms': 9 * 60 * 1000,
'reconnect_backoff_ms': 50,
'reconnect_backoff_max_ms': 1000,
'reconnect_backoff_max_ms': 30000,
'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
Expand Down
59 changes: 38 additions & 21 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class KafkaClient(object):
reconnection attempts will continue periodically with this fixed
rate. To avoid connection storms, a randomization factor of 0.2
will be applied to the backoff resulting in a random range between
20% below and 20% above the computed value. Default: 1000.
20% below and 20% above the computed value. Default: 30000.
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 30000.
connections_max_idle_ms: Close idle connections after the number of
Expand Down Expand Up @@ -164,7 +164,7 @@ class KafkaClient(object):
'wakeup_timeout_ms': 3000,
'connections_max_idle_ms': 9 * 60 * 1000,
'reconnect_backoff_ms': 50,
'reconnect_backoff_max_ms': 1000,
'reconnect_backoff_max_ms': 30000,
'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
Expand Down Expand Up @@ -464,9 +464,8 @@ def is_disconnected(self, node_id):
def connection_delay(self, node_id):
"""
Return the number of milliseconds to wait, based on the connection
state, before attempting to send data. When disconnected, this respects
the reconnect backoff time. When connecting, returns 0 to allow
non-blocking connect to finish. When connected, returns a very large
state, before attempting to send data. When connecting or disconnected,
this respects the reconnect backoff time. When connected, returns a very large
number to handle slow/stalled connections.

Arguments:
Expand Down Expand Up @@ -537,7 +536,8 @@ def send(self, node_id, request, wakeup=True):
# we will need to call send_pending_requests()
# to trigger network I/O
future = conn.send(request, blocking=False)
self._sending.add(conn)
if not future.is_done:
self._sending.add(conn)

# Wakeup signal is useful in case another thread is
# blocked waiting for incoming network traffic while holding
Expand All @@ -563,9 +563,7 @@ def poll(self, timeout_ms=None, future=None):
Returns:
list: responses received (can be empty)
"""
if future is not None:
timeout_ms = 100
elif timeout_ms is None:
if timeout_ms is None:
timeout_ms = self.config['request_timeout_ms']
elif not isinstance(timeout_ms, (int, float)):
raise TypeError('Invalid type for timeout: %s' % type(timeout_ms))
Expand All @@ -577,26 +575,25 @@ def poll(self, timeout_ms=None, future=None):
if self._closed:
break

# Send a metadata request if needed (or initiate new connection)
metadata_timeout_ms = self._maybe_refresh_metadata()

# Attempt to complete pending connections
for node_id in list(self._connecting):
self._maybe_connect(node_id)

# Send a metadata request if needed
metadata_timeout_ms = self._maybe_refresh_metadata()

# If we got a future that is already done, don't block in _poll
if future is not None and future.is_done:
timeout = 0
else:
idle_connection_timeout_ms = self._idle_expiry_manager.next_check_ms()
request_timeout_ms = self._next_ifr_request_timeout_ms()
log.debug("Timeouts: user %f, metadata %f, idle connection %f, request %f", timeout_ms, metadata_timeout_ms, idle_connection_timeout_ms, request_timeout_ms)
timeout = min(
timeout_ms,
metadata_timeout_ms,
idle_connection_timeout_ms,
self.config['request_timeout_ms'])
# if there are no requests in flight, do not block longer than the retry backoff
if self.in_flight_request_count() == 0:
timeout = min(timeout, self.config['retry_backoff_ms'])
request_timeout_ms)
timeout = max(0, timeout) # avoid negative timeouts

self._poll(timeout / 1000)
Expand All @@ -615,6 +612,8 @@ def poll(self, timeout_ms=None, future=None):
def _register_send_sockets(self):
while self._sending:
conn = self._sending.pop()
if conn._sock is None:
continue
try:
key = self._selector.get_key(conn._sock)
events = key.events | selectors.EVENT_WRITE
Expand Down Expand Up @@ -772,6 +771,17 @@ def least_loaded_node(self):

return found

def least_loaded_node_refresh_ms(self):
"""Return connection delay in milliseconds for next available node.

This method is used primarily for retry/backoff during metadata refresh
during / after a cluster outage, in which there are no available nodes.

Returns:
float: delay_ms
"""
return min([self.connection_delay(broker.nodeId) for broker in self.cluster.brokers()])

def set_topics(self, topics):
"""Set specific topics to track for metadata.

Expand Down Expand Up @@ -803,12 +813,18 @@ def add_topic(self, topic):
self._topics.add(topic)
return self.cluster.request_update()

def _next_ifr_request_timeout_ms(self):
if self._conns:
return min([conn.next_ifr_request_timeout_ms() for conn in six.itervalues(self._conns)])
else:
return float('inf')

# This method should be locked when running multi-threaded
def _maybe_refresh_metadata(self, wakeup=False):
"""Send a metadata request if needed.

Returns:
int: milliseconds until next refresh
float: milliseconds until next refresh
"""
ttl = self.cluster.ttl()
wait_for_in_progress_ms = self.config['request_timeout_ms'] if self._metadata_refresh_in_progress else 0
Expand All @@ -822,8 +838,9 @@ def _maybe_refresh_metadata(self, wakeup=False):
# least_loaded_node()
node_id = self.least_loaded_node()
if node_id is None:
log.debug("Give up sending metadata request since no node is available");
return self.config['reconnect_backoff_ms']
next_connect_ms = self.least_loaded_node_refresh_ms()
log.debug("Give up sending metadata request since no node is available. (reconnect delay %d ms)", next_connect_ms)
return next_connect_ms

if self._can_send_request(node_id):
topics = list(self._topics)
Expand All @@ -850,11 +867,11 @@ def refresh_done(val_or_error):
# the client from unnecessarily connecting to additional nodes while a previous connection
# attempt has not been completed.
if self._connecting:
return self.config['reconnect_backoff_ms']
return float('inf')

if self.maybe_connect(node_id, wakeup=wakeup):
log.debug("Initializing connection to node %s for metadata request", node_id)
return self.config['reconnect_backoff_ms']
return float('inf')

# connected but can't send more, OR connecting
# In either case we just need to wait for a network event
Expand Down
36 changes: 22 additions & 14 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class BrokerConnection(object):
reconnection attempts will continue periodically with this fixed
rate. To avoid connection storms, a randomization factor of 0.2
will be applied to the backoff resulting in a random range between
20% below and 20% above the computed value. Default: 1000.
20% below and 20% above the computed value. Default: 30000.
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 30000.
max_in_flight_requests_per_connection (int): Requests are pipelined
Expand Down Expand Up @@ -198,7 +198,7 @@ class BrokerConnection(object):
'node_id': 0,
'request_timeout_ms': 30000,
'reconnect_backoff_ms': 50,
'reconnect_backoff_max_ms': 1000,
'reconnect_backoff_max_ms': 30000,
'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
Expand Down Expand Up @@ -848,20 +848,22 @@ def blacked_out(self):
re-establish a connection yet
"""
if self.state is ConnectionStates.DISCONNECTED:
if time.time() < self.last_attempt + self._reconnect_backoff:
return True
return self.connection_delay() > 0
return False

def connection_delay(self):
"""
Return the number of milliseconds to wait, based on the connection
state, before attempting to send data. When disconnected, this respects
the reconnect backoff time. When connecting or connected, returns a very
state, before attempting to send data. When connecting or disconnected,
this respects the reconnect backoff time. When connected, returns a very
large number to handle slow/stalled connections.
"""
time_waited = time.time() - (self.last_attempt or 0)
if self.state is ConnectionStates.DISCONNECTED:
return max(self._reconnect_backoff - time_waited, 0) * 1000
if self.disconnected() or self.connecting():
if len(self._gai) > 0:
return 0
else:
time_waited = time.time() - self.last_attempt
return max(self._reconnect_backoff - time_waited, 0) * 1000
else:
# When connecting or connected, we should be able to delay
# indefinitely since other events (connection or data acked) will
Expand All @@ -887,6 +889,9 @@ def _reset_reconnect_backoff(self):
self._failures = 0
self._reconnect_backoff = self.config['reconnect_backoff_ms'] / 1000.0

def _reconnect_jitter_pct(self):
return uniform(0.8, 1.2)

def _update_reconnect_backoff(self):
# Do not mark as failure if there are more dns entries available to try
if len(self._gai) > 0:
Expand All @@ -895,7 +900,7 @@ def _update_reconnect_backoff(self):
self._failures += 1
self._reconnect_backoff = self.config['reconnect_backoff_ms'] * 2 ** (self._failures - 1)
self._reconnect_backoff = min(self._reconnect_backoff, self.config['reconnect_backoff_max_ms'])
self._reconnect_backoff *= uniform(0.8, 1.2)
self._reconnect_backoff *= self._reconnect_jitter_pct()
self._reconnect_backoff /= 1000.0
log.debug('%s: reconnect backoff %s after %s failures', self, self._reconnect_backoff, self._failures)

Expand Down Expand Up @@ -1136,15 +1141,18 @@ def _recv(self):
return ()

def requests_timed_out(self):
return self.next_ifr_request_timeout_ms() == 0

def next_ifr_request_timeout_ms(self):
with self._lock:
if self.in_flight_requests:
get_timestamp = lambda v: v[1]
oldest_at = min(map(get_timestamp,
self.in_flight_requests.values()))
timeout = self.config['request_timeout_ms'] / 1000.0
if time.time() >= oldest_at + timeout:
return True
return False
next_timeout = oldest_at + self.config['request_timeout_ms'] / 1000.0
return max(0, (next_timeout - time.time()) * 1000)
else:
return float('inf')

def _handle_api_version_response(self, response):
error_type = Errors.for_code(response.error_code)
Expand Down
4 changes: 2 additions & 2 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class KafkaConsumer(six.Iterator):
reconnection attempts will continue periodically with this fixed
rate. To avoid connection storms, a randomization factor of 0.2
will be applied to the backoff resulting in a random range between
20% below and 20% above the computed value. Default: 1000.
20% below and 20% above the computed value. Default: 30000.
max_in_flight_requests_per_connection (int): Requests are pipelined
to kafka brokers up to this number of maximum requests per
broker connection. Default: 5.
Expand Down Expand Up @@ -263,7 +263,7 @@ class KafkaConsumer(six.Iterator):
'request_timeout_ms': 305000, # chosen to be higher than the default of max_poll_interval_ms
'retry_backoff_ms': 100,
'reconnect_backoff_ms': 50,
'reconnect_backoff_max_ms': 1000,
'reconnect_backoff_max_ms': 30000,
'max_in_flight_requests_per_connection': 5,
'auto_offset_reset': 'latest',
'enable_auto_commit': True,
Expand Down
4 changes: 2 additions & 2 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class KafkaProducer(object):
reconnection attempts will continue periodically with this fixed
rate. To avoid connection storms, a randomization factor of 0.2
will be applied to the backoff resulting in a random range between
20% below and 20% above the computed value. Default: 1000.
20% below and 20% above the computed value. Default: 30000.
max_in_flight_requests_per_connection (int): Requests are pipelined
to kafka brokers up to this number of maximum requests per
broker connection. Note that if this setting is set to be greater
Expand Down Expand Up @@ -311,7 +311,7 @@ class KafkaProducer(object):
'sock_chunk_bytes': 4096, # undocumented experimental option
'sock_chunk_buffer_count': 1000, # undocumented experimental option
'reconnect_backoff_ms': 50,
'reconnect_backoff_max_ms': 1000,
'reconnect_backoff_max_ms': 30000,
'max_in_flight_requests_per_connection': 5,
'security_protocol': 'PLAINTEXT',
'ssl_context': None,
Expand Down
10 changes: 5 additions & 5 deletions kafka/producer/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,14 @@ def run_once(self):
self._metadata.request_update()

# remove any nodes we aren't ready to send to
not_ready_timeout = float('inf')
not_ready_timeout_ms = float('inf')
for node in list(ready_nodes):
if not self._client.is_ready(node):
log.debug('Node %s not ready; delaying produce of accumulated batch', node)
node_delay_ms = self._client.connection_delay(node)
log.debug('Node %s not ready; delaying produce of accumulated batch (%f ms)', node, node_delay_ms)
self._client.maybe_connect(node, wakeup=False)
ready_nodes.remove(node)
not_ready_timeout = min(not_ready_timeout,
self._client.connection_delay(node))
not_ready_timeout_ms = min(not_ready_timeout_ms, node_delay_ms)

# create produce requests
batches_by_node = self._accumulator.drain(
Expand All @@ -136,7 +136,7 @@ def run_once(self):
# off). Note that this specifically does not include nodes with
# sendable data that aren't ready to send since they would cause busy
# looping.
poll_timeout_ms = min(next_ready_check_delay * 1000, not_ready_timeout)
poll_timeout_ms = min(next_ready_check_delay * 1000, not_ready_timeout_ms)
if ready_nodes:
log.debug("Nodes with data ready to send: %s", ready_nodes) # trace
log.debug("Created %d produce requests: %s", len(requests), requests) # trace
Expand Down
2 changes: 2 additions & 0 deletions test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ def conn(mocker):
MetadataResponse[0](
[(0, 'foo', 12), (1, 'bar', 34)], # brokers
[])) # topics
conn.connection_delay.return_value = 0
conn.blacked_out.return_value = False
conn.next_ifr_request_timeout_ms.return_value = float('inf')
def _set_conn_state(state):
conn.state = state
return state
Expand Down
Loading