Skip to content

Commit

Permalink
Create rule S7194: PySpark broadcasting should be used when joining a…
Browse files Browse the repository at this point in the history
… small DataFrame to a larger DataFrame.
  • Loading branch information
joke1196 committed Jan 30, 2025
1 parent f9500f5 commit 6916af0
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 24 deletions.
10 changes: 5 additions & 5 deletions rules/S7194/python/metadata.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
{
"title": "FIXME",
"title": "PySpark broadcasting should be used when joining a small DataFrame to a larger DataFrame",
"type": "CODE_SMELL",
"status": "ready",
"remediation": {
"func": "Constant\/Issue",
"constantCost": "5min"
},
"tags": [
"data-science",
"pyspark"
],
"defaultSeverity": "Major",
"ruleSpecification": "RSPEC-7194",
Expand All @@ -16,10 +18,8 @@
"quickfix": "unknown",
"code": {
"impacts": {
"MAINTAINABILITY": "HIGH",
"RELIABILITY": "MEDIUM",
"SECURITY": "LOW"
"RELIABILITY": "LOW"
},
"attribute": "CONVENTIONAL"
"attribute": "EFFICIENT"
}
}
64 changes: 45 additions & 19 deletions rules/S7194/python/rule.adoc
Original file line number Diff line number Diff line change
@@ -1,44 +1,70 @@
FIXME: add a description

// If you want to factorize the description uncomment the following line and create the file.
//include::../description.adoc[]
This rule raises an issue when a small DataFrame is joined to another DataFrame without the use of the broadcast operation.

== Why is this an issue?

FIXME: remove the unused optional headers (that are commented out)
In PySpark, shuffling refers to the process of transferring data between worker nodes within a cluster.
This operation, while necessary for tasks such as join and aggregation on DataFrames, can be resource-intensive.
Although Spark handles shuffling automatically, there are strategies to minimize it, thereby enhancing the performance of these operations.

//=== What is the potential impact?
When performing join operations with multiple DataFrames in PySpark, it is crucial to consider the size of the DataFrames involved.
If a small DataFrame is being joined to a larger one, utilizing the `broadcast` function to distribute the small DataFrame across all worker nodes can be beneficial.
This approach significantly reduces the volume of data shuffled between nodes, thereby improving the efficiency of the join operation.

== How to fix it
//== How to fix it in FRAMEWORK NAME

To fix this issue, use the `broadcast` function on the small DataFrame before performing the join operation.

=== Code examples

==== Noncompliant code example

[source,python,diff-id=1,diff-type=noncompliant]
----
FIXME
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('myspark').getOrCreate()
data = [
(1, "Alice"),
(2, "Bob"),
(2, "Charlie"),
(1, "Dan"),
(2, "Elsa")
]
large_df = spark.createDataFrame(data, ["department_id", "name"])
small_df = spark.createDataFrame([(1, 'HR'), (2, 'Finance')], ["department_id", "department"])
joined_df = large_df.join(small_df, on="department_id", how="left") # NonCompliant: the small DataFrame is not broadcasted
----

==== Compliant solution

[source,python,diff-id=1,diff-type=compliant]
----
FIXME
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
spark = SparkSession.builder.appName('myspark').getOrCreate()
data = [
(1, "Alice"),
(2, "Bob"),
(2, "Charlie"),
(1, "Dan"),
(2, "Elsa")
]
large_df = spark.createDataFrame(data, ["department_id", "name"])
small_df = spark.createDataFrame([(1, 'HR'), (2, 'Finance')], ["department_id", "department"])
joined_df = large_df.join(broadcast(small_df), on="department_id", how="left") # Compliant
----

//=== How does this work?
== Resources
=== Documentation

//=== Pitfalls
* PySpark Documentation - https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.broadcast.html[pyspark.sql.functions.broadcast]

//=== Going the extra mile
=== Articles & blog posts

* Medium Article - https://aspinfo.medium.com/what-is-broadcast-join-how-to-perform-broadcast-in-pyspark-699aef2eff5a[What is broadcast join, how to perform broadcast in pyspark?]

//== Resources
//=== Documentation
//=== Articles & blog posts
//=== Conference presentations
//=== Standards
//=== External coding guidelines
//=== Benchmarks

0 comments on commit 6916af0

Please sign in to comment.