|
55 | 55 |
|
56 | 56 | public class Quickstart { |
57 | 57 |
|
58 | | - public static Job waitForJobCompletion( |
59 | | - JobControllerClient jobControllerClient, String projectId, String region, String jobId) { |
60 | | - while (true) { |
61 | | - // Poll the service periodically until the Job is in a finished state. |
62 | | - Job jobInfo = jobControllerClient.getJob(projectId, region, jobId); |
63 | | - switch (jobInfo.getStatus().getState()) { |
64 | | - case DONE: |
65 | | - case CANCELLED: |
66 | | - case ERROR: |
67 | | - return jobInfo; |
68 | | - default: |
69 | | - try { |
70 | | - // Wait a second in between polling attempts. |
71 | | - TimeUnit.SECONDS.sleep(1); |
72 | | - } catch (InterruptedException e) { |
73 | | - throw new RuntimeException(e); |
74 | | - } |
75 | | - } |
76 | | - } |
77 | | - } |
78 | | - |
79 | 58 | public static void quickstart( |
80 | 59 | String projectId, String region, String clusterName, String jobFilePath) |
81 | 60 | throws IOException, InterruptedException { |
@@ -130,16 +109,8 @@ public static void quickstart( |
130 | 109 | Job job = Job.newBuilder().setPlacement(jobPlacement).setPysparkJob(pySparkJob).build(); |
131 | 110 |
|
132 | 111 | // Submit an asynchronous request to execute the job. |
133 | | - Job request = jobControllerClient.submitJob(projectId, region, job); |
134 | | - String jobId = request.getReference().getJobId(); |
135 | | - System.out.println(String.format("Submitting job \"%s\"", jobId)); |
136 | | - |
137 | | - // Wait for the job to finish. |
138 | | - System.out.println(String.format("Job %s finished successfully.", jobId)); |
139 | | - |
140 | 112 | OperationFuture<Job, JobMetadata> submitJobAsOperationAsyncRequest = |
141 | 113 | jobControllerClient.submitJobAsOperationAsync(projectId, region, job); |
142 | | - |
143 | 114 | Job jobResponse = submitJobAsOperationAsyncRequest.get(); |
144 | 115 |
|
145 | 116 | // Print output from Google Cloud Storage. |
|
0 commit comments