Skip to content

Commit 2e17e08

Browse files
dnishimurarmatharu-zz
authored andcommitted
SAMZA-2266: Introduce a backoff when there are repeated failures for host-affinity allocations (apache#1104)
* SAMZA-2266: Introduce a backoff when there are repeated failures for host-affinity allocations
1 parent 37de270 commit 2e17e08

14 files changed

+733
-368
lines changed

docs/learn/documentation/versioned/jobs/samza-configurations.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@ Samza supports both standalone and clustered ([YARN](yarn-jobs.html)) [deploymen
295295
|--- |--- |--- |
296296
|cluster-manager.container.retry.count|8|If a container fails, it is automatically restarted by Samza. However, if a container keeps failing shortly after startup, that indicates a deeper problem, so we should kill the job rather than retrying indefinitely. This property determines the maximum number of times we are willing to restart a failed container in quick succession (the time period is configured with `cluster-manager.container.retry.window.ms`). Each container in the job is counted separately. If this property is set to 0, any failed container immediately causes the whole job to fail. If it is set to a negative number, there is no limit on the number of retries.|
297297
|cluster-manager.container.retry.window.ms|300000|This property determines how frequently a container is allowed to fail before we give up and fail the job. If the same container has failed more than `cluster-manager.container.retry.count` times, and the time between failures was less than this property `cluster-manager.container.retry.window.ms` (in milliseconds), then we fail the job. There is no limit to the number of times we will restart a container if the time between failures is greater than `cluster-manager.container.retry.window.ms`.|
298+
|cluster-manager.container.preferred-host.last.retry.delay.ms|360000|The delay added to the last retry for a failing container after all but one of cluster-manager.container.retry.count retries have been exhausted. The delay is only added when `job.host-affinity.enabled` is true and the retried request is for a preferred host. This addresses the issue where there may be a delay when a preferred host is marked invalid and the container continuously attempts to restart and fail on the invalid preferred host. This property is useful to prevent the `cluster-manager.container.retry.count` from being exceeded too quickly for such scenarios.|
298299
|cluster-manager.jobcoordinator.jmx.enabled|true|This is deprecated in favor of `job.jmx.enabled`|
299300
|cluster-manager.allocator.sleep.ms|3600|The container allocator thread is responsible for matching requests to allocated containers. The sleep interval for this thread is configured using this property.|
300301
|cluster-manager.container.request.timeout.ms|5000|The allocator thread periodically checks the state of the container requests and allocated containers to determine the assignment of a container to an allocated resource. This property determines the number of milliseconds before a container request is considered to have expired / timed-out. When a request expires, it gets allocated to any available container that was returned by the cluster manager.|

samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818
*/
1919
package org.apache.samza.clustermanager;
2020

21+
import java.time.Duration;
22+
import java.time.Instant;
23+
import java.util.Map;
24+
import java.util.Optional;
2125
import org.apache.samza.SamzaException;
2226
import org.apache.samza.config.ClusterManagerConfig;
2327
import org.apache.samza.config.Config;
@@ -28,8 +32,6 @@
2832
import org.slf4j.Logger;
2933
import org.slf4j.LoggerFactory;
3034

31-
import java.util.Map;
32-
3335

3436
/**
3537
* {@link AbstractContainerAllocator} makes requests for physical resources to the resource manager and also runs
@@ -110,6 +112,10 @@ public void run() {
110112
while (isRunning) {
111113
try {
112114
assignResourceRequests();
115+
116+
// Move delayed requests that are ready to the active request queue
117+
resourceRequestState.sendPendingDelayedResourceRequests();
118+
113119
// Release extra resources and update the entire system's state
114120
resourceRequestState.releaseExtraResources();
115121

@@ -151,7 +157,7 @@ protected void runStreamProcessor(SamzaResourceRequest request, String preferred
151157

152158
// Run processor on resource
153159
log.info("Found Container ID: {} for Processor ID: {} on host: {} for request creation time: {}.",
154-
resource.getContainerId(), processorId, preferredHost, request.getRequestTimestampMs());
160+
resource.getContainerId(), processorId, preferredHost, request.getRequestTimestamp());
155161

156162
// Update processor state as "pending" and then issue a request to launch it. It's important to perform the state-update
157163
// prior to issuing the request. Otherwise, there's a race where the response callback may arrive sooner and not see
@@ -175,35 +181,65 @@ protected void runStreamProcessor(SamzaResourceRequest request, String preferred
175181
public abstract void requestResources(Map<String, String> processorToHostMapping);
176182

177183
/**
178-
* Checks if this allocator has a pending resource request.
184+
* Checks if this allocator has a pending resource request with a request timestamp equal to or earlier than the current
185+
* timestamp.
179186
* @return {@code true} if there is a pending request, {@code false} otherwise.
180187
*/
181-
protected final boolean hasPendingRequest() {
182-
return peekPendingRequest() != null;
188+
protected final boolean hasReadyPendingRequest() {
189+
return peekReadyPendingRequest().isPresent();
183190
}
184191

185192
/**
186-
* Retrieves, but does not remove, the next pending request in the queue.
193+
* Retrieves, but does not remove, the next pending request in the queue with the {@link SamzaResourceRequest#getRequestTimestamp()}
194+
* that is greater than the current timestamp.
187195
*
188196
* @return the pending request or {@code null} if there is no pending request.
189197
*/
190-
protected final SamzaResourceRequest peekPendingRequest() {
191-
return resourceRequestState.peekPendingRequest();
198+
protected final Optional<SamzaResourceRequest> peekReadyPendingRequest() {
199+
SamzaResourceRequest pendingRequest = resourceRequestState.peekPendingRequest();
200+
return Optional.ofNullable(pendingRequest);
192201
}
193202

194203
/**
195204
* Requests a resource from the cluster manager
196-
*
197205
* @param processorId Samza processor ID that will be run when a resource is allocated for this request
198206
* @param preferredHost name of the host that you prefer to run the processor on
199207
*/
200208
public final void requestResource(String processorId, String preferredHost) {
201-
SamzaResourceRequest request = getResourceRequest(processorId, preferredHost);
209+
requestResourceWithDelay(processorId, preferredHost, Duration.ZERO);
210+
}
211+
212+
/**
213+
* Requests a resource from the cluster manager with a request timestamp of the current time plus the specified delay.
214+
* @param processorId Samza processor ID that will be run when a resource is allocated for this request
215+
* @param preferredHost name of the host that you prefer to run the processor on
216+
* @param delay the {@link Duration} to add to the request timestamp
217+
*/
218+
public final void requestResourceWithDelay(String processorId, String preferredHost, Duration delay) {
219+
SamzaResourceRequest request = getResourceRequestWithDelay(processorId, preferredHost, delay);
202220
issueResourceRequest(request);
203221
}
204222

223+
/**
224+
* Creates a {@link SamzaResourceRequest} to send to the cluster manager
225+
* @param processorId Samza processor ID that will be run when a resource is allocated for this request
226+
* @param preferredHost name of the host that you prefer to run the processor on
227+
* @return the created request
228+
*/
205229
public final SamzaResourceRequest getResourceRequest(String processorId, String preferredHost) {
206-
return new SamzaResourceRequest(this.containerNumCpuCores, this.containerMemoryMb, preferredHost, processorId);
230+
return getResourceRequestWithDelay(processorId, preferredHost, Duration.ZERO);
231+
}
232+
233+
/**
234+
* Creates a {@link SamzaResourceRequest} to send to the cluster manager with a request timestamp of the current time
235+
* plus the specified delay.
236+
* @param processorId Samza processor ID that will be run when a resource is allocated for this request
237+
* @param preferredHost name of the host that you prefer to run the processor on
238+
* @param delay the {@link Duration} to add to the request timestamp
239+
* @return the created request
240+
*/
241+
public final SamzaResourceRequest getResourceRequestWithDelay(String processorId, String preferredHost, Duration delay) {
242+
return new SamzaResourceRequest(this.containerNumCpuCores, this.containerMemoryMb, preferredHost, processorId, Instant.now().plus(delay));
207243
}
208244

209245
public final void issueResourceRequest(SamzaResourceRequest request) {

samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,8 @@ public ContainerAllocator(ClusterResourceManager manager,
5151
* */
5252
@Override
5353
public void assignResourceRequests() {
54-
while (hasPendingRequest() && hasAllocatedResource(ResourceRequestState.ANY_HOST)) {
55-
SamzaResourceRequest request = peekPendingRequest();
56-
runStreamProcessor(request, ResourceRequestState.ANY_HOST);
54+
while (hasReadyPendingRequest() && hasAllocatedResource(ResourceRequestState.ANY_HOST)) {
55+
peekReadyPendingRequest().ifPresent(request -> runStreamProcessor(request, ResourceRequestState.ANY_HOST));
5756
}
5857
}
5958

0 commit comments

Comments
 (0)