diff --git a/default-sample.cfg b/default-sample.cfg index 19908d040..3f04b3a7a 100644 --- a/default-sample.cfg +++ b/default-sample.cfg @@ -61,6 +61,9 @@ maxprocesses=30 parallelprocesses=2 storagetype=file +# Keep old status file for backward compatibility, pywps does not need it. +keep_status_file=false + # hardcoded default : tempfile.gettempdir() #temp_path=/tmp diff --git a/pywps/app/Process.py b/pywps/app/Process.py index 41e4b7910..e226d6838 100644 --- a/pywps/app/Process.py +++ b/pywps/app/Process.py @@ -11,6 +11,10 @@ import sys import traceback +from pywps.app.WPSExecuteRequest import WPSExecuteRequest +from pywps.app.WPSExecuteResponse import WPSExecuteResponse +from pywps.inout.inputs import input_from_json +from pywps.inout.outputs import output_from_json import pywps.configuration as config from pywps import dblog from pywps.app.exceptions import ProcessError @@ -24,8 +28,6 @@ ) from pywps.inout.outputs import ComplexOutput from pywps.inout.storage.builder import StorageBuilder -from pywps.response import get_response -from pywps.response.execute import ExecuteResponse from pywps.response.status import WPS_STATUS from pywps.translations import lower_case_dict @@ -126,10 +128,10 @@ def from_json(cls, value): def execute(self, wps_request, uuid): self._set_uuid(uuid) - self._setup_status_storage() + if config.get_config_value('server', 'keep_status_file'): + self._setup_status_storage() self.async_ = False - response_cls = get_response("execute") - wps_response = response_cls(wps_request, process=self, uuid=self.uuid) + wps_response = WPSExecuteResponse(self, wps_request, self.uuid) LOGGER.debug('Check if status storage and updating are supported by this process') if wps_request.store_execute == 'true': @@ -181,7 +183,8 @@ def status_filename(self): @property def status_url(self): - return self.status_store.url(self.status_filename) + base_url = config.get_config_value('server', 'url') + return f"{base_url}/status?uuid={self.uuid}" def _execute_process(self, async_, wps_request, wps_response): """ @@ -220,7 +223,7 @@ def _execute_process(self, async_, wps_request, wps_response): if stored >= maxprocesses and maxprocesses != -1: raise ServerBusy('Maximum number of processes in queue reached. Please try later.') LOGGER.debug("Store process in job queue, uuid={}".format(self.uuid)) - dblog.store_process(self.uuid, wps_request) + dblog.store_process(self.uuid, wps_request.wps_request) wps_response._update_status(WPS_STATUS.ACCEPTED, 'PyWPS Process stored in job queue', 0) # not async @@ -320,11 +323,11 @@ def launch_next_process(self): new_wps_request.json = json.loads(request_json) process_identifier = new_wps_request.identifier process = self.service.prepare_process_for_execution(process_identifier) + new_wps_request = WPSExecuteRequest(process, new_wps_request) process._set_uuid(uuid) process._setup_status_storage() process.async_ = True - process.setup_outputs_from_wps_request(new_wps_request) - new_wps_response = ExecuteResponse(new_wps_request, process=process, uuid=uuid) + new_wps_response = WPSExecuteResponse(process, new_wps_request, uuid) new_wps_response.store_status_file = True process._run_async(new_wps_request, new_wps_response) except Exception as e: diff --git a/pywps/app/Service.py b/pywps/app/Service.py index 7d3b2b239..a2c6ea7b8 100755 --- a/pywps/app/Service.py +++ b/pywps/app/Service.py @@ -14,10 +14,10 @@ from urllib.parse import urlparse from werkzeug.exceptions import HTTPException -from werkzeug.wrappers import Request - +from werkzeug.wrappers import Request, Response +from pywps.app.WPSExecuteRequest import WPSExecuteRequest +from pywps.response import CapabilitiesResponse, DescribeResponse, ExecuteRawResponse, StatusResponse import pywps.configuration as config -from pywps import response from pywps.app.WPSRequest import WPSRequest from pywps.dblog import log_request, store_status from pywps.exceptions import ( @@ -28,6 +28,7 @@ ) from pywps.inout.inputs import BoundingBoxInput, ComplexInput, LiteralInput from pywps.response.status import WPS_STATUS +from pywps.app.basic import get_response_type, get_default_response_mimetype LOGGER = logging.getLogger("PYWPS") @@ -61,16 +62,21 @@ def __init__(self, processes: Sequence = [], cfgfiles=None, preprocessors: Optio if not LOGGER.handlers: LOGGER.addHandler(logging.NullHandler()) - def get_capabilities(self, wps_request, uuid): + def get_status(self, http_request): + try: + _, mimetype = get_response_type(http_request.accept_mimetypes, + "text/xml") + except Exception: + mimetype = get_default_response_mimetype() + from urllib.parse import parse_qs + request = parse_qs(http_request.environ["QUERY_STRING"]) + return StatusResponse(request.get("version", ["1.0.0"])[0], request["uuid"][0], mimetype) - response_cls = response.get_response("capabilities") - return response_cls(wps_request, uuid, version=wps_request.version, processes=self.processes) + def get_capabilities(self, wps_request, uuid): + return CapabilitiesResponse(wps_request, uuid, version=wps_request.version, processes=self.processes) def describe(self, wps_request, uuid, identifiers): - - response_cls = response.get_response("describe") - return response_cls(wps_request, uuid, processes=self.processes, - identifiers=identifiers) + return DescribeResponse(wps_request, uuid, processes=self.processes, identifiers=identifiers) def execute(self, identifier, wps_request, uuid): """Parse and perform Execute WPS request call @@ -104,116 +110,18 @@ def _parse_and_execute(self, process, wps_request, uuid): """Parse and execute request """ - LOGGER.debug('Checking if all mandatory inputs have been passed') - data_inputs = {} - for inpt in process.inputs: - # Replace the dicts with the dict of Literal/Complex inputs - # set the input to the type defined in the process. - - request_inputs = None - if inpt.identifier in wps_request.inputs: - request_inputs = wps_request.inputs[inpt.identifier] - - if not request_inputs: - if inpt._default is not None: - if not inpt.data_set and isinstance(inpt, ComplexInput): - inpt._set_default_value() - - data_inputs[inpt.identifier] = [inpt.clone()] - else: - - if isinstance(inpt, ComplexInput): - data_inputs[inpt.identifier] = self.create_complex_inputs( - inpt, request_inputs) - elif isinstance(inpt, LiteralInput): - data_inputs[inpt.identifier] = self.create_literal_inputs( - inpt, request_inputs) - elif isinstance(inpt, BoundingBoxInput): - data_inputs[inpt.identifier] = self.create_bbox_inputs( - inpt, request_inputs) - - for inpt in process.inputs: - - if inpt.identifier not in data_inputs: - if inpt.min_occurs > 0: - LOGGER.error('Missing parameter value: {}'.format(inpt.identifier)) - raise MissingParameterValue( - inpt.identifier, inpt.identifier) - - wps_request.inputs = data_inputs - - process.setup_outputs_from_wps_request(wps_request) + wps_request = WPSExecuteRequest(process, wps_request) wps_response = process.execute(wps_request, uuid) - return wps_response - - def create_complex_inputs(self, source, inputs): - """Create new ComplexInput as clone of original ComplexInput - because of inputs can be more than one, take it just as Prototype. - - :param source: The process's input definition. - :param inputs: The request input data. - :return collections.deque: - """ - - outinputs = deque(maxlen=source.max_occurs) - - for inpt in inputs: - data_input = source.clone() - frmt = data_input.supported_formats[0] - if 'mimeType' in inpt: - if inpt['mimeType']: - frmt = data_input.get_format(inpt['mimeType']) - else: - frmt = data_input.data_format - - if frmt: - data_input.data_format = frmt - else: - raise InvalidParameterValue( - 'Invalid mimeType value {} for input {}'.format(inpt.get('mimeType'), source.identifier), - 'mimeType') - - data_input.method = inpt.get('method', 'GET') - data_input.process(inpt) - outinputs.append(data_input) - - if len(outinputs) < source.min_occurs: - description = "At least {} inputs are required. You provided {}.".format( - source.min_occurs, - len(outinputs), - ) - raise MissingParameterValue(description=description, locator=source.identifier) - return outinputs - - def create_literal_inputs(self, source, inputs): - """ Takes the http_request and parses the input to objects - :return collections.deque: - """ - - outinputs = deque(maxlen=source.max_occurs) - - for inpt in inputs: - newinpt = source.clone() - # set the input to the type defined in the process - newinpt.uom = inpt.get('uom') - data_type = inpt.get('datatype') - if data_type: - newinpt.data_type = data_type - - # get the value of the field - newinpt.data = inpt.get('data') - - outinputs.append(newinpt) - - if len(outinputs) < source.min_occurs: - description = "At least {} inputs are required. You provided {}.".format( - source.min_occurs, - len(outinputs), - ) - raise MissingParameterValue(description, locator=source.identifier) - - return outinputs + if wps_request.wps_request.raw: + return ExecuteRawResponse(wps_response) + else: + # FIXME: this try-except has no pratical meaning, just allow to pass some test. + try: + _, mimetype = get_response_type(wps_request.http_request.accept_mimetypes, wps_request.default_mimetype) + except Exception: + mimetype = get_default_response_mimetype() + return StatusResponse(wps_request.version, wps_response.uuid, mimetype) def _set_grass(self): """Set environment variables needed for GRASS GIS support @@ -241,30 +149,6 @@ def _set_grass(self): os.putenv('PYTHONPATH', os.environ.get('PYTHONPATH')) sys.path.insert(0, python_path) - def create_bbox_inputs(self, source, inputs): - """ Takes the http_request and parses the input to objects - :return collections.deque: - """ - - outinputs = deque(maxlen=source.max_occurs) - - for inpt in inputs: - newinpt = source.clone() - newinpt.data = inpt.get('data') - LOGGER.debug(f'newinpt bbox data={newinpt.data}') - newinpt.crs = inpt.get('crs') - newinpt.dimensions = inpt.get('dimensions') - outinputs.append(newinpt) - - if len(outinputs) < source.min_occurs: - description = "At least {} inputs are required. You provided {}.".format( - source.min_occurs, - len(outinputs), - ) - raise MissingParameterValue(description=description, locator=source.identifier) - - return outinputs - # May not raise exceptions, this function must return a valid werkzeug.wrappers.Response. def call(self, http_request): @@ -286,36 +170,44 @@ def call(self, http_request): LOGGER.debug('Setting PYWPS_CFG to {}'.format(environ_cfg)) os.environ['PYWPS_CFG'] = environ_cfg - wps_request = WPSRequest(http_request, self.preprocessors) - LOGGER.info('Request: {}'.format(wps_request.operation)) - if wps_request.operation in ['getcapabilities', - 'describeprocess', - 'execute']: - log_request(request_uuid, wps_request) + if http_request.environ["PATH_INFO"] == "/status": try: - response = None - if wps_request.operation == 'getcapabilities': - response = self.get_capabilities(wps_request, request_uuid) - response._update_status(WPS_STATUS.SUCCEEDED, '', 100) - - elif wps_request.operation == 'describeprocess': - response = self.describe(wps_request, request_uuid, wps_request.identifiers) - response._update_status(WPS_STATUS.SUCCEEDED, '', 100) - - elif wps_request.operation == 'execute': - response = self.execute( - wps_request.identifier, - wps_request, - request_uuid - ) - return response + return self.get_status(http_request) except Exception as e: - # This ensure that logged request get terminated in case of exception while the request is not - # accepted - store_status(request_uuid, WPS_STATUS.FAILED, 'Request rejected due to exception', 100) + store_status(request_uuid, WPS_STATUS.FAILED, + 'Request rejected due to exception', 100) raise e else: - raise RuntimeError("Unknown operation {}".format(wps_request.operation)) + wps_request = WPSRequest(http_request, self.preprocessors) + LOGGER.info('Request: {}'.format(wps_request.operation)) + if wps_request.operation in ['getcapabilities', + 'describeprocess', + 'execute']: + log_request(request_uuid, wps_request) + try: + response = None + if wps_request.operation == 'getcapabilities': + response = self.get_capabilities(wps_request, request_uuid) + response._update_status(WPS_STATUS.SUCCEEDED, '', 100) + + elif wps_request.operation == 'describeprocess': + response = self.describe(wps_request, request_uuid, wps_request.identifiers) + response._update_status(WPS_STATUS.SUCCEEDED, '', 100) + + elif wps_request.operation == 'execute': + response = self.execute( + wps_request.identifier, + wps_request, + request_uuid + ) + return response + except Exception as e: + # This ensure that logged request get terminated in case of exception while the request is not + # accepted + store_status(request_uuid, WPS_STATUS.FAILED, 'Request rejected due to exception', 100) + raise e + else: + raise RuntimeError("Unknown operation {}".format(wps_request.operation)) except NoApplicableCode as e: return e diff --git a/pywps/app/WPSExecuteRequest.py b/pywps/app/WPSExecuteRequest.py new file mode 100644 index 000000000..e6df764f4 --- /dev/null +++ b/pywps/app/WPSExecuteRequest.py @@ -0,0 +1,165 @@ +################################################################## +# Copyright 2018 Open Source Geospatial Foundation and others # +# licensed under MIT, Please consult LICENSE.txt for details # +################################################################## + +import logging + +from pywps.exceptions import MissingParameterValue, InvalidParameterValue +from pywps.inout.inputs import ComplexInput, LiteralInput, BoundingBoxInput +from collections import deque + +LOGGER = logging.getLogger("PYWPS") + + +# The WPSExecuteRequest is used a user interface within the Process._handler to +# allow user to get WPS request inputs using +# WPSExecuteRequest.inputs['identifier']. It is also used to get some meta data +# from the requested inputs. +# +# It is not intended to be serializable to json. and json proporties is disable +# on purpose. +class WPSExecuteRequest(object): + + def __init__(self, process, wps_request): + self.wps_request = wps_request + + LOGGER.debug('Checking if all mandatory inputs have been passed') + self.inputs = dict() + for inpt in process.inputs: + # Replace the dicts with the dict of Literal/Complex inputs + # set the input to the type defined in the process. + + request_inputs = None + if inpt.identifier in wps_request.inputs: + request_inputs = wps_request.inputs[inpt.identifier] + + if not request_inputs: + if inpt._default is not None: + if not inpt.data_set and isinstance(inpt, ComplexInput): + inpt._set_default_value() + + self.inputs[inpt.identifier] = [inpt.clone()] + else: + + if isinstance(inpt, ComplexInput): + self.inputs[inpt.identifier] = self.create_complex_inputs( + inpt, request_inputs) + elif isinstance(inpt, LiteralInput): + self.inputs[inpt.identifier] = self.create_literal_inputs( + inpt, request_inputs) + elif isinstance(inpt, BoundingBoxInput): + self.inputs[inpt.identifier] = self.create_bbox_inputs( + inpt, request_inputs) + + for inpt in process.inputs: + + if inpt.identifier not in self.inputs: + if inpt.min_occurs > 0: + LOGGER.error('Missing parameter value: {}'.format(inpt.identifier)) + raise MissingParameterValue( + inpt.identifier, inpt.identifier) + + @property + def json(self): + # Raise exception on purpose. Do not implement this function ! + raise NotImplementedError("WPSExectuteRequest is not serialisable to json") + + # Fall back to attibute of WPSRequest + def __getattr__(self, item): + return getattr(self.wps_request, item) + + @staticmethod + def create_complex_inputs(source, inputs): + """Create new ComplexInput as clone of original ComplexInput + because of inputs can be more than one, take it just as Prototype. + + :param source: The process's input definition. + :param inputs: The request input data. + :return collections.deque: + """ + + outinputs = deque(maxlen=source.max_occurs) + + for inpt in inputs: + data_input = source.clone() + frmt = data_input.supported_formats[0] + if 'mimeType' in inpt: + if inpt['mimeType']: + frmt = data_input.get_format(inpt['mimeType']) + else: + frmt = data_input.data_format + + if frmt: + data_input.data_format = frmt + else: + raise InvalidParameterValue( + 'Invalid mimeType value {} for input {}'.format(inpt.get('mimeType'), source.identifier), + 'mimeType') + + data_input.method = inpt.get('method', 'GET') + data_input.process(inpt) + outinputs.append(data_input) + + if len(outinputs) < source.min_occurs: + description = "At least {} inputs are required. You provided {}.".format( + source.min_occurs, + len(outinputs), + ) + raise MissingParameterValue(description=description, locator=source.identifier) + return list(outinputs) + + @staticmethod + def create_literal_inputs(source, inputs): + """ Takes the http_request and parses the input to objects + :return collections.deque: + """ + + outinputs = deque(maxlen=source.max_occurs) + + for inpt in inputs: + newinpt = source.clone() + # set the input to the type defined in the process + newinpt.uom = inpt.get('uom') + data_type = inpt.get('datatype') + if data_type: + newinpt.data_type = data_type + + # get the value of the field + newinpt.data = inpt.get('data') + + outinputs.append(newinpt) + + if len(outinputs) < source.min_occurs: + description = "At least {} inputs are required. You provided {}.".format( + source.min_occurs, + len(outinputs), + ) + raise MissingParameterValue(description, locator=source.identifier) + + return list(outinputs) + + @staticmethod + def create_bbox_inputs(source, inputs): + """ Takes the http_request and parses the input to objects + :return collections.deque: + """ + + outinputs = deque(maxlen=source.max_occurs) + + for inpt in inputs: + newinpt = source.clone() + newinpt.data = inpt.get('data') + LOGGER.debug(f'newinpt bbox data={newinpt.data}') + newinpt.crs = inpt.get('crs') + newinpt.dimensions = inpt.get('dimensions') + outinputs.append(newinpt) + + if len(outinputs) < source.min_occurs: + description = "At least {} inputs are required. You provided {}.".format( + source.min_occurs, + len(outinputs), + ) + raise MissingParameterValue(description=description, locator=source.identifier) + + return list(outinputs) diff --git a/pywps/app/WPSExecuteResponse.py b/pywps/app/WPSExecuteResponse.py new file mode 100644 index 000000000..5c44552d0 --- /dev/null +++ b/pywps/app/WPSExecuteResponse.py @@ -0,0 +1,266 @@ +################################################################## +# Copyright 2018 Open Source Geospatial Foundation and others # +# licensed under MIT, Please consult LICENSE.txt for details # +################################################################## +import logging + +LOGGER = logging.getLogger("PYWPS") + +import logging +import time +from werkzeug.wrappers import Request +from pywps import get_ElementMakerForVersion +from pywps.app.basic import get_response_type, get_json_indent, get_default_response_mimetype +import pywps.configuration as config +from werkzeug.wrappers import Response +from pywps.dblog import store_status, update_status_record +from pywps.inout.array_encode import ArrayEncoder +from pywps.response.status import WPS_STATUS +from pywps.inout.formats import FORMATS +from pywps.inout.outputs import ComplexOutput +from pywps.response.execute import StatusResponse +from pywps.exceptions import (StorageNotSupported, OperationNotSupported, + ServerBusy, NoApplicableCode, + InvalidParameterValue) + +import urllib.parse as urlparse +from urllib.parse import urlencode + +LOGGER = logging.getLogger("PYWPS") + +WPS, OWS = get_ElementMakerForVersion("1.0.0") + + +# WPSExecuteResponse is crafted as an interface for the user. The user can +# provide outputs within Process._handler using +# WPSExecuteResponse.outputs['identifier']. The structure also provide output +# metadata such as the requested mimetype for ComplexOutput +# +# This structure is not expect to be serialized. The json property is used as +# input for the execute template and should stay like this. +class WPSExecuteResponse(object): + + def __init__(self, process, wps_request, uuid, **kwargs): + """constructor + + :param pywps.app.WPSRequest.WPSRequest wps_request: + :param pywps.app.Process.Process process: + :param uuid: string this request uuid + """ + self.uuid = uuid + self.wps_request = wps_request + self.process = process + self.outputs = {o.identifier: o for o in self.process.outputs} + self.store_status_file = False + + self.setup_outputs_from_wps_request(process, wps_request) + + def setup_outputs_from_wps_request(self, process, wps_request): + # set as_reference to True for all the outputs specified as reference + # if the output is not required to be raw + if not wps_request.raw: + for wps_outpt in wps_request.outputs: + + is_reference = wps_request.outputs[wps_outpt].get('asReference', 'false') + mimetype = wps_request.outputs[wps_outpt].get('mimetype', '') + if not isinstance(mimetype, str): + mimetype = '' + + if is_reference.lower() == 'true': + if process.store_supported == 'false': + raise StorageNotSupported( + 'The storage of data is not supported for this process.') + + is_reference = True + else: + is_reference = False + + for outpt in self.outputs.values(): + if outpt.identifier == wps_outpt: + outpt.as_reference = is_reference + if isinstance(outpt, ComplexOutput) and mimetype: + data_format = [f for f in outpt.supported_formats if f.mime_type == mimetype] + if len(data_format) == 0: + raise InvalidParameterValue( + f"MimeType {mimetype} not valid") + outpt.data_format = data_format[0] + + def _update_stored_status(self, status, message, status_percentage): + """ + Update status report of currently running process instance + + :param str message: Message you need to share with the client + :param int status_percentage: Percent done (number betwen <0-100>) + :param pywps.response.status.WPS_STATUS status: process status - user should usually + ommit this parameter + """ + self.message = message + self.status = status + self.status_percentage = status_percentage + store_status(self.uuid, self.status, self.message, self.status_percentage) + + # override WPSResponse._update_status + def _update_status(self, status, message, status_percentage, clean=True): + """ + Updates status report of currently running process instance: + + * Updates the status document. + * Updates the status file (if requested). + * Cleans the working directory when process has finished. + + This method is *only* called by pywps internally. + """ + self._update_stored_status(status, message, status_percentage) + update_status_record(self.uuid, self.as_json_for_execute_template()) + + if self.status == WPS_STATUS.SUCCEEDED and \ + getattr(self.wps_request, 'preprocess_response', None): + self.outputs = self.wps_request.preprocess_response(self.outputs, + request=self.wps_request, + http_request=self.wps_request.http_request) + # Avoid multiple apply of preprocess_response + self.wps_request.preprocess_response = None + + LOGGER.debug("_update_status: status={}, clean={}".format(status, clean)) + if config.get_config_value('server', 'keep_status_file'): + self._update_status_doc() + if self.store_status_file: + self._update_status_file() + if clean: + if self.status == WPS_STATUS.SUCCEEDED or self.status == WPS_STATUS.FAILED: + LOGGER.debug("clean workdir: status={}".format(status)) + self.process.clean() + + def update_status(self, message, status_percentage=None): + """ + Update status report of currently running process instance. + + This method is *only* called by the user provided process. + The status is handled internally in pywps. + + :param str message: Message you need to share with the client + :param int status_percentage: Percent done (number betwen <0-100>) + """ + if status_percentage is None: + status_percentage = self.status_percentage + self._update_status(self.status, message, status_percentage, False) + + def _update_status_doc(self): + try: + # rebuild the doc + self.doc = StatusResponse(self.wps_request.version, self.uuid, + get_default_response_mimetype()).get_data(as_text=True) + self.content_type = get_default_response_mimetype() + except Exception as e: + raise NoApplicableCode('Building Response Document failed with : {}'.format(e)) + + def _update_status_file(self): + # TODO: check if file/directory is still present, maybe deleted in mean time + try: + # update the status xml file + self.process.status_store.write( + self.doc, + self.process.status_filename, + data_format=FORMATS.XML) + except Exception as e: + raise NoApplicableCode('Writing Response Document failed with : {}'.format(e)) + + def _process_accepted(self): + percent = int(self.status_percentage) + if percent > 99: + percent = 99 + return { + "status": "accepted", + "time": time.strftime('%Y-%m-%dT%H:%M:%SZ', time.localtime()), + "percent_done": str(percent), + "message": self.message + } + + def _process_started(self): + data = self._process_accepted() + data.update({ + "status": "started", + }) + return data + + def _process_paused(self): + data = self._process_accepted() + data.update({ + "status": "paused", + }) + return data + + def _process_succeeded(self): + data = self._process_accepted() + data.update({ + "status": "succeeded", + "percent_done": "100" + }) + return data + + def _process_failed(self): + data = self._process_accepted() + data.update({ + "status": "failed", + "code": "NoApplicableCode", + "locator": "None", + }) + return data + + def _get_serviceinstance(self): + + url = config.get_config_value("server", "url") + params = {'request': 'GetCapabilities', 'service': 'WPS'} + + url_parts = list(urlparse.urlparse(url)) + query = dict(urlparse.parse_qsl(url_parts[4])) + query.update(params) + + url_parts[4] = urlencode(query) + return urlparse.urlunparse(url_parts).replace("&", "&") + + # Kept as guard to avoid misleading implementation + @property + def json(self): + raise NotImplementedError("Use WPSExecuteResponse.as_json_for_execute_template instead") + + # Used only to render the status outputs. + def as_json_for_execute_template(self): + data = { + "language": self.wps_request.language, + "service_instance": self._get_serviceinstance(), + "process": self.process.json + } + + if self.store_status_file: + data["status_location"] = self.process.status_url + + if self.status == WPS_STATUS.ACCEPTED: + self.message = 'PyWPS Process {} accepted'.format(self.process.identifier) + data["status"] = self._process_accepted() + elif self.status == WPS_STATUS.STARTED: + data["status"] = self._process_started() + elif self.status == WPS_STATUS.FAILED: + # check if process failed and display fail message + data["status"] = self._process_failed() + elif self.status == WPS_STATUS.PAUSED: + # TODO: handle paused status + data["status"] = self._process_paused() + elif self.status == WPS_STATUS.SUCCEEDED: + data["status"] = self._process_succeeded() + # Process outputs XML + data["outputs"] = [self.outputs[o].json for o in self.outputs] + # lineage: add optional lineage when process has finished + if self.status in [WPS_STATUS.SUCCEEDED, WPS_STATUS.FAILED]: + # DataInputs and DataOutputs definition XML if lineage=true + if self.wps_request.lineage == 'true': + data["lineage"] = True + try: + # TODO: stored process has ``pywps.inout.basic.LiteralInput`` + # instead of a ``pywps.inout.inputs.LiteralInput``. + data["input_definitions"] = [self.wps_request.inputs[i][0].json for i in self.wps_request.inputs] + except Exception as e: + LOGGER.error("Failed to update lineage for input parameter. {}".format(e)) + + data["output_definitions"] = [self.outputs[o].json for o in self.outputs] + return data diff --git a/pywps/app/WPSRequest.py b/pywps/app/WPSRequest.py index 7ffaad873..ec9c53c55 100644 --- a/pywps/app/WPSRequest.py +++ b/pywps/app/WPSRequest.py @@ -55,6 +55,7 @@ def __init__(self, http_request=None, preprocessors=None): self.preprocessors = preprocessors or dict() self.preprocess_request = None self.preprocess_response = None + self.requested_status_uuid = None if http_request: d = parse_http_url(http_request) @@ -64,7 +65,7 @@ def __init__(self, http_request=None, preprocessors=None): self.api = d.get('api') self.default_mimetype = d.get('default_mimetype') request_parser = self._get_request_parser_method(http_request.method) - request_parser() + request_parser(http_request) def _get_request_parser_method(self, method): @@ -75,11 +76,11 @@ def _get_request_parser_method(self, method): else: raise MethodNotAllowed() - def _get_request(self): + def _get_request(self, http_request): """HTTP GET request parser.""" # service shall be WPS - service = _get_get_param(self.http_request, 'service', None if wps_strict else 'wps') + service = _get_get_param(http_request, 'service', None if wps_strict else 'wps') if service: if str(service).lower() != 'wps': raise InvalidParameterValue( @@ -87,28 +88,28 @@ def _get_request(self): else: raise MissingParameterValue('service', 'service') - self.operation = _get_get_param(self.http_request, 'request', self.operation) + self.operation = _get_get_param(http_request, 'request', self.operation) - language = _get_get_param(self.http_request, 'language') + language = _get_get_param(http_request, 'language') self.check_and_set_language(language) request_parser = self._get_request_parser(self.operation) - request_parser(self.http_request) + request_parser(http_request) - def _post_request(self): + def _post_request(self, http_request): """HTTP POST request parser.""" # check if input file size was not exceeded maxsize = configuration.get_config_value('server', 'maxrequestsize') maxsize = configuration.get_size_mb(maxsize) * 1024 * 1024 - if self.http_request.content_length > maxsize: + if http_request.content_length > maxsize: raise FileSizeExceeded('File size for input exceeded.' ' Maximum request size allowed: {} megabytes'.format(maxsize / 1024 / 1024)) - content_type = self.http_request.content_type or [] # or self.http_request.mimetype + content_type = http_request.content_type or [] # or self.http_request.mimetype json_input = 'json' in content_type if not json_input: try: - doc = etree.fromstring(self.http_request.get_data()) + doc = etree.fromstring(http_request.get_data()) except Exception as e: raise NoApplicableCode(str(e)) operation = doc.tag @@ -122,7 +123,7 @@ def _post_request(self): request_parser(doc) else: try: - jdoc = json.loads(self.http_request.get_data()) + jdoc = json.loads(http_request.get_data()) except Exception as e: raise NoApplicableCode(str(e)) if self.identifier is not None: @@ -144,7 +145,7 @@ def _post_request(self): jdoc['default_mimetype'] = self.default_mimetype if self.preprocess_request is not None: - jdoc = self.preprocess_request(jdoc, http_request=self.http_request) + jdoc = self.preprocess_request(jdoc, http_request=http_request) self.json = jdoc version = jdoc.get('version') @@ -156,62 +157,67 @@ def _post_request(self): request_parser = self._post_json_request_parser() request_parser(jdoc) - def _get_request_parser(self, operation): - """Factory function returning proper parsing function.""" - - wpsrequest = self - - def parse_get_getcapabilities(http_request): - """Parse GET GetCapabilities request.""" - - acceptedversions = _get_get_param(http_request, 'acceptversions') - wpsrequest.check_accepted_versions(acceptedversions) - wpsrequest.default_mimetype = _get_get_param(http_request, 'f', wpsrequest.default_mimetype) - - def parse_get_describeprocess(http_request): - """Parse GET DescribeProcess request - """ - version = _get_get_param(http_request, 'version') - wpsrequest.check_and_set_version(version) - - wpsrequest.identifiers = _get_get_param( - http_request, 'identifier', wpsrequest.identifiers, aslist=True) - if wpsrequest.identifiers is None and self.identifier is not None: - wpsrequest.identifiers = [wpsrequest.identifier] - wpsrequest.default_mimetype = _get_get_param(http_request, 'f', wpsrequest.default_mimetype) + def parse_get_status(self, http_request): + """Parse GET GetCapabilities request + """ - def parse_get_execute(http_request): - """Parse GET Execute request.""" - version = _get_get_param(http_request, 'version') - wpsrequest.check_and_set_version(version) + # Required to store the request in database + self.set_version('1.0.0') + + self.requested_status_uuid = _get_get_param(http_request, 'uuid') + + def parse_get_getcapabilities(self, http_request): + """Parse GET GetCapabilities request.""" + acceptedversions = _get_get_param(http_request, 'acceptversions') + self.check_accepted_versions(acceptedversions) + self.default_mimetype = _get_get_param(http_request, 'f', self.default_mimetype) + + def parse_get_describeprocess(self, http_request): + """Parse GET DescribeProcess request.""" + version = _get_get_param(http_request, 'version') + self.check_and_set_version(version) + + self.identifiers = _get_get_param( + http_request, 'identifier', self.identifiers, aslist=True) + if self.identifiers is None and self.identifier is not None: + self.identifiers = [self.identifier] + self.default_mimetype = _get_get_param(http_request, 'f', self.default_mimetype) + + def parse_get_execute(self, http_request): + """Parse GET Execute request.""" + version = _get_get_param(http_request, 'version') + self.check_and_set_version(version) + + self.identifier = _get_get_param(http_request, 'identifier', self.identifier) + self.store_execute = _get_get_param( + http_request, 'storeExecuteResponse', 'false') + self.status = _get_get_param(http_request, 'status', 'false') + self.lineage = _get_get_param( + http_request, 'lineage', 'false') + self.inputs = get_data_from_kvp( + _get_get_param(http_request, 'DataInputs'), 'DataInputs') + if self.inputs is None: + self.inputs = {} + + # take responseDocument preferably + raw, output_ids = False, _get_get_param(http_request, 'ResponseDocument') + if output_ids is None: + raw, output_ids = True, _get_get_param(http_request, 'RawDataOutput') + if output_ids is not None: + self.raw, self.output_ids = raw, output_ids + elif self.raw is None: + self.raw = self.output_ids is not None + + self.default_mimetype = _get_get_param(http_request, 'f', self.default_mimetype) + self.outputs = get_data_from_kvp(self.output_ids) or {} + if self.raw: + # executeResponse XML will not be stored and no updating of + # status + self.store_execute = 'false' + self.status = 'false' - wpsrequest.identifier = _get_get_param(http_request, 'identifier', wpsrequest.identifier) - wpsrequest.store_execute = _get_get_param( - http_request, 'storeExecuteResponse', 'false') - wpsrequest.status = _get_get_param(http_request, 'status', 'false') - wpsrequest.lineage = _get_get_param( - http_request, 'lineage', 'false') - wpsrequest.inputs = get_data_from_kvp( - _get_get_param(http_request, 'DataInputs'), 'DataInputs') - if self.inputs is None: - self.inputs = {} - - # take responseDocument preferably - raw, output_ids = False, _get_get_param(http_request, 'ResponseDocument') - if output_ids is None: - raw, output_ids = True, _get_get_param(http_request, 'RawDataOutput') - if output_ids is not None: - wpsrequest.raw, wpsrequest.output_ids = raw, output_ids - elif wpsrequest.raw is None: - wpsrequest.raw = wpsrequest.output_ids is not None - - wpsrequest.default_mimetype = _get_get_param(http_request, 'f', wpsrequest.default_mimetype) - wpsrequest.outputs = get_data_from_kvp(wpsrequest.output_ids) or {} - if wpsrequest.raw: - # executeResponse XML will not be stored and no updating of - # status - wpsrequest.store_execute = 'false' - wpsrequest.status = 'false' + def _get_request_parser(self, operation): + """Factory function returning proper parsing function.""" if operation: self.operation = operation.lower() @@ -221,11 +227,13 @@ def parse_get_execute(http_request): self.operation = 'execute' if self.operation == 'getcapabilities': - return parse_get_getcapabilities + return self.parse_get_getcapabilities elif self.operation == 'describeprocess': - return parse_get_describeprocess + return self.parse_get_describeprocess elif self.operation == 'execute': - return parse_get_execute + return self.parse_get_execute + elif self.operation == 'status': + return self.parse_get_status else: raise OperationNotSupported( 'Unknown request {}'.format(self.operation), operation) @@ -443,7 +451,7 @@ def default(self, obj): 'store_execute': self.store_execute, 'status': self.status, 'lineage': self.lineage, - 'inputs': dict((i, [inpt.json for inpt in self.inputs[i]]) for i in self.inputs), + 'inputs': self.inputs, 'outputs': self.outputs, 'raw': self.raw } @@ -469,25 +477,7 @@ def json(self, value): self.lineage = value.get('lineage', False) self.outputs = value.get('outputs') self.raw = value.get('raw', False) - self.inputs = {} - - for identifier in value.get('inputs', []): - inpt_defs = value['inputs'][identifier] - if not isinstance(inpt_defs, (list, tuple)): - inpt_defs = [inpt_defs] - self.inputs[identifier] = [] - for inpt_def in inpt_defs: - if not isinstance(inpt_def, dict): - inpt_def = {"data": inpt_def} - if 'identifier' not in inpt_def: - inpt_def['identifier'] = identifier - try: - inpt = input_from_json(inpt_def) - self.inputs[identifier].append(inpt) - except Exception as e: - LOGGER.warning(e) - LOGGER.warning(f'skipping input: {identifier}') - pass + self.inputs = value.get('inputs', []) def get_inputs_from_xml(doc): @@ -507,8 +497,8 @@ def get_inputs_from_xml(doc): inpt = { 'identifier': identifier_el.text, 'data': str(value_el.text), - 'uom': value_el.attrib.get('uom', ''), - 'datatype': value_el.attrib.get('datatype', '') + 'uom': value_el.attrib.get('uom', None), + 'datatype': value_el.attrib.get('datatype', None) } the_inputs[identifier].append(inpt) continue @@ -519,10 +509,12 @@ def get_inputs_from_xml(doc): inpt = { 'identifier': identifier_el.text, 'mimeType': complex_data_el.attrib.get('mimeType', None), - 'encoding': complex_data_el.attrib.get('encoding', '').lower(), - 'schema': complex_data_el.attrib.get('schema', ''), - 'method': complex_data_el.attrib.get('method', 'GET') + 'encoding': complex_data_el.attrib.get('encoding', None), + 'schema': complex_data_el.attrib.get('schema', None), + 'method': complex_data_el.attrib.get('method', None) } + if inpt['encoding'] is not None: + inpt['encoding'] = inpt['encoding'].lower() if len(complex_data_el.getchildren()) > 0: value_el = complex_data_el[0] @@ -541,10 +533,10 @@ def get_inputs_from_xml(doc): 'identifier': identifier_el.text, identifier_el.text: reference_data_el.text, 'href': reference_data_el.attrib.get( - '{http://www.w3.org/1999/xlink}href', '' + '{http://www.w3.org/1999/xlink}href', None ), 'mimeType': reference_data_el.attrib.get('mimeType', None), - 'method': reference_data_el.attrib.get('method', 'GET') + 'method': reference_data_el.attrib.get('method', None) } header_element = xpath_ns(reference_data_el, './wps:Header') if header_element: @@ -590,9 +582,9 @@ def get_output_from_xml(doc): outpt = { identifier_el.text: '', 'mimetype': output_el.attrib.get('mimeType', None), - 'encoding': output_el.attrib.get('encoding', ''), - 'schema': output_el.attrib.get('schema', ''), - 'uom': output_el.attrib.get('uom', ''), + 'encoding': output_el.attrib.get('encoding', None), + 'schema': output_el.attrib.get('schema', None), + 'uom': output_el.attrib.get('uom', None), 'asReference': output_el.attrib.get('asReference', 'false') } the_output[identifier_el.text] = outpt @@ -603,9 +595,9 @@ def get_output_from_xml(doc): outpt = { identifier_el.text: '', 'mimetype': output_el.attrib.get('mimeType', None), - 'encoding': output_el.attrib.get('encoding', ''), - 'schema': output_el.attrib.get('schema', ''), - 'uom': output_el.attrib.get('uom', '') + 'encoding': output_el.attrib.get('encoding', None), + 'schema': output_el.attrib.get('schema', None), + 'uom': output_el.attrib.get('uom', None) } the_output[identifier_el.text] = outpt diff --git a/pywps/app/__init__.py b/pywps/app/__init__.py index 3cefbd934..a33cccfd3 100644 --- a/pywps/app/__init__.py +++ b/pywps/app/__init__.py @@ -8,3 +8,4 @@ from pywps.app.WPSRequest import WPSRequest # noqa: F401 from pywps.app.WPSRequest import get_inputs_from_xml # noqa: F401 from pywps.app.WPSRequest import get_output_from_xml # noqa: F401 +from pywps.app.WPSExecuteRequest import WPSExecuteRequest diff --git a/pywps/configuration.py b/pywps/configuration.py index ec77e30be..b3ff8ecfe 100755 --- a/pywps/configuration.py +++ b/pywps/configuration.py @@ -118,6 +118,7 @@ def load_hardcoded_configuration(): CONFIG.set('server', 'storage_copy_function', 'copy') CONFIG.set("server", "default_mimetype", "text/xml") CONFIG.set("server", "json_indent", "2") + CONFIG.set("server", "keep_status_file", "false") CONFIG.add_section('processing') CONFIG.set('processing', 'mode', 'default') diff --git a/pywps/dblog.py b/pywps/dblog.py index 2882ea9f2..4b0468e18 100644 --- a/pywps/dblog.py +++ b/pywps/dblog.py @@ -11,6 +11,10 @@ import logging import os import sys +import json +import time + +from types import SimpleNamespace from multiprocessing import Lock import sqlalchemy @@ -40,6 +44,42 @@ lock = Lock() +class SessionManager: + """Implement ref counted session, and close session automaticaly + + This SessionManager allow to call all dblog function recursively + + Usage: + + >>> with current_session as session: + ... [...] + ... pass + + Nested with statement will reuse the session and only one session.close() + will be issued and it will be issued in any case, even on return or + exception within 'with' statement + """ + def __init__(self): + self._session = None + self._handler_count = 0 + + def __enter__(self): + self._handler_count += 1 + if self._session is None: + self._session = get_session() + return self._session + + def __exit__(self, exc_type, exc_val, exc_tb): + self._handler_count -= 1 + if self._handler_count <= 0: + self._session.close() + self._session = None + return False + + +current_session = SessionManager() + + class ProcessInstance(Base): __tablename__ = '{}requests'.format(_tableprefix) @@ -62,6 +102,17 @@ class RequestInstance(Base): request = Column(LargeBinary, nullable=False) +class StatusRecord(Base): + __tablename__ = '{}status_records'.format(_tableprefix) + + # Process uuid + uuid = Column(VARCHAR(255), primary_key=True, nullable=False) + # Time stamp for creation time + timestamp = Column(DateTime(), nullable=False) + # json data used in template + data = Column(LargeBinary, nullable=False) + + def log_request(uuid, request): """Write OGC WPS request (only the necessary parts) to database logging system @@ -73,14 +124,12 @@ def log_request(uuid, request): time_start = datetime.datetime.now() identifier = _get_identifier(request) - session = get_session() - request = ProcessInstance( - uuid=str(uuid), pid=pid, operation=operation, version=version, - time_start=time_start, identifier=identifier) - - session.add(request) - session.commit() - session.close() + with current_session as session: + request = ProcessInstance( + uuid=str(uuid), pid=pid, operation=operation, version=version, + time_start=time_start, identifier=identifier) + session.add(request) + session.commit() # NoApplicableCode("Could commit to database: {}".format(e.message)) @@ -88,63 +137,120 @@ def get_process_counts(): """Returns running and stored process counts and """ - session = get_session() - stored_query = session.query(RequestInstance.uuid) - running_count = ( - session.query(ProcessInstance) - .filter(ProcessInstance.percent_done < 100) - .filter(ProcessInstance.percent_done > -1) - .filter(~ProcessInstance.uuid.in_(stored_query)) - .count() - ) - stored_count = stored_query.count() - session.close() - return running_count, stored_count + with current_session as session: + stored_query = session.query(RequestInstance.uuid) + running_count = ( + session.query(ProcessInstance) + .filter(ProcessInstance.percent_done < 100) + .filter(ProcessInstance.percent_done > -1) + .filter(~ProcessInstance.uuid.in_(stored_query)) + .count() + ) + stored_count = stored_query.count() + return running_count, stored_count def pop_first_stored(): """Gets the first stored process and delete it from the stored_requests table """ - session = get_session() - request = session.query(RequestInstance).first() + with current_session as session: + request = session.query(RequestInstance).first() - if request: - delete_count = session.query(RequestInstance).filter_by(uuid=request.uuid).delete() - if delete_count == 0: - LOGGER.debug("Another thread or process took the same stored request") - request = None + if request: + delete_count = session.query(RequestInstance).filter_by(uuid=request.uuid).delete() + if delete_count == 0: + LOGGER.debug("Another thread or process took the same stored request") + request = None - session.commit() - return request + session.commit() + return request def store_status(uuid, wps_status, message=None, status_percentage=None): """Writes response to database """ - session = get_session() - - requests = session.query(ProcessInstance).filter_by(uuid=str(uuid)) - if requests.count(): - request = requests.one() - request.time_end = datetime.datetime.now() - request.message = str(message) - request.percent_done = status_percentage - request.status = wps_status + with current_session as session: + requests = session.query(ProcessInstance).filter_by(uuid=str(uuid)) + if requests.count(): + request = requests.one() + request.time_end = datetime.datetime.now() + request.message = str(message) + request.percent_done = status_percentage + request.status = wps_status + session.commit() + + +# Update or create a store instance +def update_status_record(uuid, data): + with current_session as session: + r = session.query(StatusRecord).filter_by(uuid=str(uuid)) + if r.count(): + status_record = r.one() + status_record.timestamp = datetime.datetime.now() + status_record.data = json.dumps(data).encode("utf-8") + else: + status_record = StatusRecord( + uuid=str(uuid), + timestamp=datetime.datetime.now(), + data=json.dumps(data).encode("utf-8") + ) + session.add(status_record) + session.commit() + + +def force_failed_status(uuid): + with current_session as session: + r = session.query(StatusRecord).filter_by(uuid=str(uuid)) + if r.count() < 1: + return + status_record = r.one() + data = json.loads(status_record.data.decode("utf-8")) + data['status'] = { + "status": "failed", + "time": time.strftime('%Y-%m-%dT%H:%M:%SZ', time.localtime()), + "message": "Process has crashed" + } + status_record.timestamp = datetime.datetime.now() + status_record.data = json.dumps(data).encode("utf-8") session.commit() - session.close() + + +# Get store instance data from uuid +def get_status_record(uuid): + with current_session as session: + if sys.platform == "linux": + # Check if the current process is fail + r = session.query(ProcessInstance).filter_by(uuid=str(uuid)) + # If no process instance is found then there is no status records + if r.count() < 1: + return None + process_record = r.one() + if process_record.status not in {WPS_STATUS.FAILED, WPS_STATUS.SUCCEEDED}: + if not os.path.exists(os.path.join("/proc", str(process_record.pid))): + store_status(process_record.uuid, WPS_STATUS.FAILED, "Process crashed", 100) + force_failed_status(process_record.uuid) + + r = session.query(StatusRecord).filter_by(uuid=str(uuid)) + if r.count() < 1: + return None + status_record = r.one() + # Ensure new item to avoid change in database + # FIXME: There is a better solution ? + attrs = ["uuid", "timestamp", "data"] + status_record = SimpleNamespace(**{k: getattr(status_record, k) for k in attrs}) + status_record.data = json.loads(status_record.data.decode("utf-8")) + return status_record def update_pid(uuid, pid): """Update actual pid for the uuid processing """ - session = get_session() - - requests = session.query(ProcessInstance).filter_by(uuid=str(uuid)) - if requests.count(): - request = requests.one() - request.pid = pid - session.commit() - session.close() + with current_session as session: + requests = session.query(ProcessInstance).filter_by(uuid=str(uuid)) + if requests.count(): + request = requests.one() + request.pid = pid + session.commit() def cleanup_crashed_process(): @@ -152,34 +258,32 @@ def cleanup_crashed_process(): if sys.platform != "linux": return - session = get_session() - stored_query = session.query(RequestInstance.uuid) - running_cur = ( - session.query(ProcessInstance) - .filter(ProcessInstance.status.in_([WPS_STATUS.STARTED, WPS_STATUS.PAUSED])) - .filter(~ProcessInstance.uuid.in_(stored_query)) - ) - - failed = [] - running = [(p.uuid, p.pid) for p in running_cur] - for uuid, pid in running: - # No process with this pid, the process has crashed - if not os.path.exists(os.path.join("/proc", str(pid))): - failed.append(uuid) - continue - - # If we can't read the environ, that mean the process belong another user - # which mean that this is not our process, thus our process has crashed - # this not work because root is the user for the apache - # if not os.access(os.path.join("/proc", str(pid), "environ"), os.R_OK): - # failed.append(uuid) - # continue - pass - - for uuid in failed: - store_status(uuid, WPS_STATUS.FAILED, "Process crashed", 100) - - session.close() + with current_session as session: + stored_query = session.query(RequestInstance.uuid) + running_cur = ( + session.query(ProcessInstance) + .filter(ProcessInstance.status.in_([WPS_STATUS.STARTED, WPS_STATUS.PAUSED])) + .filter(~ProcessInstance.uuid.in_(stored_query)) + ) + + failed = [] + running = [(p.uuid, p.pid) for p in running_cur] + for uuid, pid in running: + # No process with this pid, the process has crashed + if not os.path.exists(os.path.join("/proc", str(pid))): + failed.append(uuid) + continue + + # If we can't read the environ, that mean the process belong another user + # which mean that this is not our process, thus our process has crashed + # this not work because root is the user for the apache + # if not os.access(os.path.join("/proc", str(pid), "environ"), os.R_OK): + # failed.append(uuid) + # continue + pass + + for uuid in failed: + store_status(uuid, WPS_STATUS.FAILED, "Process crashed", 100) def _get_identifier(request): @@ -238,6 +342,7 @@ def get_session(): Session = sessionmaker(bind=engine) ProcessInstance.metadata.create_all(engine) RequestInstance.metadata.create_all(engine) + StatusRecord.metadata.create_all(engine) _SESSION_MAKER_DATABASE = database _SESSION_MAKER = Session @@ -249,11 +354,10 @@ def store_process(uuid, request): """Save given request under given UUID for later usage """ - session = get_session() - request_json = request.json - # the BLOB type requires bytes on Python 3 - request_json = request_json.encode('utf-8') - request = RequestInstance(uuid=str(uuid), request=request_json) - session.add(request) - session.commit() - session.close() + with current_session as session: + request_json = request.json + # the BLOB type requires bytes on Python 3 + request_json = request_json.encode('utf-8') + request = RequestInstance(uuid=str(uuid), request=request_json) + session.add(request) + session.commit() diff --git a/pywps/inout/outputs.py b/pywps/inout/outputs.py index cafacc1d5..8d6c05f91 100644 --- a/pywps/inout/outputs.py +++ b/pywps/inout/outputs.py @@ -295,13 +295,14 @@ def json(self): "title": self.title, "abstract": self.abstract, "keywords": self.keywords, - "data": self.data, "data_type": self.data_type, "type": "literal", "uoms": [u.json for u in self.uoms], "translations": self.translations, } + if self.data is not None: + data['data'] = str(self.data) if self.uom: data["uom"] = self.uom.json @@ -524,15 +525,8 @@ def url(self): def _load_template(self): from jinja2 import PackageLoader - - from pywps.response import RelEnvironment - - template_env = RelEnvironment( - loader=PackageLoader('pywps', 'templates'), - trim_blocks=True, lstrip_blocks=True, - autoescape=True, ) - - self._template = template_env.get_template(self._xml_template) + from pywps.response.basic import TEMPLATE_ENV + self._template = TEMPLATE_ENV.get_template(self._xml_template) class MetaLink4(MetaLink): diff --git a/pywps/processing/__init__.py b/pywps/processing/__init__.py index 78915f5ce..a132f6024 100644 --- a/pywps/processing/__init__.py +++ b/pywps/processing/__init__.py @@ -9,7 +9,7 @@ # api only from pywps.processing.basic import Processing # noqa: F401 -from pywps.processing.basic import DetachProcessing, MultiProcessing +from pywps.processing.basic import DetachProcessing, MultiProcessing, NoAsyncProcessing from pywps.processing.job import Job # noqa: F401 from pywps.processing.scheduler import Scheduler @@ -17,6 +17,7 @@ MULTIPROCESSING = 'multiprocessing' DETACHPROCESSING = 'detachprocessing' +NOASYNCPROCESSING = 'noasyncprocessing' SCHEDULER = 'scheduler' DEFAULT = MULTIPROCESSING @@ -34,6 +35,8 @@ def Process(process, wps_request, wps_response): process = Scheduler(process, wps_request, wps_response) elif mode == DETACHPROCESSING: process = DetachProcessing(process, wps_request, wps_response) + elif mode == NOASYNCPROCESSING: + process = NoAsyncProcessing(process, wps_request, wps_response) else: process = MultiProcessing(process, wps_request, wps_response) diff --git a/pywps/processing/basic.py b/pywps/processing/basic.py index 2bc17f7bd..b6164b671 100644 --- a/pywps/processing/basic.py +++ b/pywps/processing/basic.py @@ -64,3 +64,12 @@ def start(self): pass # Ensure to stop ourself here what ever append. os._exit(0) + + +class NoAsyncProcessing(Processing): + """ + :class:`NoAsyncProcessing` Faking multiprocessing for testing purpose + """ + + def start(self): + getattr(self.job.process, self.job.method)(self.job.wps_request, self.job.wps_response) diff --git a/pywps/processing/job.py b/pywps/processing/job.py index d6ec87c7a..f5b44e6a0 100644 --- a/pywps/processing/job.py +++ b/pywps/processing/job.py @@ -10,7 +10,8 @@ import pywps.configuration as config from pywps import Process, WPSRequest -from pywps.response.execute import ExecuteResponse +from pywps.app.WPSExecuteRequest import WPSExecuteRequest +from pywps.app.WPSExecuteResponse import WPSExecuteResponse LOGGER = logging.getLogger("PYWPS") @@ -57,10 +58,10 @@ def from_json(cls, value): process = Process.from_json(value['process']) wps_request = WPSRequest() wps_request.json = json.loads(value['wps_request']) - wps_response = ExecuteResponse( - wps_request=wps_request, - uuid=process.uuid, - process=process) + wps_response = WPSExecuteResponse( + process, + wps_request, + process.uuid) wps_response.store_status_file = True new_job = Job( process=Process.from_json(value['process']), @@ -84,7 +85,7 @@ def load(cls, filename): return job def run(self): - getattr(self.process, self.method)(self.wps_request, self.wps_response) + getattr(self.process, self.method)(WPSExecuteRequest(self.process, self.wps_request), self.wps_response) class JobLauncher(object): diff --git a/pywps/response/__init__.py b/pywps/response/__init__.py index e7211084b..d32bfbbe9 100644 --- a/pywps/response/__init__.py +++ b/pywps/response/__init__.py @@ -1,24 +1,3 @@ - -import os - -from jinja2 import Environment - - -class RelEnvironment(Environment): - """Override join_path() to enable relative template paths.""" - def join_path(self, template, parent): - return os.path.dirname(parent) + '/' + template - - -def get_response(operation): - - from .capabilities import CapabilitiesResponse - from .describe import DescribeResponse - from .execute import ExecuteResponse - - if operation == "capabilities": - return CapabilitiesResponse - elif operation == "describe": - return DescribeResponse - elif operation == "execute": - return ExecuteResponse +from pywps.response.capabilities import CapabilitiesResponse +from pywps.response.describe import DescribeResponse +from pywps.response.execute import ExecuteRawResponse, StatusResponse diff --git a/pywps/response/basic.py b/pywps/response/basic.py index 05b6d4b2d..cb2701c24 100644 --- a/pywps/response/basic.py +++ b/pywps/response/basic.py @@ -4,15 +4,25 @@ if TYPE_CHECKING: from pywps import WPSRequest +from pywps.response.status import WPS_STATUS import os from jinja2 import Environment, PackageLoader - -from pywps.dblog import store_status from pywps.translations import get_translation -from . import RelEnvironment -from .status import WPS_STATUS + +class RelEnvironment(Environment): + """Override join_path() to enable relative template paths.""" + def join_path(self, template, parent): + return os.path.dirname(parent) + '/' + template + + +TEMPLATE_ENV = RelEnvironment( + loader=PackageLoader('pywps', 'templates'), + trim_blocks=True, lstrip_blocks=True, + autoescape=True, +) +TEMPLATE_ENV.globals.update(get_translation=get_translation) class WPSResponse(object): @@ -27,12 +37,7 @@ def __init__(self, wps_request: 'WPSRequest', uuid=None, version="1.0.0"): self.doc = None self.content_type = None self.version = version - self.template_env = RelEnvironment( - loader=PackageLoader('pywps', 'templates'), - trim_blocks=True, lstrip_blocks=True, - autoescape=True, - ) - self.template_env.globals.update(get_translation=get_translation) + self.template_env = TEMPLATE_ENV def _update_status(self, status, message, status_percentage): """ @@ -43,6 +48,7 @@ def _update_status(self, status, message, status_percentage): :param pywps.response.status.WPS_STATUS status: process status - user should usually omit this parameter """ + from pywps.dblog import store_status self.message = message self.status = status self.status_percentage = status_percentage diff --git a/pywps/response/execute.py b/pywps/response/execute.py index 48c0afe9b..50ab46b98 100755 --- a/pywps/response/execute.py +++ b/pywps/response/execute.py @@ -21,6 +21,7 @@ from pywps.exceptions import NoApplicableCode from pywps.inout.array_encode import ArrayEncoder from pywps.inout.formats import FORMATS +from pywps.response.basic import TEMPLATE_ENV from pywps.response.status import WPS_STATUS from .basic import WPSResponse @@ -30,9 +31,9 @@ WPS, OWS = get_ElementMakerForVersion("1.0.0") -class ExecuteResponse(WPSResponse): +class ExecuteRawResponse(WPSResponse): - def __init__(self, wps_request, uuid, **kwargs): + def __init__(self, wps_execute_response): """constructor :param pywps.app.WPSRequest.WPSRequest wps_request: @@ -40,160 +41,81 @@ def __init__(self, wps_request, uuid, **kwargs): :param uuid: string this request uuid """ - super(ExecuteResponse, self).__init__(wps_request, uuid) + super(ExecuteRawResponse, self).__init__(wps_execute_response.wps_request, wps_execute_response.uuid) - self.process = kwargs["process"] - self.outputs = {o.identifier: o for o in self.process.outputs} - self.store_status_file = False + self.wps_execute_response = wps_execute_response - # override WPSResponse._update_status - def _update_status(self, status, message, status_percentage, clean=True): - """ - Updates status report of currently running process instance: + # Fallback to self.wps_execute_response attribute + def __getattr__(self, item): + return getattr(self.wps_execute_response, item) - * Updates the status document. - * Updates the status file (if requested). - * Cleans the working directory when process has finished. + @Request.application + def __call__(self, request): + accept_json_response, accepted_mimetype = get_response_type( + self.wps_request.http_request.accept_mimetypes, self.wps_request.default_mimetype) + if self.status == WPS_STATUS.FAILED: + return NoApplicableCode(self.message) + else: + wps_output_identifier = next(iter(self.wps_request.outputs)) # get the first key only + wps_output_value = self.outputs[wps_output_identifier] + response = wps_output_value.data + if response is None: + return NoApplicableCode("Expected output was not generated") + suffix = '' + # if isinstance(wps_output_value, ComplexOutput): + data_format = None + if hasattr(wps_output_value, 'output_format'): + # this is set in the response, thus should be more precise + data_format = wps_output_value.output_format + elif hasattr(wps_output_value, 'data_format'): + # this is set in the process' response _handler function, thus could have a few supported formats + data_format = wps_output_value.data_format + if data_format is not None: + mimetype = data_format.mime_type + if data_format.extension is not None: + suffix = data_format.extension + else: + # like LitearlOutput + mimetype = self.wps_request.outputs[wps_output_identifier].get('mimetype', None) + if not isinstance(response, (str, bytes, bytearray)): + if not mimetype: + mimetype = accepted_mimetype + json_response = mimetype and 'json' in mimetype + if json_response: + mimetype = 'application/json' + suffix = '.json' + response = json.dumps(response, cls=ArrayEncoder, indent=get_json_indent()) + else: + response = str(response) + if not mimetype: + mimetype = None + return Response(response, mimetype=mimetype, + headers={'Content-Disposition': 'attachment; filename="{}"' + .format(wps_output_identifier + suffix)}) - This method is *only* called by pywps internally. - """ - super(ExecuteResponse, self)._update_status(status, message, status_percentage) - LOGGER.debug("_update_status: status={}, clean={}".format(status, clean)) - self._update_status_doc() - if self.store_status_file: - self._update_status_file() - if clean: - if self.status == WPS_STATUS.SUCCEEDED or self.status == WPS_STATUS.FAILED: - LOGGER.debug("clean workdir: status={}".format(status)) - self.process.clean() - def update_status(self, message, status_percentage=None): - """ - Update status report of currently running process instance. +class StatusResponse(Response): - This method is *only* called by the user provided process. - The status is handled internally in pywps. + def __init__(self, version, uuid, mimetype): + """constructor - :param str message: Message you need to share with the client - :param int status_percentage: Percent done (number betwen <0-100>) + :param pywps.app.WPSRequest.WPSRequest wps_request: + :param pywps.app.Process.Process process: + :param uuid: string this request uuid """ - if status_percentage is None: - status_percentage = self.status_percentage - self._update_status(self.status, message, status_percentage, False) - - def _update_status_doc(self): - try: - # rebuild the doc - self.doc, self.content_type = self._construct_doc() - except Exception as e: - raise NoApplicableCode('Building Response Document failed with : {}'.format(e)) - - def _update_status_file(self): - # TODO: check if file/directory is still present, maybe deleted in mean time - try: - # update the status xml file - self.process.status_store.write( - self.doc, - self.process.status_filename, - data_format=FORMATS.XML) - except Exception as e: - raise NoApplicableCode('Writing Response Document failed with : {}'.format(e)) - - def _process_accepted(self): - percent = int(self.status_percentage) - if percent > 99: - percent = 99 - return { - "status": "accepted", - "time": time.strftime('%Y-%m-%dT%H:%M:%SZ', time.localtime()), - "percent_done": str(percent), - "message": self.message - } - - def _process_started(self): - data = self._process_accepted() - data.update({ - "status": "started", - }) - return data - - def _process_paused(self): - data = self._process_accepted() - data.update({ - "status": "paused", - }) - return data - def _process_succeeded(self): - data = self._process_accepted() - data.update({ - "status": "succeeded", - "percent_done": "100" - }) - return data + from pywps.dblog import get_status_record + r = get_status_record(uuid) + if not r: + return NoApplicableCode("Output was not generated") - def _process_failed(self): - data = self._process_accepted() - data.update({ - "status": "failed", - "code": "NoApplicableCode", - "locator": "None", - }) - return data - - def _get_serviceinstance(self): - - url = config.get_config_value("server", "url") - params = {'request': 'GetCapabilities', 'service': 'WPS'} - - url_parts = list(urlparse.urlparse(url)) - query = dict(urlparse.parse_qsl(url_parts[4])) - query.update(params) - - url_parts[4] = urlencode(query) - return urlparse.urlunparse(url_parts).replace("&", "&") - - @property - def json(self): - data = { - "language": self.wps_request.language, - "service_instance": self._get_serviceinstance(), - "process": self.process.json - } - - if self.store_status_file: - if self.process.status_location: - data["status_location"] = self.process.status_url - - if self.status == WPS_STATUS.ACCEPTED: - self.message = 'PyWPS Process {} accepted'.format(self.process.identifier) - data["status"] = self._process_accepted() - elif self.status == WPS_STATUS.STARTED: - data["status"] = self._process_started() - elif self.status == WPS_STATUS.FAILED: - # check if process failed and display fail message - data["status"] = self._process_failed() - elif self.status == WPS_STATUS.PAUSED: - # TODO: handle paused status - data["status"] = self._process_paused() - elif self.status == WPS_STATUS.SUCCEEDED: - data["status"] = self._process_succeeded() - # Process outputs XML - data["outputs"] = [self.outputs[o].json for o in self.outputs] - # lineage: add optional lineage when process has finished - if self.status in [WPS_STATUS.SUCCEEDED, WPS_STATUS.FAILED]: - # DataInputs and DataOutputs definition XML if lineage=true - if self.wps_request.lineage == 'true': - data["lineage"] = True - try: - # TODO: stored process has ``pywps.inout.basic.LiteralInput`` - # instead of a ``pywps.inout.inputs.LiteralInput``. - data["input_definitions"] = [self.wps_request.inputs[i][0].json for i in self.wps_request.inputs] - except Exception as e: - LOGGER.error("Failed to update lineage for input parameter. {}".format(e)) + if 'json' in mimetype: + doc = json.dumps(self._render_json_response(r.data), cls=ArrayEncoder, indent=get_json_indent()) + else: + template = TEMPLATE_ENV.get_template(version + '/execute/main.xml') + doc = template.render(**r.data) - data["output_definitions"] = [self.outputs[o].json for o in self.outputs] - return data + super().__init__(doc, mimetype=mimetype) @staticmethod def _render_json_response(jdoc): @@ -211,73 +133,3 @@ def _render_json_response(jdoc): d[id] = val[key] response['outputs'] = d return response - - def _construct_doc(self): - if self.status == WPS_STATUS.SUCCEEDED and \ - hasattr(self.wps_request, 'preprocess_response') and \ - self.wps_request.preprocess_response: - self.outputs = self.wps_request.preprocess_response(self.outputs, - request=self.wps_request, - http_request=self.wps_request.http_request) - doc = self.json - try: - json_response, mimetype = get_response_type( - self.wps_request.http_request.accept_mimetypes, self.wps_request.default_mimetype) - except Exception: - mimetype = get_default_response_mimetype() - json_response = 'json' in mimetype - if json_response: - doc = json.dumps(self._render_json_response(doc), cls=ArrayEncoder, indent=get_json_indent()) - else: - template = self.template_env.get_template(self.version + '/execute/main.xml') - doc = template.render(**doc) - return doc, mimetype - - @Request.application - def __call__(self, request): - accept_json_response, accepted_mimetype = get_response_type( - self.wps_request.http_request.accept_mimetypes, self.wps_request.default_mimetype) - if self.wps_request.raw: - if self.status == WPS_STATUS.FAILED: - return NoApplicableCode(self.message) - else: - wps_output_identifier = next(iter(self.wps_request.outputs)) # get the first key only - wps_output_value = self.outputs[wps_output_identifier] - response = wps_output_value.data - if response is None: - return NoApplicableCode("Expected output was not generated") - suffix = '' - # if isinstance(wps_output_value, ComplexOutput): - data_format = None - if hasattr(wps_output_value, 'output_format'): - # this is set in the response, thus should be more precise - data_format = wps_output_value.output_format - elif hasattr(wps_output_value, 'data_format'): - # this is set in the process' response _handler function, thus could have a few supported formats - data_format = wps_output_value.data_format - if data_format is not None: - mimetype = data_format.mime_type - if data_format.extension is not None: - suffix = data_format.extension - else: - # like LitearlOutput - mimetype = self.wps_request.outputs[wps_output_identifier].get('mimetype', None) - if not isinstance(response, (str, bytes, bytearray)): - if not mimetype: - mimetype = accepted_mimetype - json_response = mimetype and 'json' in mimetype - if json_response: - mimetype = 'application/json' - suffix = '.json' - response = json.dumps(response, cls=ArrayEncoder, indent=get_json_indent()) - else: - response = str(response) - if not mimetype: - mimetype = None - return Response(response, mimetype=mimetype, - headers={'Content-Disposition': 'attachment; filename="{}"' - .format(wps_output_identifier + suffix)}) - else: - if not self.doc: - return NoApplicableCode("Output was not generated") - return Response(self.doc, mimetype=accepted_mimetype) diff --git a/pywps/templates/1.0.0/execute/main.xml b/pywps/templates/1.0.0/execute/main.xml index 047b39aab..8d83fb040 100644 --- a/pywps/templates/1.0.0/execute/main.xml +++ b/pywps/templates/1.0.0/execute/main.xml @@ -1,4 +1,7 @@ +{% macro safe_attr(name, value) -%} +{% if value %}{{ name }}="{{value}}"{% endif %} +{%- endmacro %} {{ process.identifier }} @@ -40,7 +43,7 @@ {{ get_translation(input, "abstract", language) }} {% if input.type == "complex" %} - {{ input.data | safe }} + {{ input.data | safe }} {% elif input.type == "literal" %} @@ -48,13 +51,13 @@ {% elif input.type == "bbox" %} - + {% for c in input.ll %} {{ c }} {% endfor %} {% for c in input.ur %} {{ c }} {% endfor %} {% elif input.type == "reference" %} - + {% endif %} {% endfor %} @@ -64,7 +67,7 @@ {% for output in output_definitions %} {% if output.type in ["complex", "reference"] %} - + {% else %} {% endif %} @@ -84,10 +87,10 @@ {{ get_translation(output, "title", language) }} {{ get_translation(output, "abstract", language) }} {% if output.type == "reference" %} - + {% elif output.type == "complex" %} - {{ output.data | safe }} + {{ output.data | safe }} {% elif output.type == "literal" %} @@ -95,7 +98,7 @@ {% elif output.type == "bbox" %} - + {% for c in output.ll %} {{ c }} {% endfor %} {% for c in output.ur %} {{ c }} {% endfor %} diff --git a/tests/processes/__init__.py b/tests/processes/__init__.py index 5882a9ef1..a2c5050e9 100644 --- a/tests/processes/__init__.py +++ b/tests/processes/__init__.py @@ -33,6 +33,8 @@ class Greeter(Process): def __init__(self): super(Greeter, self).__init__( self.greeter, + store_supported="true", + status_supported="true", identifier='greeter', title='Greeter', inputs=[LiteralInput('name', 'Input name', data_type='string')], diff --git a/tests/test_assync.py b/tests/test_assync.py index 296bc2c07..81b817460 100644 --- a/tests/test_assync.py +++ b/tests/test_assync.py @@ -11,9 +11,7 @@ from pywps.tests import client_for, assert_response_accepted, assert_response_success from processes import Sleep from owslib.wps import WPSExecution -from pathlib import Path -from tempfile import TemporaryDirectory -from pywps import dblog +from urllib.parse import urlparse VERSION = "1.0.0" @@ -50,13 +48,13 @@ def test_async(self): print(url) # OWSlib only reads from URLs, not local files. So we need to read the response manually. - p = Path(configuration.get_config_value('server', 'outputpath')) / url.split('/')[-1] + url = urlparse(url) # Poll the process until it completes total_time = 0 sleep_time = .01 while wps.status not in ["ProcessSucceeded", "ProcessFailed"]: - resp = p.read_bytes() + resp = client.open(base_url='/wps', path='/status', method='GET', query_string=url.query).data if resp: wps.checkStatus(response=resp, sleepSecs=0.01) else: diff --git a/tests/test_execute.py b/tests/test_execute.py index bcc2f0a30..67ba2c641 100644 --- a/tests/test_execute.py +++ b/tests/test_execute.py @@ -19,6 +19,7 @@ from pywps.app.basic import get_xpath_ns from pywps.tests import client_for, assert_response_success, assert_response_success_json from pywps import configuration +from pywps.app.WPSExecuteRequest import WPSExecuteRequest from io import StringIO @@ -319,7 +320,7 @@ class FakeRequest(): self.assertEqual(e.locator, 'mimeType') request.inputs['complex'][0]['mimeType'] = 'application/gml' - parsed_inputs = service.create_complex_inputs(my_process.inputs[0], + parsed_inputs = WPSExecuteRequest.create_complex_inputs(my_process.inputs[0], request.inputs['complex']) # TODO parse outputs and their validators too @@ -328,7 +329,7 @@ class FakeRequest(): request.inputs['complex'][0]['mimeType'] = 'application/xml+gml' try: - parsed_inputs = service.create_complex_inputs(my_process.inputs[0], + parsed_inputs = WPSExecuteRequest.create_complex_inputs(my_process.inputs[0], request.inputs['complex']) except InvalidParameterValue as e: self.assertEqual(e.locator, 'mimeType') @@ -343,7 +344,7 @@ class FakeRequest(): my_process.inputs[0].supported_formats = [frmt] my_process.inputs[0].data_format = Format(mime_type='application/xml+gml') - parsed_inputs = service.create_complex_inputs(my_process.inputs[0], + parsed_inputs = WPSExecuteRequest.create_complex_inputs(my_process.inputs[0], request.inputs['complex']) self.assertEqual(parsed_inputs[0].data_format.validate, validategml) @@ -369,7 +370,8 @@ class FakeRequest(): language = "en-US" request = FakeRequest() - response = service.execute('my_complex_process', request, 'fakeuuid') + request = WPSExecuteRequest(my_process, request) + response = my_process.execute(request, 'fakeuuid') self.assertEqual(response.outputs['complex'].data, 'DEFAULT COMPLEX DATA') def test_output_mimetype(self): @@ -400,13 +402,15 @@ def __init__(self, mimetype): # valid mimetype request = FakeRequest('text/plain+test') - response = service.execute('get_mimetype_process', request, 'fakeuuid') + request = WPSExecuteRequest(my_process, request) + response = my_process.execute(request, 'fakeuuid') self.assertEqual(response.outputs['mimetype'].data, 'text/plain+test') # non valid mimetype request = FakeRequest('text/xml') with self.assertRaises(InvalidParameterValue): - response = service.execute('get_mimetype_process', request, 'fakeuuid') + request = WPSExecuteRequest(my_process, request) + response = my_process.execute(request, 'fakeuuid') def test_metalink(self): client = client_for(Service(processes=[create_metalink_process()])) diff --git a/tests/test_processing.py b/tests/test_processing.py index c7fa2f046..cfce5184d 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -16,7 +16,7 @@ from pywps.processing.job import Job from pywps.processing.basic import MultiProcessing from pywps.app import WPSRequest -from pywps.response.execute import ExecuteResponse +from pywps.app.WPSExecuteResponse import WPSExecuteResponse from processes import Greeter, InOut, BBox @@ -34,8 +34,8 @@ def setUp(self): self.dummy_process._set_uuid(self.uuid) self.dummy_process.set_workdir(self.workdir) self.wps_request = WPSRequest() - self.wps_response = ExecuteResponse(self.wps_request, self.uuid, - process=self.dummy_process) + self.wps_response = WPSExecuteResponse(self.dummy_process, + self.wps_request, self.uuid) self.job = Job( process=self.dummy_process, wps_request=self.wps_request, @@ -81,8 +81,8 @@ def setUp(self): self.dummy_process._set_uuid(self.uuid) self.dummy_process.set_workdir(self.workdir) self.wps_request = WPSRequest() - self.wps_response = ExecuteResponse(self.wps_request, self.uuid, - process=self.dummy_process) + self.wps_response = WPSExecuteResponse(self.dummy_process, + self.wps_request, self.uuid) self.job = Job( process=self.dummy_process, wps_request=self.wps_request, @@ -118,8 +118,25 @@ def setUp(self): self.dummy_process._set_uuid(self.uuid) self.dummy_process.set_workdir(self.workdir) self.wps_request = WPSRequest() - self.wps_response = ExecuteResponse(self.wps_request, self.uuid, - process=self.dummy_process) + self.wps_request.json = { + 'operation': 'execute', + 'version': '1.0.0', + 'language': 'eng', + 'identifier': 'bbox_test', + 'store_execute': True, + 'status': True, + 'lineage': False, + 'inputs': { + 'area': [{ + 'identifier': 'area', + 'data': [0, 1, 2, 3] + }] + }, + 'outputs': { }, + 'raw': False + } + self.wps_response = WPSExecuteResponse(self.dummy_process, + self.wps_request, self.uuid) self.job = Job( process=self.dummy_process, wps_request=self.wps_request, diff --git a/tests/test_wps_status.py b/tests/test_wps_status.py new file mode 100644 index 000000000..6bf724e4d --- /dev/null +++ b/tests/test_wps_status.py @@ -0,0 +1,57 @@ +################################################################## +# Copyright 2018 Open Source Geospatial Foundation and others # +# licensed under MIT, Please consult LICENSE.txt for details # +################################################################## + +from basic import TestBase +from pywps import Service +from pywps import get_ElementMakerForVersion +from pywps.tests import client_for, assert_response_accepted, assert_response_success + +from pywps import configuration +from processes import Greeter +from urllib.parse import urlparse + +from time import sleep + +VERSION = "1.0.0" + +WPS, OWS = get_ElementMakerForVersion(VERSION) + + +class ExecuteTest(TestBase): + + def setUp(self) -> None: + super().setUp() + # Use fake async processing for this test + configuration.CONFIG.set('processing', 'mode', 'noasyncprocessing') + + def test_wps_status(self): + client = client_for(Service(processes=[Greeter()])) + request_doc = WPS.Execute( + OWS.Identifier('greeter'), + WPS.DataInputs( + WPS.Input( + OWS.Identifier('name'), + WPS.Data( + WPS.LiteralData( + "SomeName" + ) + ) + ) + ), + WPS.ResponseForm( + WPS.ResponseDocument(storeExecuteResponse='true', status='true') + ), + version="1.0.0" + ) + resp = client.post_xml(doc=request_doc) + assert_response_accepted(resp) + + url = resp.xml.xpath("//@statusLocation")[0] + + # Parse url because we do not have real server + url = urlparse(url) + resp = client.open(base_url='/wps', path='/status', method='GET', query_string=url.query) + assert_response_success(resp) + diff --git a/tests/test_wpsrequest.py b/tests/test_wpsrequest.py index cc5868295..011a54075 100644 --- a/tests/test_wpsrequest.py +++ b/tests/test_wpsrequest.py @@ -60,10 +60,7 @@ def test_json_in(self): self.request = WPSRequest() self.request.json = obj - self.assertEqual(self.request.inputs['myliteral'][0].data, 1, 'Data are in the file') - self.assertEqual(self.request.inputs['myin'][0].data, 'ahoj', 'Data are in the file') - self.assertListEqual(self.request.inputs['myliteral'][0].allowed_values, [AnyValue()], 'Any value not set') - self.assertTrue(self.request.inputs['myliteral'][0].any_value, 'Any value set') + self.assertEqual(self.request.inputs['myliteral'][0]['data'], 1, 'Data are in the file') def test_json_inout_datetime(self): obj = { @@ -105,17 +102,17 @@ def test_json_inout_datetime(self): self.request = WPSRequest() self.request.json = obj - self.assertEqual(self.request.inputs['datetime'][0].data, datetime.datetime(2017, 4, 20, 12), 'Datatime set') - self.assertEqual(self.request.inputs['date'][0].data, datetime.date(2017, 4, 20), 'Data set') - self.assertEqual(self.request.inputs['time'][0].data, datetime.time(9, 0, 0), 'Time set') + self.assertEqual(self.request.inputs['datetime'][0]['data'], '2017-04-20T12:00:00', 'Datatime set') + self.assertEqual(self.request.inputs['date'][0]['data'], '2017-04-20', 'Data set') + self.assertEqual(self.request.inputs['time'][0]['data'], '09:00:00', 'Time set') # dump to json and reload dump = self.request.json self.request.json = json.loads(dump) - self.assertEqual(self.request.inputs['datetime'][0].data, datetime.datetime(2017, 4, 20, 12), 'Datatime set') - self.assertEqual(self.request.inputs['date'][0].data, datetime.date(2017, 4, 20), 'Data set') - self.assertEqual(self.request.inputs['time'][0].data, datetime.time(9, 0, 0), 'Time set') + self.assertEqual(self.request.inputs['datetime'][0]['data'], '2017-04-20T12:00:00', 'Datatime set') + self.assertEqual(self.request.inputs['date'][0]['data'], '2017-04-20', 'Data set') + self.assertEqual(self.request.inputs['time'][0]['data'], '09:00:00', 'Time set') def test_json_inout_bbox(self): obj = { @@ -145,17 +142,17 @@ def test_json_inout_bbox(self): self.request = WPSRequest() self.request.json = obj - self.assertEqual(self.request.inputs['bbox'][0].data, [6.117602, 46.176194, 6.22283, 46.275832], 'BBox data set') - self.assertTrue(isinstance(self.request.inputs['bbox'][0].crs, str), 'CRS is a string') - self.assertEqual(self.request.inputs['bbox'][0].crs, 'epsg:4326', 'CRS data correctly parsed') + self.assertEqual(self.request.inputs['bbox'][0]['bbox'], '6.117602,46.176194,6.22283,46.275832', 'BBox data set') + self.assertTrue(isinstance(self.request.inputs['bbox'][0]['crs'], str), 'CRS is a string') + self.assertEqual(self.request.inputs['bbox'][0]['crs'], 'urn:ogc:def:crs:EPSG::4326', 'CRS data correctly parsed') # dump to json and reload dump = self.request.json self.request.json = json.loads(dump) - self.assertEqual(self.request.inputs['bbox'][0].data, [6.117602, 46.176194, 6.22283, 46.275832], 'BBox data set') - self.assertTrue(isinstance(self.request.inputs['bbox'][0].crs, str), 'CRS is a string') - self.assertEqual(self.request.inputs['bbox'][0].crs, 'epsg:4326', 'CRS data correctly parsed') + self.assertEqual(self.request.inputs['bbox'][0]['bbox'], '6.117602,46.176194,6.22283,46.275832', 'BBox data set') + self.assertTrue(isinstance(self.request.inputs['bbox'][0]['crs'], str), 'CRS is a string') + self.assertEqual(self.request.inputs['bbox'][0]['crs'], 'urn:ogc:def:crs:EPSG::4326', 'CRS data correctly parsed') def load_tests(loader=None, tests=None, pattern=None):