Skip to content

Commit

Permalink
OpenConceptLab/ocl_issues#1923 Persist partial progress for new bulk …
Browse files Browse the repository at this point in the history
…import
  • Loading branch information
rkorytkowski committed Jan 23, 2025
1 parent 93a51d7 commit 5ff62eb
Showing 1 changed file with 35 additions and 35 deletions.
70 changes: 35 additions & 35 deletions core/importers/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@
import zipfile
from zipfile import ZipFile
from celery.result import AsyncResult, result_from_tuple
from celery import group, chain
from celery import group, chain, chord

import ijson
import requests
from ijson import JSONError
from kombu import uuid
from packaging.version import Version
from pydantic import BaseModel, computed_field, PrivateAttr

Expand Down Expand Up @@ -83,6 +84,7 @@ class ImportTask(BaseModel):
time_started: datetime = timezone.now()
_time_finished: datetime = PrivateAttr(default=None)
dependencies: list = []
subtask_ids: list = []
initial_summary: ImportTaskSummary = ImportTaskSummary()
final_summary: ImportTaskSummary = None

Expand Down Expand Up @@ -144,36 +146,30 @@ def summary(self) -> ImportTaskSummary: # pylint: disable=too-many-branches
if not self.import_async_result:
return summary

import_group = self.import_async_result
import_groups = []
while import_group.parent is not None:
import_group = import_group.parent
import_groups = [import_group] + import_groups

for import_group in import_groups:
for child in import_group.children:
if child.ready():
results = child.result
if not isinstance(results, list):
results = [child.result]

for result in results:
summary.processed += 1
if result == CREATED:
summary.created += 1
elif result == UPDATED:
summary.updated += 1
elif result == DELETED:
summary.deleted += 1
elif result == PERMISSION_DENIED:
summary.permission_denied += 1
elif result == UNCHANGED:
summary.unchanged += 1
else:
summary.failed += 1
summary.failures.append(result)
if not import_group.ready():
break # inspect next group only if the current one is ready
for task_id in self.subtask_ids:
child = AsyncResult(task_id)
if child.ready():
results = child.result
if not isinstance(results, list):
results = [child.result]

for result in results:
summary.processed += 1
if result == CREATED:
summary.created += 1
elif result == UPDATED:
summary.updated += 1
elif result == DELETED:
summary.deleted += 1
elif result == PERMISSION_DENIED:
summary.permission_denied += 1
elif result == UNCHANGED:
summary.unchanged += 1
else:
summary.failed += 1
summary.failures.append(result)
else:
break # inspect further only if the current one is ready
return summary

@computed_field
Expand Down Expand Up @@ -275,13 +271,14 @@ def run(self): # pylint: disable=too-many-locals
tasks = self.prepare_tasks(resource_types, dependencies, resources)
if tasks:
# In the future we will let the user approve the import before scheduling tasks.
task = self.schedule_tasks(tasks)
task, subtask_ids = self.schedule_tasks(tasks)

# Return the task id of the chain to track the end of execution.
# We do not wait for the end of execution of tasks here to free up worker and memory.
# It is also to be able to pick up running tasks in the event of restart and not having to handle restarting
# the main task.
result = ImportTask(import_task=task.as_tuple(), time_started=time_started, dependencies=dependencies)
result = ImportTask(import_task=task.as_tuple(), subtask_ids=subtask_ids, time_started=time_started,
dependencies=dependencies)

for _, files in resources.items():
for _, count in files.items():
Expand Down Expand Up @@ -432,23 +429,26 @@ def calculate_batch_size(self, resources):
return task_batch_size

def schedule_tasks(self, tasks):
subtask_ids = []
chained_tasks = chain()
for task in tasks:
group_tasks = []
for group_task in task:
# TODO: create 2 queues for new bulk import subtasks: bulk_import_subtask and bulk_import_subtask_root
subtask_id = uuid()
subtask_ids.append(subtask_id)
group_tasks.append(bulk_import_subtask.si(group_task['path'], group_task['username'],
group_task['owner_type'], group_task['owner'],
group_task['resource_type'], group_task['files'])
.set(queue='concurrent'))
.set(queue='concurrent', task_id=subtask_id))
if len(group_tasks) == 1: # Prevent celery from converting group to a single task
group_tasks.append(bulk_import_subtask_empty.si().set(queue='concurrent'))

chained_tasks |= group(group_tasks)
chained_tasks |= import_finisher.si(self.task_id).set(queue='concurrent')

final_task = chained_tasks.apply_async(queue='concurrent')
return final_task
return final_task, subtask_ids

def is_importable_file(self, file_name):
return file_name.endswith('.json') and ((self.is_npm_import() and file_name.startswith('package/')
Expand Down

0 comments on commit 5ff62eb

Please sign in to comment.