Skip to content

Data ingestion API project using Python, Docker and PostgreSQL

License

Notifications You must be signed in to change notification settings

iuryrosal/automated-ingestion-data

Repository files navigation

automated-ingestion-data

Your task is to build an automatic process to ingest data on an on-demand basis. The data represents trips taken by different vehicles, and include a city, a point of origin and a destination.

Mandatory Features

● There must be an automated process to ingest and store the data.

● Trips with similar origin, destination, and time of day should be grouped together.

● Develop a way to obtain the weekly average number of trips for an area, defined by a bounding box (given by coordinates) or by a region.

● Develop a way to inform the user about the status of the data ingestion without using a polling solution.

● The solution should be scalable to 100 million entries. It is encouraged to simplify the data by a data model. Please add proof that the solution is scalable.

● Use a SQL database.

Project Organization


├── LICENSE
├── README.md          <- The top-level README for developers using this project.
├── data
│   ├── stress_test    <- The big csvs generated by stress_local/stress_test.
│   └── raw            <- The original, immutable data dump.
│
├── requirements.txt   <- The requirements file for reproducing the analysis environment, e.g.
│                         generated with `pip freeze > requirements.txt`
│
├── setup.py           <- makes project pip installable (pip install -e .) so src can be imported
│
├── api_main.py        <- Main source code for running the api
│
├── stress_local       <- Scripts for generating csv for stress testing
│
├── Dockerfile         <- Create python application image
│
├── docker-compose.yml <- Run application in docker
│
├── api.log            <- Log generated during execution
│
│
├── src                <- Source code for use in this project.
│   │
│   ├── data           <- Scripts to download or generate data
│   │   
│   │
│   ├── api_routes     <- Scripts that define API routes
│   │   
│   ├── conf           <- Scripts that help in setting up the environment
│   │
│   ├── models         <- Scripts with Database schema that allows connection to the api
│   │
│   └── utils  <- Scripts that help in building the api
│       
│
└── tests              <- Test's script for code source

API Contract

List of Trips (GET)

Get all vehicles records in dataset.

/vehicles

query_params avaliables in this route:

  • region : Get vehicles records in specific region
  • origin_coord_x : Get vehicles records in specific coordinate origin x
  • origin_coord_y : Get vehicles records in specific coordinate origin y
  • destination_coord_x : Get vehicles records in specific coordinate destination x
  • destination_coord_y : Get vehicles records in specific coordinate destination y
  • datasource : Get vehicles records filtering by datasource variable
  • date : Get vehicles records filtering by datetime column (considering only date statement)
  • limit: Limit of records ​​that will be returned (Default: 10)

Example:

/vehicles?region=Turin

Sample Output:

{
  "data": [
    {
        "datasource": "baba_car",
        "datetime": "Mon, 21 May 2018 02:54:04 GMT",
        "destination_coord_point_x": "7.72036863753512",
        "destination_coord_point_y": "45.0678238539384",
        "id": 1,
        "origin_coord_point_x": "7.67283791328688",
        "origin_coord_point_y": "44.995710924205",
        "region": "Turin"
    },
    {
        "datasource": "bad_diesel_vehicles",
        "datetime": "Sun, 06 May 2018 09:49:16 GMT",
        "destination_coord_point_x": "7.7452865344197",
        "destination_coord_point_y": "45.0262859834150",
        "id": 3,
        "origin_coord_point_x": "7.54150918911443",
        "origin_coord_point_y": "45.0916050382774",
        "region": "Turin"
    }
  ]
}

Specific Vehicle Record (GET)

Get specific vehicle record by specific (id).

/vehicles/(id)

Example:

/vehicles/1

Sample Output:

{
    "datasource": "baba_car",
    "datetime": "Mon, 21 May 2018 02:54:04 GMT",
    "destination_coord_point_x": "7.72036863753512",
    "destination_coord_point_y": "45.0678238539384",
    "id": 1,
    "origin_coord_point_x": "7.67283791328688",
    "origin_coord_point_y": "44 995710924205",
    "region": "Turin"
}

Count by (GET)

Get count of elements by specif (column) in dataset.

/vehicles/(column)/count

Example:

/vehicles/region/count

Obs: This operation is valid only for columns datasource and region.

Sample Output:

{
 "data": [
    {
        "count": 28,
        "region": "Hamburg"
    },
    {
        "count": 34,
        "region": "Prague"
    },
    {
        "count": 38,
        "region": "Turin"
    }
  ]
}

Weekly Trips (GET)

Get count of elements per week (using datetime column) and region.

/vehicles/weekly_trips/region

Sample Output:

{
  "data": [
    {
        "freq_trip": 5,
        "region": "Hamburg",
        "week_number": 18
    },
    {
        "freq_trip": 10,
        "region": "Prague",
        "week_number": 18
    },
    {
        "freq_trip": 8,
        "region": "Turin",
        "week_number": 18
    }
  ]
}

Weekly Average Trips (GET)

Get average of elements per week (using datetime column) and region.

/vehicles/weekly_avg_trips/region

Sample Output:

{
  "data": [
    {
        "freq_avg_weekly_trips": "7.6000000000000000",
        "region": "Turin"
    },
    {
        "freq_avg_weekly_trips": "5.6000000000000000",
        "region": "Hamburg"
    },
    {
        "freq_avg_weekly_trips": "6.8000000000000000",
        "region": "Prague"
    }
  ]
}

