-
Notifications
You must be signed in to change notification settings - Fork 0
Upgrading spark jobs on Google Dataproc with ML_DSL
Sometimes it is necessary to change the behaviour of current spark streaming job. There are two ways to run spark jobs on a Dataproc: using API or jupyter magic functions.
from com.griddynamics.dsl.ml.executors.executors import JobUpgradeExecutor
from com.griddynamics.dsl.ml.settings.profiles import PySparkJobProfile
from com.griddynamics.dsl.ml.jobs.builder import JobBuilder
from com.griddynamics.dsl.ml.sessions import SessionFactory
from com.griddynamics.dsl.ml.settings.description import Platform
from runpy import run_path
define Profile for pyspark job
profile = PySparkJobProfile(bucket='test_bucket',cluster='test_cluster',
region='global', job_prefix='test_job',
root_path='scripts', project='test_project',
ai_region='us-central1', job_async=True)
name of main python script
script_name = 'test_job.py'
additional files for spark job
profile.py_files = ['py_file1.py', ..., 'py_filen.py']
profile.jars = ['jar1.py', ..., 'jarn.py']
job properties
profile.properties={"spark.executor.cores":"1",
"spark.executor.memory":"4G"}
job arguments
profile.args = {'--data_path': 'gs://test_bucket/data'}
Execute the code at the named filesystem location and return the resulting module globals dictionary using run_path method of runpy
validator_module = run_path('{path_to_validator_script.py}')
Define name of Validator class in validator module
validator = 'MyValidator'
old_job_id = 'test_job_123456'
Job Builder instance for new spark job
builder = JobBuilder(Platform.GCP)
job = builder.files_root(root_path)
.job_file(script_name)
.job_id(job_name)
.build_job(profile, Platform.GCP)
instance of Session class for dataproc JobController client
session = sessions.SessionFactory(platform=Platform.GCP)
.build_session(job_bucket=profile.bucket,
job_region=profile.region,
cluster=profile.cluster,
job_project_id=profile.project,
ml_region=profile.ai_region)
Executor instance for submit job to Dataproc cluster
executor = JobUpgradeExecutor(job, session, old_job_id)
executor.submit_upgrade_job(validator=validator,
validator_path=validator_module,
run_async=profile.job_async)
from com.griddynamics.dsl.ml.settings.profiles import PySparkJobProfile
from com.griddynamics.dsl.ml.settings.description import Platform
define Profile for pyspark job
profile = PySparkJobProfile(bucket='test_bucket',cluster='test_cluster',
region='global', job_prefix='test_job',
root_path='scripts', project='test_project',
ai_region='us-central1', job_async=False)
additional files for spark job
profile.py_files = ['py_file1.py', ..., 'py_filen.py']
profile.jars = ['jar1.py', ..., 'jarn.py']
job properties
profile.properties={"spark.executor.cores":"1",
"spark.executor.memory":"4G"}
job arguments
profile.args = {'--data_path': 'gs://test_bucket/data'}
validator_module = run_path('{path_to_validator_script.py}')
validator = 'MyValidator'
old_job_id = 'test_job_123456'
set profile
PySparkJobProfile.set('DslTestJobProfile', profile)
platform = Platform.GCP
Open or load task script using magic functions %py_script, %py_script_open or %py_load:
%py_script_open -n test_upgrade_job.py -p demo/scripts -o dev
Start job using magic function %py_data:
%job_upgrade -n test_upgrade_job.py -p DslTestJobProfile -pm $platform -o gs://test_bucket/ --old_job_id $old_job_id -v MyValidator -vp path_to_validator_script_.py