Skip to content

Commit 41e45f2

Browse files
ethanwharrisBorda
authored andcommitted
[App] Support running on multiple clusters (#16016)
(cherry picked from commit d3a7226)
1 parent 5b11bdd commit 41e45f2

File tree

4 files changed

+223
-141
lines changed

4 files changed

+223
-141
lines changed

src/lightning_app/runners/cloud.py

Lines changed: 120 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import fnmatch
22
import json
33
import random
4+
import re
45
import string
56
import sys
67
import time
78
from dataclasses import dataclass
89
from pathlib import Path
9-
from textwrap import dedent
1010
from typing import Any, List, Optional, Union
1111

1212
import click
@@ -19,6 +19,7 @@
1919
Externalv1LightningappInstance,
2020
Gridv1ImageSpec,
2121
V1BuildSpec,
22+
V1ClusterType,
2223
V1DependencyFileInfo,
2324
V1Drive,
2425
V1DriveSpec,
@@ -210,8 +211,6 @@ def dispatch(
210211
# Determine the root of the project: Start at the entrypoint_file and look for nearby Lightning config files,
211212
# going up the directory structure. The root of the project is where the Lightning config file is located.
212213

213-
# TODO: verify lightning version
214-
# _verify_lightning_version()
215214
config_file = _get_config_file(self.entrypoint_file)
216215
app_config = AppConfig.load_from_file(config_file) if config_file.exists() else AppConfig()
217216
root = Path(self.entrypoint_file).absolute().parent
@@ -228,10 +227,6 @@ def dispatch(
228227
# Override the name if provided by the CLI
229228
app_config.name = name
230229

231-
if cluster_id:
232-
# Override the cluster ID if provided by the CLI
233-
app_config.cluster_id = cluster_id
234-
235230
print(f"The name of the app is: {app_config.name}")
236231

237232
v1_env_vars = [V1EnvVar(name=k, value=v) for k, v in self.env_vars.items()]
@@ -293,17 +288,92 @@ def dispatch(
293288
project = _get_project(self.backend.client)
294289

295290
try:
296-
list_apps_resp = self.backend.client.lightningapp_v2_service_list_lightningapps_v2(
297-
project_id=project.project_id, name=app_config.name
291+
if cluster_id is not None:
292+
# Verify that the cluster exists
293+
list_clusters_resp = self.backend.client.cluster_service_list_clusters()
294+
cluster_ids = [cluster.id for cluster in list_clusters_resp.clusters]
295+
if cluster_id not in cluster_ids:
296+
raise ValueError(f"You requested to run on cluster {cluster_id}, but that cluster doesn't exist.")
297+
298+
self._ensure_cluster_project_binding(project.project_id, cluster_id)
299+
300+
# Resolve the app name, instance, and cluster ID
301+
existing_instance = None
302+
app_name = app_config.name
303+
304+
# List existing instances
305+
# TODO: Add pagination, otherwise this could break if users have a lot of apps.
306+
find_instances_resp = self.backend.client.lightningapp_instance_service_list_lightningapp_instances(
307+
project_id=project.project_id
298308
)
299-
if list_apps_resp.lightningapps:
300-
# There can be only one app with unique project_id<>name pair
301-
lit_app = list_apps_resp.lightningapps[0]
302-
else:
303-
app_body = Body7(name=app_config.name, can_download_source_code=True)
309+
310+
# Seach for instances with the given name (possibly with some random characters appended)
311+
pattern = re.escape(f"{app_name}-") + ".{4}"
312+
instances = [
313+
lightningapp
314+
for lightningapp in find_instances_resp.lightningapps
315+
if lightningapp.name == app_name or (re.fullmatch(pattern, lightningapp.name) is not None)
316+
]
317+
318+
# If instances exist and cluster is None, mimic cluster selection logic to choose a default
319+
if cluster_id is None and len(instances) > 0:
320+
# Determine the cluster ID
321+
cluster_id = self._get_default_cluster(project.project_id)
322+
323+
# If an instance exists on the cluster with the same base name - restart it
324+
for instance in instances:
325+
if instance.spec.cluster_id == cluster_id:
326+
existing_instance = instance
327+
break
328+
329+
# If instances exist but not on the cluster - choose a randomised name
330+
if len(instances) > 0 and existing_instance is None:
331+
name_exists = True
332+
while name_exists:
333+
random_name = self._randomise_name(app_name)
334+
name_exists = any([instance.name == random_name for instance in instances])
335+
336+
app_name = random_name
337+
338+
# Create the app if it doesn't exist
339+
if existing_instance is None:
340+
app_body = Body7(name=app_name, can_download_source_code=True)
304341
lit_app = self.backend.client.lightningapp_v2_service_create_lightningapp_v2(
305342
project_id=project.project_id, body=app_body
306343
)
344+
app_id = lit_app.id
345+
else:
346+
app_id = existing_instance.spec.app_id
347+
348+
# check if user has sufficient credits to run an app
349+
# if so set the desired state to running otherwise, create the app in stopped state,
350+
# and open the admin ui to add credits and running the app.
351+
has_sufficient_credits = self._project_has_sufficient_credits(project, app=self.app)
352+
app_release_desired_state = (
353+
V1LightningappInstanceState.RUNNING if has_sufficient_credits else V1LightningappInstanceState.STOPPED
354+
)
355+
if not has_sufficient_credits:
356+
logger.warn("You may need Lightning credits to run your apps on the cloud.")
357+
358+
# Stop the instance if it isn't stopped yet
359+
if existing_instance and existing_instance.status.phase != V1LightningappInstanceState.STOPPED:
360+
# TODO(yurij): Implement release switching in the UI and remove this
361+
# We can only switch release of the stopped instance
362+
existing_instance = self.backend.client.lightningapp_instance_service_update_lightningapp_instance(
363+
project_id=project.project_id,
364+
id=existing_instance.id,
365+
body=Body3(spec=V1LightningappInstanceSpec(desired_state=V1LightningappInstanceState.STOPPED)),
366+
)
367+
# wait for the instance to stop for up to 150 seconds
368+
for _ in range(150):
369+
existing_instance = self.backend.client.lightningapp_instance_service_get_lightningapp_instance(
370+
project_id=project.project_id, id=existing_instance.id
371+
)
372+
if existing_instance.status.phase == V1LightningappInstanceState.STOPPED:
373+
break
374+
time.sleep(1)
375+
if existing_instance.status.phase != V1LightningappInstanceState.STOPPED:
376+
raise RuntimeError("Failed to stop the existing instance.")
307377

308378
network_configs: Optional[List[V1NetworkConfig]] = None
309379
if enable_multiple_works_in_default_container():
@@ -318,90 +388,18 @@ def dispatch(
318388
)
319389
initial_port += 1
320390

321-
# check if user has sufficient credits to run an app
322-
# if so set the desired state to running otherwise, create the app in stopped state,
323-
# and open the admin ui to add credits and running the app.
324-
has_sufficient_credits = self._project_has_sufficient_credits(project, app=self.app)
325-
app_release_desired_state = (
326-
V1LightningappInstanceState.RUNNING if has_sufficient_credits else V1LightningappInstanceState.STOPPED
327-
)
328-
if not has_sufficient_credits:
329-
logger.warn("You may need Lightning credits to run your apps on the cloud.")
330-
331-
# right now we only allow a single instance of the app
332-
find_instances_resp = self.backend.client.lightningapp_instance_service_list_lightningapp_instances(
333-
project_id=project.project_id, app_id=lit_app.id
334-
)
335-
336391
queue_server_type = V1QueueServerType.UNSPECIFIED
337392
if CLOUD_QUEUE_TYPE == "http":
338393
queue_server_type = V1QueueServerType.HTTP
339394
elif CLOUD_QUEUE_TYPE == "redis":
340395
queue_server_type = V1QueueServerType.REDIS
341396

342-
existing_instance: Optional[Externalv1LightningappInstance] = None
343-
if find_instances_resp.lightningapps:
344-
existing_instance = find_instances_resp.lightningapps[0]
345-
346-
if not app_config.cluster_id:
347-
# Re-run the app on the same cluster
348-
app_config.cluster_id = existing_instance.spec.cluster_id
349-
350-
if existing_instance.status.phase != V1LightningappInstanceState.STOPPED:
351-
# TODO(yurij): Implement release switching in the UI and remove this
352-
# We can only switch release of the stopped instance
353-
existing_instance = self.backend.client.lightningapp_instance_service_update_lightningapp_instance(
354-
project_id=project.project_id,
355-
id=existing_instance.id,
356-
body=Body3(spec=V1LightningappInstanceSpec(desired_state=V1LightningappInstanceState.STOPPED)),
357-
)
358-
# wait for the instance to stop for up to 150 seconds
359-
for _ in range(150):
360-
existing_instance = self.backend.client.lightningapp_instance_service_get_lightningapp_instance(
361-
project_id=project.project_id, id=existing_instance.id
362-
)
363-
if existing_instance.status.phase == V1LightningappInstanceState.STOPPED:
364-
break
365-
time.sleep(1)
366-
if existing_instance.status.phase != V1LightningappInstanceState.STOPPED:
367-
raise RuntimeError("Failed to stop the existing instance.")
368-
369-
if app_config.cluster_id is not None:
370-
# Verify that the cluster exists
371-
list_clusters_resp = self.backend.client.cluster_service_list_clusters()
372-
cluster_ids = [cluster.id for cluster in list_clusters_resp.clusters]
373-
if app_config.cluster_id not in cluster_ids:
374-
if cluster_id:
375-
msg = f"You requested to run on cluster {cluster_id}, but that cluster doesn't exist."
376-
else:
377-
msg = (
378-
f"Your app last ran on cluster {app_config.cluster_id}, but that cluster "
379-
"doesn't exist anymore."
380-
)
381-
raise ValueError(msg)
382-
if existing_instance and existing_instance.spec.cluster_id != app_config.cluster_id:
383-
raise ValueError(
384-
dedent(
385-
f"""\
386-
An app names {app_config.name} is already running on cluster {existing_instance.spec.cluster_id}, and you requested it to run on cluster {app_config.cluster_id}.
387-
388-
In order to proceed, please either:
389-
a. rename the app to run on {app_config.cluster_id} with the --name option
390-
lightning run app {app_entrypoint_file} --name (new name) --cloud --cluster-id {app_config.cluster_id}
391-
b. delete the app running on {existing_instance.spec.cluster_id} in the UI before running this command.
392-
""" # noqa: E501
393-
)
394-
)
395-
396-
if app_config.cluster_id is not None:
397-
self._ensure_cluster_project_binding(project.project_id, app_config.cluster_id)
398-
399397
release_body = Body8(
400398
app_entrypoint_file=app_spec.app_entrypoint_file,
401399
enable_app_server=app_spec.enable_app_server,
402400
flow_servers=app_spec.flow_servers,
403401
image_spec=app_spec.image_spec,
404-
cluster_id=app_config.cluster_id,
402+
cluster_id=cluster_id,
405403
network_config=network_configs,
406404
works=works,
407405
local_source=True,
@@ -412,14 +410,13 @@ def dispatch(
412410

413411
# create / upload the new app release
414412
lightning_app_release = self.backend.client.lightningapp_v2_service_create_lightningapp_release(
415-
project_id=project.project_id, app_id=lit_app.id, body=release_body
413+
project_id=project.project_id, app_id=app_id, body=release_body
416414
)
417415

418416
if lightning_app_release.source_upload_url == "":
419417
raise RuntimeError("The source upload url is empty.")
420418

421419
if getattr(lightning_app_release, "cluster_id", None):
422-
app_config.cluster_id = lightning_app_release.cluster_id
423420
logger.info(f"Running app on {lightning_app_release.cluster_id}")
424421

425422
# Save the config for re-runs
@@ -428,7 +425,7 @@ def dispatch(
428425
repo.package()
429426
repo.upload(url=lightning_app_release.source_upload_url)
430427

431-
if find_instances_resp.lightningapps:
428+
if existing_instance is not None:
432429
lightning_app_instance = (
433430
self.backend.client.lightningapp_instance_service_update_lightningapp_instance_release(
434431
project_id=project.project_id,
@@ -452,12 +449,12 @@ def dispatch(
452449
lightning_app_instance = (
453450
self.backend.client.lightningapp_v2_service_create_lightningapp_release_instance(
454451
project_id=project.project_id,
455-
app_id=lit_app.id,
452+
app_id=app_id,
456453
id=lightning_app_release.id,
457454
body=Body9(
458-
cluster_id=app_config.cluster_id,
455+
cluster_id=cluster_id,
459456
desired_state=app_release_desired_state,
460-
name=lit_app.name,
457+
name=app_name,
461458
env=v1_env_vars,
462459
queue_server_type=queue_server_type,
463460
),
@@ -490,6 +487,36 @@ def _ensure_cluster_project_binding(self, project_id: str, cluster_id: str):
490487
body=V1ProjectClusterBinding(cluster_id=cluster_id, project_id=project_id),
491488
)
492489

490+
def _get_default_cluster(self, project_id: str) -> str:
491+
"""This utility implements a minimal version of the cluster selection logic used in the cloud.
492+
493+
TODO: This should be requested directly from the platform.
494+
"""
495+
cluster_bindings = self.backend.client.projects_service_list_project_cluster_bindings(
496+
project_id=project_id
497+
).clusters
498+
499+
if not cluster_bindings:
500+
raise ValueError(f"No clusters are bound to the project {project_id}.")
501+
502+
if len(cluster_bindings) == 1:
503+
return cluster_bindings[0].cluster_id
504+
505+
clusters = [
506+
self.backend.client.cluster_service_get_cluster(cluster_binding.cluster_id)
507+
for cluster_binding in cluster_bindings
508+
]
509+
510+
# Filter global clusters
511+
clusters = [cluster for cluster in clusters if cluster.spec.cluster_type == V1ClusterType.GLOBAL]
512+
513+
return random.choice(clusters).id
514+
515+
@staticmethod
516+
def _randomise_name(app_name: str) -> str:
517+
letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
518+
return app_name + "-" + "".join(random.sample(letters, 4))
519+
493520
@staticmethod
494521
def _check_uploaded_folder(root: Path, repo: LocalSourceCodeDir) -> None:
495522
"""This method is used to inform the users if their folder files are large and how to filter them."""

src/lightning_app/utilities/packaging/app_config.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import pathlib
22
from dataclasses import asdict, dataclass, field
3-
from typing import Optional, Union
3+
from typing import Union
44

55
import yaml
66

@@ -18,7 +18,6 @@ class AppConfig:
1818
"""
1919

2020
name: str = field(default_factory=get_unique_name)
21-
cluster_id: Optional[str] = field(default=None)
2221

2322
def save_to_file(self, path: Union[str, pathlib.Path]) -> None:
2423
"""Save the configuration to the given file in YAML format."""
@@ -35,6 +34,8 @@ def load_from_file(cls, path: Union[str, pathlib.Path]) -> "AppConfig":
3534
"""Load the configuration from the given file."""
3635
with open(path) as file:
3736
config = yaml.safe_load(file)
37+
# Ignore `cluster_id` without error for backwards compatibility.
38+
config.pop("cluster_id", None)
3839
return cls(**config)
3940

4041
@classmethod

tests/tests_app/cli/test_cloud_cli.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from lightning_cloud.openapi import (
1212
V1LightningappV2,
1313
V1ListLightningappInstancesResponse,
14-
V1ListLightningappsV2Response,
1514
V1ListMembershipsResponse,
1615
V1Membership,
1716
)
@@ -102,8 +101,8 @@ def __init__(self, *args, create_response, **kwargs):
102101
super().__init__()
103102
self.create_response = create_response
104103

105-
def lightningapp_v2_service_list_lightningapps_v2(self, *args, **kwargs):
106-
return V1ListLightningappsV2Response(lightningapps=[V1LightningappV2(id="my_app", name="app")])
104+
def lightningapp_v2_service_create_lightningapp_v2(self, *args, **kwargs):
105+
return V1LightningappV2(id="my_app", name="app")
107106

108107
def lightningapp_v2_service_create_lightningapp_release(self, project_id, app_id, body):
109108
assert project_id == "test-project-id"
@@ -183,7 +182,7 @@ def __init__(self, *args, message, **kwargs):
183182
super().__init__()
184183
self.message = message
185184

186-
def lightningapp_v2_service_list_lightningapps_v2(self, *args, **kwargs):
185+
def lightningapp_instance_service_list_lightningapp_instances(self, *args, **kwargs):
187186
raise ApiException(
188187
http_resp=HttpHeaderDict(
189188
data=self.message,

0 commit comments

Comments
 (0)