From 75d208a82a1a83486811a411d7689d9323e80c0d Mon Sep 17 00:00:00 2001 From: Jeremie Pardou-Piquemal <571533+jrmi@users.noreply.github.com> Date: Wed, 8 Sep 2021 18:00:49 +0200 Subject: [PATCH] Change way periodic tasks are launched --- README.md | 25 ++- .../commands/auto_refresh_sources.py | 9 + .../migrations/0021_source_last_refresh.py | 19 +++ django_geosource/mixins.py | 23 ++- django_geosource/models.py | 34 +++- django_geosource/periodics.py | 18 ++ django_geosource/schedulers.py | 155 ------------------ django_geosource/tasks.py | 7 + django_geosource/tests/test_periodics.py | 99 +++++++++++ django_geosource/tests/test_scheduler.py | 131 --------------- docker-compose.yml | 4 +- 11 files changed, 221 insertions(+), 303 deletions(-) create mode 100644 django_geosource/management/commands/auto_refresh_sources.py create mode 100644 django_geosource/migrations/0021_source_last_refresh.py create mode 100644 django_geosource/periodics.py delete mode 100644 django_geosource/schedulers.py create mode 100644 django_geosource/tests/test_periodics.py delete mode 100644 django_geosource/tests/test_scheduler.py diff --git a/README.md b/README.md index 0740a16..1bc50e9 100644 --- a/README.md +++ b/README.md @@ -21,8 +21,29 @@ You also need to create the celery app following this [documentation](https://do Then to run the celery worker: `$ celery worker -A django_geosource -l info` -To run the celery beat worker that allow to synchronize periodically sources, launch this command: -`$ celery beat --scheduler django_geosource.celery.schedulers.GeosourceScheduler -A django_geosource -l info` +To run periodic tasks, use celery beat: + +```python + +from celery import Celery +from celery.schedules import crontab + +app = Celery() + +@app.after_finalize.connect +def setup_periodic_tasks(sender, **kwargs): + # Calls refresh check every 30 minutes + sender.add_periodic_task(60.0 * 30, run_auto_refresh_source.s()) + +@app.task +def run_auto_refresh_source(): + from django_geosource.periodics import auto_refresh_source + + auto_refresh_source() +``` + +Then run celery beat worker that allow to synchronize periodically sources, launch this command: +`$ celery beat -A django_geosource -l info` ## Configure data destination diff --git a/django_geosource/management/commands/auto_refresh_sources.py b/django_geosource/management/commands/auto_refresh_sources.py new file mode 100644 index 0000000..2349d27 --- /dev/null +++ b/django_geosource/management/commands/auto_refresh_sources.py @@ -0,0 +1,9 @@ +from django.core.management import BaseCommand +from django_geosource.periodics import auto_refresh_source + + +class Command(BaseCommand): + help = "Launch resync of all sources if needed" + + def handle(self, *args, **options): + auto_refresh_source() diff --git a/django_geosource/migrations/0021_source_last_refresh.py b/django_geosource/migrations/0021_source_last_refresh.py new file mode 100644 index 0000000..91c5849 --- /dev/null +++ b/django_geosource/migrations/0021_source_last_refresh.py @@ -0,0 +1,19 @@ +# Generated by Django 3.1.4 on 2021-09-08 15:29 + +from django.db import migrations, models +import django.utils.timezone + + +class Migration(migrations.Migration): + + dependencies = [ + ("django_geosource", "0020_auto_20201008_0851"), + ] + + operations = [ + migrations.AddField( + model_name="source", + name="last_refresh", + field=models.DateTimeField(default=django.utils.timezone.now), + ), + ] diff --git a/django_geosource/mixins.py b/django_geosource/mixins.py index cadff2c..84f6e22 100644 --- a/django_geosource/mixins.py +++ b/django_geosource/mixins.py @@ -28,18 +28,27 @@ def can_sync(self): and self.task_date < now() - timedelta(hours=MAX_TASK_RUNTIME) ) - def run_async_method(self, method, success_state=states.SUCCESS, force=False): + def run_async_method( + self, + method, + success_state=states.SUCCESS, + force=False, + countdown=None, + ): """Schedule an async task that will be runned by celery. Raises an error if a task is already running or scheduled, can be forced with `force` argument. """ if self.can_sync or force: - task_job = run_model_object_method.delay( - self._meta.app_label, - self.__class__.__name__, - self.pk, - method, - success_state, + task_job = run_model_object_method.apply_async( + ( + self._meta.app_label, + self.__class__.__name__, + self.pk, + method, + success_state, + ), + countdown=countdown, ) self.update_status(task_job) diff --git a/django_geosource/models.py b/django_geosource/models.py index b68f10e..d85695f 100644 --- a/django_geosource/models.py +++ b/django_geosource/models.py @@ -1,5 +1,5 @@ import json -from datetime import datetime +from datetime import datetime, timedelta from enum import Enum, IntEnum, auto import fiona @@ -9,11 +9,17 @@ from django.conf import settings from django.contrib.gis.gdal.error import GDALException from django.contrib.gis.geos import GEOSGeometry -from django.contrib.postgres.fields import JSONField + +try: + from django.db.models import JSONField +except ImportError: # TODO Remove when dropping Django releases < 3.1 + from django.contrib.postgres.fields import JSONField + from django.core.management import call_command from django.core.validators import RegexValidator, URLValidator from django.db import models, transaction from django.utils.text import slugify +from django.utils import timezone from polymorphic.models import PolymorphicModel from psycopg2 import sql @@ -95,6 +101,7 @@ class Source(PolymorphicModel, CeleryCallMethodsMixin): task_date = models.DateTimeField(null=True) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) + last_refresh = models.DateTimeField(default=timezone.now) SOURCE_GEOM_ATTRIBUTE = "_geom_" MAX_SAMPLE_DATA = 5 @@ -121,7 +128,26 @@ def save(self, *args, **kwargs): self.slug = slugify(self.name) return super().save(*args, **kwargs) + def should_refresh(self): + now = timezone.now() + if not getattr(self, "refresh", None) or self.refresh < 1: + return False + next_run = self.last_refresh + timedelta(minutes=self.refresh) + return next_run < now + def refresh_data(self): + try: + return self._refresh_data() + finally: + self.last_refresh = timezone.now() + self.save() + layer = self.get_layer() + refresh_data_done.send_robust( + sender=self.__class__, + layer=layer.pk, + ) + + def _refresh_data(self): report = {} with transaction.atomic(): layer = self.get_layer() @@ -150,10 +176,6 @@ def refresh_data(self): self.save(update_fields=["report"]) raise Exception("Failed to refresh data") - refresh_data_done.send_robust( - sender=self.__class__, - layer=layer.pk, - ) if row_count == total: self.report["status"] = "success" self.save(update_fields=["report"]) diff --git a/django_geosource/periodics.py b/django_geosource/periodics.py new file mode 100644 index 0000000..86d7663 --- /dev/null +++ b/django_geosource/periodics.py @@ -0,0 +1,18 @@ +import logging +from django_geosource.models import Source + +logger = logging.getLogger(__name__) + + +def auto_refresh_source(): + countdown = 0 + for source in Source.objects.all(): + logger.info(f"Is refresh for {source}<{source.id}> needed?") + if source.should_refresh(): + logger.info(f"Schedule refresh for source {source}<{source.id}>...") + # Delay execution by some minutes to avoid struggling + try: + source.run_async_method("refresh_data", countdown=countdown, force=True) + countdown += 60 * 3 + except Exception: + logger.exception("Failed to refresh source!") diff --git a/django_geosource/schedulers.py b/django_geosource/schedulers.py deleted file mode 100644 index ca4e808..0000000 --- a/django_geosource/schedulers.py +++ /dev/null @@ -1,155 +0,0 @@ -import copy -import heapq -import logging -from datetime import datetime, timedelta - -from celery import schedules -from celery.beat import ScheduleEntry, Scheduler, event_t -from django.utils import timezone -from django_geosource.models import PostGISSource - -logger = logging.getLogger(__name__) - - -class SourceEntry(ScheduleEntry): - def __init__(self, source, *args, **kwargs): - self.source = source - self.name = source.slug - super().__init__(*args, **kwargs) - - def is_due(self): - logger.info(f"Is {self.source} due to refresh ?") - - # Refresh from db to update refresh time - self.source.refresh_from_db() - - if self.source.refresh > 0: - next_run = self.last_run_at + timedelta(minutes=self.source.refresh) - logger.debug( - f"Refresh : {self.source.refresh} | Next run : {next_run} | Now : {timezone.now()}" - ) - if next_run < timezone.now(): - logger.info("Source is due to refresh") - return schedules.schedstate(True, 10.0) - else: - logger.debug("Source is NOT due to refresh, let's wait") - return schedules.schedstate(False, 10.0) - - logger.info("The refresh is disabled for this source, check again later") - return schedules.schedstate(False, 600.0) - - def run_task(self): - logger.info(f"Launch refresh_data for source {self.source}") - try: - self.source.run_async_method("refresh_data") - except Exception as e: - logger.error(f"Refresh data couldn't be launched: {e}") - - def __next__(self, last_run_at=None): - """Return new instance, with date and count fields updated.""" - return self.__class__( - **dict( - self, - source=self.source, - last_run_at=last_run_at or self.default_now(), - total_run_count=self.total_run_count + 1, - ) - ) - - next = __next__ - - -class GeosourceScheduler(Scheduler): - """Database-backed Beat Scheduler.""" - - Entry = SourceEntry - _initial = True - _last_sync = None - - TICK_DELAY = 60 - - def all_entries(self): - s = {} - for source in PostGISSource.objects.exclude(refresh__lte=0).order_by( - "-refresh" - ): - try: - s[source.slug] = self.Entry(source, app=self.app) - except ValueError: - pass - return s - - def reserve(self, entry): - new_entry = self.schedule[entry.source.slug] = next(entry) - return new_entry - - @property - def schedule(self): - if self.should_sync() or not getattr(self, "_schedule", None): - logger.debug("Resync schedule entries") - self.sync() - self._schedule = self.all_entries() - - return self._schedule - - def sync(self): - self._last_sync = timezone.now().timestamp() - super().sync() - - def should_sync(self): - last_update = PostGISSource.objects.order_by("-updated_at").first() - if not self._last_sync or ( - last_update - and ( - timezone.make_aware( - datetime.fromtimestamp(self._last_sync), - timezone.get_default_timezone(), - ) - < last_update.updated_at - ) - ): - return True - return False - - def apply_entry(self, entry, producer=None): - logger.info(f"Scheduler: Sending due task {entry.source.name}") - entry.run_task() - - def tick( - self, event_t=event_t, min=min, heappop=heapq.heappop, heappush=heapq.heappush - ): - - logger.debug("Ticking") - - max_interval = self.max_interval - - if self._heap is None or not self.schedules_equal( - self.old_schedulers, self.schedule - ): - self.old_schedulers = copy.copy(self.schedule) - self.populate_heap() - - H = self._heap - - if not H: - logger.info("There is no source to synchronize") - return max_interval - - event = H[0] - time_to_run, priority, entry = event - time_to_run = datetime.fromtimestamp(time_to_run) - - if time_to_run < datetime.now(): - is_due, next_time_to_run = self.is_due(entry) - verify = heappop(H) - - if is_due and verify is event: - self.apply_entry(entry, producer=self.producer) - - next_entry = self.reserve(entry) - heappush( - H, - event_t(self._when(next_entry, next_time_to_run), priority, next_entry), - ) - - return self.TICK_DELAY / len(H) diff --git a/django_geosource/tasks.py b/django_geosource/tasks.py index 618f646..907b3f9 100644 --- a/django_geosource/tasks.py +++ b/django_geosource/tasks.py @@ -49,3 +49,10 @@ def run_model_object_method(self, app, model, pk, method, success_state=states.S logger.error(e, exc_info=True) raise Ignore() + + +@shared_task(bind=True) +def run_auto_refresh_source(): + from django_geosource.periodics import auto_refresh_source + + auto_refresh_source() diff --git a/django_geosource/tests/test_periodics.py b/django_geosource/tests/test_periodics.py new file mode 100644 index 0000000..6f65950 --- /dev/null +++ b/django_geosource/tests/test_periodics.py @@ -0,0 +1,99 @@ +from datetime import datetime +from unittest import mock + +from django.test import TestCase +from django.utils import timezone +from django_geosource.models import GeometryTypes, PostGISSource, GeoJSONSource +from django_geosource.periodics import auto_refresh_source +import os + + +class PeriodicsTestCase(TestCase): + def setUp(self): + self.geosource = GeoJSONSource.objects.create( + name="test", + geom_type=GeometryTypes.Point.value, + file=os.path.join(os.path.dirname(__file__), "data", "test.geojson"), + ) + self.source = PostGISSource.objects.create( + name="First Source", + db_host="localhost", + db_name="dbname", + db_username="username", + query="SELECT 1", + geom_field="geom", + refresh=-1, + last_refresh=datetime(2020, 1, 1, tzinfo=timezone.utc), + geom_type=GeometryTypes.LineString.value, + ) + self.source2 = PostGISSource.objects.create( + name="Second Source", + db_host="localhost", + db_name="dbname", + db_username="username", + query="SELECT 1", + geom_field="geom", + refresh=60 * 24 * 3, + last_refresh=datetime(2020, 1, 1, tzinfo=timezone.utc), + geom_type=GeometryTypes.LineString.value, + ) + + @mock.patch("django.utils.timezone.now") + def test_should_refresh(self, mock_timezone): + dt = datetime(2099, 1, 1, tzinfo=timezone.utc) + mock_timezone.return_value = dt + + self.assertEqual(self.source.should_refresh(), False) + self.assertEqual(self.geosource.should_refresh(), False) + + dt = datetime(2020, 1, 2, tzinfo=timezone.utc) + mock_timezone.return_value = dt + + self.assertEqual(self.source2.should_refresh(), False) + + dt = datetime(2020, 1, 10, tzinfo=timezone.utc) + mock_timezone.return_value = dt + + self.assertEqual(self.source2.should_refresh(), True) + + @mock.patch("django.utils.timezone.now") + def test_auto_refresh(self, mock_timezone): + + with mock.patch( + "django_geosource.models.Source._refresh_data" + ) as mocked, mock.patch( + "django_geosource.mixins.CeleryCallMethodsMixin.update_status", + return_value=False, + ): + + dt = datetime(2020, 1, 2, tzinfo=timezone.utc) + mock_timezone.return_value = dt + auto_refresh_source() + + mocked.assert_not_called() + + with mock.patch( + "django_geosource.models.Source._refresh_data" + ) as mocked2, mock.patch( + "django_geosource.mixins.CeleryCallMethodsMixin.update_status", + return_value=False, + ): + + dt = datetime(2020, 1, 10, tzinfo=timezone.utc) + mock_timezone.return_value = dt + auto_refresh_source() + + mocked2.assert_called_once() + + with mock.patch( + "django_geosource.models.Source._refresh_data" + ) as mocked2, mock.patch( + "django_geosource.mixins.CeleryCallMethodsMixin.update_status", + return_value=False, + ): + + dt = datetime(2020, 1, 10, tzinfo=timezone.utc) + mock_timezone.return_value = dt + auto_refresh_source() + + mocked2.assert_not_called() diff --git a/django_geosource/tests/test_scheduler.py b/django_geosource/tests/test_scheduler.py deleted file mode 100644 index 1463253..0000000 --- a/django_geosource/tests/test_scheduler.py +++ /dev/null @@ -1,131 +0,0 @@ -import logging -from datetime import datetime, timedelta -from unittest import mock - -from django.test import TestCase -from django.utils import timezone -from django_geosource.models import GeometryTypes, PostGISSource -from django_geosource.schedulers import GeosourceScheduler, SourceEntry -from rest_framework.exceptions import MethodNotAllowed -from test_geosource.celery import app as celery_app - - -class SourceEntrySchedulerTestCase(TestCase): - def setUp(self): - self.source = PostGISSource.objects.create( - name="First Source", - db_host="localhost", - db_name="dbname", - db_username="username", - query="SELECT 1", - geom_field="geom", - refresh=-1, - geom_type=GeometryTypes.LineString.value, - ) - - def test_celery_source_entry_is_due_refresh_infinite(self): - entry = SourceEntry(self.source, app=celery_app) - self.assertEqual(entry.is_due().is_due, False) - self.assertEqual(entry.is_due().next, 600) - - def test_celery_source_entry_is_due_refresh(self): - self.source.refresh = 1 - self.source.save() - entry = SourceEntry(self.source, app=celery_app) - self.assertEqual(entry.is_due().is_due, False) - self.assertEqual(entry.is_due().next, 10) - - @mock.patch("django.utils.timezone.now") - def test_celery_source_entry_is_due_refresh_before_now(self, mock_timezone): - dt = datetime(2099, 1, 1, tzinfo=timezone.utc) - mock_timezone.return_value = dt - self.source.refresh = 1 - self.source.save() - entry = SourceEntry(self.source, app=celery_app) - self.assertEqual(entry.is_due().is_due, True) - self.assertEqual(entry.is_due().next, 10) - - def test_celery_source_entry_next(self): - entry = SourceEntry(self.source, app=celery_app) - self.assertEqual( - str(entry.__next__(datetime(2099, 1, 1, tzinfo=timezone.utc)).source), - "First Source - PostGISSource", - ) - - def test_celery_source_entry_run_task(self): - entry = SourceEntry(self.source, app=celery_app) - logging.disable(logging.ERROR) - with mock.patch( - "django_geosource.mixins.CeleryCallMethodsMixin.can_sync", - new_callable=mock.PropertyMock, - return_value=False, - ): - with mock.patch( - "django_geosource.mixins.CeleryCallMethodsMixin.run_async_method", - side_effect=MethodNotAllowed("Test"), - ) as mocked: - entry.run_task() - mocked.assert_called_once() - - -class GeoSourceSchedulerTestCase(TestCase): - def setUp(self): - self.source = PostGISSource.objects.create( - name="First Source", - db_host="localhost", - db_name="dbname", - db_username="username", - query="SELECT 1", - geom_field="geom", - refresh=1, - geom_type=GeometryTypes.LineString.value, - ) - self.geosource_scheduler = GeosourceScheduler(celery_app, lazy=True) - - def test_all_entries(self): - self.assertEqual( - ["first-source"], list(self.geosource_scheduler.all_entries().keys()) - ) - - def test_reserve(self): - entry = SourceEntry(self.source, app=celery_app) - self.assertEqual(entry, self.geosource_scheduler.reserve(entry)) - - def test_schedule_with_source_should_sync(self): - entry = SourceEntry(self.source, app=celery_app) - self.assertEqual({"first-source": entry}, self.geosource_scheduler.schedule) - - def test_schedule_with_source_should_not_sync(self): - time_s = timezone.now() + timedelta(days=1) - self.geosource_scheduler._last_sync = time_s.timestamp() - self.geosource_scheduler._schedule = "Test" - self.assertEqual("Test", self.geosource_scheduler.schedule) - - def test_schedule_apply_entry(self): - entry = SourceEntry(self.source, app=celery_app) - with mock.patch( - "django_geosource.mixins.CeleryCallMethodsMixin.run_async_method", - return_value=False, - ) as mocked: - self.geosource_scheduler.apply_entry(entry) - mocked.assert_called_once() - - def test_tick_without_entry(self): - self.source.delete() - self.geosource_scheduler.max_interval = 300 - self.assertEqual(self.geosource_scheduler.tick(), 300) - - def test_tick_with_entry(self): - PostGISSource.objects.create( - name="Second Source", - db_host="localhost", - db_name="dbname", - db_username="username", - query="SELECT 1", - geom_field="geom", - refresh=1, - geom_type=GeometryTypes.LineString.value, - ) - self.assertEqual( - self.geosource_scheduler.tick(), 30 - ) # TICK_DELAY = 60 / Number of source diff --git a/docker-compose.yml b/docker-compose.yml index ac80804..57e38d9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,4 @@ -version: "3" +version: '3' services: postgres: image: makinacorpus/pgrouting:10-2.5-2.6 @@ -18,7 +18,7 @@ services: volumes: - .:/code/src ports: - - "8000:8000" + - '8089:8000' command: ./manage.py runserver 0.0.0.0:8000 volumes: