|
18 | 18 | package org.apache.spark; |
19 | 19 |
|
20 | 20 | import java.io.Serializable; |
21 | | -import java.util.ArrayList; |
22 | | -import java.util.Collections; |
23 | | -import java.util.List; |
24 | 21 |
|
25 | 22 | import scala.Function0; |
26 | 23 | import scala.Function1; |
27 | 24 | import scala.Unit; |
28 | | -import scala.collection.JavaConversions; |
29 | 25 |
|
30 | 26 | import org.apache.spark.annotation.DeveloperApi; |
31 | 27 | import org.apache.spark.executor.TaskMetrics; |
32 | 28 | import org.apache.spark.util.TaskCompletionListener; |
33 | | -import org.apache.spark.util.TaskCompletionListenerException; |
34 | 29 |
|
35 | 30 | /** |
36 | | -* :: DeveloperApi :: |
37 | | -* Contextual information about a task which can be read or mutated during execution. |
38 | | -*/ |
39 | | -@DeveloperApi |
40 | | -public class TaskContext implements Serializable { |
41 | | - |
42 | | - private int stageId; |
43 | | - private int partitionId; |
44 | | - private long attemptId; |
45 | | - private boolean runningLocally; |
46 | | - private TaskMetrics taskMetrics; |
47 | | - |
48 | | - /** |
49 | | - * :: DeveloperApi :: |
50 | | - * Contextual information about a task which can be read or mutated during execution. |
51 | | - * |
52 | | - * @param stageId stage id |
53 | | - * @param partitionId index of the partition |
54 | | - * @param attemptId the number of attempts to execute this task |
55 | | - * @param runningLocally whether the task is running locally in the driver JVM |
56 | | - * @param taskMetrics performance metrics of the task |
57 | | - */ |
58 | | - @DeveloperApi |
59 | | - public TaskContext(int stageId, int partitionId, long attemptId, boolean runningLocally, |
60 | | - TaskMetrics taskMetrics) { |
61 | | - this.attemptId = attemptId; |
62 | | - this.partitionId = partitionId; |
63 | | - this.runningLocally = runningLocally; |
64 | | - this.stageId = stageId; |
65 | | - this.taskMetrics = taskMetrics; |
66 | | - } |
67 | | - |
68 | | - /** |
69 | | - * :: DeveloperApi :: |
70 | | - * Contextual information about a task which can be read or mutated during execution. |
71 | | - * |
72 | | - * @param stageId stage id |
73 | | - * @param partitionId index of the partition |
74 | | - * @param attemptId the number of attempts to execute this task |
75 | | - * @param runningLocally whether the task is running locally in the driver JVM |
76 | | - */ |
77 | | - @DeveloperApi |
78 | | - public TaskContext(int stageId, int partitionId, long attemptId, boolean runningLocally) { |
79 | | - this.attemptId = attemptId; |
80 | | - this.partitionId = partitionId; |
81 | | - this.runningLocally = runningLocally; |
82 | | - this.stageId = stageId; |
83 | | - this.taskMetrics = TaskMetrics.empty(); |
84 | | - } |
85 | | - |
| 31 | + * Contextual information about a task which can be read or mutated during |
| 32 | + * execution. To access the TaskContext for a running task use |
| 33 | + * TaskContext.get(). |
| 34 | + */ |
| 35 | +public abstract class TaskContext implements Serializable { |
86 | 36 | /** |
87 | | - * :: DeveloperApi :: |
88 | | - * Contextual information about a task which can be read or mutated during execution. |
89 | | - * |
90 | | - * @param stageId stage id |
91 | | - * @param partitionId index of the partition |
92 | | - * @param attemptId the number of attempts to execute this task |
| 37 | + * Return the currently active TaskContext. This can be called inside of |
| 38 | + * user functions to access contextual information about running tasks. |
93 | 39 | */ |
94 | | - @DeveloperApi |
95 | | - public TaskContext(int stageId, int partitionId, long attemptId) { |
96 | | - this.attemptId = attemptId; |
97 | | - this.partitionId = partitionId; |
98 | | - this.runningLocally = false; |
99 | | - this.stageId = stageId; |
100 | | - this.taskMetrics = TaskMetrics.empty(); |
| 40 | + public static TaskContext get() { |
| 41 | + return taskContext.get(); |
101 | 42 | } |
102 | 43 |
|
103 | 44 | private static ThreadLocal<TaskContext> taskContext = |
104 | 45 | new ThreadLocal<TaskContext>(); |
105 | 46 |
|
106 | | - /** |
107 | | - * :: Internal API :: |
108 | | - * This is spark internal API, not intended to be called from user programs. |
109 | | - */ |
110 | | - public static void setTaskContext(TaskContext tc) { |
| 47 | + static void setTaskContext(TaskContext tc) { |
111 | 48 | taskContext.set(tc); |
112 | 49 | } |
113 | 50 |
|
114 | | - public static TaskContext get() { |
115 | | - return taskContext.get(); |
116 | | - } |
117 | | - |
118 | | - /** :: Internal API :: */ |
119 | | - public static void unset() { |
| 51 | + static void unset() { |
120 | 52 | taskContext.remove(); |
121 | 53 | } |
122 | 54 |
|
123 | | - // List of callback functions to execute when the task completes. |
124 | | - private transient List<TaskCompletionListener> onCompleteCallbacks = |
125 | | - new ArrayList<TaskCompletionListener>(); |
126 | | - |
127 | | - // Whether the corresponding task has been killed. |
128 | | - private volatile boolean interrupted = false; |
129 | | - |
130 | | - // Whether the task has completed. |
131 | | - private volatile boolean completed = false; |
132 | | - |
133 | 55 | /** |
134 | | - * Checks whether the task has completed. |
| 56 | + * Whether the task has completed. |
135 | 57 | */ |
136 | | - public boolean isCompleted() { |
137 | | - return completed; |
138 | | - } |
| 58 | + public abstract boolean isCompleted(); |
139 | 59 |
|
140 | 60 | /** |
141 | | - * Checks whether the task has been killed. |
| 61 | + * Whether the task has been killed. |
142 | 62 | */ |
143 | | - public boolean isInterrupted() { |
144 | | - return interrupted; |
145 | | - } |
| 63 | + public abstract boolean isInterrupted(); |
| 64 | + |
| 65 | + /** @deprecated: use isRunningLocally() */ |
| 66 | + @Deprecated |
| 67 | + public abstract boolean runningLocally(); |
| 68 | + |
| 69 | + public abstract boolean isRunningLocally(); |
146 | 70 |
|
147 | 71 | /** |
148 | 72 | * Add a (Java friendly) listener to be executed on task completion. |
149 | 73 | * This will be called in all situation - success, failure, or cancellation. |
150 | 74 | * <p/> |
151 | 75 | * An example use is for HadoopRDD to register a callback to close the input stream. |
152 | 76 | */ |
153 | | - public TaskContext addTaskCompletionListener(TaskCompletionListener listener) { |
154 | | - onCompleteCallbacks.add(listener); |
155 | | - return this; |
156 | | - } |
| 77 | + public abstract TaskContext addTaskCompletionListener(TaskCompletionListener listener); |
157 | 78 |
|
158 | 79 | /** |
159 | 80 | * Add a listener in the form of a Scala closure to be executed on task completion. |
160 | 81 | * This will be called in all situations - success, failure, or cancellation. |
161 | 82 | * <p/> |
162 | 83 | * An example use is for HadoopRDD to register a callback to close the input stream. |
163 | 84 | */ |
164 | | - public TaskContext addTaskCompletionListener(final Function1<TaskContext, Unit> f) { |
165 | | - onCompleteCallbacks.add(new TaskCompletionListener() { |
166 | | - @Override |
167 | | - public void onTaskCompletion(TaskContext context) { |
168 | | - f.apply(context); |
169 | | - } |
170 | | - }); |
171 | | - return this; |
172 | | - } |
| 85 | + public abstract TaskContext addTaskCompletionListener(final Function1<TaskContext, Unit> f); |
173 | 86 |
|
174 | 87 | /** |
175 | 88 | * Add a callback function to be executed on task completion. An example use |
176 | 89 | * is for HadoopRDD to register a callback to close the input stream. |
177 | 90 | * Will be called in any situation - success, failure, or cancellation. |
178 | 91 | * |
179 | | - * Deprecated: use addTaskCompletionListener |
180 | | - * |
| 92 | + * @deprecated: use addTaskCompletionListener |
| 93 | + * |
181 | 94 | * @param f Callback function. |
182 | 95 | */ |
183 | 96 | @Deprecated |
184 | | - public void addOnCompleteCallback(final Function0<Unit> f) { |
185 | | - onCompleteCallbacks.add(new TaskCompletionListener() { |
186 | | - @Override |
187 | | - public void onTaskCompletion(TaskContext context) { |
188 | | - f.apply(); |
189 | | - } |
190 | | - }); |
191 | | - } |
192 | | - |
193 | | - /** |
194 | | - * ::Internal API:: |
195 | | - * Marks the task as completed and triggers the listeners. |
196 | | - */ |
197 | | - public void markTaskCompleted() throws TaskCompletionListenerException { |
198 | | - completed = true; |
199 | | - List<String> errorMsgs = new ArrayList<String>(2); |
200 | | - // Process complete callbacks in the reverse order of registration |
201 | | - List<TaskCompletionListener> revlist = |
202 | | - new ArrayList<TaskCompletionListener>(onCompleteCallbacks); |
203 | | - Collections.reverse(revlist); |
204 | | - for (TaskCompletionListener tcl: revlist) { |
205 | | - try { |
206 | | - tcl.onTaskCompletion(this); |
207 | | - } catch (Throwable e) { |
208 | | - errorMsgs.add(e.getMessage()); |
209 | | - } |
210 | | - } |
211 | | - |
212 | | - if (!errorMsgs.isEmpty()) { |
213 | | - throw new TaskCompletionListenerException(JavaConversions.asScalaBuffer(errorMsgs)); |
214 | | - } |
215 | | - } |
216 | | - |
217 | | - /** |
218 | | - * ::Internal API:: |
219 | | - * Marks the task for interruption, i.e. cancellation. |
220 | | - */ |
221 | | - public void markInterrupted() { |
222 | | - interrupted = true; |
223 | | - } |
224 | | - |
225 | | - @Deprecated |
226 | | - /** Deprecated: use getStageId() */ |
227 | | - public int stageId() { |
228 | | - return stageId; |
229 | | - } |
230 | | - |
231 | | - @Deprecated |
232 | | - /** Deprecated: use getPartitionId() */ |
233 | | - public int partitionId() { |
234 | | - return partitionId; |
235 | | - } |
236 | | - |
237 | | - @Deprecated |
238 | | - /** Deprecated: use getAttemptId() */ |
239 | | - public long attemptId() { |
240 | | - return attemptId; |
241 | | - } |
242 | | - |
243 | | - @Deprecated |
244 | | - /** Deprecated: use isRunningLocally() */ |
245 | | - public boolean runningLocally() { |
246 | | - return runningLocally; |
247 | | - } |
248 | | - |
249 | | - public boolean isRunningLocally() { |
250 | | - return runningLocally; |
251 | | - } |
| 97 | + public abstract void addOnCompleteCallback(final Function0<Unit> f); |
252 | 98 |
|
253 | | - public int getStageId() { |
254 | | - return stageId; |
255 | | - } |
| 99 | + public abstract int stageId(); |
256 | 100 |
|
257 | | - public int getPartitionId() { |
258 | | - return partitionId; |
259 | | - } |
| 101 | + public abstract int partitionId(); |
260 | 102 |
|
261 | | - public long getAttemptId() { |
262 | | - return attemptId; |
263 | | - } |
| 103 | + public abstract long attemptId(); |
264 | 104 |
|
265 | | - /** ::Internal API:: */ |
266 | | - public TaskMetrics taskMetrics() { |
267 | | - return taskMetrics; |
268 | | - } |
| 105 | + /** ::DeveloperApi:: */ |
| 106 | + @DeveloperApi |
| 107 | + public abstract TaskMetrics taskMetrics(); |
269 | 108 | } |
0 commit comments