diff --git a/database_adapters/__init__.py b/database_adapters/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/database_adapters/db_adapters.py b/database_adapters/db_adapters.py index d8ed661a..cf343afd 100644 --- a/database_adapters/db_adapters.py +++ b/database_adapters/db_adapters.py @@ -1,68 +1,131 @@ +import logging +import threading from abc import ABCMeta, abstractmethod import pymongo +import pyodbc from bson.objectid import ObjectId from pymongo.errors import ConnectionFailure, PyMongoError +logger = logging.getLogger(__name__) + class InsertionResponse: def __init__(self, ok, exception=None, need_upsert=False): self.ok = ok self.need_upsert = need_upsert - self.error = None if (ok or exception is None) else exception.__class__.__name__ + self.error = ( + None + if (ok or exception is None) + else f"{exception.__class__.__name__}: {str(exception)}" + ) + +class DatabaseReaderInterface(metaclass=ABCMeta): + """ + An abstract base class defining the interface for database read operations. + + This interface is used to adapt various types of databases to a common set of read operations, + enabling the retrieval of data and metadata from the database. + """ -class DatabaseInterface(metaclass=ABCMeta): @abstractmethod def get_connection(self): - pass + """Establishes a connection to the database.""" @abstractmethod - def delete_collection_data(self): - pass + def get_all_dataset_data(self): + """Retrieves all data from a dataset within the database.""" @abstractmethod - def get_all_collection_data(self): - pass + def get_chunked_dataset_data(self): + """ + Retrieves data from a dataset in chunks. + + This method is particularly useful for handling large datasets that should not be loaded into memory all at once. + """ @abstractmethod - def get_chunked_collection_data(self): - pass + def get_paginated_dataset_data(self): + """ + Retrieves data from a dataset in a paginated manner. + + This method is useful for web applications or services where data needs to be displayed in smaller portions. + """ @abstractmethod - def get_paginated_collection_data(self): - pass + def get_estimated_item_count(self): + """ + Provides an estimated count of items in a dataset. + + Note that this might not always be an exact count but should be fast and efficient, suitable for large datasets. + """ @abstractmethod - def get_estimated_document_count(self): - pass + def get_estimated_item_size(self): + """ + Provides an estimated size of each item in the dataset. + + This can be useful for understanding data storage requirements and planning data load operations. + """ @abstractmethod - def get_estimated_document_size(self): - pass + def get_database_size(self): + """ + Retrieves the total size of the database. + + This includes the size of all datasets and any additional overhead the database might have. + """ + + +class DatabaseWriterInterface(metaclass=ABCMeta): + """ + An abstract base class defining the interface for database write operations. + + This interface is used to adapt various types of databases to a common set of write operations, + enabling the modification of data within the database. + """ @abstractmethod - def insert_one_to_unique_collection(self): - pass + def get_connection(self): + """Establishes a connection to the database.""" @abstractmethod - def insert_one_to_collection(self): - pass + def delete_dataset_data(self): + """ + Deletes data from a dataset. + + This method should ensure that data is removed according to specified criteria or conditions. + """ @abstractmethod - def insert_many_to_collection(self): - pass + def insert_one_to_unique_dataset(self): + """ + Inserts a single data item into a unique dataset. + + This method ensures that the data item is added to a dataset belonging to a scheduled job with + the field unique_collection set to True. This means, all jobs save the data to the same dataset, + instead of creating a new dataset for each job. + """ @abstractmethod - def get_database_size(self): - pass + def insert_one_to_dataset(self): + """ + Inserts a single data item into a dataset. + + This method is for adding individual items to a dataset where uniqueness is not enforced. + """ @abstractmethod - def get_collection_size(self): - pass + def insert_many_to_dataset(self): + """ + Inserts multiple data items into a dataset. + + This method is optimized for bulk operations, allowing efficient insertion of large numbers of data items. + """ -class MongoAdapter(DatabaseInterface): +class MongoAdapter(DatabaseWriterInterface, DatabaseReaderInterface): def __init__(self, mongo_connection, mongo_production, mongo_certificate_path): self.mongo_connection = mongo_connection self.mongo_production = mongo_production @@ -85,7 +148,7 @@ def get_connection(self): self.client = client return True - def delete_collection_data(self, database_name, collection_name): + def delete_dataset_data(self, database_name, collection_name): collection = self.client[database_name][collection_name] try: collection.drop() @@ -94,17 +157,17 @@ def delete_collection_data(self, database_name, collection_name): print(ex) return False - def get_collection_data(self, database_name, collection_name, limit=10000): + def get_dataset_data(self, database_name, collection_name, limit=10000): collection = self.client[database_name][collection_name] result = collection.find({}, {"_id": False}).limit(limit) return list(result) - def get_all_collection_data(self, database_name, collection_name): + def get_all_dataset_data(self, database_name, collection_name): collection = self.client[database_name][collection_name] result = collection.find({}, {"_id": False}) return list(result) - def get_chunked_collection_data( + def get_chunked_dataset_data( self, database_name, collection_name, chunk_size, current_chunk=None ): collection = self.client[database_name][collection_name] @@ -131,7 +194,7 @@ def get_jobs_set_stats(self, database_name, jobs_ids): ) return list(result) - def get_paginated_collection_data( + def get_paginated_dataset_data( self, database_name, collection_name, page, page_size ): collection = self.client[database_name][collection_name] @@ -147,16 +210,16 @@ def update_document(self, database_name, collection_name, document_id, new_field result = collection.update_one({"_id": document_id}, {"$set": new_field}) return result.acknowledged - def get_estimated_document_count(self, database_name, collection_name): + def get_estimated_item_count(self, database_name, collection_name): collection = self.client[database_name][collection_name] return collection.estimated_document_count() - def get_estimated_document_size(self, database_name, collection_name): + def get_estimated_item_size(self, database_name, collection_name): database = self.client[database_name] document_size = database.command("collstats", collection_name)["avgObjSize"] return document_size - def insert_one_to_unique_collection(self, database_name, collection_name, item): + def insert_one_to_unique_dataset(self, database_name, collection_name, item): response = None try: self.client[database_name][collection_name].update_one( @@ -168,7 +231,7 @@ def insert_one_to_unique_collection(self, database_name, collection_name, item): finally: return response - def insert_one_to_collection(self, database_name, collection_name, item): + def insert_one_to_dataset(self, database_name, collection_name, item): response = None try: self.client[database_name][collection_name].insert_one(item) @@ -178,7 +241,7 @@ def insert_one_to_collection(self, database_name, collection_name, item): finally: return response - def insert_many_to_collection( + def insert_many_to_dataset( self, database_name, collection_name, items, ordered=False ): response = None @@ -198,17 +261,95 @@ def get_database_size(self, database_name, data_type): total_size_bytes = 0 for collection in collections: if data_type in collection: - total_size_bytes += self.get_collection_size(database_name, collection) + total_size_bytes += self.get_dataset_size(database_name, collection) return total_size_bytes - def get_collection_size(self, database_name, collection_name): + def get_dataset_size(self, database_name, collection_name): database = self.client[database_name] collection_size = database.command("collstats", collection_name)["size"] return collection_size +class SqlServerWriterAdapter(DatabaseWriterInterface): + + def __init__(self, connection_string): + self.connection_string = connection_string + self.local_storage = threading.local() + + def get_connection(self): + if not hasattr(self.local_storage, "connection"): + try: + self.local_storage.connection = pyodbc.connect(self.connection_string) + return True + except Exception as e: + print(f"Error connecting to SQL Server: {e}") + return False + return True + + def _execute_query(self, database_name, query, values=(), execute_many=False): + if not self.get_connection(): + return False, "Connection Error" + + try: + with self.local_storage.connection.cursor() as cursor: + logger.debug("Executing query: %s", query) + if not execute_many: + cursor.execute(f"USE {database_name}") + cursor.execute(query, values) + else: + cursor.execute(f"USE {database_name}") + cursor.executemany(query, values) + self.local_storage.connection.commit() + return True, None + except pyodbc.Error as e: + self.local_storage.connection.rollback() + logger.debug("Error executing query: %s", query) + return False, e + + def insert_one_to_dataset(self, database_name, table_name, item): + # It should 'transform' the item into a valid SQL item. + columns = ", ".join(item.keys()) + placeholders = ", ".join("?" * len(item)) + query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})" + response, ex = self._execute_query( + database_name, query, values=list(item.values()) + ) + return InsertionResponse(response, ex) + + def insert_many_to_dataset(self, database_name, table_name, items): + columns = ", ".join(items[0].keys()) + placeholders = ", ".join("?" * len(items[0])) + logger.debug("items :%s", str(items)) + query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})" + values_to_insert = [tuple(item.values()) for item in items] + logger.debug("values to insert: %s", str(values_to_insert)) + response, ex = self._execute_query( + database_name, query, values_to_insert, execute_many=True + ) + # No upsert needed as execute_many is atomic + return InsertionResponse(response, ex) + + def delete_dataset_data(self, database_name, table_name): + query = f"DELETE FROM {table_name}" + response, ex = self._execute_query(database_name, query) + return InsertionResponse(response, ex) + + def insert_one_to_unique_dataset( + self, database_name, table_name, item + ): # Needs more discussion. + return self.insert_one_to_dataset(database_name, table_name, item) + + def get_database_interface(engine, connection, production, certificate_path): database_interfaces = { "mongodb": MongoAdapter(connection, production, certificate_path), } return database_interfaces[engine] + + +def get_database_writer_interface(engine, connection, production, certificate_path): + database_interfaces = { + "mongodb": MongoAdapter(connection, production, certificate_path), + "sqlserver": SqlServerWriterAdapter(connection), + } + return database_interfaces[engine] diff --git a/database_adapters/tests/__init__.py b/database_adapters/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/database_adapters/tests/test_db_adapters.py b/database_adapters/tests/test_db_adapters.py new file mode 100644 index 00000000..29bb78a1 --- /dev/null +++ b/database_adapters/tests/test_db_adapters.py @@ -0,0 +1,95 @@ +from unittest.mock import Mock, patch + +import pyodbc +import pytest + +from database_adapters.db_adapters import InsertionResponse, SqlServerWriterAdapter + + +class TestSqlServerWriterAdapter: + @pytest.fixture + def mock_pyodbc_connect(self): + with patch("database_adapters.db_adapters.pyodbc.connect") as mock_connect: + mock_connect.return_value.cursor.return_value.__enter__.return_value.execute = ( + Mock() + ) + mock_connect.return_value.cursor.return_value.__enter__.return_value.executemany = ( + Mock() + ) + yield mock_connect + + @pytest.fixture + def writer_adapter(self, mock_pyodbc_connect): + return SqlServerWriterAdapter("dummy_connection_string") + + def test_get_connection_success(self, writer_adapter): + assert writer_adapter.get_connection() == True + + def test_get_connection_failure(self, mock_pyodbc_connect, writer_adapter): + mock_pyodbc_connect.side_effect = Exception("Connection Error") + assert writer_adapter.get_connection() == False + + def test_execute_query_success(self, writer_adapter): + connection = writer_adapter.get_connection() + assert writer_adapter._execute_query("test_db", "SELECT * FROM test_table") == ( + True, + None, + ) + + def test_execute_query_failure(self, mock_pyodbc_connect, writer_adapter): + connection = writer_adapter.get_connection() + mock_pyodbc_connect.return_value.cursor.side_effect = pyodbc.DatabaseError( + "Error" + ) + + # Ejecutar la consulta y capturar el resultado + result, error = writer_adapter._execute_query( + "test_db", "SELECT * FROM test_table" + ) + assert result == False + assert isinstance(error, pyodbc.DatabaseError) + assert str(error) == "Error" + + def test_execute_query_with_execute_many(self, writer_adapter): + connection = writer_adapter.get_connection() + assert ( + writer_adapter._execute_query( + "test_db", + "INSERT INTO test_table VALUES (?)", + values=[(1,), (2,)], + execute_many=True, + ) + == (True, None) + ) + + def test_insert_one_to_dataset(self, writer_adapter): + item = {"col1": "val1", "col2": "val2"} + connection = writer_adapter.get_connection() + response = writer_adapter.insert_one_to_dataset("test_db", "test_table", item) + assert isinstance(response, InsertionResponse) + assert response.ok == True + + def test_insert_many_to_dataset(self, writer_adapter): + items = [{"col1": "val1", "col2": "val2"}, {"col1": "val3", "col2": "val4"}] + connection = writer_adapter.get_connection() + response = writer_adapter.insert_many_to_dataset("test_db", "test_table", items) + assert isinstance(response, InsertionResponse) + assert response.ok == True + assert not response.error + assert not response.need_upsert + + def test_delete_dataset_data(self, writer_adapter): + connection = writer_adapter.get_connection() + response = writer_adapter.delete_dataset_data("test_db", "test_table") + assert response.ok == True + assert not response.error + assert not response.need_upsert + + def test_insert_one_to_unique_dataset(self, writer_adapter): + connection = writer_adapter.get_connection() + item = {"col1": "val1", "col2": "val2"} + response = writer_adapter.insert_one_to_unique_dataset( + "test_db", "test_table", item + ) + assert isinstance(response, InsertionResponse) + assert response.ok == True diff --git a/development-helps/Makefile b/development-helps/Makefile index 263f1da5..8b470393 100644 --- a/development-helps/Makefile +++ b/development-helps/Makefile @@ -8,6 +8,7 @@ REGISTRY_HOST = localhost:5001 API_POD = $$(kubectl get pod -l app=estela-django-api -o jsonpath="{.items[0].metadata.name}") API_DOC = docs/api.yaml PLATFORM ?= linux/$(shell uname -m) +APPLE_SILICON ?= $$(uname -m | grep -q 'arm64' && echo "true" || echo "false") .PHONY: start start: @@ -22,15 +23,15 @@ stop: .PHONY: update-api-image update-api-image: -cd $(API_DIR) && \ - docker build .. --file docker-conf/Dockerfile-django-api --tag $(REGISTRY_HOST)/estela-django-api:latest --platform $(PLATFORM) + docker build .. --build-arg APPLE_SILICON=${APPLE_SILICON} --file docker-conf/Dockerfile-django-api --tag $(REGISTRY_HOST)/estela-django-api:latest --platform $(PLATFORM) -docker push $(REGISTRY_HOST)/estela-django-api:latest .PHONY: update-celery-image update-celery-image: -cd $(API_DIR) && \ - docker build .. --file docker-conf/Dockerfile-celery-beat --tag $(REGISTRY_HOST)/estela-celery-beat:latest --platform $(PLATFORM) && \ - docker build .. --file docker-conf/Dockerfile-celery-worker --tag $(REGISTRY_HOST)/estela-celery-worker:latest --platform $(PLATFORM) + docker build .. --build-arg APPLE_SILICON=${APPLE_SILICON} --file docker-conf/Dockerfile-celery-beat --tag $(REGISTRY_HOST)/estela-celery-beat:latest --platform $(PLATFORM) && \ + docker build .. --build-arg APPLE_SILICON=${APPLE_SILICON} --file docker-conf/Dockerfile-celery-worker --tag $(REGISTRY_HOST)/estela-celery-worker:latest --platform $(PLATFORM) -docker push $(REGISTRY_HOST)/estela-celery-beat:latest -docker push $(REGISTRY_HOST)/estela-celery-worker:latest @@ -38,21 +39,21 @@ update-celery-image: .PHONY: update-redis-image update-redis-image: -cd $(API_DIR) && \ - docker build . --file docker-conf/Dockerfile-redis --tag $(REGISTRY_HOST)/estela-redis:latest --platform $(PLATFORM) + docker build . --build-arg APPLE_SILICON=${APPLE_SILICON} --file docker-conf/Dockerfile-redis --tag $(REGISTRY_HOST)/estela-redis:latest --platform $(PLATFORM) -docker push $(REGISTRY_HOST)/estela-redis:latest .PHONY: update-build-project-image update-build-project-image: -cd $(API_DIR) && \ - docker build .. --file docker-conf/Dockerfile-build-project --tag $(REGISTRY_HOST)/estela-build-project:latest --platform $(PLATFORM) + docker build .. --build-arg APPLE_SILICON=${APPLE_SILICON} --file docker-conf/Dockerfile-build-project --tag $(REGISTRY_HOST)/estela-build-project:latest --platform $(PLATFORM) -docker push $(REGISTRY_HOST)/estela-build-project:latest .PHONY: update-consumer-image update-consumer-image: -cd $(QUEUING_DIR) && \ - docker build .. --file Dockerfile --tag $(REGISTRY_HOST)/estela-consumer:latest --platform $(PLATFORM) + docker build .. --build-arg APPLE_SILICON=${APPLE_SILICON} --file Dockerfile --tag $(REGISTRY_HOST)/estela-consumer:latest --platform $(PLATFORM) -docker push $(REGISTRY_HOST)/estela-consumer:latest diff --git a/estela-api/api/utils.py b/estela-api/api/utils.py index 28e70049..802082d2 100644 --- a/estela-api/api/utils.py +++ b/estela-api/api/utils.py @@ -62,7 +62,7 @@ def update_stats_from_redis(job, save_to_database=False): job_collection_name = "{}-{}-job_stats".format(job.spider.sid, job.jid) job_stats["_id"] = job_collection_name - spiderdata_db_client.insert_one_to_collection( + spiderdata_db_client.insert_one_to_dataset( str(job.spider.project.pid), "job_stats", job_stats ) diff --git a/estela-api/api/views/job_data.py b/estela-api/api/views/job_data.py index b4681133..ebb71569 100644 --- a/estela-api/api/views/job_data.py +++ b/estela-api/api/views/job_data.py @@ -100,7 +100,7 @@ def list(self, request, *args, **kwargs): job = SpiderJob.objects.filter(jid=kwargs["jid"]).get() job_collection_name = self.get_collection_name(job, data_type) - count = spiderdata_db_client.get_estimated_document_count( + count = spiderdata_db_client.get_estimated_item_count( kwargs["pid"], job_collection_name ) @@ -119,12 +119,12 @@ def list(self, request, *args, **kwargs): chunk_size = max( 1, settings.MAX_CLI_DOWNLOAD_CHUNK_SIZE - // spiderdata_db_client.get_estimated_document_size( + // spiderdata_db_client.get_estimated_item_size( kwargs["pid"], job_collection_name ), ) current_chunk = request.query_params.get("current_chunk", None) - result, next_chunk = spiderdata_db_client.get_chunked_collection_data( + result, next_chunk = spiderdata_db_client.get_chunked_dataset_data( kwargs["pid"], job_collection_name, chunk_size, current_chunk ) response = {"count": count, "results": result} @@ -132,7 +132,7 @@ def list(self, request, *args, **kwargs): response["next_chunk"] = next_chunk return Response(response) else: - result = spiderdata_db_client.get_paginated_collection_data( + result = spiderdata_db_client.get_paginated_dataset_data( kwargs["pid"], job_collection_name, page, page_size ) @@ -207,11 +207,11 @@ def download(self, request, *args, **kwargs): docs_limit = max( 1, settings.MAX_WEB_DOWNLOAD_SIZE - // spiderdata_db_client.get_estimated_document_size( + // spiderdata_db_client.get_estimated_item_size( kwargs["pid"], job_collection_name ), ) - data = spiderdata_db_client.get_collection_data( + data = spiderdata_db_client.get_dataset_data( kwargs["pid"], job_collection_name, docs_limit ) @@ -243,7 +243,7 @@ def delete(self, request, *args, **kwargs): raise DataBaseError({"error": errors.UNABLE_CONNECT_DB}) job_collection_name = self.get_collection_name(job, data_type) - deleted_data = spiderdata_db_client.delete_collection_data( + deleted_data = spiderdata_db_client.delete_dataset_data( kwargs["pid"], job_collection_name ) chain_of_usage_process = get_chain_to_process_usage_data( diff --git a/estela-api/core/tasks.py b/estela-api/core/tasks.py index e479b3cd..3d3577dc 100644 --- a/estela-api/core/tasks.py +++ b/estela-api/core/tasks.py @@ -71,7 +71,7 @@ def delete_data(pid, sid, jid, data_type): else: job_collection_name = "{}-{}-job_{}".format(sid, jid, data_type) - spiderdata_db_client.delete_collection_data(pid, job_collection_name) + spiderdata_db_client.delete_dataset_data(pid, job_collection_name) @celery_app.task(name="core.tasks.launch_job") @@ -226,16 +226,16 @@ def record_project_usage_after_job_event(job_id): unique_collection = True else: items_collection_name = "{}-{}-job_items".format(job.spider.sid, job.jid) - items_data_size = spiderdata_db_client.get_collection_size( + items_data_size = spiderdata_db_client.get_dataset_size( str(project.pid), items_collection_name ) unique_collection = False requests_collection_name = "{}-{}-job_requests".format(job.spider.sid, job.jid) - requests_data_size = spiderdata_db_client.get_collection_size( + requests_data_size = spiderdata_db_client.get_dataset_size( str(project.pid), requests_collection_name ) logs_collection_name = "{}-{}-job_logs".format(job.spider.sid, job.jid) - logs_data_size = spiderdata_db_client.get_collection_size( + logs_data_size = spiderdata_db_client.get_dataset_size( str(project.pid), logs_collection_name ) # Tracking Proxy Usage @@ -309,7 +309,7 @@ def record_job_coverage_event(job_id): pid = job.spider.project.pid sid = job.spider.sid items_collection_name = f"{sid}-{job.jid}-job_items" - items: List[dict] = spiderdata_db_client.get_all_collection_data( + items: List[dict] = spiderdata_db_client.get_all_dataset_data( str(pid), items_collection_name ) total_items = len(items) diff --git a/estela-api/docker-conf/Dockerfile-build-project b/estela-api/docker-conf/Dockerfile-build-project index 3a2f996c..ec8e333b 100644 --- a/estela-api/docker-conf/Dockerfile-build-project +++ b/estela-api/docker-conf/Dockerfile-build-project @@ -2,6 +2,7 @@ FROM python:3.9 WORKDIR /home/estela +ARG APPLE_SILICON=false COPY estela-api/requirements ./requirements ENV PYTHONDONTWRITEBYTECODE 1 @@ -25,6 +26,13 @@ RUN apt-get update RUN apt-get install docker-ce docker-ce-cli containerd.io -y +# If an Apple Silicon chip is used, install pyodbc and unixodbc-dev +RUN if [ "$APPLE_SILICON" = "true" ]; then \ + apt-get update && apt-get install -y unixodbc-dev && \ + pip install --no-binary :all: pyodbc && \ + sed -i '/pyodbc/d' requirements/deploy.txt; \ + fi + RUN pip install -r requirements/deploy.txt RUN if test -f "requirements/externalApps.txt"; then pip install -r requirements/externalApps.txt; fi diff --git a/estela-api/docker-conf/Dockerfile-celery-beat b/estela-api/docker-conf/Dockerfile-celery-beat index dc32547e..35e41755 100644 --- a/estela-api/docker-conf/Dockerfile-celery-beat +++ b/estela-api/docker-conf/Dockerfile-celery-beat @@ -1,5 +1,7 @@ FROM python:3.9 +ARG APPLE_SILICON=false + WORKDIR /home/estela COPY estela-api/requirements ./requirements @@ -7,6 +9,13 @@ COPY estela-api/requirements ./requirements ENV PYTHONDONTWRITEBYTECODE 1 ENV PYTHONUNBUFFERED 1 +# If an Apple Silicon chip is used, install pyodbc and unixodbc-dev +RUN if [ "$APPLE_SILICON" = "true" ]; then \ + apt-get update && apt-get install -y unixodbc-dev && \ + pip install --no-binary :all: pyodbc && \ + sed -i '/pyodbc/d' requirements/test.txt; \ + fi + RUN pip install -r requirements/test.txt RUN if test -f "requirements/externalApps.txt"; then pip install -r requirements/externalApps.txt; fi diff --git a/estela-api/docker-conf/Dockerfile-celery-worker b/estela-api/docker-conf/Dockerfile-celery-worker index dc32547e..4f469fb9 100644 --- a/estela-api/docker-conf/Dockerfile-celery-worker +++ b/estela-api/docker-conf/Dockerfile-celery-worker @@ -1,5 +1,7 @@ FROM python:3.9 +ARG APPLE_SILICON=false + WORKDIR /home/estela COPY estela-api/requirements ./requirements @@ -7,6 +9,13 @@ COPY estela-api/requirements ./requirements ENV PYTHONDONTWRITEBYTECODE 1 ENV PYTHONUNBUFFERED 1 +# If an Apple Silicon chip is used, install pyodbc and unixodbc-dev +RUN if [ "$APPLE_SILICON" = "true" ]; then \ + apt-get update && apt-get install -y unixodbc-dev && \ + pip install --no-binary :all: pyodbc && \ + sed -i '/pyodbc/d' requirements/test.txt; \ + fi + RUN pip install -r requirements/test.txt RUN if test -f "requirements/externalApps.txt"; then pip install -r requirements/externalApps.txt; fi diff --git a/estela-api/docker-conf/Dockerfile-django-api b/estela-api/docker-conf/Dockerfile-django-api index 0530dd86..33dbb9ba 100644 --- a/estela-api/docker-conf/Dockerfile-django-api +++ b/estela-api/docker-conf/Dockerfile-django-api @@ -1,12 +1,20 @@ FROM python:3.9 WORKDIR /home/estela +ARG APPLE_SILICON=false COPY estela-api/requirements ./requirements ENV PYTHONDONTWRITEBYTECODE 1 ENV PYTHONUNBUFFERED 1 +# If an Apple Silicon chip is used, install pyodbc and unixodbc-dev +RUN if [ "$APPLE_SILICON" = "true" ]; then \ + apt-get update && apt-get install -y unixodbc-dev && \ + pip install --no-binary :all: pyodbc && \ + sed -i '/pyodbc/d' requirements/test.txt; \ + fi + RUN pip install -r requirements/test.txt RUN if test -f "requirements/externalApps.txt"; then pip install -r requirements/externalApps.txt; fi diff --git a/estela-api/requirements/base.in b/estela-api/requirements/base.in index 30521cb2..ef3830dd 100644 --- a/estela-api/requirements/base.in +++ b/estela-api/requirements/base.in @@ -16,6 +16,7 @@ google-cloud-storage mysqlclient minio pymongo[srv] +pyodbc redis gunicorn git+https://github.com/bitmakerla/estela-queue-adapter.git diff --git a/estela-api/requirements/base.txt b/estela-api/requirements/base.txt index 1202e2d4..182c15c8 100644 --- a/estela-api/requirements/base.txt +++ b/estela-api/requirements/base.txt @@ -163,6 +163,8 @@ pyasn1-modules==0.2.8 # via google-auth pymongo[srv]==3.12.0 # via -r base.in +pyodbc==5.0.1 + # via -r base.in pyparsing==2.4.7 # via packaging python-crontab==2.5.1 @@ -201,7 +203,7 @@ rsa==4.5 # google-auth ruamel-yaml==0.17.10 # via drf-yasg -ruamel-yaml-clib==0.2.6 +ruamel-yaml-clib==0.2.8 # via ruamel-yaml s3transfer==0.3.6 # via diff --git a/estela-api/requirements/deploy.txt b/estela-api/requirements/deploy.txt index c2b1d52a..efac4e50 100644 --- a/estela-api/requirements/deploy.txt +++ b/estela-api/requirements/deploy.txt @@ -241,6 +241,10 @@ pyasn1-modules==0.2.8 # -r base.txt # google-auth pymongo[srv]==3.12.0 + # via + # -r base.txt + # pymongo +pyodbc==5.0.1 # via -r base.txt pyparsing==2.4.7 # via @@ -296,7 +300,7 @@ ruamel-yaml==0.17.10 # via # -r base.txt # drf-yasg -ruamel-yaml-clib==0.2.6 +ruamel-yaml-clib==0.2.8 # via # -r base.txt # ruamel-yaml diff --git a/estela-api/requirements/test.txt b/estela-api/requirements/test.txt index a2d198b3..4a55859e 100644 --- a/estela-api/requirements/test.txt +++ b/estela-api/requirements/test.txt @@ -1,5 +1,5 @@ # -# This file is autogenerated by pip-compile with Python 3.9 +# This file is autogenerated by pip-compile with Python 3.10 # by the following command: # # pip-compile test.in @@ -250,6 +250,8 @@ pyasn1-modules==0.2.8 # google-auth pymongo[srv]==3.12.0 # via -r base.txt +pyodbc==5.0.1 + # via -r base.txt pyparsing==2.4.7 # via # -r base.txt @@ -307,10 +309,8 @@ ruamel-yaml==0.17.10 # via # -r base.txt # drf-yasg -ruamel-yaml-clib==0.2.6 - # via - # -r base.txt - # ruamel-yaml +ruamel-yaml-clib==0.2.8 + # via -r base.txt s3transfer==0.3.6 # via # -r base.txt diff --git a/installation/Makefile b/installation/Makefile index a3b2bbf8..2b6b6813 100644 --- a/installation/Makefile +++ b/installation/Makefile @@ -7,6 +7,7 @@ LOCAL_API_IP = $$(kubectl get services -n $${NAMESPACE} estela-django-api-servic RESOURCES = db registry minio zookeeper kafka SERVICES ?= django-api celery-worker celery-beat redis build-project PLATFORM ?= linux/$(shell uname -m) +APPLE_SILICON ?= $$(uname -m | grep -q 'arm64' && echo "true" || echo "false") .PHONY: resources @@ -18,8 +19,8 @@ resources: -minikube delete -. ./local/.env && minikube start \ --insecure-registry=$${HOST_REGISTRY} \ - --cpus="2" \ - --memory="2500mb" \ + --cpus="4" \ + --memory="3500mb" \ --disk-size="20000mb" -minikube addons enable metrics-server @@ -33,15 +34,16 @@ delete-resources: .PHONY: build-all-images build-all-images: -. ./local/.env && for service in $(SERVICES); do \ - cd $(API_DIR) && docker build .. --file docker-conf/Dockerfile-$$service --tag $${LOCAL_REGISTRY}/estela-$$service:latest; \ + cd $(API_DIR) && docker build .. --build-arg APPLE_SILICON=${APPLE_SILICON} --file docker-conf/Dockerfile-$$service --tag $${LOCAL_REGISTRY}/estela-$$service:latest; \ done - -. ./local/.env && cd $(QUEUING_DIR) && docker build .. --file Dockerfile --tag $${LOCAL_REGISTRY}/estela-consumer:latest + -. ./local/.env && cd $(QUEUING_DIR) && docker build .. --build-arg APPLE_SILICON=${APPLE_SILICON} --file Dockerfile --tag $${LOCAL_REGISTRY}/estela-consumer:latest .PHONY: upload-all-images upload-all-images: -. ./local/.env && for image in $(SERVICES); do \ docker push $${LOCAL_REGISTRY}/estela-$$image:latest; \ done + -. ./local/.env && docker push $${LOCAL_REGISTRY}/estela-consumer:latest; .PHONY: images images: build-all-images upload-all-images diff --git a/queueing/Dockerfile b/queueing/Dockerfile index f384065d..72142360 100644 --- a/queueing/Dockerfile +++ b/queueing/Dockerfile @@ -1,11 +1,17 @@ -FROM python:3.6 +FROM python:3.9 +ARG APPLE_SILICON=false ENV PYTHONDONTWRITEBYTECODE 1 ENV PYTHONUNBUFFERED 1 WORKDIR /home/estela COPY queueing/requirements requirements +RUN if [ "$APPLE_SILICON" = "true" ]; then \ + apt-get update && apt-get install -y unixodbc-dev && \ + pip install --no-binary :all: pyodbc && \ + sed -i '/pyodbc/d' requirements/consumer.txt; \ + fi RUN pip install -r requirements/consumer.txt COPY queueing/consumer.py . diff --git a/queueing/config/__init__.py b/queueing/config/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/queueing/config/database_manager.py b/queueing/config/database_manager.py index df088ccb..eaad1d0f 100644 --- a/queueing/config/database_manager.py +++ b/queueing/config/database_manager.py @@ -1,6 +1,6 @@ import os -from database_adapters.db_adapters import get_database_interface +from database_adapters.db_adapters import get_database_writer_interface if os.getenv("PRODUCTION") == "False": db_production = False @@ -8,7 +8,7 @@ db_production = True db_certificate_path = os.getenv("DB_CERTIFICATE_PATH") -db_client = get_database_interface( +db_client = get_database_writer_interface( engine=os.getenv("DB_ENGINE"), connection=os.getenv("DB_CONNECTION"), production=db_production, diff --git a/queueing/consumer.py b/queueing/consumer.py index 02b510bd..ce39f726 100644 --- a/queueing/consumer.py +++ b/queueing/consumer.py @@ -1,15 +1,14 @@ +import logging import os import sys -import logging import threading import time - from queue import Queue + from config.database_manager import db_client +from estela_queue_adapter import get_consumer_interface from inserter import Inserter from utils import jsonify -from estela_queue_adapter import get_consumer_interface - WORKER_POOL = int(os.getenv("WORKER_POOL", "10")) HEARTBEAT_TICK = int(os.getenv("HEARTBEAT_TICK", "300")) @@ -19,6 +18,7 @@ item_queue = Queue() inserters = {} heartbeat_lock = threading.Lock() +logger = logging.getLogger(__name__) def read_from_queue(): @@ -58,7 +58,7 @@ def heartbeat(): time.sleep(HEARTBEAT_TICK) with heartbeat_lock: - logging.debug("Heartbeat: A new inspection has started.") + logger.debug("Heartbeat: A new inspection has started.") for worker in workers: worker.join() @@ -71,7 +71,33 @@ def heartbeat(): ): del inserters[identifier] - logging.debug("Heartbeat: {} alive inserters.".format(len(inserters))) + logger.debug("Heartbeat: {} alive inserters.".format(len(inserters))) + + +def split_jid(jid): + return jid.split(".") + + +def get_db_name(item): + if "db_name" in item: + logger.debug("Using custom database name: {}".format(item["db_name"])) + return item["db_name"] + # This should be deprecated. + if "jid" in item: + _, _, project = split_jid(item["jid"]) + logger.debug("Using generated database name: {}".format(project)) + return project + + +def get_dataset_name(item, topic_name): + if "dataset_name" in item: + logger.debug("Using custom dataset_name: {}".format(item["dataset_name"])) + return item["dataset_name"] + job, spider, _ = split_jid(item["jid"]) + logger.debug( + "Using generated dataset_name: {}-{}-{}".format(spider, job, topic_name) + ) + return "{}-{}-{}".format(spider, job, topic_name) def consume_from_queue_platform(topic_name): @@ -90,21 +116,19 @@ def consume_from_queue_platform(topic_name): _heartbeat = threading.Thread(target=heartbeat, daemon=True) _heartbeat.start() - for message in consumer: if heartbeat_lock.locked(): heartbeat_lock.acquire() heartbeat_lock.release() - job, spider, project = message.value["jid"].split(".") - - collection_name = "{}-{}-{}".format(spider, job, topic_name) - identifier = "{}/{}".format(project, collection_name) + db_name = get_db_name(message.value) + dataset_name = get_dataset_name(message.value, topic_name) + identifier = "{}/{}".format(db_name, dataset_name) unique = message.value.get("unique", "") == "True" if inserters.get(identifier) is None: inserters[identifier] = Inserter( - db_client, project, collection_name, unique, topic_name + db_client, db_name, dataset_name, unique, topic_name ) inserters[identifier].add_pending_item() diff --git a/queueing/inserter.py b/queueing/inserter.py index ecf8d40f..668e7102 100644 --- a/queueing/inserter.py +++ b/queueing/inserter.py @@ -1,15 +1,18 @@ -import os import logging +import os import sys import threading import time from estela_queue_adapter import get_producer_interface +logger = logging.getLogger("consumer.inserter") + BATCH_SIZE_THRESHOLD = int(os.getenv("BATCH_SIZE_THRESHOLD", "4096")) INSERT_TIME_THRESHOLD = int(os.getenv("INSERT_TIME_THRESHOLD", "5")) ACTIVITY_TIME_THRESHOLD = int(os.getenv("ACTIVITY_TIME_THRESHOLD", "600")) +MAX_RETRIES = int(os.getenv("MAX_RETRIES", "3")) # In order to avoid infinite loops producer = get_producer_interface() producer.get_connection() @@ -31,40 +34,44 @@ def __init__(self, client, database_name, collection_name, unique, topic): self.__last_insertion = time.time() self.__pending_items_count = 0 - logging.info("New Inserter created for {}.".format(self.identifier)) + logger.info("New Inserter created for {}.".format(self.identifier)) - def is_job_stats(self, collection_name): - return "job_stats" == collection_name.split("-")[2] + def is_job_stats(self): + return "job_stats" == self.topic def __handle_insertion_error(self, response, items): - logging.warning( + logger.warning( "The exception [{}] occurred during the insertion of {} items in {}.".format( response.error, len(items), self.identifier ) ) for item in items: + if item.get("retries", 0) > MAX_RETRIES: + logger.error("Item: %s has reached maximum retries.", item) + continue if item["payload"].get("_id"): del item["payload"]["_id"] if response.need_upsert: item["need_upsert"] = "True" + item["retries"] = item.get("retries", 0) + 1 producer.send(self.topic, item) def __insert_items(self, reason): - if self.is_job_stats(self.collection_name): + if self.is_job_stats(): self.__items[0]["payload"]["_id"] = self.collection_name - response = self.__client.insert_one_to_collection( + response = self.__client.insert_one_to_dataset( self.database_name, "job_stats", self.__items[0]["payload"], ) else: - response = self.__client.insert_many_to_collection( + response = self.__client.insert_many_to_dataset( self.database_name, self.collection_name, [item["payload"] for item in self.__items], ) if response.ok: - logging.info( + logger.info( "{} documents inserted [{}] in {}.".format( len(self.__items), reason, self.identifier ) @@ -86,11 +93,15 @@ def has_pending_items(self): def insert(self, item): if self.unique or item.get("need_upsert"): - response = self.__client.insert_one_to_unique_collection( + response = self.__client.insert_one_to_unique_dataset( self.database_name, self.collection_name, item["payload"] ) if response.ok: - logging.debug("1 document inserted in {}.".format(self.identifier)) + logger.debug( + "1 document inserted in {}. Item: {}".format( + self.identifier, item["payload"] + ) + ) else: self.__handle_insertion_error(response, [item]) else: diff --git a/queueing/requirements/consumer.in b/queueing/requirements/consumer.in index 059598a2..200ee22e 100644 --- a/queueing/requirements/consumer.in +++ b/queueing/requirements/consumer.in @@ -1,4 +1,5 @@ pip-tools pymongo[srv] +pyodbc black git+https://github.com/bitmakerla/estela-queue-adapter.git diff --git a/queueing/requirements/consumer.txt b/queueing/requirements/consumer.txt index f76b00db..63f562a8 100644 --- a/queueing/requirements/consumer.txt +++ b/queueing/requirements/consumer.txt @@ -1,13 +1,13 @@ # -# This file is autogenerated by pip-compile with python 3.9 -# To update, run: +# This file is autogenerated by pip-compile with Python 3.10 +# by the following command: # -# pip-compile requirements/consumer.in +# pip-compile consumer.in # appdirs==1.4.4 # via black black==21.7b0 - # via -r requirements/consumer.in + # via -r consumer.in click==8.0.0 # via # black @@ -15,7 +15,7 @@ click==8.0.0 dnspython==1.16.0 # via pymongo estela-queue-adapter @ git+https://github.com/bitmakerla/estela-queue-adapter.git - # via -r requirements/consumer.in + # via -r consumer.in kafka-python==2.0.2 # via estela-queue-adapter mypy-extensions==0.4.3 @@ -25,9 +25,11 @@ pathspec==0.9.0 pep517==0.10.0 # via pip-tools pip-tools==6.1.0 - # via -r requirements/consumer.in + # via -r consumer.in pymongo[srv]==3.11.4 - # via -r requirements/consumer.in + # via -r consumer.in +pyodbc==5.0.1 + # via -r consumer.in regex==2021.8.3 # via black toml==0.10.2