Skip to content

Commit

Permalink
Add ETL for persons (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
a1d4r authored Mar 18, 2024
1 parent 589ed8a commit 8f82b8c
Show file tree
Hide file tree
Showing 10 changed files with 290 additions and 83 deletions.
2 changes: 2 additions & 0 deletions etl/etl/dto/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from etl.dto.elasticsearch import *
from etl.dto.postgres import *
63 changes: 63 additions & 0 deletions etl/etl/dto/elasticsearch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from dataclasses import dataclass
from uuid import UUID

__all__ = [
"BaseElasticsearchRecord",
"PersonMinimalElasticsearchRecord",
"FilmWorkMinimalElasticsearchRecord",
"GenreElasticsearchRecord",
"PersonElasticsearchRecord",
"FilmWorkElasticsearchRecord",
]


@dataclass
class BaseElasticsearchRecord:
"""Базовая модель для хранения информации в индексе Elasticsearch."""

id: UUID


@dataclass
class PersonMinimalElasticsearchRecord(BaseElasticsearchRecord):
"""Модель для хранения краткой информации о персоне в индексе Elasticsearch."""

name: str


@dataclass
class FilmWorkMinimalElasticsearchRecord(BaseElasticsearchRecord):
"""Модель для хранения краткой информации о кинопроизведении в индексе Elasticsearch."""

roles: list[str]


@dataclass
class GenreElasticsearchRecord(BaseElasticsearchRecord):
"""Модель для хранения информации о жанре в индексе Elasticsearch."""

name: str
description: str


@dataclass
class PersonElasticsearchRecord(BaseElasticsearchRecord):
"""Модель для хранения информации о персоне в индексе Elasticsearch."""

full_name: str
films: list[FilmWorkMinimalElasticsearchRecord]


@dataclass
class FilmWorkElasticsearchRecord(BaseElasticsearchRecord):
"""Модель для хранения информации о кинопроизведении в индексе Elasticsearch."""

imdb_rating: float
genre: list[str]
title: str
description: str
director: list[str]
actors_names: list[str]
writers_names: list[str]
actors: list[PersonMinimalElasticsearchRecord]
writers: list[PersonMinimalElasticsearchRecord]
72 changes: 35 additions & 37 deletions etl/etl/dto.py → etl/etl/dto/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,18 @@
from datetime import datetime
from uuid import UUID

__all__ = [
"PersonIdModified",
"GenreIdModified",
"FilmWorkIdModified",
"GenreInfo",
"PersonInfo",
"PersonFilmWorkRecord",
"FilmWorkInfo",
"FilmWorkGenreRecord",
"FilmWorkPersonRecord",
]


@dataclass
class IdModified:
Expand All @@ -27,7 +39,7 @@ class FilmWorkIdModified(IdModified):

@dataclass
class GenreInfo:
"""Информация о жанре."""
"""Информация о жанре фильма из БД."""

id: UUID
name: str
Expand All @@ -36,9 +48,28 @@ class GenreInfo:
modified: datetime


@dataclass
class PersonInfo:
"""Информация о персоне из БД."""

id: UUID
full_name: str
created: datetime
modified: datetime


@dataclass
class PersonFilmWorkRecord:
"""Информация о фильме и ролях, в котором участвовала персона."""

person_id: UUID
film_work_id: UUID
roles: list[str]


@dataclass
class FilmWorkInfo:
"""Информация о кинопроизведении."""
"""Информация о кинопроизведении из БД."""

id: UUID
title: str
Expand All @@ -52,7 +83,7 @@ class FilmWorkInfo:

@dataclass
class FilmWorkGenreRecord:
"""Связь между кинопроизведением и жанром."""
"""Связь между кинопроизведением и жанром из БД."""

film_work_id: UUID
genre_id: UUID
Expand All @@ -61,42 +92,9 @@ class FilmWorkGenreRecord:

@dataclass
class FilmWorkPersonRecord:
"""Связь между кинопроизведением и персоной."""
"""Информация о персоне, участвовавшей в кинопроизведении."""

film_work_id: UUID
person_id: UUID
person_full_name: str
role: str


@dataclass
class PersonElasticsearchRecord:
"""Объект для хранения информации о персоне в индексе Elasticsearch."""

id: UUID
name: str


@dataclass
class GenreElasticsearchRecord:
"""Объект для хранения информации о жанре в индексе Elasticsearch."""

id: UUID
name: str
description: str


@dataclass
class FilmWorkElasticsearchRecord:
"""Объект для хранения информации о кинопроизведении в индексе Elasticsearch."""

id: UUID
imdb_rating: float
genre: list[str]
title: str
description: str
director: list[str]
actors_names: list[str]
writers_names: list[str]
actors: list[PersonElasticsearchRecord]
writers: list[PersonElasticsearchRecord]
46 changes: 45 additions & 1 deletion etl/etl/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,27 @@ def fetch_genres_info(
)
return [dto.GenreInfo(**row) for row in cursor.fetchall()]

@retry
def fetch_persons_info(
self,
persons: list[dto.PersonIdModified],
) -> list[dto.PersonInfo]:
"""Получить информацию о заданных персонах."""
with self._connection.cursor() as cursor:
cursor.execute(
"""
SELECT
id,
full_name,
created,
modified
FROM content.person
WHERE id = ANY(%(persons_ids)s)
""",
({"persons_ids": [person.id for person in persons]}),
)
return [dto.PersonInfo(**row) for row in cursor.fetchall()]

@retry
def fetch_film_works_info(
self,
Expand Down Expand Up @@ -202,7 +223,9 @@ def fetch_film_works_persons(
self,
film_works: list[dto.FilmWorkIdModified],
) -> list[dto.FilmWorkPersonRecord]:
"""Получить персон, которые участвовали в создании заданных кинопроизведений."""
"""Получить информацию о персонах, которые участвовали
в создании заданных кинопроизведений.
"""
with self._connection.cursor() as cursor:
cursor.execute(
"""
Expand All @@ -219,3 +242,24 @@ def fetch_film_works_persons(
({"film_works_ids": [film_work.id for film_work in film_works]}),
)
return [dto.FilmWorkPersonRecord(**row) for row in cursor.fetchall()]

@retry
def fetch_persons_film_works(
self,
persons: list[dto.PersonIdModified],
) -> list[dto.PersonFilmWorkRecord]:
"""Получить кинопроизведения и роли, в которых участвовала заданная персона."""
with self._connection.cursor() as cursor:
cursor.execute(
"""
SELECT
person_id,
film_work_id,
array_agg(role) AS roles
FROM content.person_film_work
WHERE person_id = ANY(%(persons_ids)s)
GROUP BY (person_id, film_work_id)
""",
({"persons_ids": [person.id for person in persons]}),
)
return [dto.PersonFilmWorkRecord(**row) for row in cursor.fetchall()]
42 changes: 28 additions & 14 deletions etl/etl/loader.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import dataclasses

from collections.abc import Sequence

import elastic_transport
import tenacity

Expand All @@ -22,26 +24,38 @@ def __init__(self, client: Elasticsearch) -> None:
self._client = client

@retry
def load_film_works_records(
self,
film_works_records: list[dto.FilmWorkElasticsearchRecord],
) -> None:
"""Загружает в индекс данные о фильмах."""
def _load_records(self, index: str, records: Sequence[dto.BaseElasticsearchRecord]) -> None:
"""Загружает в данные в заданный индекс."""
logger.info(
"Going to insert {} documents into Elasticsearch",
len(records),
)
helpers.bulk(
self._client,
[
{"_index": "movies", "_id": str(record.id), "_source": dataclasses.asdict(record)}
for record in film_works_records
{"_index": index, "_id": str(record.id), "_source": dataclasses.asdict(record)}
for record in records
],
)
logger.info(
"Inserted {} documents into Elasticsearch",
len(records),
)

@retry
def load_film_works_records(
self,
film_works_records: list[dto.FilmWorkElasticsearchRecord],
) -> None:
"""Загружает в индекс данные о фильмах."""
self._load_records("movies", film_works_records)

@retry
def load_genres_records(self, genres_records: list[dto.GenreElasticsearchRecord]) -> None:
"""Загружает в индекс данные о жанрах."""
helpers.bulk(
self._client,
[
{"_index": "genres", "_id": str(record.id), "_source": dataclasses.asdict(record)}
for record in genres_records
],
)
self._load_records("genres", genres_records)

@retry
def load_persons_records(self, persons_records: list[dto.PersonElasticsearchRecord]) -> None:
"""Загружает в индекс данные о персонах."""
self._load_records("persons", persons_records)
53 changes: 24 additions & 29 deletions etl/etl/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ def synchronize_person_updates(self) -> None:
state.persons_modified_cursor,
)

self._update_persons_in_elasticsearch(persons)

for film_works in self.extractor.fetch_film_works_with_persons_in_batches(persons):
self._update_film_works_in_elasticsearch(film_works=film_works)
self._update_film_works_in_elasticsearch(film_works)

state.persons_modified_cursor = persons[-1].modified
self.state_manager.save_state(state)
Expand All @@ -66,10 +68,10 @@ def synchronize_genre_updates(self) -> None:
logger.info("No genres were updated since {}", state.genres_modified_cursor)
return

self._update_genres_in_elasticsearch(genres=genres)
self._update_genres_in_elasticsearch(genres)

for film_works in self.extractor.fetch_film_works_with_genres_in_batches(genres=genres):
self._update_film_works_in_elasticsearch(film_works=film_works)
for film_works in self.extractor.fetch_film_works_with_genres_in_batches(genres):
self._update_film_works_in_elasticsearch(film_works)

state.genres_modified_cursor = genres[-1].modified
self.state_manager.save_state(state)
Expand All @@ -94,7 +96,7 @@ def synchronize_film_work_updates(self) -> None:
state.film_works_modified_cursor,
)

self._update_film_works_in_elasticsearch(film_works=film_works)
self._update_film_works_in_elasticsearch(film_works)

state.film_works_modified_cursor = film_works[-1].modified
self.state_manager.save_state(state)
Expand All @@ -105,43 +107,36 @@ def _update_genres_in_elasticsearch(self, genres: list[dto.GenreIdModified]) ->
"""Обновляет данные о жанрах в Elasticsearch."""
logger.info("Retrieved {} genres", len(genres))

genres_info = self.extractor.fetch_genres_info(genres=genres)
genres_info = self.extractor.fetch_genres_info(genres)
genres_elasticsearch_records = transformer.build_genres_elasticsearch_records(
genres_info=genres_info,
)

logger.info(
"Going to insert {} documents into Elasticsearch",
len(genres_elasticsearch_records),
genres_info,
)
self.loader.load_genres_records(genres_elasticsearch_records)
logger.info(
"Inserted {} documents into Elasticsearch",
len(genres_elasticsearch_records),

def _update_persons_in_elasticsearch(self, persons: list[dto.PersonIdModified]) -> None:
"""Обновляет данные о персонах в Elasticsearch."""
logger.info("Retrieved {} persons", len(persons))

persons_info = self.extractor.fetch_persons_info(persons)
persons_film_works = self.extractor.fetch_persons_film_works(persons)

persons_elasticsearch_records = transformer.build_persons_elasticsearch_records(
persons_info,
persons_film_works,
)
self.loader.load_persons_records(persons_elasticsearch_records)

def _update_film_works_in_elasticsearch(self, film_works: list[dto.FilmWorkIdModified]) -> None:
"""Обновляет данные о фильмах в Elasticsearch."""
logger.info("Retrieved {} film works with updated persons", len(film_works))

logger.info("Collecting film works info")

film_works_info = self.extractor.fetch_film_works_info(film_works=film_works)
film_works_genres = self.extractor.fetch_film_works_genres(film_works=film_works)
film_works_persons = self.extractor.fetch_film_works_persons(film_works=film_works)
film_works_info = self.extractor.fetch_film_works_info(film_works)
film_works_genres = self.extractor.fetch_film_works_genres(film_works)
film_works_persons = self.extractor.fetch_film_works_persons(film_works)

film_works_elasticsearch_records = transformer.build_film_works_elasticsearch_records(
film_works_info=film_works_info,
film_works_genres=film_works_genres,
film_works_persons=film_works_persons,
)
logger.info(
"Going to insert {} documents into Elasticsearch",
len(film_works_elasticsearch_records),
)

self.loader.load_film_works_records(film_works_elasticsearch_records)
logger.info(
"Inserted {} documents into Elasticsearch",
len(film_works_elasticsearch_records),
)
Loading

0 comments on commit 8f82b8c

Please sign in to comment.