diff --git a/.gitignore b/.gitignore index 70ff284a5103c..23bc8ea38615d 100644 --- a/.gitignore +++ b/.gitignore @@ -95,3 +95,14 @@ example/tutorial/R/*.nb.html # travis_wait command logs travis_wait*.log + +.history + +mlflow/java/scoring/bin/* +mlflow/java/client/bin/* +mlflow/java/.settings/* +mlflow/java/client/.classpath +mlflow/java/scoring/.classpath +mlflow/java/scoring/.settings/ +mlflow/java/client/.settings/ + diff --git a/examples/qubole/readme.md b/examples/qubole/readme.md new file mode 100644 index 0000000000000..e613f670512f5 --- /dev/null +++ b/examples/qubole/readme.md @@ -0,0 +1,79 @@ +# Running MLFlow in Qubole Mode + + +When run in `"qubole"` mode, a `ShellCommand` is launched on QDS from the MLFlow project. + +## Setting up cluster + +Install `mlflow` package on cluster using the node-bootstrap. + +``` +source /usr/lib/environs/a-2019.03-py-3.7.3/bin/activate /usr/lib/environs/a-2019.03-py-3.7.3/ +conda install mlflow +``` + +## Start tracking server + +To run a long-lived, shared MLflow tracking server, launch an EC2 instance to run the MLflow Tracking server. + +Create an Anaconda with Python 3 AMI EC2 instance.You can use a t2.micro (Free-tier) instance for test environment. This AMI already has conda and many other packages needed pre-installed. +Install mlflow: +``` +wget https://github.com/qubole/mlflow/releases/download/v1.5.0-q/mlflow-1.5.0-py3-none-any.whl +pip install mlflow-1.5.0-py3-none-any.whl +``` +Open port 5000 for MLflow server; an example of how to do this via How to open a web server port on EC2 instance. Opening up port 5000 to the Internet will allow anyone to access your server, so it is recommended to only open up the port within an AWS VPC that your Qubole clusters have access to. +Configure your AWS credentials on the instance. The optimal configuration for MLflow Remote Tracking is to use the default-artifact-root option to store your artifacts in an S3 bucket. +SSH into your EC2 instance, e.g. ssh -i ~/.ssh/.pem ubuntu@..compute.amazonaws.com. +Configure your S3 credentials via aws cli; for more information, refer to Configuring the AWS CLI. +Run the Tracking Server +Start the tracking server: +```sh +mlflow server --default-artifact-root s3:// --host 0.0.0.0. +``` +For more information, refer to MLflow > Running a Tracking Server. +Test connectivity of your tracking server. Go to http://:5000; it should look similar to + +![](https://docs.databricks.com/_static/images/mlflow/mlflow-web-ui.png) + +## Run the job + +### Set tracking server variable + +Set environment variable `MLFLOW_TRACKING_URI`. + +### Create cluster spec file +Running the remote job requires `backend-spec.json` to be passed as follows, + +```json +{ + "aws": { + "s3_experiment_bucket": "", + "s3_experiment_base_path": "" + }, + "qubole": { + "api_token": "" , + "api_url": "https://api.qubole.com/api/", + "version": "v1.2", + "poll_interval": 5, + "skip_ssl_cert_check": false, + "cloud_name": "AWS" + }, + "cluster": { + "label": "mlflow-test" + }, + "command": { + "name": "mlflow-test", + "tags": ["mlflow"], + "notify": false + } +} +``` + +### Example + +A toy example can be launch using the following command, + +```sh +mlflow run git@github.com:agrawalamey/mlflow-example.git -P alpha=0.5 -b qubole --backend-config backend-spec.json +``` diff --git a/mlflow/projects/__init__.py b/mlflow/projects/__init__.py index e904bbae59ab8..e02e6ddd023f7 100644 --- a/mlflow/projects/__init__.py +++ b/mlflow/projects/__init__.py @@ -19,6 +19,7 @@ import docker import mlflow.projects.databricks +import mlflow.projects.qubole import mlflow.tracking as tracking import mlflow.tracking.fluent as fluent from mlflow.entities import RunStatus, SourceType @@ -201,8 +202,15 @@ def _run(uri, experiment_id, entry_point="main", version=None, parameters=None, kube_config['kube-job-template'] ) return submitted_run - - supported_backends = ["local", "databricks", "kubernetes"] + elif backend == "qubole": + tracking.MlflowClient().set_tag(active_run.info.run_id, MLFLOW_PROJECT_BACKEND, + "qubole") + from mlflow.projects.qubole import run_qubole + return run_qubole( + remote_run=active_run, + uri=uri, entry_point=entry_point, work_dir=work_dir, parameters=parameters, + experiment_id=experiment_id, cluster_spec=backend_config) + supported_backends = ["local", "databricks", "kubernetes", "qubole"] raise ExecutionException("Got unsupported execution mode %s. Supported " "values: %s" % (backend, supported_backends)) @@ -276,7 +284,8 @@ def run(uri, entry_point="main", version=None, parameters=None, if backend == "databricks": mlflow.projects.databricks.before_run_validations(mlflow.get_tracking_uri(), backend_config) - + elif backend == "qubole": + mlflow.projects.qubole.before_run_validations(mlflow.get_tracking_uri(), cluster_spec_dict) experiment_id = _resolve_experiment_id(experiment_name=experiment_name, experiment_id=experiment_id) diff --git a/mlflow/projects/qubole.py b/mlflow/projects/qubole.py new file mode 100644 index 0000000000000..5117b2b8b85e2 --- /dev/null +++ b/mlflow/projects/qubole.py @@ -0,0 +1,311 @@ +import boto3 +import botocore +import hashlib +import json +import logging +import os +import shutil +import tempfile +import textwrap +import time + +from six.moves import shlex_quote + +from mlflow import tracking +from mlflow.entities import RunStatus +from mlflow.projects.submitted_run import SubmittedRun +from mlflow.utils import rest_utils, file_utils +from mlflow.exceptions import ExecutionException +from mlflow.utils import env +from mlflow.utils.mlflow_tags import MLFLOW_QUBOLE_COMMAND_URL, MLFLOW_QUBOLE_COMMAND_ID +from mlflow.version import VERSION + +from qds_sdk.commands import ShellCommand +from qds_sdk.qubole import Qubole + +QUBOLE_TARFILE_ARCHIVE_NAME = "mlflow-project" + + +_logger = logging.getLogger(__name__) + + +class S3Utils(object): + def __init__(self, conf): + self.bucket = conf["s3_experiment_bucket"] + self.base_path = conf["s3_experiment_base_path"] + + def _get_bucket(self): + return boto3.session.Session()\ + .resource('s3').Bucket(self.bucket) + + def _path_exists(self, path): + """ + Returns True if the passed-in path exists in s3. + """ + try: + self._get_bucket().load(path) + except botocore.exceptions.ClientError as e: + if e.response['Error']['Code'] == "404": + return True # The object does not exist. + else: + raise e # Something else has gone wrong. + else: + return False # The object exists. + + def _upload(self, src_path, path): + """ + Uploads the file at `src_path` to the specified S3 path. + """ + _logger.info("=== Uploading to S3 path %s ===" % path) + self._get_bucket().upload_file(src_path, path) + + def upload_project(self, project_dir, experiment_id): + """ + Tars a project directory into an archive in a temp dir and uploads it to S3, returning + the URI of the tarball in S3 (e.g. s3:/path/to/tar). + :param project_dir: Path to a directory containing an MLflow project to upload to S3 (e.g. + a directory containing an MLproject file). + """ + temp_tarfile_dir = tempfile.mkdtemp() + temp_tar_filename = os.path.join(temp_tarfile_dir, "project.tar.gz") + + def custom_filter(x): + return None if os.path.basename(x.name) == "mlruns" else x + + try: + file_utils.make_tarfile(temp_tar_filename, project_dir, QUBOLE_TARFILE_ARCHIVE_NAME, + custom_filter=custom_filter) + with open(temp_tar_filename, "rb") as tarred_project: + tarfile_hash = hashlib.sha256(tarred_project.read()).hexdigest() + # TODO: Get subdirectory for experiment from the tracking server + s3_path = os.path.join(self.base_path, str(experiment_id), + "projects-code", "%s.tar.gz" % tarfile_hash) + if not self._path_exists(s3_path): + self._upload(temp_tar_filename, s3_path) + _logger.info("=== Finished uploading project to %s ===" % s3_path) + else: + _logger.info("=== Project already exists in S3 ===") + finally: + shutil.rmtree(temp_tarfile_dir) + + full_path = "s3://{}/{}".format(self.bucket, s3_path) + + return full_path + + def upload_script(self, script, experiment_id): + """ + Stores the scrip in a temp file and uploads it to S3, returning + the URI of the script in S3 (e.g. s3:/path/to/tar). + :param script: String containing the commands to be run on QDS. + """ + temp_dir = tempfile.mkdtemp() + temp_filename = os.path.join(temp_dir, "script.sh") + try: + file_utils.write_to(temp_filename, script) + with open(temp_filename, "r") as f: + script_hash = hashlib.sha256(f.read().encode('utf-8')).hexdigest() + # TODO: Get subdirectory for experiment from the tracking server + s3_path = os.path.join(self.base_path, str(experiment_id), + "script", "%s.sh" % script_hash) + if not self._path_exists(s3_path): + self._upload(temp_filename, s3_path) + _logger.info("=== Uploaded script to %s ===" % s3_path) + else: + _logger.info("=== Script already exists in S3 ===") + finally: + shutil.rmtree(temp_dir) + + full_path = "s3://{}/{}".format(self.bucket, s3_path) + + return full_path + + +def before_run_validations(tracking_uri, cluster_spec): + """Validations to perform before running a project on Qubole.""" + if not cluster_spec or type(cluster_spec) != dict: + raise ExecutionException("Cluster spec must be provided when launching MLflow project " + "runs on Qubole.") + # if tracking.utils._is_local_uri(tracking_uri): + # raise ExecutionException( + # "When running on Qubole, the MLflow tracking URI must be " + # "a remote HTTP URI accessible to both the " + # "current client and code running on Qubole. Got local tracking URI %s. " + # "Please specify a valid tracking URI via mlflow.set_tracking_uri or by setting the " + # "MLFLOW_TRACKING_URI environment variable." % tracking_uri) + +def _get_qubole_run_script(project_s3_path, run_id, entry_point, parameters, env_vars): + """ + Generates MLflow CLI command to run on Qubole cluster + """ + project_dir = QUBOLE_TARFILE_ARCHIVE_NAME + + conda_env = _get_qubole_conda_home() + + script_template = \ + """ + set -x + # Activate mlflow environment + source {}/bin/activate {}/ + # Export environment variables + {} + # Untar project + tar -xf {} + # Configure boto creds + source /usr/lib/hustler/bin/qubole-bash-lib.sh + set +x + export AWS_ACCESS_KEY_ID=`nodeinfo s3_access_key_id` + export AWS_SECRET_ACCESS_KEY=`nodeinfo s3_secret_access_key` + set -x + export HOME=/home/yarn + # Run mlflow + mlflow run {} \ + --entry-point {} \ + {} {} + """ + + env_var_export = " && ".join(["export {}={}".format(k, v) for + (k, v) in env_vars.items()]) + + project_tar = project_s3_path.rstrip("/").split("/")[-1] + + run_id_param = "--run-id {}".format(run_id) if run_id else "" + + program_params = "" + if parameters: + program_params = " ".join(["-P {}={}".format(k, v) for + (k, v) in parameters.items()]) + + mlflow_run_cmd = script_template.format(conda_env, conda_env, + env_var_export, project_tar, + project_dir, entry_point, + run_id_param, program_params) + + return mlflow_run_cmd + + +def _run_shell_command_job(project_s3_path, script_s3_path, cluster_spec): + """ + Runs the specified shell command on a Qubole cluster. + :param project_s3_path: S3 path of archive + :param script_s3_path: S3 path of shell command to run + :param cluster_spec: Dictionary describing the cluster, expected to contain the fields for a + :return: ShellCommand Object. + """ + + args_template = """ + --script_location {} \\ + --files {} \\ + --cluster-label {} \\ + {} \\ + --tags {} \\ + --name {} \\ + """ + notify = "--notify" if cluster_spec["command"]["notify"] else "" + args = args_template.format(script_s3_path, project_s3_path, + cluster_spec["cluster"]["label"], + notify, ",".join(cluster_spec["command"]["tags"]), + cluster_spec["command"]["name"]) + + _logger.info("=== Launching MLflow run as Qubole job ===") + + Qubole.configure(**cluster_spec["qubole"]) + args = ShellCommand.parse([shlex_quote(x) for x in args.split()]) + command = ShellCommand.create(**args) + + return command + +def _get_qubole_conda_home(): + if env.get_env("QUBOLE_CONDA_HOME") is not None: + return env.get_env("QUBOLE_CONDA_HOME") + else: + return "/usr/lib/environs/a-2019.03-py-3.7.3/" + +def run_qubole(remote_run, uri, entry_point, work_dir, parameters, experiment_id, cluster_spec): + """ + Runs the project at the specified URI on Qubole, returning a `SubmittedRun` that can be + used to query the run's status or wait for the resulting Qubole command to terminate. + """ + tracking_uri = tracking.get_tracking_uri() + + project_s3_path = S3Utils(cluster_spec["aws"]).upload_project(work_dir, experiment_id) + + env_vars = { + tracking._TRACKING_URI_ENV_VAR: tracking_uri, + tracking._EXPERIMENT_ID_ENV_VAR: experiment_id, + "MLFLOW_CONDA_HOME": _get_qubole_conda_home() + } + + run_id = remote_run.info.run_uuid + _logger.info("=== Running entry point %s of project %s on Qubole. ===" % (entry_point, uri)) + + # Get the shell command to run + script = _get_qubole_run_script(project_s3_path, run_id, entry_point, parameters, env_vars) + + script_s3_path = S3Utils(cluster_spec["aws"]).upload_script(script, experiment_id) + + # Launch run on Qubole + command = _run_shell_command_job(project_s3_path, script_s3_path, cluster_spec) + submitted_run = QuboleSubmittedRun(cluster_spec, command, run_id) + submitted_run._print_description_and_log_tags() + + return submitted_run + + +class QuboleSubmittedRun(SubmittedRun): + """ + Instance of SubmittedRun corresponding to a Qubole Job run launched to run an MLflow + project. Note that run_id may be None, e.g. if we did not launch the run against a tracking + server accessible to the local client. + """ + # How often to poll run status when waiting on a run + POLL_STATUS_INTERVAL = 30 + + def __init__(self, cluster_spec, command, run_id): + super(QuboleSubmittedRun, self).__init__() + self.cluster_spec = cluster_spec + self.command = command + self._mlflow_run_id = run_id + + @property + def run_id(self): + return self._mlflow_run_id + + def wait(self): + while not command.is_done(command.status): + time.sleep(self.POLL_STATUS_INTERVAL) + return command.is_success(command.status) + + def cancel(self): + self.command.cancel() + self.wait() + + def _get_status(self): + status = self.command.status + if not self.command.is_done(status): + return RunStatus.RUNNING + if self.command.is_success(status): + return RunStatus.FINISHED + return RunStatus.FAILED + + def get_command_url(self): + qubol_env_base_url = "/".join(self.cluster_spec["qubole"]["api_url"]\ + .rstrip("/")\ + .split("/")[:-1]) + command_url = "{}/v2/analyze?command_id={}".format( + qubol_env_base_url, self.command.id) + + return command_url + + def _print_description_and_log_tags(self): + _logger.info("=== Launched MLflow run as Qubole command with ID %s. Getting run status " + "page URL... ===" % self.run_id) + command_url = self.get_command_url() + tracking.MlflowClient().set_tag(self._mlflow_run_id, + MLFLOW_QUBOLE_COMMAND_URL, command_url) + tracking.MlflowClient().set_tag(self._mlflow_run_id, + MLFLOW_QUBOLE_COMMAND_ID, self.command.id) + + _logger.info("=== Check the run's status at %s ===" % command_url) + + def get_status(self): + return RunStatus.to_string(self._get_status()) \ No newline at end of file diff --git a/mlflow/server/js/src/components/RunView.js b/mlflow/server/js/src/components/RunView.js index 211405b25a1c4..fe945a3ee81b6 100644 --- a/mlflow/server/js/src/components/RunView.js +++ b/mlflow/server/js/src/components/RunView.js @@ -180,6 +180,16 @@ class RunView extends Component { ) : null} + {tags['mlflow.qubole.commandURL'] !== undefined ? ( + + + Logs + + + ) : null} {/* Page Sections */} diff --git a/mlflow/utils/mlflow_tags.py b/mlflow/utils/mlflow_tags.py index 788efc7a43d97..ee76f93e8c668 100644 --- a/mlflow/utils/mlflow_tags.py +++ b/mlflow/utils/mlflow_tags.py @@ -34,6 +34,10 @@ MLFLOW_PROJECT_BACKEND = "mlflow.project.backend" +MLFLOW_QUBOLE_COMMAND_URL = "mlflow.qubole.commandURL" +MLFLOW_QUBOLE_COMMAND_ID = "mlflow.qubole.commandID" + # The following legacy tags are deprecated and will be removed by MLflow 1.0. LEGACY_MLFLOW_GIT_BRANCH_NAME = "mlflow.gitBranchName" # Replaced with mlflow.source.git.branch LEGACY_MLFLOW_GIT_REPO_URL = "mlflow.gitRepoURL" # Replaced with mlflow.source.git.repoURL + diff --git a/setup.py b/setup.py index c2e3080a73d60..d72b6b141bbc2 100644 --- a/setup.py +++ b/setup.py @@ -51,6 +51,7 @@ def package_files(directory): 'sqlalchemy', 'gorilla', 'prometheus-flask-exporter', + 'qds-sdk' ], extras_require={ 'extras':[