Status of Process' Data Ingestion (GET)

Get status of process' data ingestion.

/vehicles/ingest-data/status

Sample Output (1) -> Nothing is being processed:

{
    "review": "Nothing is being processed"
}

Sample Output (2) -> Something in progress... :

{
  "details": [
    {
        "file": "data/stress_test\\trips_big_dataset.csv",
        "lead_time": "0:01:22.754150",
        "num_rows": 102400,
        "result": "Successful Data Ingestion"
    }
  ],
  "review": "data/stress_test\\trips_big_dataset_1.csv: In progress..",
  "start_time": "Sat, 03 Sep 2022 20:37:37 GMT"
}

Sample Output (3) -> Process Finished:

{
  "details": [
    {
        "file": "data/raw\\trips copy.csv",
        "lead_time": "0:00:00.172140",
        "num_rows": 100,
        "result": "Successful Data Ingestion"
    },
    {
        "file": "data/raw\\trips.csv",
        "lead_time": "0:00:00.141916",
        "num_rows": 100,
        "result": "Successful Data Ingestion"
    }
  ],
  "end_time": "09/03/2022, 20:45:15",
  "lead_time": "0:00:00.315056",
  "review": "Process Finished",
  "start_time": "09/03/2022, 20:45:14"
}

Process' Data Ingestion (POST)

Start ingestion process with CSVs file in data/raw. This process will append new data in dataset. You can check status of process making request with Get Method to endpoint /vehicles/ingest-data/status.

/vehicles

Output:

Ingestion Started

Local Execution

  1. Clone the Git repository.
  2. Put your csv files in data/raw directory.
  3. Boot one terminal in root folder project.
  4. Run docker-compose up in terminal.
  5. Now, only make requests on the url base http://localhost:5000.

Scalability

The app have some mechanisms that helps in scalability of solution.

  • In POST route that starts ingestion process, the program put this task in background using timeout. This avoid the risk of timeout response in this route.
  • In ingestion process, the program use batch processing with threading and chunk ramification. This can help performance in situation you have a big csv file.
  • In case of 100 millions of lines entry, I didn't can test in my local machine, but I believe the mechanisms commented before can help in this situation. In this case, its more recommend that the app hosted in virtual machine with more memory enabled, like AWS EC2 in memory optimized instances.

Local Stress Test

Using script tests/stress_test.py I builded 10 CSVs with 1 million records. I changed the directory of ingestion process to data/stress_test directory and I did the ingestion data by API in local execution.

The sample of result of this process is detailed below:

{
  "details": [
    {
        "file": "data/stress_test\\trips_big_dataset.csv",
        "lead_time": "0:01:27.034481",
        "num_rows": 102400,
        "result": "Successful Data Ingestion"
    },
    {
        "file": "data/stress_test\\trips_big_dataset_1.csv",
        "lead_time": "0:06:51.535297",
        "num_rows": 1024000,
        "result": "Successful Data Ingestion"
    },
    {
        "file": "data/stress_test\\trips_big_dataset_10.csv",
        "lead_time": "0:06:48.626142",
        "num_rows": 1024000,
        "result": "Successful Data Ingestion"
    },
    {
        "file": "data/stress_test\\trips_big_dataset_2.csv",
        "lead_time": "0:06:50.248607",
        "num_rows": 1024000,
        "result": "Successful Data Ingestion"
    }
  ],
  "review": "data/stress_test\\trips_big_dataset_3.csv: In progress..",
  "start_time": "Sun, 04 Sep 2022 11:28:16 GMT"
}

Go up in the cloud (AWS)

To upload this application to the cloud, I believe the application would be hosted on a memory-optimized AWS EC2 instance or AWS ECS. The database can be allocated in RDS and CSVs can be allocated to be read from an S3 bucket. However, this would require some adaptations to the source code.

About tests

I didn't have time to do tests with 100% coverage, but I took the liberty of writing some to show off some of my knowledge and concern for having testable code.

You can run the following command to test this: pytest .\tests\ -vv

Flake8

Codes are standardized within flake8 rules. You can run the following command to test this: flake8

Git Commit Convention

The commits carried out in this project seek to follow this Git Commit pattern to facilitate maintenance and knowledge management:

🔰 type (scope): subject

Symbols (🔰)

https://gitmoji.dev

Types

  • test: Indicates any type of creation or alteration of test codes
  • feat: Indicates the development of a new feature to the project
  • refactor: Refactoring that does not impact business logic/rules
  • style: Used when there are formatting and code style changes that do not change the system in any way.
  • fix: bug fix.
  • chore: Changes that do not affect source code or test files
  • docs: Change regarding files, directories and documentation
  • ci: CI configuration changes
  • local: Changes to the project's local run configuration

Logging

Log file: api.log Log format:

%(asctime)s :: %(levelname)s :: %(filename)s :: %(lineno)d :: %(message)s

  • asctime: Human-readable time when the LogRecord was created. By default this is of the form ‘2003-07-08 16:49:45,896’ (the numbers after the comma are millisecond portion of the time).
  • levelname: Text logging level for the message ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL').
  • filename: Filename portion of pathname.
  • lineno: Source line number where the logging call was issued (if available).
  • message: The logged message, computed as msg % args.

Project based on the cookiecutter data science project template. #cookiecutterdatascience

About

Data ingestion API project using Python, Docker and PostgreSQL

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published