1919from google .cloud import storage
2020import googleapiclient .discovery
2121
22- # Currently only the "global" region is supported
23- REGION = 'global'
2422DEFAULT_FILENAME = 'pyspark_sort.py'
2523
2624
@@ -36,6 +34,14 @@ def get_pyspark_file(filename):
3634 return f , os .path .basename (filename )
3735
3836
37+ def get_region_from_zone (zone ):
38+ try :
39+ region_as_list = zone .split ('-' )[:- 1 ]
40+ return '-' .join (region_as_list )
41+ except (AttributeError , IndexError , ValueError ):
42+ raise ValueError ('Invalid zone provided, please check your input.' )
43+
44+
3945def upload_pyspark_file (project_id , bucket_name , filename , file ):
4046 """Uploads the PySpark file in this directory to the configured
4147 input bucket."""
@@ -59,8 +65,8 @@ def download_output(project_id, cluster_id, output_bucket, job_id):
5965
6066
6167# [START create_cluster]
62- def create_cluster (dataproc , project , cluster_name , zone ):
63- print ('Creating cluster.' )
68+ def create_cluster (dataproc , project , zone , region , cluster_name ):
69+ print ('Creating cluster... ' )
6470 zone_uri = \
6571 'https://www.googleapis.com/compute/v1/projects/{}/zones/{}' .format (
6672 project , zone )
@@ -75,19 +81,19 @@ def create_cluster(dataproc, project, cluster_name, zone):
7581 }
7682 result = dataproc .projects ().regions ().clusters ().create (
7783 projectId = project ,
78- region = REGION ,
84+ region = region ,
7985 body = cluster_data ).execute ()
8086 return result
8187# [END create_cluster]
8288
8389
84- def wait_for_cluster_creation (dataproc , project_id , cluster_name , zone ):
85- print ('Waiting for cluster creation' )
90+ def wait_for_cluster_creation (dataproc , project_id , region , cluster_name ):
91+ print ('Waiting for cluster creation... ' )
8692
8793 while True :
8894 result = dataproc .projects ().regions ().clusters ().list (
8995 projectId = project_id ,
90- region = REGION ).execute ()
96+ region = region ).execute ()
9197 cluster_list = result ['clusters' ]
9298 cluster = [c
9399 for c in cluster_list
@@ -100,10 +106,10 @@ def wait_for_cluster_creation(dataproc, project_id, cluster_name, zone):
100106
101107
102108# [START list_clusters_with_detail]
103- def list_clusters_with_details (dataproc , project ):
109+ def list_clusters_with_details (dataproc , project , region ):
104110 result = dataproc .projects ().regions ().clusters ().list (
105111 projectId = project ,
106- region = REGION ).execute ()
112+ region = region ).execute ()
107113 cluster_list = result ['clusters' ]
108114 for cluster in cluster_list :
109115 print ("{} - {}"
@@ -120,7 +126,8 @@ def get_cluster_id_by_name(cluster_list, cluster_name):
120126
121127
122128# [START submit_pyspark_job]
123- def submit_pyspark_job (dataproc , project , cluster_name , bucket_name , filename ):
129+ def submit_pyspark_job (dataproc , project , region ,
130+ cluster_name , bucket_name , filename ):
124131 """Submits the Pyspark job to the cluster, assuming `filename` has
125132 already been uploaded to `bucket_name`"""
126133 job_details = {
@@ -136,7 +143,7 @@ def submit_pyspark_job(dataproc, project, cluster_name, bucket_name, filename):
136143 }
137144 result = dataproc .projects ().regions ().jobs ().submit (
138145 projectId = project ,
139- region = REGION ,
146+ region = region ,
140147 body = job_details ).execute ()
141148 job_id = result ['reference' ]['jobId' ]
142149 print ('Submitted job ID {}' .format (job_id ))
@@ -145,29 +152,29 @@ def submit_pyspark_job(dataproc, project, cluster_name, bucket_name, filename):
145152
146153
147154# [START delete]
148- def delete_cluster (dataproc , project , cluster ):
155+ def delete_cluster (dataproc , project , region , cluster ):
149156 print ('Tearing down cluster' )
150157 result = dataproc .projects ().regions ().clusters ().delete (
151158 projectId = project ,
152- region = REGION ,
159+ region = region ,
153160 clusterName = cluster ).execute ()
154161 return result
155162# [END delete]
156163
157164
158165# [START wait]
159- def wait_for_job (dataproc , project , job_id ):
166+ def wait_for_job (dataproc , project , region , job_id ):
160167 print ('Waiting for job to finish...' )
161168 while True :
162169 result = dataproc .projects ().regions ().jobs ().get (
163170 projectId = project ,
164- region = REGION ,
171+ region = region ,
165172 jobId = job_id ).execute ()
166173 # Handle exceptions
167174 if result ['status' ]['state' ] == 'ERROR' :
168175 raise Exception (result ['status' ]['details' ])
169176 elif result ['status' ]['state' ] == 'DONE' :
170- print ('Job finished' )
177+ print ('Job finished. ' )
171178 return result
172179# [END wait]
173180
@@ -181,34 +188,44 @@ def get_client():
181188# [END get_client]
182189
183190
184- def main (project_id , zone , cluster_name , bucket_name , pyspark_file = None ):
191+ def main (project_id , zone , cluster_name , bucket_name ,
192+ pyspark_file = None , create_new_cluster = True ):
185193 dataproc = get_client ()
194+ region = get_region_from_zone (zone )
186195 try :
187196 if pyspark_file :
188197 spark_file , spark_filename = get_pyspark_file (pyspark_file )
189198 else :
190199 spark_file , spark_filename = get_default_pyspark_file ()
191200
192- create_cluster (dataproc , project_id , cluster_name , zone )
193- wait_for_cluster_creation (dataproc , project_id , cluster_name , zone )
194- upload_pyspark_file (project_id , bucket_name ,
195- spark_filename , spark_file )
201+ if create_new_cluster :
202+ create_cluster (
203+ dataproc , project_id , zone , region , cluster_name )
204+ wait_for_cluster_creation (
205+ dataproc , project_id , region , cluster_name )
206+
207+ upload_pyspark_file (
208+ project_id , bucket_name , spark_filename , spark_file )
209+
196210 cluster_list = list_clusters_with_details (
197- dataproc , project_id )['clusters' ]
211+ dataproc , project_id , region )['clusters' ]
198212
199213 (cluster_id , output_bucket ) = (
200214 get_cluster_id_by_name (cluster_list , cluster_name ))
215+
201216 # [START call_submit_pyspark_job]
202217 job_id = submit_pyspark_job (
203- dataproc , project_id , cluster_name , bucket_name , spark_filename )
218+ dataproc , project_id , region ,
219+ cluster_name , bucket_name , spark_filename )
204220 # [END call_submit_pyspark_job]
205- wait_for_job (dataproc , project_id , job_id )
221+ wait_for_job (dataproc , project_id , region , job_id )
206222
207223 output = download_output (project_id , cluster_id , output_bucket , job_id )
208224 print ('Received job output {}' .format (output ))
209225 return output
210226 finally :
211- delete_cluster (dataproc , project_id , cluster_name )
227+ if create_new_cluster :
228+ delete_cluster (dataproc , project_id , region , cluster_name )
212229 spark_file .close ()
213230
214231
@@ -220,15 +237,19 @@ def main(project_id, zone, cluster_name, bucket_name, pyspark_file=None):
220237 parser .add_argument (
221238 '--project_id' , help = 'Project ID you want to access.' , required = True ),
222239 parser .add_argument (
223- '--zone' , help = 'Region to create clusters in' , required = True )
240+ '--zone' , help = 'Zone to create clusters in/connect to ' , required = True )
224241 parser .add_argument (
225- '--cluster_name' , help = 'Name of the cluster to create' , required = True )
242+ '--cluster_name' ,
243+ help = 'Name of the cluster to create/connect to' , required = True )
226244 parser .add_argument (
227245 '--gcs_bucket' , help = 'Bucket to upload Pyspark file to' , required = True )
228246 parser .add_argument (
229247 '--pyspark_file' , help = 'Pyspark filename. Defaults to pyspark_sort.py' )
248+ parser .add_argument (
249+ '--create_new_cluster' ,
250+ action = 'store_true' , help = 'States if the cluster should be created' )
230251
231252 args = parser .parse_args ()
232253 main (
233- args .project_id , args .zone ,
234- args .cluster_name , args .gcs_bucket , args .pyspark_file )
254+ args .project_id , args .zone , args . cluster_name ,
255+ args .gcs_bucket , args .pyspark_file , args .create_new_cluster )
0 commit comments