-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsimple_spark_job.py
55 lines (42 loc) · 1.74 KB
/
simple_spark_job.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum
import random
import time
# Start measuring time
start_time = time.time()
# Configuration de Spark
spark = SparkSession.builder \
.appName("SimpleSparkJob") \
.master("local[2]") \
.getOrCreate()
# Création d'un DataFrame à partir d'une liste de tuples
data = [(random.randint(0, 10), random.randint(10, 10000)) for _ in range(1000000)]
df = spark.createDataFrame(data, schema=["key", "value"])
# Repartition the DataFrame into 4 partitions
df = df.repartition(4)
# Check the number of partitions
print(f"Number of partitions: {df.rdd.getNumPartitions()}")
# Partition sizes
partition_sizes = df.rdd.glom().map(len).collect()
print(f"Size of each partition: {partition_sizes}")
# Transformation étroite : Filtrer les paires avec des clés paires
filtered_df = df.filter(col("key") % 2 == 0)
# Transformation large : Réduire par clé (nécessite un shuffle)
# Ici, nous utilisons groupBy et agg pour effectuer une agrégation
reduced_df = filtered_df.groupBy("key").agg(sum("value").alias("total_value"))
# Action : Collecter les résultats
result = reduced_df.collect()
# Afficher les résultats
print("Résultats :")
for row in result:
print(f"Clé : {row['key']}, Valeur totale : {row['total_value']}")
# End measuring time
end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {execution_time} seconds")
# Affichage du lien vers l'interface web de Spark pour visualiser le DAG
print("Ouvrez votre navigateur et accédez à http://localhost:4040 pour visualiser le DAG.")
# Gardez la session Spark active pour explorer l'interface web
input("Appuyez sur Entrée pour arrêter la session Spark...")
# Arrêt de la session Spark
spark.stop()