Skip to content

Commit

Permalink
Change way periodic tasks are launched
Browse files Browse the repository at this point in the history
  • Loading branch information
jrmi committed Sep 14, 2021
1 parent df775fc commit 75d208a
Show file tree
Hide file tree
Showing 11 changed files with 221 additions and 303 deletions.
25 changes: 23 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,29 @@ You also need to create the celery app following this [documentation](https://do
Then to run the celery worker:
`$ celery worker -A django_geosource -l info`

To run the celery beat worker that allow to synchronize periodically sources, launch this command:
`$ celery beat --scheduler django_geosource.celery.schedulers.GeosourceScheduler -A django_geosource -l info`
To run periodic tasks, use celery beat:

```python

from celery import Celery
from celery.schedules import crontab

app = Celery()

@app.after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls refresh check every 30 minutes
sender.add_periodic_task(60.0 * 30, run_auto_refresh_source.s())

@app.task
def run_auto_refresh_source():
from django_geosource.periodics import auto_refresh_source

auto_refresh_source()
```

Then run celery beat worker that allow to synchronize periodically sources, launch this command:
`$ celery beat -A django_geosource -l info`

## Configure data destination

Expand Down
9 changes: 9 additions & 0 deletions django_geosource/management/commands/auto_refresh_sources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from django.core.management import BaseCommand
from django_geosource.periodics import auto_refresh_source


class Command(BaseCommand):
help = "Launch resync of all sources if needed"

def handle(self, *args, **options):
auto_refresh_source()
19 changes: 19 additions & 0 deletions django_geosource/migrations/0021_source_last_refresh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Generated by Django 3.1.4 on 2021-09-08 15:29

from django.db import migrations, models
import django.utils.timezone


class Migration(migrations.Migration):

dependencies = [
("django_geosource", "0020_auto_20201008_0851"),
]

operations = [
migrations.AddField(
model_name="source",
name="last_refresh",
field=models.DateTimeField(default=django.utils.timezone.now),
),
]
23 changes: 16 additions & 7 deletions django_geosource/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,27 @@ def can_sync(self):
and self.task_date < now() - timedelta(hours=MAX_TASK_RUNTIME)
)

def run_async_method(self, method, success_state=states.SUCCESS, force=False):
def run_async_method(
self,
method,
success_state=states.SUCCESS,
force=False,
countdown=None,
):
"""Schedule an async task that will be runned by celery.
Raises an error if a task is already running or scheduled, can be forced with
`force` argument.
"""
if self.can_sync or force:
task_job = run_model_object_method.delay(
self._meta.app_label,
self.__class__.__name__,
self.pk,
method,
success_state,
task_job = run_model_object_method.apply_async(
(
self._meta.app_label,
self.__class__.__name__,
self.pk,
method,
success_state,
),
countdown=countdown,
)

self.update_status(task_job)
Expand Down
34 changes: 28 additions & 6 deletions django_geosource/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import json
from datetime import datetime
from datetime import datetime, timedelta
from enum import Enum, IntEnum, auto

import fiona
Expand All @@ -9,11 +9,17 @@
from django.conf import settings
from django.contrib.gis.gdal.error import GDALException
from django.contrib.gis.geos import GEOSGeometry
from django.contrib.postgres.fields import JSONField

try:
from django.db.models import JSONField
except ImportError: # TODO Remove when dropping Django releases < 3.1
from django.contrib.postgres.fields import JSONField

from django.core.management import call_command
from django.core.validators import RegexValidator, URLValidator
from django.db import models, transaction
from django.utils.text import slugify
from django.utils import timezone
from polymorphic.models import PolymorphicModel
from psycopg2 import sql

Expand Down Expand Up @@ -95,6 +101,7 @@ class Source(PolymorphicModel, CeleryCallMethodsMixin):
task_date = models.DateTimeField(null=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
last_refresh = models.DateTimeField(default=timezone.now)

SOURCE_GEOM_ATTRIBUTE = "_geom_"
MAX_SAMPLE_DATA = 5
Expand All @@ -121,7 +128,26 @@ def save(self, *args, **kwargs):
self.slug = slugify(self.name)
return super().save(*args, **kwargs)

def should_refresh(self):
now = timezone.now()
if not getattr(self, "refresh", None) or self.refresh < 1:
return False
next_run = self.last_refresh + timedelta(minutes=self.refresh)
return next_run < now

def refresh_data(self):
try:
return self._refresh_data()
finally:
self.last_refresh = timezone.now()
self.save()
layer = self.get_layer()
refresh_data_done.send_robust(
sender=self.__class__,
layer=layer.pk,
)

def _refresh_data(self):
report = {}
with transaction.atomic():
layer = self.get_layer()
Expand Down Expand Up @@ -150,10 +176,6 @@ def refresh_data(self):
self.save(update_fields=["report"])
raise Exception("Failed to refresh data")

refresh_data_done.send_robust(
sender=self.__class__,
layer=layer.pk,
)
if row_count == total:
self.report["status"] = "success"
self.save(update_fields=["report"])
Expand Down
18 changes: 18 additions & 0 deletions django_geosource/periodics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import logging
from django_geosource.models import Source

logger = logging.getLogger(__name__)


def auto_refresh_source():
countdown = 0
for source in Source.objects.all():
logger.info(f"Is refresh for {source}<{source.id}> needed?")
if source.should_refresh():
logger.info(f"Schedule refresh for source {source}<{source.id}>...")
# Delay execution by some minutes to avoid struggling
try:
source.run_async_method("refresh_data", countdown=countdown, force=True)
countdown += 60 * 3
except Exception:
logger.exception("Failed to refresh source!")
155 changes: 0 additions & 155 deletions django_geosource/schedulers.py

This file was deleted.

7 changes: 7 additions & 0 deletions django_geosource/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,10 @@ def run_model_object_method(self, app, model, pk, method, success_state=states.S
logger.error(e, exc_info=True)

raise Ignore()


@shared_task(bind=True)
def run_auto_refresh_source():
from django_geosource.periodics import auto_refresh_source

auto_refresh_source()
Loading

0 comments on commit 75d208a

Please sign in to comment.