Skip to content

Commit

Permalink
ETL for genres (#16)
Browse files Browse the repository at this point in the history
* Add mapping for genres
* Add ETL for genres
  • Loading branch information
a1d4r authored Mar 16, 2024
1 parent 7e6dbf7 commit eb990b0
Show file tree
Hide file tree
Showing 11 changed files with 162 additions and 16 deletions.
2 changes: 1 addition & 1 deletion etl/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ RUN --mount=type=cache,target=/root/.cache \
# will become mountpoint of our code
WORKDIR /app

CMD ["python", "-m", "etl.example"]
CMD ["python", "-m", "etl.main"]


################################
Expand Down
5 changes: 5 additions & 0 deletions etl/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ VERSION := latest
CODE = etl tests
TESTS = tests

#* Application
.PHONY: up
up:
python -m etl.main

#* Poetry
.PHONY: poetry-download
poetry-download:
Expand Down
10 changes: 4 additions & 6 deletions etl/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ services:
volumes:
- ./infra/postgres/pg_dump.sql:/docker-entrypoint-initdb.d/pg_dump.sql
- postgres-data:/data/postgres
ports:
- "5432:5432"
healthcheck:
test: [ "CMD-SHELL", "pg_isready -U app -d movies_database" ]
interval: 5s
Expand Down Expand Up @@ -63,13 +65,9 @@ services:

init-elasticsearch:
image: curlimages/curl:8.6.0
command: >-
curl -s
-XPUT -H 'Content-Type: application/json'
http://elasticsearch:9200/movies
-d @/data/index.json
command: /bin/sh "/data/create_indexes.sh"
volumes:
- ./infra/elasticsearch/index.json:/data/index.json
- ./infra/elasticsearch/:/data/
depends_on:
elasticsearch:
condition: service_healthy
Expand Down
20 changes: 20 additions & 0 deletions etl/etl/dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ class FilmWorkIdModified(IdModified):
pass


@dataclass
class GenreInfo:
"""Информация о жанре."""

id: UUID
name: str
description: str
created: datetime
modified: datetime


@dataclass
class FilmWorkInfo:
"""Информация о кинопроизведении."""
Expand Down Expand Up @@ -66,6 +77,15 @@ class PersonElasticsearchRecord:
name: str


@dataclass
class GenreElasticsearchRecord:
"""Объект для хранения информации о жанре в индексе Elasticsearch."""

id: UUID
name: str
description: str


@dataclass
class FilmWorkElasticsearchRecord:
"""Объект для хранения информации о кинопроизведении в индексе Elasticsearch."""
Expand Down
22 changes: 22 additions & 0 deletions etl/etl/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,28 @@ def fetch_film_works_with_genres_in_batches(
for batch in iter(lambda: cursor.fetchmany(self._batch_size), []):
yield [dto.FilmWorkIdModified(**row) for row in batch]

@retry
def fetch_genres_info(
self,
genres: list[dto.GenreIdModified],
) -> list[dto.GenreInfo]:
"""Получить информацию о заданных жанрах."""
with self._connection.cursor() as cursor:
cursor.execute(
"""
SELECT
id,
name,
description,
created,
modified
FROM content.genre
WHERE id = ANY(%(genres_ids)s)
""",
({"genres_ids": [genre.id for genre in genres]}),
)
return [dto.GenreInfo(**row) for row in cursor.fetchall()]

@retry
def fetch_film_works_info(
self,
Expand Down
11 changes: 11 additions & 0 deletions etl/etl/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,14 @@ def load_film_works_records(
for record in film_works_records
],
)

@retry
def load_genres_records(self, genres_records: list[dto.GenreElasticsearchRecord]) -> None:
"""Загружает в индекс данные о жанрах."""
helpers.bulk(
self._client,
[
{"_index": "genres", "_id": str(record.id), "_source": dataclasses.asdict(record)}
for record in genres_records
],
)
32 changes: 23 additions & 9 deletions etl/etl/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from loguru import logger

from etl import dto
from etl import dto, transformer
from etl.extractor import PostgresExtractor
from etl.loader import ElasticsearchLoader
from etl.settings import Settings
from etl.state import ETLState, StateManager
from etl.transformer import build_film_works_elasticsearch_records


class ETL:
Expand Down Expand Up @@ -66,12 +65,8 @@ def synchronize_genre_updates(self) -> None:
if not genres:
logger.info("No genres were updated since {}", state.genres_modified_cursor)
return
logger.debug("Genres IDs: {}", genres)
logger.info(
"Retrieved {} genres updated since {}",
len(genres),
state.genres_modified_cursor,
)

self._update_genres_in_elasticsearch(genres=genres)

for film_works in self.extractor.fetch_film_works_with_genres_in_batches(genres=genres):
self._update_film_works_in_elasticsearch(film_works=film_works)
Expand Down Expand Up @@ -106,6 +101,25 @@ def synchronize_film_work_updates(self) -> None:

logger.info("Moved film works cursor to {}", state.film_works_modified_cursor)

def _update_genres_in_elasticsearch(self, genres: list[dto.GenreIdModified]) -> None:
"""Обновляет данные о жанрах в Elasticsearch."""
logger.info("Retrieved {} genres", len(genres))

genres_info = self.extractor.fetch_genres_info(genres=genres)
genres_elasticsearch_records = transformer.build_genres_elasticsearch_records(
genres_info=genres_info,
)

logger.info(
"Going to insert {} documents into Elasticsearch",
len(genres_elasticsearch_records),
)
self.loader.load_genres_records(genres_elasticsearch_records)
logger.info(
"Inserted {} documents into Elasticsearch",
len(genres_elasticsearch_records),
)

def _update_film_works_in_elasticsearch(self, film_works: list[dto.FilmWorkIdModified]) -> None:
"""Обновляет данные о фильмах в Elasticsearch."""
logger.info("Retrieved {} film works with updated persons", len(film_works))
Expand All @@ -116,7 +130,7 @@ def _update_film_works_in_elasticsearch(self, film_works: list[dto.FilmWorkIdMod
film_works_genres = self.extractor.fetch_film_works_genres(film_works=film_works)
film_works_persons = self.extractor.fetch_film_works_persons(film_works=film_works)

film_works_elasticsearch_records = build_film_works_elasticsearch_records(
film_works_elasticsearch_records = transformer.build_film_works_elasticsearch_records(
film_works_info=film_works_info,
film_works_genres=film_works_genres,
film_works_persons=film_works_persons,
Expand Down
14 changes: 14 additions & 0 deletions etl/etl/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,17 @@ def build_film_works_elasticsearch_records(
)
for film_work_info in film_works_info
]


def build_genres_elasticsearch_records(
genres_info: list[dto.GenreInfo],
) -> list[dto.GenreElasticsearchRecord]:
"""Преобразовывает данные о жанрах в формат, пригодный для индекса в Elasticsearch."""
return [
dto.GenreElasticsearchRecord(
id=genre_info.id,
name=genre_info.name,
description=genre_info.description,
)
for genre_info in genres_info
]
5 changes: 5 additions & 0 deletions etl/infra/elasticsearch/create_indexes.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash

echo "Creating indexes"
curl -s -XPUT -H 'Content-Type: application/json' -d @/data/indexes/movies.json http://elasticsearch:9200/movies
curl -s -XPUT -H 'Content-Type: application/json' -d @/data/indexes/genres.json http://elasticsearch:9200/genres
57 changes: 57 additions & 0 deletions etl/infra/elasticsearch/indexes/genres.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
{
"settings": {
"refresh_interval": "1s",
"analysis": {
"filter": {
"english_stop": {
"type": "stop",
"stopwords": "_english_"
},
"english_stemmer": {
"type": "stemmer",
"language": "english"
},
"english_possessive_stemmer": {
"type": "stemmer",
"language": "possessive_english"
},
"russian_stop": {
"type": "stop",
"stopwords": "_russian_"
},
"russian_stemmer": {
"type": "stemmer",
"language": "russian"
}
},
"analyzer": {
"ru_en": {
"tokenizer": "standard",
"filter": [
"lowercase",
"english_stop",
"english_stemmer",
"english_possessive_stemmer",
"russian_stop",
"russian_stemmer"
]
}
}
}
},
"mappings": {
"dynamic": "strict",
"properties": {
"id": {
"type": "keyword"
},
"name": {
"type": "keyword"
},
"description": {
"type": "text",
"analyzer": "ru_en"
}
}
}
}
File renamed without changes.

0 comments on commit eb990b0

Please sign in to comment.