Skip to content

Upgrading spark jobs on Google Dataproc with ML_DSL

Anna Safonova edited this page Jun 29, 2020 · 2 revisions

Sometimes it is necessary to change the behaviour of current spark streaming job. There are two ways to run spark jobs on a Dataproc: using API or jupyter magic functions.

Using API

from com.griddynamics.dsl.ml.executors.executors import JobUpgradeExecutor
from com.griddynamics.dsl.ml.settings.profiles import PySparkJobProfile
from com.griddynamics.dsl.ml.jobs.builder import JobBuilder
from com.griddynamics.dsl.ml.sessions import SessionFactory
from com.griddynamics.dsl.ml.settings.description import Platform
from runpy import run_path

define Profile for pyspark job

profile = PySparkJobProfile(bucket='test_bucket',cluster='test_cluster',      
                            region='global', job_prefix='test_job',  
                            root_path='scripts', project='test_project', 
                            ai_region='us-central1', job_async=True)

name of main python script

script_name = 'test_job.py'

additional files for spark job

 profile.py_files = ['py_file1.py', ..., 'py_filen.py']
 profile.jars = ['jar1.py', ..., 'jarn.py']

job properties

profile.properties={"spark.executor.cores":"1",       
                    "spark.executor.memory":"4G"}

job arguments

profile.args = {'--data_path': 'gs://test_bucket/data'}

Execute the code at the named filesystem location and return the resulting module globals dictionary using run_path method of runpy

validator_module = run_path('{path_to_validator_script.py}')

Define name of Validator class in validator module

validator = 'MyValidator'
old_job_id = 'test_job_123456'

Job Builder instance for new spark job

builder = JobBuilder(Platform.GCP)
job = builder.files_root(root_path)
             .job_file(script_name)
             .job_id(job_name)
             .build_job(profile, Platform.GCP)

instance of Session class for dataproc JobController client

session = sessions.SessionFactory(platform=Platform.GCP)
                       .build_session(job_bucket=profile.bucket,  
                                      job_region=profile.region, 
                                      cluster=profile.cluster,  
                                      job_project_id=profile.project, 
                                      ml_region=profile.ai_region)

Executor instance for submit job to Dataproc cluster

executor = JobUpgradeExecutor(job, session, old_job_id)
executor.submit_upgrade_job(validator=validator,
                            validator_path=validator_module,
                            run_async=profile.job_async)

Using Magic Functions

from com.griddynamics.dsl.ml.settings.profiles import PySparkJobProfile
from com.griddynamics.dsl.ml.settings.description import Platform

define Profile for pyspark job

profile = PySparkJobProfile(bucket='test_bucket',cluster='test_cluster',      
                            region='global', job_prefix='test_job',  
                            root_path='scripts', project='test_project', 
                            ai_region='us-central1', job_async=False)

additional files for spark job

profile.py_files = ['py_file1.py', ..., 'py_filen.py']
profile.jars = ['jar1.py', ..., 'jarn.py']

job properties

profile.properties={"spark.executor.cores":"1",       
                "spark.executor.memory":"4G"}

job arguments

profile.args = {'--data_path': 'gs://test_bucket/data'}

validator_module = run_path('{path_to_validator_script.py}')
validator = 'MyValidator'
old_job_id = 'test_job_123456'

set profile

PySparkJobProfile.set('DslTestJobProfile', profile)
platform = Platform.GCP

Open or load task script using magic functions %py_script, %py_script_open or %py_load:

%py_script_open -n test_upgrade_job.py -p demo/scripts -o dev

Start job using magic function %py_data:

%job_upgrade -n test_upgrade_job.py -p DslTestJobProfile -pm $platform -o gs://test_bucket/ --old_job_id $old_job_id -v MyValidator -vp path_to_validator_script_.py
Clone this wiki locally