-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
89 lines (63 loc) · 3.52 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType
from spark import SparkSessionBuilder
from spark import SchemaBuilder
from util import AnalysisOutputSaver
from data_preparation import DataLoader
from data_preparation import DataCleaner
from data_preparation import DataAnalysisExplorer
from data_analysis import AccidentGeoAnalysis
from data_analysis import AccidentAnalysis
from graph_analysis import GraphAnalysis
from graph_analysis import AccidentPeopleGraph
from graph_analysis import AccidentCrossroadsGraph
from graph_analysis import AccidentRoadsGraph
from graph_analysis import AccidentTypeVehicleGraph
def main():
# Crea una sessione di Spark
sparkSession = SparkSessionBuilder().get_or_create()
# Costruisce lo schema del dataset.
schema = SchemaBuilder().build_schema()
# Carica i dati utilizzando lo schema e crea un DataFrame Spark.
dataset_df = DataLoader(sparkSession, schema).load_data()
# Pulisce il dataset.
cleaned_dataset_df = DataCleaner().clean_data(dataset_df)
# Memorizza il DataFrame in memoria.
cleaned_dataset_df = cleaned_dataset_df.cache()
# Esplora i dati puliti per ottenere informazioni preliminari e statistiche descrittive.
DataAnalysisExplorer().explore_data(cleaned_dataset_df)
# Esegue l'analisi del dataset pulito e restituisce i risultati.
results = AccidentAnalysis().analyze(cleaned_dataset_df)
# Stampa i risultati dell'analisi.
for key, result in results.items():
print(f" ----------------- {key} -----------------")
result.show()
# Salva i plot basati sui risultati dell'analisi.
AnalysisOutputSaver().save_plots(results)
# Crea e salva le mappe geografiche degli incidenti utilizzando il dataset pulito.
AccidentGeoAnalysis().plot_maps(cleaned_dataset_df)
# Ottiene le combinazioni demografiche da analizzare.
demographic_combinations = AccidentAnalysis().get_demographic_combinations_for_analysis()
# Salva la distribuzione dei dati demografici nel dataset pulito.
AnalysisOutputSaver().save_plot_distribution(cleaned_dataset_df, demographic_combinations)
# Crea il grafo di incroci stradali con incidenti.
accidentCrossroadsGraph, nameAccidentCrossroadsGraph = AccidentCrossroadsGraph(cleaned_dataset_df).graph()
# Crea il grafo di persone coinvolte in incidenti.
accidentPeopleGraph, nameAccidentPeopleGraph = AccidentPeopleGraph(cleaned_dataset_df).graph()
# Crea il grafo che rappresenta i tipi di veicoli e i relativi incidenti.
accidentTypeVehicleGraph, nameAccidentTypeVehicleGraph = AccidentTypeVehicleGraph(cleaned_dataset_df).graph()
# Crea il grafo delle strade con incidenti.
accidentRoadsGraph, nameAccidentRoadsGraph = AccidentRoadsGraph(cleaned_dataset_df).graph()
# Analizza il grafo di incidenti avvenuti agli incroci stradali.
GraphAnalysis.analyze(accidentCrossroadsGraph, nameAccidentCrossroadsGraph)
# Analizza il grafo di persone coinvolte in incidenti.
GraphAnalysis.analyze(accidentPeopleGraph, nameAccidentPeopleGraph, sampleForAdvancedAnalysis=True)
# Analizza il grafo che rappresenta i tipi di veicoli e i relativi incidenti.
GraphAnalysis.analyze(accidentTypeVehicleGraph, nameAccidentTypeVehicleGraph, sampleForAdvancedAnalysis=True)
# Analizza il grafo delle strade con incidenti.
GraphAnalysis.analyze(accidentRoadsGraph, nameAccidentRoadsGraph, sampleForAdvancedAnalysis=True)
# Arresta la sessione Spark
sparkSession.stop()
if __name__ == '__main__':
main()