From 1d0da08bb6ce6e81eb4a6bf29d02a60f4b9a2fcf Mon Sep 17 00:00:00 2001 From: wookiee2187 Date: Wed, 28 Aug 2019 15:41:29 -0500 Subject: [PATCH 1/4] changes made --- etc/tasks.conf | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/etc/tasks.conf b/etc/tasks.conf index dbd7f38..8a0e878 100644 --- a/etc/tasks.conf +++ b/etc/tasks.conf @@ -25,7 +25,9 @@ polling_interval = 360 [vcluster-headnodecycle] -taskplugins = HandleHeadNodes +taskplugins = HandleHeadNodes, HandleHeadNodesK8S +#To run the HandleHeadNodesK8S plugin you will need to add the dependencies +# mentioned in the beginning of the runnew.sh script in the incubator code polling_interval = 10 username = myosuser From d57d090a524c5d3130eddc2ef6ec83987deea42c Mon Sep 17 00:00:00 2001 From: wookiee2187 Date: Wed, 28 Aug 2019 15:41:46 -0500 Subject: [PATCH 2/4] changes to pull request --- vc3master/plugins/task/HandleHeadNodesK8S.py | 595 +++++++++++++++++++ 1 file changed, 595 insertions(+) create mode 100644 vc3master/plugins/task/HandleHeadNodesK8S.py diff --git a/vc3master/plugins/task/HandleHeadNodesK8S.py b/vc3master/plugins/task/HandleHeadNodesK8S.py new file mode 100644 index 0000000..2859391 --- /dev/null +++ b/vc3master/plugins/task/HandleHeadNodesK8S.py @@ -0,0 +1,595 @@ +#!/usr/bin/env python + +from vc3master.task import VC3Task +from vc3infoservice.infoclient import InfoConnectionFailure, InfoEntityMissingException + +from base64 import b64encode +import pluginmanager as pm +import traceback + +import json +import os +import sys +import re +import subprocess +import time +import pprint +import subprocess, yaml + +from kubernetes import client, config, utils +import kubernetes.client +from kubernetes.client.rest import ApiException +from jinja2 import Environment, FileSystemLoader + + +class HandleHeadNodes(VC3Task): + ''' + Plugin to manage the head nodes lifetime. + ''' + + def __init__(self, parent, config, section): + super(HandleHeadNodes, self).__init__(parent, config, section) + self.client = parent.client + self.config = config + self.node_prefix = self.config.get(section, 'node_prefix') + self.node_image = self.config.get(section, 'node_image') + self.node_flavor = self.config.get(section, 'node_flavor') + self.node_user = self.config.get(section, 'node_user') + self.node_network_id = self.config.get(section, 'node_network_id') + self.node_private_key_file = os.path.expanduser(self.config.get(section, 'node_private_key_file')) + self.node_public_key_name = self.config.get(section, 'node_public_key_name') + + self.node_max_no_contact_time = int(self.config.get(section, 'node_max_no_contact_time')) + self.node_max_initializing_count = int(self.config.get(section, 'node_max_initializing_count')) + + groups = self.config.get(section, 'node_security_groups') + self.node_security_groups = groups.split(',') + + self.initializers = {} + # keep las succesful contact to node, to check against node_max_no_contact_time. + self.last_contact_times = {} + + # number of times we have tries to initialize a node. After node_max_initializing_count, declare failure. + self.initializing_count = {} + + self.log.debug("HandleHeadNodes VC3Task initialized.") + + global login_info + def login_info(self, request): + """Finds IP and port for ssh login + Args: request + Returns: IP and port as integers + """ + config.load_kube_config(config_file = '/etc/kubernetes/admin.conf') + v1 = client.CoreV1Api() + k8s_client = client.ApiClient() + k8s_api = client.ExtensionsV1beta1Api(k8s_client) + configuration = kubernetes.client.Configuration() + api_instance = kubernetes.client.CoreV1Api(kubernetes.client.ApiClient(configuration)) + try: + service = v1.read_namespaced_service(name = "login-node-service-" + str(request.name), namespace = str(request.name)) + port = service.spec.ports[0].node_port + list_pods = v1.list_namespaced_pod(str(request.name)) + pod = list_pods.items[0] + node = v1.read_node(pod.spec.node_name) + IP = node.status.addresses[0].address + self.log.info('About to return') + self.log.info(IP) + self.log.info(port) + return [IP, port] + except Exception: + self.log.info("Login pod does not exist") + return None + + def template(self): + """Templating for the config files + Args: None + Returns: temp_up, temp_up2, temp_up3, temp_up4, temp_up5 as rendered files + """ + config_data = yaml.load(open('/usr/lib/python2.7/site-packages/vc3master/plugins/task/vals.yaml'),Loader=yaml.FullLoader) + env = Environment(loader = FileSystemLoader('./templates'), trim_blocks=True, lstrip_blocks=True) + template = env.get_template('condor_config.local.j2') + temp_up = template.render(config_data) + config_data2 = yaml.load(open('/etc/cvmfs_vals.yaml'),Loader=yaml.FullLoader) + template2 = env.get_template('cvmfs_default_local.j2') + temp_up2 = template2.render(config_data2) + config_data3 = yaml.load(open('/etc/minio'),Loader=yaml.FullLoader) + template3 = env.get_template('minio.env') + temp_up3 = template3.render(config_data3) + config_data4 = yaml.load(open('/etc/minio'),Loader=yaml.FullLoader) + template4 = env.get_template('core-site.xml.j2') + temp_up4 = template4.render(config_data4) + config_data5 = yaml.load(open('/etc/spark'),Loader=yaml.FullLoader) + template5 = env.get_template('spark.env') + temp_up5 = template5.render(config_data5) + return temp_up, temp_up2, temp_up3, temp_up4, temp_up5 + + def login_create(self, request): + """ Creates the deployment, service, and two config maps (one for adding users, the other for adding the config files) + Args: request + Returns: None + """ + self.log.info('Starting login_create') + config.load_kube_config(config_file = '/etc/kubernetes/admin.conf') + v1 = client.CoreV1Api() + k8s_client = client.ApiClient() + k8s_api = client.ExtensionsV1beta1Api(k8s_client) + configuration = kubernetes.client.Configuration() + api_instance = kubernetes.client.CoreV1Api(kubernetes.client.ApiClient(configuration)) + self.create_namespace(request) + self.add_keys_to_pod(request) + try: + # checks if deployment, service, configmap already created - To do add checks for service + configmaps + check = k8s_api.read_namespaced_deployment_status(name= "login-node-n-" + str(request.name), namespace =str(request.name)) + self.log.info("pod already exists") + except Exception: + # rendering template and creating configmap to mount config files + temp_up, temp_up2, temp_up3, temp_up4, temp_up5 = self.template() + name = 'temcon-' + str(request.name) + namespace = str(request.name) + body = kubernetes.client.V1ConfigMap() + body.data = {"condor_config.local":str(temp_up),"cvmfs_default_local":str(temp_up2),"minio":str(temp_up3),"core-site.xml":str(temp_up4),"spark":str(temp_up5)} + body.metadata = kubernetes.client.V1ObjectMeta() + body.metadata.name = name + configuration = kubernetes.client.Configuration() + api_instance = kubernetes.client.CoreV1Api(kubernetes.client.ApiClient(configuration)) + try: + api_response = api_instance.create_namespaced_config_map(namespace, body) + except ApiException as e: + print("Exception when calling CoreV1Api->create_namespaced_config_map: %s\n" % e) + self.log.info('CREATING DEPLOYMENT') + self.create_dep(request) + self.create_service(request) + self.create_conf_users(request) + return + + def create_namespace(self, request): + """Creates namespace with request name + Args: request + Returns: None + """ + config.load_kube_config(config_file = '/etc/kubernetes/admin.conf') + pp = pprint.PrettyPrinter(indent =4) + configuration = kubernetes.client.Configuration() + api_instance = kubernetes.client.CoreV1Api(kubernetes.client.ApiClient(configuration)) + body = kubernetes.client.V1Namespace(api_version = 'v1', kind = 'Namespace', metadata = kubernetes.client.V1ObjectMeta(name = str(request.name))) + try: + api_response = api_instance.create_namespace(body) + except ApiException as e: + print("Exception when calling CoreV1Api->create_namespace: %s\n" % e) + + def create_dep(self, request): + """Creates deployment with request name + Args: request + Returns: None + """ + config.load_kube_config(config_file = '/etc/kubernetes/admin.conf') + pp = pprint.PrettyPrinter(indent =4) + configuration = kubernetes.client.Configuration() + api_instance = kubernetes.client.AppsV1Api(kubernetes.client.ApiClient(configuration)) + core_v1_api = client.CoreV1Api() + namespace = 'default' + body = kubernetes.client.V1Deployment() + body.metadata = kubernetes.client.V1ObjectMeta() + body.metadata.name = 'login-node-n-' + str(request.name) + body.metadata.labels = {'app':'login-node-n-' + str(request.name)} + conf_list = [] + conf_list.append(kubernetes.client.V1VolumeMount(name = 'config-vol', mount_path = '/root/tconfig-file.conf',sub_path = 'tconfig-file.conf')) + conf2_list = [] + volume0 = kubernetes.client.V1Volume(name = 'config-vol', config_map = kubernetes.client.V1ConfigMapVolumeSource(name = 'new-config-' + str(request.name), items = [kubernetes.client.V1KeyToPath(key = "tconfig-file.conf", path = "tconfig-file.conf")])) + volume1 = kubernetes.client.V1Volume(name = 'temcon-vol', config_map = kubernetes.client.V1ConfigMapVolumeSource(name = 'temcon-' + str(request.name), items = [kubernetes.client.V1KeyToPath(key = "condor_config.local", path = "condor_config.local")])) + env_list = [] + env_list.append(kubernetes.client.V1EnvVar(name = 'PASSWDFILE', value = "root/tconfig-file.conf")) + vol_m_list = [] + vol_m_list.append(kubernetes.client.V1VolumeMount(name = 'config-vol', mount_path = '/root/tconfig-file.conf',sub_path = 'tconfig-file.conf')) + vol_m_list.append(kubernetes.client.V1VolumeMount(name = 'temcon-vol', mount_path = '/etc/condor/config.d/condor_config.local', sub_path = 'condor_config.local')) + container0 = kubernetes.client.V1Container(name = 'new-container', env = env_list, image = 'nlingareddy/condor-login', volume_mounts = vol_m_list) + vol_list = [] + vol_list.append(volume0) + vol_list.append(volume1) + cont_list = [] + cont_list.append(container0) + body.spec = kubernetes.client.V1DeploymentSpec(replicas= 1, selector= kubernetes.client.V1LabelSelector(match_labels= {'app':'login-node-n-' + str(request.name)}) , template= kubernetes.client.V1PodTemplateSpec(metadata = kubernetes.client.V1ObjectMeta(labels = {'app':'login-node-n-' + str(request.name)}), spec = kubernetes.client.V1PodSpec(volumes= vol_list, containers = cont_list))) + try: + api_response = api_instance.create_namespaced_deployment(namespace=str(request.name), body=body) + except ApiException as e: + print("Exception when calling AppsV1Api->create_namespaced_deployment: %s\n" % e) + + def create_service(self, request): + """Creates service with request name + Args: request + Returns: None + """ + config.load_kube_config(config_file = '/etc/kubernetes/admin.conf') + core_v1_api = kubernetes.client.CoreV1Api() + serv_list = [] + serv_list.append(kubernetes.client.V1ServicePort(port=22, protocol='TCP')) + body = kubernetes.client.V1Service(metadata= kubernetes.client.V1ObjectMeta(name = 'login-node-service-' + str(request.name)), spec = kubernetes.client.V1ServiceSpec(type='NodePort', selector={'app':'login-node-n-' + str(request.name)}, ports=serv_list)) + try: + api_response = core_v1_api.create_namespaced_service(namespace=str(request.name), body=body) + except ApiException as e: + print("Exception when calling AppsV1Api->create_namespaced_service: %s\n" % e) + + def create_conf_users(self, request): + """Creates config map for users with request name + Args: request + Returns: None + """ + config.load_kube_config(config_file = '/etc/kubernetes/admin.conf') + core_v1_api = kubernetes.client.CoreV1Api() + string_to_append = self.add_keys_to_pod(request) + bodyc = kubernetes.client.V1ConfigMap(api_version = 'v1', kind = 'ConfigMap', metadata = kubernetes.client.V1ObjectMeta(name = "new-config-" + str(request.name), namespace = str(request.name)), data = {'tconfig-file.conf':'\n' +string_to_append}) + try: + api_response = core_v1_api.create_namespaced_config_map(namespace=str(request.name), body = bodyc) + except ApiException as e: + print("Exception when calling CoreV1Api->create_namespaced_config_map: %s\n" % e) + + def login_pending(self, request): + """Waits until pod finishes building + Args: request + Returns: None + """ + config.load_kube_config(config_file = '/etc/kubernetes/admin.conf') + k8s_client = client.ApiClient() + k8s_api = client.ExtensionsV1beta1Api(k8s_client) + configuration = kubernetes.client.Configuration() + deps = k8s_api.read_namespaced_deployment_status(name= "login-node-n-" + str(request.name), namespace =str(request.name)) + while(deps.status.available_replicas != 1): + k8s_api = client.ExtensionsV1beta1Api(k8s_client) + deps = k8s_api.read_namespaced_deployment_status(name= "login-node-n-" + str(request.name), namespace =str(request.name)) + self.log.info("LOGIN POD CREATED") + deps.metadata.name = deps.metadata.name + "-" + request.name + return login_info(self, request) + + def add_keys_to_pod(self, request): + """Returns string to append to config map that adds users + Args: request + Returns: None + """ + members = self.get_members_names(request) + attributes = {} + i = 1000 + for member in members: + try: + user = self.client.getUser(member) + except Exception, e: + self.log.warning("Could not find user: %s", member) + raise e + string_to_append = str(user.name) +':x:' + str(i) + ':' + str(i) + ':'+ '/home/' + str(user.name) + '::' + '/bin/bash:' + str(user.sshpubstring) + '\n' + '\n' + self.log.info(string_to_append) + i = i + 1 + return string_to_append + + def runtask(self): + self.log.info("Running task %s" % self.section) + self.log.debug("Polling master....") + + try: + requests = self.client.listRequests() + n = len(requests) if requests else 0 + self.log.debug("Processing %d requests" % n) + if requests: + for r in requests: + try: + self.process_request(r) + except Exception, e: + self.log.warning("Request %s had an exception (%s)", r.name, e) + self.log.debug(traceback.format_exc(None)) + except InfoConnectionFailure, e: + self.log.warning("Could not read requests from infoservice. (%s)", e) + + def process_request(self, request): + self.log.debug("Processing headnode for '%s'", request.name) + + headnode = None + next_state = None + reason = None + + if not request.headnode: + # Request has not yet indicated the name it wants for the headnode, + # so we simply return. + return + + try: + headnode = self.client.getNodeset(request.headnode) + except InfoEntityMissingException: + pass + except InfoConnectionFailure: + return + + try: + if headnode is None: + if request.state == 'initializing': + headnode = self.create_headnode_nodeset(request) + elif request.state == 'cleanup' or request.state == 'terminated': + # Nothing to do, the headnode has been cleaned-up + return + else: + # Something went wrong, the headnode should still be there. + self.log.error("Could not find headnode information for %s", request.name) + return + + next_state, reason = headnode.state, headnode.state_reason + + if request.state == 'cleanup' or request.state == 'terminated': + self.log.info("Calling state_terminating") + (next_state, reason) = self.state_terminating(request, headnode) + + if next_state == 'new': + (next_state, reason) = self.state_new(request, headnode) + + if next_state == 'booting': + (next_state, reason) = self.state_booting(request, headnode) + + if next_state == 'pending': + (next_state, reason) = self.state_pending(request, headnode) + + if next_state == 'running': + (next_state, reason) = self.state_running(request, headnode) + + if (next_state != 'terminated') and (next_state != 'failure'): + (next_state, reason) = self.check_timeout(request, next_state, reason) + + except Exception, e: + self.log.debug("Error while processing headnode: %s", e) + self.log.warning(traceback.format_exc(None)) + + if headnode: + (next_state, reason) = ('failure', 'Internal error: %s' % e) + else: + raise + + headnode.state = next_state + headnode.state_reason = reason + + try: + if headnode.state == 'terminated': + self.delete_headnode_nodeset(request) + else: + self.client.storeNodeset(headnode) + except Exception, e: + self.log.warning("Storing the new headnode state failed. (%s)", e) + self.log.warning(traceback.format_exc(None)) + + def state_terminating(self, request, headnode): + try: + config.load_kube_config(config_file = '/etc/kubernetes/admin.conf') + configuration = kubernetes.client.Configuration() + api_instance = kubernetes.client.CoreV1Api(kubernetes.client.ApiClient(configuration)) + api_response = api_instance.delete_namespace(str(request.name)) + self.initializers.pop(request.name, None) + except Exception, e: + self.log.warning('Could not delete headnode instance for request %s (%s)', request.name, e) + finally: + return ('terminated', 'Headnode succesfully terminated') + + def state_new(self, request, headnode): + self.log.info('Creating new nodeset %s for request %s', request.headnode, request.name) + try: + login = self.boot_pod(request, headnode) + if not login: + self.log.warning('Could not boot headnode for request %s', request.name) + return ('failure', 'Could not boot headnode.', request.name) + else: + self.log.debug('Waiting for headnode for request %s to come online', request.name) + self.log.info("BOOTING") + return ('booting', 'Headnode is booting up.') + except Exception, e: + self.log.warning('Error in request to openstack: %s', e) + return ('failure', 'Could not boot headnode because of an internal error: %s.' % e) + + def state_booting(self, request, headnode): + if not headnode.app_host: + headnode.app_host = self.__get_ip(request) + if headnode.app_host: + self.last_contact_times[request.name] = time.time() + headnode.port = self.__get_port( request) + return ('pending', 'Headnode is being configured.') + + def state_pending(self, request, headnode): + self.login_pending(request) + (next_state, state_reason) = ('running', 'Headnode is ready to be used.') + + if next_state == 'running': + self.log.info('Done initializing pod %s for request %s', request.headnode, request.name) + #self.report_running_server(request, headnode) + elif next_state != 'failure': + self.log.debug('Waiting for headnode for %s to finish initialization.', request.name) + return (next_state, state_reason) + + def state_running(self, request, headnode): + return ('running', 'Headnode is ready to be used.') + + def check_if_online(self, request, headnode): + if headnode.app_host is None: + self.log.debug('Headnode for %s does not have an address yet.', request.name) + return False + + try: + self.log.info('The port is') + self.log.info(headnode.port) + self.log.debug("Connecting to headnode %s with key %s as user %s", headnode.app_host, self.node_private_key_file, self.node_user ) + subprocess.check_call([ + 'ssh', + '-i', + self.node_private_key_file, + '-p', + str(headnode.port), + self.node_user +'@' + + headnode.app_host], shell=True) + + self.log.info('Headnode for %s running at %s', request.name, headnode.app_host) + + return True + except subprocess.CalledProcessError: + self.log.debug('Headnode for %s running at %s could not be accessed.', request.name, headnode.app_host) + return False + + def check_timeout(self, request, next_state, reason): + now = time.time() + + diff = 0 + + if self.last_contact_times.has_key(request.name): + diff = now - self.last_contact_times[request.name] + else: + self.last_contact_times[request.name] = now + return (next_state, reason) + + self.log.debug('Headnode for %s last contacted %d seconds ago.', request.name, diff) + + if diff > self.node_max_no_contact_time: + self.log.warning('Headnode for %s could not be contacted after %d seconds. Declaring failure.', request.name, self.node_max_no_contact_time) + return ('failure', 'Headnode could no be contacted after %d seconds.' % self.node_max_no_contact_time) + elif diff > self.node_max_no_contact_time/2: + self.log.warning('Headnode for %s could not be contacted! (waiting for %d seconds before declaring failure)', request.name, self.node_max_no_contact_time - diff) + reason = reason + " (Headnode could not be contacted. This may be a transient error. Waiting for {:.0f} seconds before declaring failure.)".format(self.node_max_no_contact_time - diff) + return (next_state, reason) + else: + return (next_state, reason) + + def boot_pod(self, request, headnode): + self.log.info('Starting boot') + try: + login = login_info(self,request) + if login: + self.log.info('Found headnode at %s for request %s', request.headnode, request.name) + return login + except Exception, e: + pass + + self.log.info('Booting new headnode for request %s...', request.name) + login = self.login_create(request) + self.log.info('past login create') + if login == None: + self.log.info('login is null') + self.log.info('returning from boot server') + return login + + + def check_if_done(self, request, headnode): + return ('running', 'Headnode is ready to be used.') + + def report_running_server(self, request, headnode): + try: + headnode.app_sectoken = self.read_encoded(self.secret_auth_filename(request)) + headnode.state = 'running' + except Exception, e: + self.log.warning('Cound not read file of shared secret for request %s (%s)', request.name, e) + self.log.debug(traceback.format_exc(None)) + headnode.state = 'failure' + + def secret_auth_filename(self, request): + # file created by ansible + return '/tmp/secret.' + request.name + + def read_encoded(self, filename): + with open(filename, 'r') as f: + contents = f.read() + return self.client.encode(contents) + + def get_members_names(self, request): + members = None + + if request.project: + project = self.client.getProject(request.project) + if project: + members = project.members + + if not members: + members = [] + self.log.warning('Could not find user names for request %s.') + + return members + + def _get_members_attributes(self, request, attribute): + members = self.get_members_names(request) + + attributes = {} + for member in members: + try: + user = self.client.getUser(member) + except Exception, e: + self.log.warning("Could not find user: %s", member) + raise e + + attr_value = getattr(user, attribute, None) + if not attr_value: + self.log.warning('Could not find attribute: %s, for user %s', + attribute, member) + else: + attributes[member] = attr_value + + return attributes + + def get_members_keys(self, request): + return self._get_members_attributes(request, 'sshpubstring') + + def get_members_uuids(self, request): + return self._get_members_attributes(request, 'identity_id') + + def get_globusvc3_mapfile(self, request): + members_uuids = self.get_members_uuids(request) + mapfile = {i:j for j, i in members_uuids.iteritems()} + + return mapfile + + def get_builder_options(self, request): + packages = [] + for env_name in request.environments: + env = self.client.getEnvironment(env_name) + if env.packagelist: + packages.extend(env.packagelist) + return " ".join([ "--require %s" % p for p in packages ]) + + def create_headnode_nodeset(self, request): + self.log.debug("Creating new headnode spec '%s'", request.headnode) + + headnode = self.client.defineNodeset( + name = request.headnode, + owner = request.owner, + node_number = 1, + app_type = self.getAppType(request), + app_role = 'head-node', + environment = None, + description = 'Headnode nodeset automatically created: ' + request.headnode, + displayname = request.headnode) + + self.last_contact_times[request.name] = time.time() + + return headnode + + def delete_headnode_nodeset(self, request): + if request.headnode: + try: + headnode = self.client.getNodeset(request.headnode) + self.log.debug("Deleting headnode spec '%s'", request.headnode) + self.client.deleteNodeset(request.headnode) + except Exception, e: + self.log.debug("Could not delete headnode nodeset '%s'." % (request.headnode,)) + self.log.debug(traceback.format_exc(None)) + + def getAppType(self, request): + cluster = self.client.getCluster(request.cluster) + nodeset = self.client.getNodeset(cluster.nodesets[0]) + app_type = nodeset.app_type + + return app_type + + def __get_ip(self, request): + login = login_info(self,request) + if login[0] == None: + self.log.debug("Headnode for request %s is not active yet.", request.name) + return None + else: + return login[0] + def __get_port(self, request): + login = login_info(self, request) + if login[1] == None: + self.log.debug("Headnode for request %s is not active yet.", request.name) + return None + else: + return login[1] + def vm_name(self, request): + return self.node_prefix + request.name + From b4187abee5d43afa3cad9dbd95f4bc657108866c Mon Sep 17 00:00:00 2001 From: wookiee2187 Date: Thu, 29 Aug 2019 11:55:41 -0500 Subject: [PATCH 3/4] change ot kube config --- vc3master/plugins/task/HandleHeadNodesK8S.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/vc3master/plugins/task/HandleHeadNodesK8S.py b/vc3master/plugins/task/HandleHeadNodesK8S.py index 2859391..d7e2a29 100644 --- a/vc3master/plugins/task/HandleHeadNodesK8S.py +++ b/vc3master/plugins/task/HandleHeadNodesK8S.py @@ -51,7 +51,7 @@ def __init__(self, parent, config, section): # number of times we have tries to initialize a node. After node_max_initializing_count, declare failure. self.initializing_count = {} - + self.kubernetes_config = self.config.get(section, 'kube_config') self.log.debug("HandleHeadNodes VC3Task initialized.") global login_info @@ -60,7 +60,7 @@ def login_info(self, request): Args: request Returns: IP and port as integers """ - config.load_kube_config(config_file = '/etc/kubernetes/admin.conf') + config.load_kube_config(config_file = self.kubernetes_config) v1 = client.CoreV1Api() k8s_client = client.ApiClient() k8s_api = client.ExtensionsV1beta1Api(k8s_client) @@ -110,7 +110,7 @@ def login_create(self, request): Returns: None """ self.log.info('Starting login_create') - config.load_kube_config(config_file = '/etc/kubernetes/admin.conf') + config.load_kube_config(config_file = self.kubernetes_config) v1 = client.CoreV1Api() k8s_client = client.ApiClient() k8s_api = client.ExtensionsV1beta1Api(k8s_client) @@ -148,7 +148,7 @@ def create_namespace(self, request): Args: request Returns: None """ - config.load_kube_config(config_file = '/etc/kubernetes/admin.conf') + config.load_kube_config(config_file = self.kubernetes_config) pp = pprint.PrettyPrinter(indent =4) configuration = kubernetes.client.Configuration() api_instance = kubernetes.client.CoreV1Api(kubernetes.client.ApiClient(configuration)) @@ -163,7 +163,7 @@ def create_dep(self, request): Args: request Returns: None """ - config.load_kube_config(config_file = '/etc/kubernetes/admin.conf') + config.load_kube_config(config_file = self.kubernetes_config) pp = pprint.PrettyPrinter(indent =4) configuration = kubernetes.client.Configuration() api_instance = kubernetes.client.AppsV1Api(kubernetes.client.ApiClient(configuration)) @@ -200,7 +200,7 @@ def create_service(self, request): Args: request Returns: None """ - config.load_kube_config(config_file = '/etc/kubernetes/admin.conf') + config.load_kube_config(config_file = self.kubernetes_config) core_v1_api = kubernetes.client.CoreV1Api() serv_list = [] serv_list.append(kubernetes.client.V1ServicePort(port=22, protocol='TCP')) @@ -215,7 +215,7 @@ def create_conf_users(self, request): Args: request Returns: None """ - config.load_kube_config(config_file = '/etc/kubernetes/admin.conf') + config.load_kube_config(config_file = self.kubernetes_config) core_v1_api = kubernetes.client.CoreV1Api() string_to_append = self.add_keys_to_pod(request) bodyc = kubernetes.client.V1ConfigMap(api_version = 'v1', kind = 'ConfigMap', metadata = kubernetes.client.V1ObjectMeta(name = "new-config-" + str(request.name), namespace = str(request.name)), data = {'tconfig-file.conf':'\n' +string_to_append}) @@ -229,7 +229,7 @@ def login_pending(self, request): Args: request Returns: None """ - config.load_kube_config(config_file = '/etc/kubernetes/admin.conf') + config.load_kube_config(config_file = self.kubernetes_config) k8s_client = client.ApiClient() k8s_api = client.ExtensionsV1beta1Api(k8s_client) configuration = kubernetes.client.Configuration() From 8dc57cac84eceadd0e5acf953ec8d404b2f6a4ec Mon Sep 17 00:00:00 2001 From: wookiee2187 Date: Thu, 29 Aug 2019 12:54:24 -0500 Subject: [PATCH 4/4] Handle Requests added --- vc3master/plugins/task/HandleRequestsK8S.py | 766 ++++++++++++++++++++ 1 file changed, 766 insertions(+) create mode 100644 vc3master/plugins/task/HandleRequestsK8S.py diff --git a/vc3master/plugins/task/HandleRequestsK8S.py b/vc3master/plugins/task/HandleRequestsK8S.py new file mode 100644 index 0000000..019a934 --- /dev/null +++ b/vc3master/plugins/task/HandleRequestsK8S.py @@ -0,0 +1,766 @@ +#!/usr/bin/env python + +import ConfigParser +import StringIO +from base64 import b64encode +import socket + +import os +import json +import math +from datetime import datetime + +from vc3master.task import VC3Task +from vc3infoservice.infoclient import InfoConnectionFailure,InfoEntityMissingException + +import pluginmanager as pm +import traceback +import kubernetes + +class HandleRequests(VC3Task): + ''' + Plugin to manage the life cycle of all requests. + + ''' + + def __init__(self, parent, config, section): + super(HandleRequests, self).__init__(parent, config, section) + self.client = parent.client + self.log.debug("HandleRequests VC3Task initialized.") + + def runtask(self): + self.log.info("Running task %s" % self.section) + self.log.debug("Polling master....") + + try: + requests = self.client.listRequests() + n = len(requests) if requests else 0 + self.log.debug("Processing %d requests" % n) + if requests: + for r in requests: + try: + self.process_request(r) + except VC3InvalidRequest, e: + self.log.warning("Request %s is not valid. (%s)", r.name, e) + r.state = 'failure' + r.state_reason = 'Request invalid: ' + str(e) + except Exception, e: + self.log.warning("Request %s had a exception (%s)", r.name, e) + self.log.debug(traceback.format_exc(None)) + r.state = 'failure' + r.state_reason = str(e) + + try: + self.client.storeRequest(r) + except Exception, e: + self.log.warning("Storing the new request state failed. (%s)", e) + self.log.warning(traceback.format_exc(None)) + + except InfoConnectionFailure, e: + self.log.warning("Could not read requests from infoservice. (%s)", e) + + def process_request(self, request): + next_state = None + reason = None + + self.log.debug("Processing request '%s'", request.name) + + (next_state, reason) = (request.state, request.state_reason) + + headnode = self.getHeadNode(request) + + if not self.is_finishing_state(next_state): + if self.request_has_expired(request): + (next_state, reason) = ('terminating', 'virtual cluster has expired') + elif headnode and headnode.state == 'failure': + (next_state, reason) = ('failure', 'There was a failure with headnode: %s \nPlease terminate the virtual cluster.' % headnode.state_reason) + + if request.action and request.action == 'terminate': + if not self.is_finishing_state(next_state) or request.state == 'failure': + (next_state, reason) = ('terminating', 'received terminate action') + + nodesets = self.getNodesets(request) + request.statusinfo = self.compute_job_status_summary(request.statusraw, nodesets, next_state) + + if next_state == 'terminated': + (next_state, reason) = self.state_terminated(request) + + if next_state == 'new': + # nexts: initializing, terminating + (next_state, reason) = self.state_new(request) + + if next_state == 'initializing': + # nexts: initializing, pending, terminating + (next_state, reason) = self.state_initializing(request, headnode) + + if next_state == 'pending': + # nexts: pending, running, terminating + (next_state, reason) = self.state_pending(request) + + if next_state == 'running': + # nexts: running, terminating + (next_state, reason) = self.state_running(request) + + if next_state == 'terminating': + # waits until everything has been cleanup + (next_state, reason) = self.state_terminating(request) + + if next_state == 'cleanup': + # waits until everything has been cleanup + (next_state, reason) = self.state_cleanup(request) + + if next_state is not request.state or reason is not request.state_reason: + self.log.debug("request '%s' state '%s' -> %s (%s)'", request.name, request.state, next_state, str(reason)) + request.state = next_state + request.state_reason = reason + else: + self.log.debug("request '%s' remained in state '%s'", request.name, request.state) + + if self.is_configuring_state(request.state): + self.add_queues_conf(request, nodesets) + self.add_auth_conf(request) + else: + request.queuesconf = None + request.authconf = None + + def is_configuring_state(self, state): + if self.is_initializing_state(state): + return False + + if state == 'terminated': + return False + + return True + + def is_initializing_state(self, state): + return state in ['new', 'initializing'] + + def is_finishing_state(self, state): + return state in ['failure', 'terminating', 'cleanup', 'terminated'] + + def request_is_valid(self, request): + + bad_reasons = [] + + try: + if not request.project: + bad_reasons.append("Virtual cluster '%s' does not belong to any project." % request.name) + else: + try: + project = self.client.getProject(request.project) + + if not (request.allocations and len(request.allocations) > 0): + bad_reasons.append("Virtual cluster did not define any allocations.") + + if not (project.allocations and len(project.allocations) > 0): + bad_reasons.append("Project '%s' did not define any allocations." % (request.project, )) + + if request.allocations and project.allocations: + for a in request.allocations: + if a not in project.allocations: + bad_reasons.append("Allocation '%s' does not belong to project '%s'." % (a, request.project)) + + if project.members and len(project.members) > 0: + if request.owner not in project.members: + bad_reasons.append("User '%s' that created the virtual cluster does not belong to project '%s'." % (request.owner, request.project)) + for member_name in project.members: + try: + member = self.client.getUser(member_name) + if not member.sshpubstring: + bad_reasons.append("User '%s' in project '%s' does not have a ssh-key." % (member_name, request.project)) + elif not self.client.validate_ssh_pub_key(member.sshpubstring): + bad_reasons.append("User '%s' in project '%s' has an invalid ssh-key." % (member_name, request.project)) + except InfoEntityMissingException: + bad_reasons.append("User '%s' in project '%s' is not defined." % (member_name, request.project)) + else: + bad_reasons.append("Project '%s' did not define any members." % (request.project,)) + except InfoEntityMissingException: + bad_reasons.append("Project '%s' is not defined." % (request.project,)) + except Exception, e: + bad_reasons.append("There was an error while validating virtual cluster request: %s." % (e,)) + + if not bad_reasons: + # fill-in the desired headnode name. This will eventually come from + # the nodesets when the request is first created. + request.headnode = 'headnode-for-' + request.name + return True + else: + raise VC3InvalidRequest(' '.join(bad_reasons)) + + def state_new(self, request): + ''' + Validates all new requests. + ''' + self.log.debug('processing new request %s' % request.name) + + try: + if self.request_is_valid(request): + return ('initializing', 'Waiting for cluster components to come online.') + except VC3InvalidRequest, e: + self.log.warning("Invalid Request: %s" % str(e)) + return ('failure', 'Please terminate the cluster.\nInvalid virtual cluster specification:\n%s' % str(e)) + + def state_initializing(self, request, headnode): + if not headnode: + self.log.debug('waiting for headnode to come online for %s' % request.name) + return ('initializing', 'Headnode is being created.') + if headnode.state == 'running': + return ('pending', 'Requesting compute workers.') + if headnode.state == 'failure': + return ('failure', 'Error while initializing the headnode: %s' % headnode.state_reason) + + # in any other case, use the state from the headnode: + return ('initializing', headnode.state_reason) + + + def state_pending(self, request): + self.log.debug('waiting for factory to start fulfilling request %s' % request.name) + + running = self.job_count_with_state(request, 'running') + idle = self.job_count_with_state(request, 'idle') + err = self.job_count_with_state(request, 'error') + + if (running is None) or (idle is None) or (err is None): + return ('pending', 'Requesting compute workers') + elif running > 0: + return ('running', 'Growing virtual cluster.') + else: + return ('pending', 'Waiting for compute workers to start running.') + + def state_running(self, request): + total_of_nodes = self.total_jobs_requested(request) + running = self.job_count_with_state(request, 'running') + idle = self.job_count_with_state(request, 'idle') + + if (running is None) or (idle is None): + self.log.warning('Failure: status of request %s went away.' % request.name) + return ('terminating', 'Failure: status of virtual cluster %s went away.' % request.name) + elif total_of_nodes > (running + idle): + return ('running', 'Requesting %d more compute worker(s).' % (total_of_nodes - running)) + elif total_of_nodes < (running + idle): + return ('running', 'Requesting %d less compute worker(s).' % (running - total_of_nodes)) + elif total_of_nodes == running: + return ('running', 'All requested compute workers are running.') + else: + return ('running', 'Waiting for %d queued compute workers.' % (idle,)) + + def state_terminating(self, request): + self.log.debug('request %s is terminating' % request.name) + running = self.job_count_with_state(request, 'running') + idle = self.job_count_with_state(request, 'idle') + + if (running is None) or (idle is None): + self.log.warning('Failure: status of request %s went away.' % request.name) + return ('cleanup', 'Failure: status of virtual cluster %s went away.' % request.name) + elif (running + idle) == 0: + return ('cleanup', 'All compute workers have been terminated.') + elif request.action == 'terminate': + return ('terminating', 'terminate action is being processed.') + else: + return ('terminating', 'Terminating all compute workers.') + + + def state_cleanup(self, request): + self.log.debug('collecting garbage for request %s' % request.name) + + # to fill cleanup here! + if self.is_everything_cleaned_up(request): + return ('terminated', 'Virtual cluster terminated succesfully') + + return ('cleanup', 'Waiting for headnode/others to terminate') + + + def state_terminated(self, request): + if request.action and request.action == 'relaunch': + request.action = 'new' + return ('new', 'relaunching cluster') + else: + self.log.debug('request %s is done' % request.name) + return ('terminated', 'Virtual cluster terminated succesfully') + + + def add_queues_conf(self, request, nodesets): + ''' + request.allocations = [ alloc1, alloc2 ] + .cluster.nodesets = [ nodeset1, nodeset2 ] + nodeset.node_number # total number to launch. + ''' + config = ConfigParser.RawConfigParser() + + try: + for allocation_name in request.allocations: + self.generate_queues_section(config, request, nodesets, allocation_name) + + conf_as_string = StringIO.StringIO() + config.write(conf_as_string) + + request.queuesconf = b64encode(conf_as_string.getvalue()) + return request.queuesconf + except Exception, e: + self.log.error('Failure to generate queuesconf: %s', e) + self.log.debug(traceback.format_exc(None)) + request.queuesconf = None + raise e + + def add_auth_conf(self, request): + config = ConfigParser.RawConfigParser() + + try: + for allocation_name in request.allocations: + self.generate_auth_section(config, request, allocation_name) + + conf_as_string = StringIO.StringIO() + config.write(conf_as_string) + + request.authconf = b64encode(conf_as_string.getvalue()) + return request.authconf + except Exception, e: + self.log.error('Failure generating auth.conf: %s', e) + request.authconf = None + return None + + def generate_queues_section(self, config, request, nodesets, allocation_name): + allocation = self.client.getAllocation(allocation_name) + if not allocation: + raise VC3InvalidRequest("Allocation '%s' has not been declared." % allocation_name, request = request) + + resource = self.client.getResource(allocation.resource) + if not resource: + raise VC3InvalidRequest("Resource '%s' has not been declared." % allocation.resource, request = request) + + resource_nodesize = self.client.getNodeinfo(resource.nodeinfo) + if not resource_nodesize: + raise VC3InvalidRequest("Resource node size '%s' has not been declared." % resource.nodeinfo, request = request) + + for nodeset in nodesets: + self.add_nodeset_to_queuesconf(config, request, resource, resource_nodesize, allocation, nodeset) + + def __get_ip(self, request): + login = login_info(self, request) + if login[0] == None: + self.log.debug("Headnode for request %s is not active yet.", request.name) + return None + else: + return login[0] + def add_nodeset_to_queuesconf(self, config, request, resource, resource_nodesize, allocation, nodeset): + node_number = self.jobs_to_run_by_policy(request, allocation, nodeset) + section_name = request.name + '.' + nodeset.name + '.' + allocation.name + + self.log.debug("Information finalized for queues configuration section [%s]. Creating config." % section_name) + + config.add_section(section_name) + + cores = (resource_nodesize and resource_nodesize.cores) or 1 + disk = (resource_nodesize and resource_nodesize.storage_mb) or 1024 + memory_per_core = (resource_nodesize and resource_nodesize.memory_mb) or 1024 + + + + if resource.accesstype == 'batch': + config.set(section_name, 'batchsubmitplugin', 'CondorSSHRemoteManager') + config.set(section_name, 'batchsubmit.condorsshremotemanager.user', allocation.accountname) + config.set(section_name, 'batchsubmit.condorsshremotemanager.batch', resource.accessflavor) + config.set(section_name, 'batchsubmit.condorsshremotemanager.method', resource.accessmethod) + config.set(section_name, 'batchsubmit.condorsshremotemanager.host', resource.accesshost) + config.set(section_name, 'batchsubmit.condorsshremotemanager.port', str(resource.accessport)) + config.set(section_name, 'batchsubmit.condorsshremotemanager.authprofile', allocation.name) + + config.set(section_name, 'batchsubmit.condorsshremotemanager.condor_attributes.+Nonessential', 'True') + # CMS Connect resources work with singularity CMS images. Only RHEL7 is supported on spark clusters for now. + if resource.name == 'cms-connect' and (nodeset.app_type in ['spark', 'jupyter+spark']): + config.set(section_name, 'batchsubmit.condorsshremotemanager.condor_attributes.+remote_cerequirements', 'required_os=="rhel7"') + config.set(section_name, 'batchsubmit.condorsshremotemanager.condor_attributes.request_cpus', cores) + config.set(section_name, 'batchsubmit.condorsshremotemanager.condor_attributes.request_disk', disk * 1024) + config.set(section_name, 'batchsubmit.condorsshremotemanager.condor_attributes.request_memory', memory_per_core * cores) + # Workaround to (apparently) a bosco bug dealing with multicore + config.set(section_name, 'batchsubmit.condorsshremotemanager.condor_attributes.+remote_nodenumber', cores) + config.set(section_name, 'batchsubmit.condorsshremotemanager.condor_attributes.+remote_SMPGranularity', cores) + + config.set(section_name, 'executable', '%(builder)s') + + # for now, remove all jobs non-peacefully: + config.set(section_name, 'batchsubmit.condorsshremotemanager.overlay.peaceful', 'no') + # if nodeset.app_peaceful is not None: + # if nodeset.app_peaceful: + # config.set(section_name, 'batchsubmit.condorsshremotemanager.overlay.peaceful', 'yes') + # else: + # config.set(section_name, 'batchsubmit.condorsshremotemanager.overlay.peaceful', 'no') + + if nodeset.app_killorder is not None: + config.set(section_name, 'batchsubmit.condorsshremotemanager.overlay.killorder', nodeset.app_killorder) + + if nodeset.app_role == 'worker-nodes': + try: + headnode = self.client.getNodeset(request.headnode) + config.set(section_name, 'shared_secret_file', request.name + 'secret') + config.set(section_name, 'shared_secret', headnode.app_sectoken) + except Exception, e: + self.log.warning("Could not get headnode shared secret for request '%s'. Continuing without password (this probably won't work).", request.name ) + + # Use dynamic resizing. + # This is currently broken. + # configure APF to resize the VC based on the # of jobs in queue + # scalefactor = 1 / float(len(request.allocations)) + + # config.set(section_name, 'wmsstatusplugin', 'Condor') + # config.set(section_name, 'wmsqueue', 'ANY') + # config.set(section_name, 'wmsstatus.condor.scheddhost', headnode.app_host) + # config.set(section_name, 'wmsstatus.condor.collectorhost', headnode.app_host) + # config.set(section_name, 'schedplugin', 'Ready, Scale, KeepNRunning, MaxToRun') + # config.set(section_name, 'sched.scale.factor', scalefactor) + # config.set(section_name, 'sched.maxtorun.maximum', node_number) + + # Use static size + config.set(section_name, 'schedplugin', 'KeepNRunning') + config.set(section_name, 'sched.keepnrunning.keep_running', node_number) + + + elif resource.accesstype == 'cloud': + config.set(section_name, 'batchsubmitplugin', 'CondorEC2') + elif resource.accesstype == 'local': + config.set(section_name, 'batchsubmitplugin', 'CondorLocal') + config.set(section_name, 'batchsubmit.condorlocal.condor_attributes.should_transfer_files', 'YES') + config.set(section_name, 'batchsubmit.condorlocal.condor_attributes.initialdir', '$ENV(TMP)') + + config.set(section_name, 'batchsubmit.condorlocal.condor_attributes.request_cpus', cores) + config.set(section_name, 'batchsubmit.condorlocal.condor_attributes.request_disk', disk * 1024) + config.set(section_name, 'batchsubmit.condorlocal.condor_attributes.request_memory', memory_per_core * cores) + # Workaround to (apparently) a bosco bug dealing with multicore + config.set(section_name, 'batchsubmit.condorlocal.condor_attributes.+remote_nodenumber', cores) + + config.set(section_name, 'executable', '%(builder)s') + else: + raise VC3InvalidRequest("Unknown resource access type '%s'" % str(resource.accesstype), request = request) + + self.add_environment_to_queuesconf(config, request, section_name, nodeset, resource, resource_nodesize) + + self.log.debug("Completed filling in config for allocation %s" % allocation.name) + + + def jobs_to_run_by_policy(self, request, allocation, nodeset): + self.log.debug("Calculating nodes to run...") + node_number = 0 + + if self.is_finishing_state(request.state): + node_number = 0 + self.log.debug("Request in finishing state. Setting keepnrunning to 0") + else: + # For now no policies. Just calculated static-balanced + node_number = self.jobs_to_run_by_static_balanced(request, allocation, nodeset) + return node_number + + def jobs_to_run_by_static_balanced(self, request, allocation, nodeset): + numalloc = len(request.allocations) + total_to_run = self.total_jobs_requested(request) + raw = int(math.floor(float(total_to_run) / numalloc)) + total_raw = raw * numalloc + + # since using floor, it is always the case that total_raw <= total_to_run + # we compensate for any difference in the allocation last in the list + node_number = raw + diff = total_to_run - total_raw + + if allocation.name == request.allocations[-1]: + node_number += diff + + self.log.debug("With %d allocations and nodeset.node_number %d this allocation should run %d" % (numalloc, total_to_run, node_number)) + return node_number + + def generate_auth_tokens(self, principle): + """ + Generate SSH priv/pub keys and base64 encode them + """ + self.log.info("Generating or retrieving SSH keys for %s", principle) + self.ssh = self.parent.parent.ssh + (pub, priv) = self.ssh.getkeys(principle) + self.log.debug("public key: %s", pub) + encoded_pub = b64encode(pub) + encoded_priv = b64encode(priv) + return encoded_pub, encoded_priv + + + def generate_auth_section(self, config, request, allocation_name): + + name = allocation_name + config.add_section(name) + + allocation = self.client.getAllocation(allocation_name) + if not allocation: + raise VC3InvalidRequest("Allocation '%s' has not been declared." % allocation_name, request = request) + + if allocation.privtoken is None: + raise VC3InvalidRequest("Allocation '%s' doesn't have priv token." % allocation_name, request = request) + + resource = self.client.getResource(allocation.resource) + if not resource: + raise VC3InvalidRequest("Resource '%s' has not been declared." % allocation.resource, request = request) + + if resource.accessmethod == 'ssh': + # credible always generates rsa keys + #allocation.sectype = 'ssh-rsa' + #(allocation.pubtoken, allocation.privtoken) = self.generate_auth_tokens(name) + config.set(name, 'plugin', 'SSH') + config.set(name, 'ssh.type', allocation.sectype) + config.set(name, 'ssh.publickey', allocation.pubtoken) + config.set(name, 'ssh.privatekey', allocation.privtoken) + elif resource.accessmethod == 'gsissh': + config.set(name, 'plugin', 'GSISSH') + config.set(name, 'ssh.privatekey', allocation.privtoken) + elif resource.accessmethod == 'local': + # nothing special is needed + config.set(name, 'plugin', 'Noop') + else: + raise VC3InvalidRequest("Unknown resource access method '%s'" % str(resource.accessmethod), request = request) + + + def add_pilot_to_queuesconf(self, config, request, section_name, nodeset, resource, nodesize): + + try: + headnode = self.client.getNodeset(request.headnode) + except Exception, e: + self.log.warning("Could not find headnode for request '%s'.") + raise e + + s = '' + if nodeset.app_type in ['htcondor', 'jupyter+htcondor', 'reana+htcondor']: + collector = headnode.app_host + + s += ' --require vc3-glidein' + s += ' -- vc3-glidein --vc3-env VC3_SH_PROFILE_ENV' + s += ' -c %s -C %s -p %s -t -D %d -m %d --disk %d' % (collector, collector, '%(shared_secret_file)s', nodesize.cores, nodesize.memory_mb * nodesize.cores, nodesize.storage_mb * 1024) + + if nodeset.app_lingertime: + s += ' --lingertime %d' % (nodeset.app_lingertime, ) + + elif nodeset.app_type == 'workqueue': + s += ' --require cctools' + s += ' -- work_queue_worker -M %s -dall -t %d --cores %d --memory %d --disk %d --password %s' % (request.name, 60*60*2, nodesize.cores, nodesize.memory_mb * nodesize.cores, nodesize.storage_mb, '%(shared_secret_file)s') + if nodeset.app_lingertime: + s += ' --timeout %d' % (nodeset.app_lingertime, ) + elif nodeset.app_type in ['spark', 'jupyter+spark']: + sparkmaster = 'spark://' + headnode.app_host + ':7077' + # Workaround to python-dev issue with singularity CMS images. + if resource.name == 'cms-connect': + s += '--no-sys python' + s += ' --require spark-xrootd' + s += ' --var SPARK_NO_DAEMONIZE=1' + s += ' -- start-slave.sh %s --properties-file %s -c %d -m %dM' % (sparkmaster, '%(shared_secret_file)s', nodesize.cores, nodesize.memory_mb * nodesize.cores) + else: + raise VC3InvalidRequest("Unknown nodeset app_type: '%s'" % nodeset.app_type) + + return s + + + def add_environment_to_queuesconf(self, config, request, section_name, nodeset, resource, resource_nodesize): + #s = " --revar 'VC3_.*'" + s = '" ' # trying to quote the thing + s += ' --home=.' + s += ' --install=.' + s += ' --bosco-workaround' + # Parse common environment variables in HPC resources, + # in case they exist. + s += " --revar '^SCRATCH$'" + s += " --revar 'PBS_.*'" + + #e.g. FACTORY_JOBID=apf.virtualclusters.org#53406.6 + factory_jobid = "$ENV(HOSTNAME)" + '#$(Cluster).$(Process)' + if nodeset.app_type in ['htcondor', 'jupyter+htcondor']: + s += ' --var _CONDOR_FACTORY_JOBID=' + factory_jobid # in the Condor/Jupyterhub case we also make sure its a Condor classad + + # Leaving STARTD_ATTRS out for now, while fixing quoting. + #s += ' --var _CONDOR_STARTD_ATTRS=""$(STARTD_ATTRS) FACTORY_JOBID""' + + s += ' --var FACTORY_JOBID=' + factory_jobid + else: + # otherwise we just put the factory jobid into the environment. other middlewares might be able to use it too + s += ' --var FACTORY_JOBID=' + factory_jobid + envs = [] + + if request.environments is not None: + envs.extend(request.environments) + + if nodeset.environment is not None: + envs.append(nodeset.environment) + + for env_name in envs: + environment = self.client.getEnvironment(env_name) + if environment is None: + raise VC3InvalidRequest("Unknown environment '%s' for '%s'" % (env_name, section_name), request = request) + + vs = [ "VC3_REQUESTID=%s" % request.name, "VC3_QUEUE=%s" % section_name] + for k in environment.envmap: + vs.append("%s=%s" % (k, environment.envmap[k])) + + if environment.required_os is not None: + os = '--require-os %s' %environment.required_os + s += ' ' + os + + reqs = ' '.join(['--require %s' % x for x in environment.packagelist]) + + + vars = ' '.join(['--var %s' % x for x in vs]) + + s += ' ' + vars + ' ' + reqs + + if environment.builder_extra_args: + s += ' ' + ' '.join(environment.builder_extra_args) + + if environment.command: + self.log.warning('Ignoring command of environment %s for %s. Adding pilot for %s instead' % (environment.name, section_name, nodeset.name)) + + if nodeset.app_type in ['reana+htcondor']: + s += ' --require container-technology' + + if len(envs) > 0: + config.set(section_name, 'vc3.environments', ','.join(envs)) + + s += ' ' + self.add_pilot_to_queuesconf(config, request, section_name, nodeset, resource, resource_nodesize) + + # add another quote at the end + s += '"' + + config.set(section_name, 'executable.arguments', s) + + def is_everything_cleaned_up(self, request): + headnode = None + if request.headnode: + try: + headnode = self.client.getNodeset(request.headnode) + except InfoConnectionFailure: + # We don't know if headnode has been cleared or not... + return False + except InfoEntityMissingException: + # Headnode is missing, and that's what we want + pass + + if headnode: + return False + # if somethingElseStillThere: + # return False + + # everything has been cleaned up: + return True + + def job_count_with_state(self, request, state): + if not request.statusraw: + return None + + if not request.statusinfo: + return None + + if not self.is_configuring_state(request.state): + return None + + at_least_one_nodeset = False + count = 0 + + for nodeset in request.statusinfo.keys(): + count += request.statusinfo[nodeset].get(state, 0) + at_least_one_nodeset = True + + if at_least_one_nodeset: + self.log.debug('Counting %d jobs in state %s for request %s', count, state, request.name) + return count + + self.log.debug('No nodesets with jobs in state %s for request %s', state, request.name) + return None + + + def total_jobs_requested(self, request): + if self.is_finishing_state(request.state): + return 0 + + cluster = self.client.getCluster(request.cluster) + + if len(cluster.nodesets) < 1: + raise VC3InvalidRequest("No nodesets have been added to Cluster '%s' " % cluster.name, request = request) + + total_jobs = 0 + for nodeset_name in cluster.nodesets: + nodeset = self.client.getNodeset(nodeset_name) + if not nodeset: + raise VC3InvalidRequest("Nodeset '%s' has not been declared." % nodeset_name, request = request) + if nodeset.node_number is not None: + total_jobs += nodeset.node_number + return total_jobs + + def compute_job_status_summary(self, statusraw, nodesets, next_state): + if not statusraw: + return None + + if self.is_initializing_state(next_state): + return None + + statusinfo = {} + for nodeset in nodesets: + statusinfo[nodeset.name] = {} + statusinfo[nodeset.name]['running'] = 0 + statusinfo[nodeset.name]['idle'] = 0 + statusinfo[nodeset.name]['error'] = 0 + statusinfo[nodeset.name]['node_number'] = nodeset.node_number + + if self.is_finishing_state(next_state): + statusinfo[nodeset.name]['requested'] = 0 + else: + statusinfo[nodeset.name]['requested'] = statusinfo[nodeset.name]['node_number'] + + try: + for factory in statusraw.keys(): + for allocation in statusraw[factory][nodeset.name].keys(): + try: + for field in ['running', 'idle', 'error']: + statusinfo[nodeset.name][field] += statusraw[factory][nodeset.name][allocation]['aggregated'][field] + except KeyError, e: + pass + except KeyError, e: + pass + + return statusinfo + + def getHeadNode(self, request): + headnode = None + try: + headnode = self.client.getNodeset(request.headnode) + except InfoConnectionFailure: + pass + except InfoEntityMissingException: + pass + return headnode + + def getNodesets(self, request): + cluster = self.client.getCluster(request.cluster) + if not cluster: + raise VC3InvalidRequest("Cluster '%s' has not been declared." % cluster.name, request = request) + + if len(cluster.nodesets) < 1: + raise VC3InvalidRequest("No nodesets have been added to Cluster '%s' " % cluster.name, request = request) + + nodesets = [] + for nodeset_name in cluster.nodesets: + self.log.debug("retrieving nodeset %s for cluster %s " % (nodeset_name, cluster.name)) + nodeset = self.client.getNodeset(nodeset_name) + if not nodeset: + raise VC3InvalidRequest("Nodeset '%s' has not been declared." % nodeset_name, request = request) + self.log.debug("Retrieved %s for name %s" % ( nodeset, nodeset_name)) + nodesets.append(nodeset) + return nodesets + + def request_has_expired(self, request): + if request.expiration is None: + return False + limit = datetime.strptime(request.expiration, '%Y-%m-%dT%H:%M:%S') + + if limit < datetime.utcnow().replace(microsecond=0): + self.log.debug("Request %s has expired.", request.name) + return True + + return False + +class VC3InvalidRequest(Exception): + def __init__(self, reason, request = None): + self.reason = reason + self.request = request + + def __str__(self): + return str(self.reason) +