-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathspark_utils.py
36 lines (27 loc) · 1.1 KB
/
spark_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from pyspark.sql import DataFrame
from pyspark.rdd import RDD
from loguru import logger as Logger
def get_partition_info(df: DataFrame, logger: Logger) -> None:
"""
Get partition info to easily determine optimal partition count for repartition/coalesce
:param df:
:param logger:
:return: None. Prints the results to the console
"""
import statistics
def process_marker(logger: Logger, msg: str) -> None:
logger.info("\n" + "==" * 15 + msg + "==" * 15 + "\n")
def get_partition_len(iterator):
yield sum(1 for _ in iterator)
rdd: RDD = df.rdd
count = rdd.getNumPartitions()
lengths = rdd.mapPartitions(get_partition_len, True).collect()
logger.info(f"{count} partition(s) total.")
process_marker(logger, message="PARTITION SIZE STATS")
logger.info(f"\tmin: {min(lengths)}")
logger.info(f"\tmax: {max(lengths)}")
logger.info(f"\tavg: {sum(lengths) / len(lengths)}")
logger.info(f"\tstddev: {statistics.stdev(lengths)}")
logger.info("\tdetailed info")
for i, pl in enumerate(lengths):
logger.info(f"{i}. {pl}")