Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Christian Herrera committed Aug 5, 2024
1 parent 06bff27 commit 5bdbf44
Show file tree
Hide file tree
Showing 10 changed files with 19 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package com.codely.lesson_02_tests_in_spark.video_02__unit_testing.config

case class CmdArgs(configFile: Option[String] = None)
case class CmdArgs(configFile: Option[String] = None)
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,7 @@ case class AvgSpendingJob(
val data = streamReader.read(context.source.format, context.source.options)

val avgSpendingPerUserDF =
data
.parseJson
.addDateColum
.explodeProducts
.transformForAggregation
.calculateAvgSpending
data.parseJson.addDateColum.explodeProducts.transformForAggregation.calculateAvgSpending

val query = streamWriter.write(avgSpendingPerUserDF, context.sink.mode, context.sink.format, context.sink.options)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import org.apache.spark.sql.DataFrame

case class StreamWriter() {
def write(
df: DataFrame,
mode: String,
format: String,
options: Map[String, String]
): StreamingQuery = {
df: DataFrame,
mode: String,
format: String,
options: Map[String, String]
): StreamingQuery = {
df.writeStream.outputMode(mode).format(format).options(options).start()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import com.codely.lesson_02_tests_in_spark.z_practical_exercise.job.AvgSpendingJ
import com.codely.lesson_02_tests_in_spark.z_practical_exercise.service.{Reader, StreamWriter}
import org.apache.spark.sql.SparkSession


object AvgSpendingApp extends App {

private val context = AppConfig.load(args)
Expand All @@ -16,7 +15,7 @@ object AvgSpendingApp extends App {
.enableHiveSupport()
.getOrCreate()

private val reader = Reader()
private val reader = Reader()
private val deltaWriter = StreamWriter()

val job = AvgSpendingJob(context, reader, deltaWriter)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package com.codely.lesson_02_tests_in_spark.z_practical_exercise.config

case class CmdArgs(configFile: Option[String] = None)
case class CmdArgs(configFile: Option[String] = None)
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import org.apache.spark.sql.DataFrame

case class StreamWriter() {
def write(
df: DataFrame,
mode: String,
format: String,
options: Map[String, String]
): StreamingQuery = {
df: DataFrame,
mode: String,
format: String,
options: Map[String, String]
): StreamingQuery = {
df.writeStream.outputMode(mode).format(format).options(options).start()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ object HowSparkWorks extends App {
.appName("Spark Example")
.getOrCreate()

val sc = spark.sparkContext
val sc = spark.sparkContext

val numbers = sc.parallelize(1 to 1000)
numbers.count()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ trait SparkApp extends App {
val spark = org.apache.spark.sql.SparkSession.builder
.master("local[*]")
.appName("Spark Broadcast Join")
//.config("spark.sql.autoBroadcastJoinThreshold", -1) descomentar primera vez
//.config("spark.sql.adaptive.enabled", "false") descomentar primera vez
// .config("spark.sql.autoBroadcastJoinThreshold", -1) descomentar primera vez
// .config("spark.sql.adaptive.enabled", "false") descomentar primera vez
.getOrCreate()
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ import java.io.File
import java.nio.file.Files
import scala.reflect.io.Directory

trait SparkTestHelper
extends AnyFlatSpec
with BeforeAndAfterEach
with BeforeAndAfterAll {
trait SparkTestHelper extends AnyFlatSpec with BeforeAndAfterEach with BeforeAndAfterAll {

private val sparkSession = SparkSession
.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ import java.io.File
import java.nio.file.Files
import scala.reflect.io.Directory

trait SparkTestHelper
extends AnyFlatSpec
with BeforeAndAfterEach
with BeforeAndAfterAll {
trait SparkTestHelper extends AnyFlatSpec with BeforeAndAfterEach with BeforeAndAfterAll {

private val sparkSession = SparkSession
.builder()
Expand Down

0 comments on commit 5bdbf44

Please sign in to comment.