Your task is to build an automatic process to ingest data on an on-demand basis. The data represents trips taken by different vehicles, and include a city, a point of origin and a destination.
● There must be an automated process to ingest and store the data.
● Trips with similar origin, destination, and time of day should be grouped together.
● Develop a way to obtain the weekly average number of trips for an area, defined by a bounding box (given by coordinates) or by a region.
● Develop a way to inform the user about the status of the data ingestion without using a polling solution.
● The solution should be scalable to 100 million entries. It is encouraged to simplify the data by a data model. Please add proof that the solution is scalable.
● Use a SQL database.
├── LICENSE
├── README.md <- The top-level README for developers using this project.
├── data
│ ├── stress_test <- The big csvs generated by stress_local/stress_test.
│ └── raw <- The original, immutable data dump.
│
├── requirements.txt <- The requirements file for reproducing the analysis environment, e.g.
│ generated with `pip freeze > requirements.txt`
│
├── setup.py <- makes project pip installable (pip install -e .) so src can be imported
│
├── api_main.py <- Main source code for running the api
│
├── stress_local <- Scripts for generating csv for stress testing
│
├── Dockerfile <- Create python application image
│
├── docker-compose.yml <- Run application in docker
│
├── api.log <- Log generated during execution
│
│
├── src <- Source code for use in this project.
│ │
│ ├── data <- Scripts to download or generate data
│ │
│ │
│ ├── api_routes <- Scripts that define API routes
│ │
│ ├── conf <- Scripts that help in setting up the environment
│ │
│ ├── models <- Scripts with Database schema that allows connection to the api
│ │
│ └── utils <- Scripts that help in building the api
│
│
└── tests <- Test's script for code source
Get all vehicles records in dataset.
/vehicles
query_params avaliables in this route:
- region : Get vehicles records in specific region
- origin_coord_x : Get vehicles records in specific coordinate origin x
- origin_coord_y : Get vehicles records in specific coordinate origin y
- destination_coord_x : Get vehicles records in specific coordinate destination x
- destination_coord_y : Get vehicles records in specific coordinate destination y
- datasource : Get vehicles records filtering by datasource variable
- date : Get vehicles records filtering by datetime column (considering only date statement)
- limit: Limit of records that will be returned (Default: 10)
Example:
/vehicles?region=Turin
Sample Output:
{
"data": [
{
"datasource": "baba_car",
"datetime": "Mon, 21 May 2018 02:54:04 GMT",
"destination_coord_point_x": "7.72036863753512",
"destination_coord_point_y": "45.0678238539384",
"id": 1,
"origin_coord_point_x": "7.67283791328688",
"origin_coord_point_y": "44.995710924205",
"region": "Turin"
},
{
"datasource": "bad_diesel_vehicles",
"datetime": "Sun, 06 May 2018 09:49:16 GMT",
"destination_coord_point_x": "7.7452865344197",
"destination_coord_point_y": "45.0262859834150",
"id": 3,
"origin_coord_point_x": "7.54150918911443",
"origin_coord_point_y": "45.0916050382774",
"region": "Turin"
}
]
}
Get specific vehicle record by specific (id).
/vehicles/(id)
Example:
/vehicles/1
Sample Output:
{
"datasource": "baba_car",
"datetime": "Mon, 21 May 2018 02:54:04 GMT",
"destination_coord_point_x": "7.72036863753512",
"destination_coord_point_y": "45.0678238539384",
"id": 1,
"origin_coord_point_x": "7.67283791328688",
"origin_coord_point_y": "44 995710924205",
"region": "Turin"
}
Get count of elements by specif (column) in dataset.
/vehicles/(column)/count
Example:
/vehicles/region/count
Obs: This operation is valid only for columns datasource and region.
Sample Output:
{
"data": [
{
"count": 28,
"region": "Hamburg"
},
{
"count": 34,
"region": "Prague"
},
{
"count": 38,
"region": "Turin"
}
]
}
Get count of elements per week (using datetime column) and region.
/vehicles/weekly_trips/region
Sample Output:
{
"data": [
{
"freq_trip": 5,
"region": "Hamburg",
"week_number": 18
},
{
"freq_trip": 10,
"region": "Prague",
"week_number": 18
},
{
"freq_trip": 8,
"region": "Turin",
"week_number": 18
}
]
}
Get average of elements per week (using datetime column) and region.
/vehicles/weekly_avg_trips/region
Sample Output:
{
"data": [
{
"freq_avg_weekly_trips": "7.6000000000000000",
"region": "Turin"
},
{
"freq_avg_weekly_trips": "5.6000000000000000",
"region": "Hamburg"
},
{
"freq_avg_weekly_trips": "6.8000000000000000",
"region": "Prague"
}
]
}
Get status of process' data ingestion.
/vehicles/ingest-data/status
Sample Output (1) -> Nothing is being processed:
{
"review": "Nothing is being processed"
}
Sample Output (2) -> Something in progress... :
{
"details": [
{
"file": "data/stress_test\\trips_big_dataset.csv",
"lead_time": "0:01:22.754150",
"num_rows": 102400,
"result": "Successful Data Ingestion"
}
],
"review": "data/stress_test\\trips_big_dataset_1.csv: In progress..",
"start_time": "Sat, 03 Sep 2022 20:37:37 GMT"
}
Sample Output (3) -> Process Finished:
{
"details": [
{
"file": "data/raw\\trips copy.csv",
"lead_time": "0:00:00.172140",
"num_rows": 100,
"result": "Successful Data Ingestion"
},
{
"file": "data/raw\\trips.csv",
"lead_time": "0:00:00.141916",
"num_rows": 100,
"result": "Successful Data Ingestion"
}
],
"end_time": "09/03/2022, 20:45:15",
"lead_time": "0:00:00.315056",
"review": "Process Finished",
"start_time": "09/03/2022, 20:45:14"
}
Start ingestion process with CSVs file in data/raw. This process will append new data in dataset. You can check status of process making request with Get Method to endpoint /vehicles/ingest-data/status.
/vehicles
Output:
Ingestion Started
- Clone the Git repository.
- Put your csv files in data/raw directory.
- Boot one terminal in root folder project.
- Run
docker-compose up
in terminal. - Now, only make requests on the url base http://localhost:5000.
The app have some mechanisms that helps in scalability of solution.
- In POST route that starts ingestion process, the program put this task in background using timeout. This avoid the risk of timeout response in this route.
- In ingestion process, the program use batch processing with threading and chunk ramification. This can help performance in situation you have a big csv file.
- In case of 100 millions of lines entry, I didn't can test in my local machine, but I believe the mechanisms commented before can help in this situation. In this case, its more recommend that the app hosted in virtual machine with more memory enabled, like AWS EC2 in memory optimized instances.
Using script tests/stress_test.py I builded 10 CSVs with 1 million records. I changed the directory of ingestion process to data/stress_test directory and I did the ingestion data by API in local execution.
The sample of result of this process is detailed below:
{
"details": [
{
"file": "data/stress_test\\trips_big_dataset.csv",
"lead_time": "0:01:27.034481",
"num_rows": 102400,
"result": "Successful Data Ingestion"
},
{
"file": "data/stress_test\\trips_big_dataset_1.csv",
"lead_time": "0:06:51.535297",
"num_rows": 1024000,
"result": "Successful Data Ingestion"
},
{
"file": "data/stress_test\\trips_big_dataset_10.csv",
"lead_time": "0:06:48.626142",
"num_rows": 1024000,
"result": "Successful Data Ingestion"
},
{
"file": "data/stress_test\\trips_big_dataset_2.csv",
"lead_time": "0:06:50.248607",
"num_rows": 1024000,
"result": "Successful Data Ingestion"
}
],
"review": "data/stress_test\\trips_big_dataset_3.csv: In progress..",
"start_time": "Sun, 04 Sep 2022 11:28:16 GMT"
}
To upload this application to the cloud, I believe the application would be hosted on a memory-optimized AWS EC2 instance or AWS ECS. The database can be allocated in RDS and CSVs can be allocated to be read from an S3 bucket. However, this would require some adaptations to the source code.
I didn't have time to do tests with 100% coverage, but I took the liberty of writing some to show off some of my knowledge and concern for having testable code.
You can run the following command to test this: pytest .\tests\ -vv
Codes are standardized within flake8 rules. You can run the following command to test this: flake8
The commits carried out in this project seek to follow this Git Commit pattern to facilitate maintenance and knowledge management:
🔰 type (scope): subject
- test: Indicates any type of creation or alteration of test codes
- feat: Indicates the development of a new feature to the project
- refactor: Refactoring that does not impact business logic/rules
- style: Used when there are formatting and code style changes that do not change the system in any way.
- fix: bug fix.
- chore: Changes that do not affect source code or test files
- docs: Change regarding files, directories and documentation
- ci: CI configuration changes
- local: Changes to the project's local run configuration
Log file: api.log Log format:
%(asctime)s :: %(levelname)s :: %(filename)s :: %(lineno)d :: %(message)s
- asctime: Human-readable time when the LogRecord was created. By default this is of the form ‘2003-07-08 16:49:45,896’ (the numbers after the comma are millisecond portion of the time).
- levelname: Text logging level for the message ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL').
- filename: Filename portion of pathname.
- lineno: Source line number where the logging call was issued (if available).
- message: The logged message, computed as msg % args.
Project based on the cookiecutter data science project template. #cookiecutterdatascience