Source code for clowdr.endpoint.AWS

#!/usr/bin/env python
#
# This software is distributed with the MIT license:
# https://github.com/gkiar/clowdr/blob/master/LICENSE
#
# clowdr/endpoint/AWS.py
# Created by Greg Kiar on 2018-02-28.
# Email: gkiar@mcin.ca

from botocore.exceptions import *
import time
import boto3
import os.path as op
import json
import warnings
import os
import re

warnings.filterwarnings("ignore", message="numpy.dtype size changed")

import pandas as pd

from clowdr.endpoint.remote import Endpoint
from clowdr import __path__ as clowfile


clowfile = clowfile[0]


[docs]class AWS(Endpoint): # TODO: document
[docs] def setCredentials(self, **kwargs): # TODO: document creds = pd.read_csv(self.credentials) self.access_key = creds['Access key ID'][0] self.secret_access = creds['Secret access key'][0] os.environ["AWS_ACCESS_KEY_ID"] = self.access_key os.environ["AWS_SECRET_ACCESS_KEY"] = self.secret_access if kwargs.get("region"): self.region = kwargs["region"] else: self.region = "us-east-1"
[docs] def startSession(self): # TODO: document self.session = boto3.Session(aws_access_key_id=self.access_key, aws_secret_access_key=self.secret_access, region_name=self.region) self.iam = self.session.client("iam") self.ec2 = self.session.client("ec2") self.batch = self.session.client("batch")
[docs] def configureIAM(self, **kwargs): # TODO: document template = op.join(op.realpath(clowfile), "templates", "AWS", "userRoles.json") with open(template) as fhandle: roles = json.load(fhandle) basestr = "arn:aws:iam::aws:policy/service-role/" policy = {"batch": basestr + "AWSBatchServiceRole", "ecs": basestr + "AmazonEC2ContainerServiceforEC2Role"} for rolename in roles: role = roles[rolename] try: name = role["RoleName"] response = self.iam.get_role(RoleName=name) role["Arn"] = response["Role"]["Arn"] except ClientError as e: if e.response["Error"]["Code"] == "NoSuchEntity": if kwargs.get("verbose"): print("Role '{}' not found- creating.".format(name), flush=True) role["AssumeRolePolicyDocument"] = json.dumps(role["AssumeRolePolicyDocument"]) response = self.iam.create_role(**role) role["Arn"] = response["Role"]["Arn"] self.iam.create_instance_profile(InstanceProfileName=name) self.iam.add_role_to_instance_profile(InstanceProfileName=name, RoleName=name) self.iam.attach_role_policy(RoleName=name, PolicyArn=policy[rolename]) roles[rolename] = role if kwargs.get("verbose"): print("Role ARN: {}".format(roles[rolename]["Arn"]), flush=True) self.roles = roles
[docs] def configureBatch(self, **kwargs): # TODO: document sg = [sg["GroupId"] for sg in self.ec2.describe_security_groups()["SecurityGroups"] if sg["GroupName"] == "default"] net = [nets["SubnetId"] for nets in self.ec2.describe_subnets()["Subnets"]] def waitUntilDone(name, status): while True: env = self.batch.describe_compute_environments(computeEnvironments=[name]) stat = env["computeEnvironments"][0]["status"] if stat != status: return template = op.join(op.realpath(clowfile), "templates", "AWS", "computeEnvironment.json") with open(template) as fhandle: compute = json.load(fhandle) try: name = compute["computeEnvironmentName"] response = self.batch.describe_compute_environments(computeEnvironments=[name]) if len(response["computeEnvironments"]): if (response["computeEnvironments"][0]["status"] != "VALID" or response["computeEnvironments"][0]["state"] != "ENABLED"): raise ClientError({"Error": {"Code": "InvalidEnvironment"}}, "describe_compute_environments") else: compute["computeEnvironmentArn"] = response["computeEnvironments"][0]["computeEnvironmentArn"] else: raise ClientError({"Error": {"Code": "NoSuchEntity"}}, "describe_compute_environments") except ClientError as e: if e.response["Error"]["Code"] == "InvalidEnvironment": if kwargs.get("verbose"): print("Environment '{}' invalid- deleting.".format(name), flush=True) response = self.batch.update_compute_environment(computeEnvironment=name, state="DISABLED") waitUntilDone(name, "UPDATING") response = self.batch.delete_compute_environment(computeEnvironment=name) waitUntilDone(name, "DELETING") if (e.response["Error"]["Code"] == "NoSuchEntity" or e.response["Error"]["Code"] == "InvalidEnvironment"): if kwargs.get("verbose"): print("Environment '{}' not found- creating.".format(name), flush=True) compute["computeResources"]["subnets"] = net compute["computeResources"]["securityGroupIds"] = sg compute["computeResources"]["instanceRole"] = self.roles["ecs"]["Arn"].replace("role", "instance-profile") compute["serviceRole"] = self.roles["batch"]["Arn"] response = self.batch.create_compute_environment(**compute) waitUntilDone(name, "CREATING") waitUntilDone(name, "UPDATING") time.sleep(2) compute["computeEnvironmentArn"] = response["computeEnvironmentArn"] if kwargs.get("verbose"): print("Compute Environment ARN: {}".format(compute["computeEnvironmentArn"]), flush=True) template = op.join(op.realpath(clowfile), "templates", "AWS", "jobQueue.json") with open(template) as fhandle: queue = json.load(fhandle) try: name = queue["jobQueueName"] response = self.batch.describe_job_queues() if not len(response["jobQueues"]): raise ClientError({"Error": {"Code": "NoSuchEntity"}}, "describe_job_queues") else: queue_names = [response["jobQueues"][i]["jobQueueName"] for i in range(len(response["jobQueues"]))] if name not in queue_names: raise ClientError({"Error": {"Code": "NoSuchEntity"}}, "describe_job_queues") queue["jobQueueArn"] = response["jobQueues"][0]["jobQueueArn"] except ClientError as e: if e.response["Error"]["Code"] == "NoSuchEntity": if kwargs.get("verbose"): print("Queue '{}' not found- creating.".format(name), flush=True) response = self.batch.create_job_queue(**queue) queue["jobQueueArn"] = response["jobQueueArn"] time.sleep(2) if kwargs.get("verbose"): print("Job Queue ARN: {}".format(queue["jobQueueArn"]), flush=True) template = op.join(op.realpath(clowfile), "templates", "AWS", "jobDefinition.json") with open(template) as fhandle: job = json.load(fhandle) try: name = job["jobDefinitionName"] response = self.batch.describe_job_definitions() if (not len(response["jobDefinitions"]) or response["jobDefinitions"][0]["status"] == "INACTIVE"): raise ClientError({"Error": {"Code": "NoSuchEntity"}}, "describe_job_definitions") else: job["jobDefinitionArn"] = response["jobDefinitions"][0]["jobDefinitionArn"] except ClientError as e: if e.response["Error"]["Code"] == "NoSuchEntity": if kwargs.get("verbose"): print("Job '{}' not found- creating.".format(name), flush=True) response = self.batch.register_job_definition(**job) job["jobDefinitionArn"] = response["jobDefinitionArn"] if kwargs.get("verbose"): print("Job Definition ARN: {}".format(job["jobDefinitionArn"]), flush=True)
[docs] def launchJob(self, taskloc): # TODO: document orides = {"environment":[{"name": "AWS_ACCESS_KEY_ID", "value": self.access_key}, {"name": "AWS_SECRET_ACCESS_KEY", "value": self.secret_access}], "command":["task", taskloc, "-V"]} # p1, p2 = re.match('.+/.+-([0-9a-zA-Z_]+)/clowdr/task-([A-Za-z0-9]+).json', taskloc).group(1, 2) p1, p2 = re.match('.+\/.+-(\w+)\/clowdr\/task-([A-Za-z0-9]+).json', taskloc).group(1, 2) response = self.batch.submit_job(jobName="clowdr_{}-{}".format(p1, p2), jobQueue="clowdr-q", jobDefinition="clowdr", containerOverrides=orides) jid = response['jobId'] return jid