From d9f9266b573bb981c458d1b2ca3ad214187c249c Mon Sep 17 00:00:00 2001 From: lazaa32 Date: Thu, 20 Dec 2018 18:37:09 +1300 Subject: [PATCH 1/5] Docker extension --- pywps/app/Process.py | 17 +++- pywps/exceptions.py | 4 + pywps/processing/__init__.py | 4 + pywps/processing/container.py | 175 ++++++++++++++++++++++++++++++++++ pywps/processing/job.py | 2 +- pywps/response/execute.py | 10 ++ requirements.txt | 3 +- 7 files changed, 211 insertions(+), 4 deletions(-) create mode 100644 pywps/processing/container.py diff --git a/pywps/app/Process.py b/pywps/app/Process.py index 27f6dc5e3..46069db9f 100644 --- a/pywps/app/Process.py +++ b/pywps/app/Process.py @@ -17,6 +17,7 @@ from pywps.response.execute import ExecuteResponse from pywps.app.WPSRequest import WPSRequest import pywps.configuration as config +import pywps.processing from pywps._compat import PY2 from pywps.exceptions import (StorageNotSupported, OperationNotSupported, ServerBusy, NoApplicableCode) @@ -172,20 +173,32 @@ def _execute_process(self, async, wps_request, wps_response): if running >= maxparallel and maxparallel != -1: raise ServerBusy('Maximum number of parallel running processes reached. Please try later.') wps_response._update_status(WPS_STATUS.ACCEPTED, u"PyWPS Request accepted", 0) - wps_response = self._run_process(wps_request, wps_response) + wps_response = self._run_sync(wps_request, wps_response) return wps_response # This function may not raise exception and must return a valid wps_response # Failure must be reported as wps_response.status = WPS_STATUS.FAILED def _run_async(self, wps_request, wps_response): - import pywps.processing process = pywps.processing.Process( process=self, wps_request=wps_request, wps_response=wps_response) process.start() + def _run_sync(self, wps_request, wps_response): + mode = config.get_config_value('processing', 'mode') + if mode=='docker': + process = pywps.processing.Container( + process=self, + wps_request=wps_request, + wps_response=wps_response) + process.start() + else: + wps_response = self._run_process(wps_request, wps_response) + + return wps_response + # This function may not raise exception and must return a valid wps_response # Failure must be reported as wps_response.status = WPS_STATUS.FAILED def _run_process(self, wps_request, wps_response): diff --git a/pywps/exceptions.py b/pywps/exceptions.py index 368c97818..26236663d 100644 --- a/pywps/exceptions.py +++ b/pywps/exceptions.py @@ -159,3 +159,7 @@ class SchedulerNotAvailable(NoApplicableCode): """Job scheduler not available exception implementation """ code = 400 + +class NoAvailablePort(NoApplicableCode): + """No port available for a new docker container""" + code = 400 diff --git a/pywps/processing/__init__.py b/pywps/processing/__init__.py index 266946593..8b2115920 100644 --- a/pywps/processing/__init__.py +++ b/pywps/processing/__init__.py @@ -6,6 +6,7 @@ import pywps.configuration as config from pywps.processing.basic import MultiProcessing from pywps.processing.scheduler import Scheduler +from pywps.processing.container import Container # api only from pywps.processing.basic import Processing # noqa: F401 from pywps.processing.job import Job # noqa: F401 @@ -15,6 +16,7 @@ MULTIPROCESSING = 'multiprocessing' SCHEDULER = 'scheduler' +DOCKER = 'docker' DEFAULT = MULTIPROCESSING @@ -29,6 +31,8 @@ def Process(process, wps_request, wps_response): LOGGER.info("Processing mode: {}".format(mode)) if mode == SCHEDULER: process = Scheduler(process, wps_request, wps_response) + elif mode == DOCKER: + process = Container(process, wps_request, wps_response) else: process = MultiProcessing(process, wps_request, wps_response) return process diff --git a/pywps/processing/container.py b/pywps/processing/container.py new file mode 100644 index 000000000..72fcf6d3e --- /dev/null +++ b/pywps/processing/container.py @@ -0,0 +1,175 @@ +################################################################## +# Copyright 2018 Open Source Geospatial Foundation and others # +# licensed under MIT, Please consult LICENSE.txt for details # +################################################################## + +import os +import pywps.configuration as config +from pywps.processing.basic import Processing + +from owslib.wps import WebProcessingService as WPS +from pywps.response.status import WPS_STATUS +from pywps.exceptions import NoAvailablePort + +import docker +import socket +import time +import threading + +from pywps.inout.basic import LiteralInput, ComplexInput, BBoxInput +import owslib +from pywps.dblog import store_status + + +import logging +LOGGER = logging.getLogger("PYWPS") + + +class ClientError: + pass + + +class Container(Processing): + def __init__(self, process, wps_request, wps_response): + super().__init__(process, wps_request, wps_response) + self.port = self._assign_port() + self.client = docker.from_env() + self.cntnr = self._create() + + def _create(self): + cntnr_img = config.get_config_value("processing", "docker_img") + prcs_inp_dir = self.job.wps_response.process.workdir + prcs_out_dir = config.get_config_value("server", "outputpath") + dckr_inp_dir = config.get_config_value("processing", "dckr_inp_dir") + dckr_out_dir = config.get_config_value("processing", "dckr_out_dir") + container = self.client.containers.create(cntnr_img, ports={"5000/tcp": self.port}, detach=True, + volumes={ + prcs_out_dir: {'bind': dckr_out_dir, 'mode': 'rw'}, + prcs_inp_dir: {'bind': dckr_inp_dir, 'mode': 'ro'} + }) + return container + + def _assign_port(self): + port_min = int(config.get_config_value("processing", "port_min")) + port_max = int(config.get_config_value("processing", "port_max")) + for port in range(port_min, port_max): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + res = sock.connect_ex(('127.0.0.1', port)) + # TODO find better solution for errno + if res != 0: + return port + raise NoAvailablePort("No port from range {}-{} available.".format(port_min, port_max)) + + def start(self): + self.cntnr.start() + # it takes some time to start the container + time.sleep(1) + self._execute() + + if self.job.process.async: + self._parse_status() + daemon = threading.Thread(target=check_status, args=(self,)) + daemon.start() + else: + self._parse_outputs() + daemon = threading.Thread(target=self.dirty_clean) + daemon.start() + + def stop(self): + self.cntnr.stop() + + def cancel(self): + self.cntnr.kill() + + def pause(self): + self.cntnr.pause() + + def unpause(self): + self.cntnr.unpause() + + def _execute(self): + url_execute = "http://localhost:{}/wps".format(self.port) + inputs = get_inputs(self.job.wps_request.inputs) + output = get_output(self.job.wps_request.outputs) + wps = WPS(url=url_execute, skip_caps=True) + if self.job.process.async: + mode = "async" + else: + mode = "sync" + self.execution = wps.execute(self.job.wps_request.identifier, inputs=inputs, output=output, mode=mode) + + def _parse_outputs(self): + for output in self.execution.processOutputs: + # TODO what if len(data) > 1 ?? + if output.data: + self.job.wps_response.outputs[output.identifier].data = output.data[0] + if output.reference: + rp = output.reference[output.reference.index('outputs/'):] + self.job.wps_response.outputs[output.identifier].file = rp + + self.job.wps_response.update_status_succeeded('PyWPS Process {} finished'.format(self.job.process.title)) + store_status(self.job.wps_response.uuid, self.job.wps_response.status, self.job.wps_response.message) + + def _parse_status(self): + self.job.process.status_url = self.execution.statusLocation + self.job.wps_response.update_status(message=self.execution.statusMessage) + + def dirty_clean(self): + self.cntnr.stop() + self.cntnr.remove() + self.job.process.clean() + self.update_status() + + def update_status(self): + self.job.wps_response.message = 'PyWPS Process {} finished'.format(self.job.process.title) + self.job.wps_response.percentage = 100 + self.job.wps_response.status = WPS_STATUS.SUCCEEDED + store_status(self.job.wps_response.uuid, self.job.wps_response.status, self.job.wps_response.message, + self.job.wps_response.percentage) + + +def get_inputs(job_inputs): + """ + Return all inputs in [(input_name1, input_value1), (input_name2, input_value2)] + Return value can be used for WPS.execute method. + :return: input values + :rtype:list of tuples + """ + the_inputs = [] + for key in job_inputs.keys(): + inp = job_inputs[key][0] + if isinstance(inp, LiteralInput): + ows_inp = str(inp.data) + elif isinstance(inp, ComplexInput): + fp = os.path.basename(inp.file) + dckr_inp_dir = config.get_config_value('processing', 'dckr_inp_dir') + ows_inp = owslib.wps.ComplexDataInput("file://" + os.path.join(dckr_inp_dir, fp)) + elif isinstance(inp, BBoxInput): + ows_inp = owslib.wps.BoundingBoxDataInput(inp.data) + else: + raise Exception + the_inputs.append((key, ows_inp)) + + return the_inputs + + +def get_output(job_output): + """ + Return all outputs name + Return value can be used for WPS.execute method. + :return: output names + :rtype:list + """ + the_output = [] + for key in job_output.keys(): + the_output.append((key, job_output[key]['asReference'])) + return the_output + + +def check_status(container): + sleep_secs = int(config.get_config_value('processing', 'sleep_secs')) + while True: + container.execution.checkStatus(sleepSecs=sleep_secs) + if container.execution.isComplete(): + container.dirty_clean() + break diff --git a/pywps/processing/job.py b/pywps/processing/job.py index dc569c3e4..d68b56749 100644 --- a/pywps/processing/job.py +++ b/pywps/processing/job.py @@ -37,7 +37,7 @@ def dump(self): LOGGER.debug('dump job ...') import dill filename = tempfile.mkstemp(prefix='job_', suffix='.dump', dir=self.workdir)[1] - with open(filename, 'w') as fp: + with open(filename, 'wb') as fp: dill.dump(self, fp) LOGGER.debug("dumped job status to {}".format(filename)) return filename diff --git a/pywps/response/execute.py b/pywps/response/execute.py index 844ca64e4..a9c0ae4c4 100755 --- a/pywps/response/execute.py +++ b/pywps/response/execute.py @@ -83,6 +83,16 @@ def update_status(self, message, status_percentage=None): status_percentage = self.status_percentage self._update_status(self.status, message, status_percentage, False) + def update_status_succeeded(self, message): + """ + Update status report of succeeded process instance. + + This method is for Docker container processing. + + :param str message: Message you need to share with the client + """ + self._update_status(WPS_STATUS.SUCCEEDED, message, 100, True) + def _update_status_doc(self): try: # rebuild the doc diff --git a/requirements.txt b/requirements.txt index e3cf5a6e9..82e6573a3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -flufl.enum +flufl.enum jinja2 jsonschema lxml @@ -8,3 +8,4 @@ python-dateutil requests SQLAlchemy werkzeug +docker From e5d992c29c0661833e1c724b89c52ce976d1a313 Mon Sep 17 00:00:00 2001 From: lazaa32 Date: Tue, 25 Dec 2018 16:40:37 +1300 Subject: [PATCH 2/5] docker moved to requirements-processing.txt --- requirements-processing.txt | 1 + requirements.txt | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements-processing.txt b/requirements-processing.txt index 2b6c5c8c0..3862b84c5 100644 --- a/requirements-processing.txt +++ b/requirements-processing.txt @@ -1,2 +1,3 @@ dill drmaa +docker diff --git a/requirements.txt b/requirements.txt index 82e6573a3..e6300d86e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,4 +8,3 @@ python-dateutil requests SQLAlchemy werkzeug -docker From 353abe1cdaba1e0cb2571339c00abef3db2c7d68 Mon Sep 17 00:00:00 2001 From: lazaa32 Date: Tue, 25 Dec 2018 17:07:25 +1300 Subject: [PATCH 3/5] method _assign_port() is now function --- pywps/processing/container.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/pywps/processing/container.py b/pywps/processing/container.py index 72fcf6d3e..fe8961e33 100644 --- a/pywps/processing/container.py +++ b/pywps/processing/container.py @@ -32,7 +32,7 @@ class ClientError: class Container(Processing): def __init__(self, process, wps_request, wps_response): super().__init__(process, wps_request, wps_response) - self.port = self._assign_port() + self.port = _assign_port() self.client = docker.from_env() self.cntnr = self._create() @@ -49,17 +49,6 @@ def _create(self): }) return container - def _assign_port(self): - port_min = int(config.get_config_value("processing", "port_min")) - port_max = int(config.get_config_value("processing", "port_max")) - for port in range(port_min, port_max): - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - res = sock.connect_ex(('127.0.0.1', port)) - # TODO find better solution for errno - if res != 0: - return port - raise NoAvailablePort("No port from range {}-{} available.".format(port_min, port_max)) - def start(self): self.cntnr.start() # it takes some time to start the container @@ -166,6 +155,18 @@ def get_output(job_output): return the_output +def _assign_port(): + port_min = int(config.get_config_value("processing", "port_min")) + port_max = int(config.get_config_value("processing", "port_max")) + for port in range(port_min, port_max): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + res = sock.connect_ex(('127.0.0.1', port)) + # TODO find better solution for errno + if res != 0: + return port + raise NoAvailablePort("No port from range {}-{} available.".format(port_min, port_max)) + + def check_status(container): sleep_secs = int(config.get_config_value('processing', 'sleep_secs')) while True: From 22c52e4ba2c486e4df6ffcf9ead1a3407eac79b8 Mon Sep 17 00:00:00 2001 From: lazaa32 Date: Tue, 25 Dec 2018 17:30:02 +1300 Subject: [PATCH 4/5] requirements-processing added to travis --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index 0ccf5cc14..c4a3f07d7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -32,6 +32,7 @@ install: - pip install -r requirements-gdal.txt - pip install -r requirements-extra.txt - pip install -r requirements-dev.txt + - pip install -r requirements-processing.txt - pip install coveralls script: From d9e3fe34f963b0a5540aa6a768431afd863db660 Mon Sep 17 00:00:00 2001 From: lazaa32 Date: Tue, 25 Dec 2018 17:45:51 +1300 Subject: [PATCH 5/5] PEP8 cosmetics --- pywps/app/Process.py | 2 +- pywps/exceptions.py | 1 + pywps/processing/container.py | 6 ++---- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/pywps/app/Process.py b/pywps/app/Process.py index 46069db9f..68d229abb 100644 --- a/pywps/app/Process.py +++ b/pywps/app/Process.py @@ -188,7 +188,7 @@ def _run_async(self, wps_request, wps_response): def _run_sync(self, wps_request, wps_response): mode = config.get_config_value('processing', 'mode') - if mode=='docker': + if mode == 'docker': process = pywps.processing.Container( process=self, wps_request=wps_request, diff --git a/pywps/exceptions.py b/pywps/exceptions.py index 26236663d..4840cf005 100644 --- a/pywps/exceptions.py +++ b/pywps/exceptions.py @@ -160,6 +160,7 @@ class SchedulerNotAvailable(NoApplicableCode): """ code = 400 + class NoAvailablePort(NoApplicableCode): """No port available for a new docker container""" code = 400 diff --git a/pywps/processing/container.py b/pywps/processing/container.py index fe8961e33..ad5ec5334 100644 --- a/pywps/processing/container.py +++ b/pywps/processing/container.py @@ -43,10 +43,8 @@ def _create(self): dckr_inp_dir = config.get_config_value("processing", "dckr_inp_dir") dckr_out_dir = config.get_config_value("processing", "dckr_out_dir") container = self.client.containers.create(cntnr_img, ports={"5000/tcp": self.port}, detach=True, - volumes={ - prcs_out_dir: {'bind': dckr_out_dir, 'mode': 'rw'}, - prcs_inp_dir: {'bind': dckr_inp_dir, 'mode': 'ro'} - }) + volumes={prcs_out_dir: {'bind': dckr_out_dir, 'mode': 'rw'}, + prcs_inp_dir: {'bind': dckr_inp_dir, 'mode': 'ro'}}) return container def start(self):