Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapt pysteps to allow for postprocessing plugins #405

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pysteps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,4 @@ def load_config_file(params_file=None, verbose=False, dryrun=False):

# After the sub-modules are loaded, register the discovered importers plugin.
io.interface.discover_importers()
postprocessing.interface.discover_postprocessors()
15 changes: 6 additions & 9 deletions pysteps/io/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,8 @@
"""
import importlib

from pkg_resources import iter_entry_points

from pysteps import io
from pysteps.decorators import postprocess_import
from pysteps.io import importers, exporters
from pysteps.io import importers, exporters, interface
from pprint import pprint

_importer_methods = dict(
Expand Down Expand Up @@ -58,7 +55,7 @@ def discover_importers():
importlib.reload(pkg_resources)

for entry_point in pkg_resources.iter_entry_points(
group="pysteps.plugins.importers", name=None
group="pysteps.plugins.importer", name=None
):
_importer = entry_point.load()

Expand Down Expand Up @@ -91,22 +88,22 @@ def importers_info():

# Importers available in the `io.importers` module
available_importers = [
attr for attr in dir(io.importers) if attr.startswith("import_")
attr for attr in dir(importers) if attr.startswith("import_")
]

print("\nImporters available in the pysteps.io.importers module")
pprint(available_importers)

# Importers declared in the pysteps.io.get_method interface
importers_in_the_interface = [
f.__name__ for f in io.interface._importer_methods.values()
f.__name__ for f in interface._importer_methods.values()
]

print("\nImporters available in the pysteps.io.get_method interface")
pprint(
[
(short_name, f.__name__)
for short_name, f in io.interface._importer_methods.items()
for short_name, f in interface._importer_methods.items()
]
)

Expand All @@ -117,7 +114,7 @@ def importers_info():

difference = available_importers ^ importers_in_the_interface
if len(difference) > 0:
print("\nIMPORTANT:")
#print("\nIMPORTANT:")
_diff = available_importers - importers_in_the_interface
if len(_diff) > 0:
print(
Expand Down
3 changes: 3 additions & 0 deletions pysteps/postprocessing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@
"""Methods for post-processing of forecasts."""

from . import ensemblestats
from .diagnostics import *
from .interface import *
from .ensemblestats import *
27 changes: 27 additions & 0 deletions pysteps/postprocessing/diagnostics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""
pysteps.postprocessing.diagnostics
====================

Methods for applying diagnostics postprocessing.

The methods in this module implement the following interface::

diagnostic_xxx(optional arguments)

where **xxx** is the name of the diagnostic to be applied.

Available Diagnostics Postprocessors
------------------------

.. autosummary::
:toctree: ../generated/

"""


def diagnostic_example1(filename, **kwargs):
return "Hello, I am an example diagnostics postprocessor."


def diagnostic_example2(filename, **kwargs):
return [[42, 42], [42, 42]]
8 changes: 8 additions & 0 deletions pysteps/postprocessing/ensemblestats.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,11 @@ def banddepth(X, thr=None, norm=False):
depth = (depth - depth.min()) / (depth.max() - depth.min())

return depth


def ensemblestat_example1(filename, **kwargs):
return "Hello, I am an example of postprocessing ensemble statistics."


def ensemblestat_example2(filename, **kwargs):
return [[42, 42], [42, 42]]
278 changes: 278 additions & 0 deletions pysteps/postprocessing/interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
# -*- coding: utf-8 -*-
"""
pysteps.postprocessing.interface
====================

Interface for the postprocessing module.

.. currentmodule:: pysteps.postprocessing.interface

.. autosummary::
:toctree: ../generated/

get_method
"""
import importlib

from pysteps.postprocessing import diagnostics, ensemblestats
from pprint import pprint

_diagnostics_methods = dict(
)

_ensemblestats_methods = dict(
mean=ensemblestats.mean,
excprob=ensemblestats.excprob,
banddepth=ensemblestats.banddepth,
)

def add_postprocessor(
postprocessors_function_name,
_postprocessors,
methods_dict,
module,
attributes
):
"""
Add the postprocessor to the appropriate _methods dictionary and to the module.
Parameters
----------

postprocessors_function_name: str
for example, e.g. diagnostic_example1
_postprocessors: function
the function to be added
@param methods_dict: the dictionary where the function is added
@param module: the module where the function is added, e.g. 'diagnostics'
@param attributes: the existing functions in the selected module
"""

# get funtion name without mo
short_name = postprocessors_function_name.replace(f"{module}_", "")
if short_name not in methods_dict:
methods_dict[short_name] = _postprocessors
else:
RuntimeWarning(
f"The {module} identifier '{short_name}' is already available in "
f"'pysteps.postprocessing.interface_{module}_methods'.\n"
f"Skipping {module}:{'.'.join(attributes)}"
)

if hasattr(globals()[module], postprocessors_function_name):
RuntimeWarning(
f"The {module} function '{short_name}' is already an attribute"
f"of 'pysteps.postprocessing.{module}'.\n"
f"Skipping {module}:{'.'.join(attributes)}"
)
else:
setattr(globals()[module], postprocessors_function_name, _postprocessors)


def discover_postprocessors():
"""
Search for installed postprocessing plugins in the entrypoint 'pysteps.plugins.postprocessors'

The postprocessors found are added to the appropriate `_methods`
dictionary in 'pysteps.postprocessing.interface' containing the available postprocessors.
"""

# The pkg resources needs to be reloaded to detect new packages installed during
# the execution of the python application. For example, when the plugins are
# installed during the tests
import pkg_resources

