11import fnmatch
22import json
33import random
4+ import re
45import string
56import sys
67import time
78from dataclasses import dataclass
89from functools import partial
910from pathlib import Path
10- from textwrap import dedent
1111from typing import Any , List , Optional , Union
1212
1313import click
2020 Externalv1LightningappInstance ,
2121 Gridv1ImageSpec ,
2222 V1BuildSpec ,
23+ V1ClusterType ,
2324 V1DependencyFileInfo ,
2425 V1Drive ,
2526 V1DriveSpec ,
@@ -212,8 +213,6 @@ def dispatch(
212213 # Determine the root of the project: Start at the entrypoint_file and look for nearby Lightning config files,
213214 # going up the directory structure. The root of the project is where the Lightning config file is located.
214215
215- # TODO: verify lightning version
216- # _verify_lightning_version()
217216 config_file = _get_config_file (self .entrypoint_file )
218217 app_config = AppConfig .load_from_file (config_file ) if config_file .exists () else AppConfig ()
219218 root = Path (self .entrypoint_file ).absolute ().parent
@@ -242,10 +241,6 @@ def dispatch(
242241 # Override the name if provided by the CLI
243242 app_config .name = name
244243
245- if cluster_id :
246- # Override the cluster ID if provided by the CLI
247- app_config .cluster_id = cluster_id
248-
249244 print (f"The name of the app is: { app_config .name } " )
250245
251246 v1_env_vars = [V1EnvVar (name = k , value = v ) for k , v in self .env_vars .items ()]
@@ -307,17 +302,92 @@ def dispatch(
307302 project = _get_project (self .backend .client )
308303
309304 try :
310- list_apps_resp = self .backend .client .lightningapp_v2_service_list_lightningapps_v2 (
311- project_id = project .project_id , name = app_config .name
305+ if cluster_id is not None :
306+ # Verify that the cluster exists
307+ list_clusters_resp = self .backend .client .cluster_service_list_clusters ()
308+ cluster_ids = [cluster .id for cluster in list_clusters_resp .clusters ]
309+ if cluster_id not in cluster_ids :
310+ raise ValueError (f"You requested to run on cluster { cluster_id } , but that cluster doesn't exist." )
311+
312+ self ._ensure_cluster_project_binding (project .project_id , cluster_id )
313+
314+ # Resolve the app name, instance, and cluster ID
315+ existing_instance = None
316+ app_name = app_config .name
317+
318+ # List existing instances
319+ # TODO: Add pagination, otherwise this could break if users have a lot of apps.
320+ find_instances_resp = self .backend .client .lightningapp_instance_service_list_lightningapp_instances (
321+ project_id = project .project_id
312322 )
313- if list_apps_resp .lightningapps :
314- # There can be only one app with unique project_id<>name pair
315- lit_app = list_apps_resp .lightningapps [0 ]
316- else :
317- app_body = Body7 (name = app_config .name , can_download_source_code = True )
323+
324+ # Seach for instances with the given name (possibly with some random characters appended)
325+ pattern = re .escape (f"{ app_name } -" ) + ".{4}"
326+ instances = [
327+ lightningapp
328+ for lightningapp in find_instances_resp .lightningapps
329+ if lightningapp .name == app_name or (re .fullmatch (pattern , lightningapp .name ) is not None )
330+ ]
331+
332+ # If instances exist and cluster is None, mimic cluster selection logic to choose a default
333+ if cluster_id is None and len (instances ) > 0 :
334+ # Determine the cluster ID
335+ cluster_id = self ._get_default_cluster (project .project_id )
336+
337+ # If an instance exists on the cluster with the same base name - restart it
338+ for instance in instances :
339+ if instance .spec .cluster_id == cluster_id :
340+ existing_instance = instance
341+ break
342+
343+ # If instances exist but not on the cluster - choose a randomised name
344+ if len (instances ) > 0 and existing_instance is None :
345+ name_exists = True
346+ while name_exists :
347+ random_name = self ._randomise_name (app_name )
348+ name_exists = any ([instance .name == random_name for instance in instances ])
349+
350+ app_name = random_name
351+
352+ # Create the app if it doesn't exist
353+ if existing_instance is None :
354+ app_body = Body7 (name = app_name , can_download_source_code = True )
318355 lit_app = self .backend .client .lightningapp_v2_service_create_lightningapp_v2 (
319356 project_id = project .project_id , body = app_body
320357 )
358+ app_id = lit_app .id
359+ else :
360+ app_id = existing_instance .spec .app_id
361+
362+ # check if user has sufficient credits to run an app
363+ # if so set the desired state to running otherwise, create the app in stopped state,
364+ # and open the admin ui to add credits and running the app.
365+ has_sufficient_credits = self ._project_has_sufficient_credits (project , app = self .app )
366+ app_release_desired_state = (
367+ V1LightningappInstanceState .RUNNING if has_sufficient_credits else V1LightningappInstanceState .STOPPED
368+ )
369+ if not has_sufficient_credits :
370+ logger .warn ("You may need Lightning credits to run your apps on the cloud." )
371+
372+ # Stop the instance if it isn't stopped yet
373+ if existing_instance and existing_instance .status .phase != V1LightningappInstanceState .STOPPED :
374+ # TODO(yurij): Implement release switching in the UI and remove this
375+ # We can only switch release of the stopped instance
376+ existing_instance = self .backend .client .lightningapp_instance_service_update_lightningapp_instance (
377+ project_id = project .project_id ,
378+ id = existing_instance .id ,
379+ body = Body3 (spec = V1LightningappInstanceSpec (desired_state = V1LightningappInstanceState .STOPPED )),
380+ )
381+ # wait for the instance to stop for up to 150 seconds
382+ for _ in range (150 ):
383+ existing_instance = self .backend .client .lightningapp_instance_service_get_lightningapp_instance (
384+ project_id = project .project_id , id = existing_instance .id
385+ )
386+ if existing_instance .status .phase == V1LightningappInstanceState .STOPPED :
387+ break
388+ time .sleep (1 )
389+ if existing_instance .status .phase != V1LightningappInstanceState .STOPPED :
390+ raise RuntimeError ("Failed to stop the existing instance." )
321391
322392 network_configs : Optional [List [V1NetworkConfig ]] = None
323393 if enable_multiple_works_in_default_container ():
@@ -332,90 +402,18 @@ def dispatch(
332402 )
333403 initial_port += 1
334404
335- # check if user has sufficient credits to run an app
336- # if so set the desired state to running otherwise, create the app in stopped state,
337- # and open the admin ui to add credits and running the app.
338- has_sufficient_credits = self ._project_has_sufficient_credits (project , app = self .app )
339- app_release_desired_state = (
340- V1LightningappInstanceState .RUNNING if has_sufficient_credits else V1LightningappInstanceState .STOPPED
341- )
342- if not has_sufficient_credits :
343- logger .warn ("You may need Lightning credits to run your apps on the cloud." )
344-
345- # right now we only allow a single instance of the app
346- find_instances_resp = self .backend .client .lightningapp_instance_service_list_lightningapp_instances (
347- project_id = project .project_id , app_id = lit_app .id
348- )
349-
350405 queue_server_type = V1QueueServerType .UNSPECIFIED
351406 if CLOUD_QUEUE_TYPE == "http" :
352407 queue_server_type = V1QueueServerType .HTTP
353408 elif CLOUD_QUEUE_TYPE == "redis" :
354409 queue_server_type = V1QueueServerType .REDIS
355410
356- existing_instance : Optional [Externalv1LightningappInstance ] = None
357- if find_instances_resp .lightningapps :
358- existing_instance = find_instances_resp .lightningapps [0 ]
359-
360- if not app_config .cluster_id :
361- # Re-run the app on the same cluster
362- app_config .cluster_id = existing_instance .spec .cluster_id
363-
364- if existing_instance .status .phase != V1LightningappInstanceState .STOPPED :
365- # TODO(yurij): Implement release switching in the UI and remove this
366- # We can only switch release of the stopped instance
367- existing_instance = self .backend .client .lightningapp_instance_service_update_lightningapp_instance (
368- project_id = project .project_id ,
369- id = existing_instance .id ,
370- body = Body3 (spec = V1LightningappInstanceSpec (desired_state = V1LightningappInstanceState .STOPPED )),
371- )
372- # wait for the instance to stop for up to 150 seconds
373- for _ in range (150 ):
374- existing_instance = self .backend .client .lightningapp_instance_service_get_lightningapp_instance (
375- project_id = project .project_id , id = existing_instance .id
376- )
377- if existing_instance .status .phase == V1LightningappInstanceState .STOPPED :
378- break
379- time .sleep (1 )
380- if existing_instance .status .phase != V1LightningappInstanceState .STOPPED :
381- raise RuntimeError ("Failed to stop the existing instance." )
382-
383- if app_config .cluster_id is not None :
384- # Verify that the cluster exists
385- list_clusters_resp = self .backend .client .cluster_service_list_clusters ()
386- cluster_ids = [cluster .id for cluster in list_clusters_resp .clusters ]
387- if app_config .cluster_id not in cluster_ids :
388- if cluster_id :
389- msg = f"You requested to run on cluster { cluster_id } , but that cluster doesn't exist."
390- else :
391- msg = (
392- f"Your app last ran on cluster { app_config .cluster_id } , but that cluster "
393- "doesn't exist anymore."
394- )
395- raise ValueError (msg )
396- if existing_instance and existing_instance .spec .cluster_id != app_config .cluster_id :
397- raise ValueError (
398- dedent (
399- f"""\
400- An app names { app_config .name } is already running on cluster { existing_instance .spec .cluster_id } , and you requested it to run on cluster { app_config .cluster_id } .
401-
402- In order to proceed, please either:
403- a. rename the app to run on { app_config .cluster_id } with the --name option
404- lightning run app { app_entrypoint_file } --name (new name) --cloud --cluster-id { app_config .cluster_id }
405- b. delete the app running on { existing_instance .spec .cluster_id } in the UI before running this command.
406- """ # noqa: E501
407- )
408- )
409-
410- if app_config .cluster_id is not None :
411- self ._ensure_cluster_project_binding (project .project_id , app_config .cluster_id )
412-
413411 release_body = Body8 (
414412 app_entrypoint_file = app_spec .app_entrypoint_file ,
415413 enable_app_server = app_spec .enable_app_server ,
416414 flow_servers = app_spec .flow_servers ,
417415 image_spec = app_spec .image_spec ,
418- cluster_id = app_config . cluster_id ,
416+ cluster_id = cluster_id ,
419417 network_config = network_configs ,
420418 works = works ,
421419 local_source = True ,
@@ -426,14 +424,13 @@ def dispatch(
426424
427425 # create / upload the new app release
428426 lightning_app_release = self .backend .client .lightningapp_v2_service_create_lightningapp_release (
429- project_id = project .project_id , app_id = lit_app . id , body = release_body
427+ project_id = project .project_id , app_id = app_id , body = release_body
430428 )
431429
432430 if lightning_app_release .source_upload_url == "" :
433431 raise RuntimeError ("The source upload url is empty." )
434432
435433 if getattr (lightning_app_release , "cluster_id" , None ):
436- app_config .cluster_id = lightning_app_release .cluster_id
437434 logger .info (f"Running app on { lightning_app_release .cluster_id } " )
438435
439436 # Save the config for re-runs
@@ -442,7 +439,7 @@ def dispatch(
442439 repo .package ()
443440 repo .upload (url = lightning_app_release .source_upload_url )
444441
445- if find_instances_resp . lightningapps :
442+ if existing_instance is not None :
446443 lightning_app_instance = (
447444 self .backend .client .lightningapp_instance_service_update_lightningapp_instance_release (
448445 project_id = project .project_id ,
@@ -466,12 +463,12 @@ def dispatch(
466463 lightning_app_instance = (
467464 self .backend .client .lightningapp_v2_service_create_lightningapp_release_instance (
468465 project_id = project .project_id ,
469- app_id = lit_app . id ,
466+ app_id = app_id ,
470467 id = lightning_app_release .id ,
471468 body = Body9 (
472- cluster_id = app_config . cluster_id ,
469+ cluster_id = cluster_id ,
473470 desired_state = app_release_desired_state ,
474- name = lit_app . name ,
471+ name = app_name ,
475472 env = v1_env_vars ,
476473 queue_server_type = queue_server_type ,
477474 ),
@@ -504,6 +501,36 @@ def _ensure_cluster_project_binding(self, project_id: str, cluster_id: str):
504501 body = V1ProjectClusterBinding (cluster_id = cluster_id , project_id = project_id ),
505502 )
506503
504+ def _get_default_cluster (self , project_id : str ) -> str :
505+ """This utility implements a minimal version of the cluster selection logic used in the cloud.
506+
507+ TODO: This should be requested directly from the platform.
508+ """
509+ cluster_bindings = self .backend .client .projects_service_list_project_cluster_bindings (
510+ project_id = project_id
511+ ).clusters
512+
513+ if not cluster_bindings :
514+ raise ValueError (f"No clusters are bound to the project { project_id } ." )
515+
516+ if len (cluster_bindings ) == 1 :
517+ return cluster_bindings [0 ].cluster_id
518+
519+ clusters = [
520+ self .backend .client .cluster_service_get_cluster (cluster_binding .cluster_id )
521+ for cluster_binding in cluster_bindings
522+ ]
523+
524+ # Filter global clusters
525+ clusters = [cluster for cluster in clusters if cluster .spec .cluster_type == V1ClusterType .GLOBAL ]
526+
527+ return random .choice (clusters ).id
528+
529+ @staticmethod
530+ def _randomise_name (app_name : str ) -> str :
531+ letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
532+ return app_name + "-" + "" .join (random .sample (letters , 4 ))
533+
507534 @staticmethod
508535 def _check_uploaded_folder (root : Path , repo : LocalSourceCodeDir ) -> None :
509536 """This method is used to inform the users if their folder files are large and how to filter them."""
0 commit comments