importlib.reload(pkg_resources)

# Discover the postprocessors available in the plugins
for plugintype in ["diagnostic", "ensemblestat"]:
for entry_point in pkg_resources.iter_entry_points(
group=f"pysteps.plugins.{plugintype}", name=None
):
_postprocessors = entry_point.load()

postprocessors_function_name = _postprocessors.__name__


if "diagnostic" in entry_point.module_name:
add_postprocessor(
postprocessors_function_name,
_postprocessors,
_diagnostics_methods,
"diagnostics",
entry_point.attrs,
)
elif "ensemblestat" in entry_point.module_name:
add_postprocessor(
postprocessors_function_name,
_postprocessors,
_ensemblestats_methods,
"ensemblestats",
entry_point.attrs,
)
else:
raise ValueError(
f"Unknown module {entry_point.module_name} in the entrypoint {entry_point.name}"
)


def print_postprocessors_info(module_name, interface_methods, module_methods):
"""
Helper function to print the postprocessors available in the module and in the interface.

Parameters
----------
module_name: str
Name of the module, for example 'pysteps.postprocessing.diagnostics'.
interface_methods: dict
Dictionary of the postprocessors declared in the interface, for example _diagnostics_methods.
module_methods: list
List of the postprocessors available in the module, for example 'diagnostic_example1'.

"""
print(f"\npostprocessors available in the {module_name} module")
pprint(module_methods)

print(
f"\npostprocessors available in the pysteps.postprocessing.get_method interface"
)
pprint([(short_name, f.__name__) for short_name, f in interface_methods.items()])

module_methods_set = set(module_methods)
interface_methods_set = set(interface_methods.keys())

difference = module_methods_set ^ interface_methods_set
if len(difference) > 0:
#print("\nIMPORTANT:")
_diff = module_methods_set - interface_methods_set
if len(_diff) > 0:
print(
f"\nIMPORTANT:\nThe following postprocessors are available in {module_name} module but not in the pysteps.postprocessing.get_method interface"
)
pprint(_diff)
_diff = interface_methods_set - module_methods_set
if len(_diff) > 0:
print(
"\nWARNING:\n"
f"The following postprocessors are available in the pysteps.postprocessing.get_method interface but not in the {module_name} module"
)
pprint(_diff)


def postprocessors_info():
"""Print all the available postprocessors."""

available_postprocessors = set()
postprocessors_in_the_interface = set()
# List the plugins that have been added to the postprocessing.[plugintype] module
for plugintype in ["diagnostics", "ensemblestats"]:
# in the dictionary and found by get_methods() function
interface_methods = (
_diagnostics_methods
if plugintype == "diagnostics"
else _ensemblestats_methods
)
# in the pysteps.postprocessing module
module_name = f"pysteps.postprocessing.{plugintype}"
available_module_methods = [
attr
for attr in dir(importlib.import_module(module_name))
if attr.startswith(plugintype[:-1])
]
# add the pre-existing ensemblestats functions (see _ensemblestats_methods above)
# that do not follow the convention to start with "ensemblestat_" as the plugins
if "ensemblestats" in plugintype:
available_module_methods += [em for em in _ensemblestats_methods.keys() if not em.startswith('ensemblestat_')]
print_postprocessors_info(
module_name, interface_methods, available_module_methods
)
available_postprocessors = available_postprocessors.union(
available_module_methods
)
postprocessors_in_the_interface = postprocessors_in_the_interface.union(
interface_methods.keys()
)

return available_postprocessors, postprocessors_in_the_interface


def get_method(name, method_type):
"""
Return a callable function for the method corresponding to the given
name.

Parameters
----------
name: str
Name of the method. The available options are:\n

diagnostics:

.. tabularcolumns:: |p{2cm}|L|

+---------------+-------------------------------------------------------+
| Name | Description |
+===============+=======================================================+
| Diagnostic | Example that returns a string |
| Example1 | |
+---------------+-------------------------------------------------------+
| Diagnostic | Example that returns an array |
| Example3 | |
+---------------+-------------------------------------------------------+

ensemblestats:

.. tabularcolumns:: |p{2cm}|L|

+---------------+-------------------------------------------------------+
| Name | Description |
+===============+=======================================================+
| EnsembleStat | Example that returns a string |
| Example1 | |
+---------------+-------------------------------------------------------+
| EnsembleStat | Example that returns an array |
| Example3 | |
+---------------+-------------------------------------------------------+

method_type: {'diagnostics', 'ensemblestats'}
Type of the method (see tables above).

"""

if isinstance(method_type, str):
method_type = method_type.lower()
else:
raise TypeError(
"Only strings supported for for the method_type"
+ " argument\n"
+ "The available types are: 'diagnostics', 'ensemblestats'"
) from None

if isinstance(name, str):
name = name.lower()
else:
raise TypeError(
"Only strings supported for the method's names.\n"
+ "\nAvailable diagnostics names:"
+ str(list(_diagnostics_methods.keys()))
+ "\nAvailable ensemblestats names:"
+ str(list(_ensemblestats_methods.keys()))
) from None

if method_type == "diagnostics":
methods_dict = _diagnostics_methods
elif method_type == "ensemblestats":
methods_dict = _ensemblestats_methods
else:
raise ValueError(
"Unknown method type {}\n".format(method_type)
+ "The available types are: 'diagnostics', 'ensemblestats'"
) from None

try:
return methods_dict[name]
except KeyError:
raise ValueError(
"Unknown {} method {}\n".format(method_type, name)
+ "The available methods are:"
+ str(list(methods_dict.keys()))
) from None
Loading
Loading