Skip to content

Commit

Permalink
Add new content
Browse files Browse the repository at this point in the history
  • Loading branch information
Christian Herrera committed Aug 5, 2024
1 parent c00a090 commit a2dde61
Show file tree
Hide file tree
Showing 52 changed files with 414 additions and 319 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
- uses: actions/setup-java@v4
with:
distribution: 'zulu'
java-version: '21'
java-version: '11'
cache: 'sbt'
- name: 👌 Run "pre-push" tasks (compile and style-check)
run: sbt prep
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ project/plugins/project/
/docker/spark/data/
/docker/volume/
/docker/spark/apps/
/derby.log
13 changes: 6 additions & 7 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ services:
environment:
- SPARK_LOCAL_IP=172.19.0.10
- SPARK_WORKLOAD=master
- SPARK_DAEMON_MEMORY=3G
networks:
spark-network:
ipv4_address: 172.19.0.10
Expand All @@ -37,9 +38,8 @@ services:
- spark-master
environment:
- SPARK_MASTER=spark://spark-master:7077
- SPARK_WORKER_CORES=2
- SPARK_WORKER_MEMORY=2G
- SPARK_DRIVER_MEMORY=2G
- SPARK_WORKER_CORES=3
- SPARK_WORKER_MEMORY=3G
- SPARK_EXECUTOR_MEMORY=1G
- SPARK_WORKLOAD=worker
- SPARK_LOCAL_IP=172.19.0.2
Expand All @@ -62,9 +62,8 @@ services:
- spark-master
environment:
- SPARK_MASTER=spark://spark-master:7077
- SPARK_WORKER_CORES=2
- SPARK_WORKER_MEMORY=2G
- SPARK_DRIVER_MEMORY=2G
- SPARK_WORKER_CORES=3
- SPARK_WORKER_MEMORY=3G
- SPARK_EXECUTOR_MEMORY=1G
- SPARK_WORKLOAD=worker
- SPARK_LOCAL_IP=172.19.0.3
Expand Down Expand Up @@ -208,7 +207,7 @@ services:
--deploy-mode client
--executor-memory 1G
--driver-memory 1G
--total-executor-cores 1
--total-executor-cores 2
--conf spark.sql.hive.metastore.version=2.3.9
--conf spark.sql.uris=thrift://hive-metastore:9083
--conf spark.hadoop.hive.metastore.uris=thrift://hive-metastore:9083
Expand Down
2 changes: 1 addition & 1 deletion docker/hive/conf/hive-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:postgresql://172.18.0.8:5432/metastore</value>
<value>jdbc:postgresql://postgres:5432/metastore</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
Expand Down
2 changes: 1 addition & 1 deletion project/Settings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ object Settings {
val settings = Seq(
name := "spark-best_practises_and_deploy-course",
version := "0.1.0-SNAPSHOT",
scalaVersion := "2.12.12",
scalaVersion := "2.12.19",
organization := "com.codely",
organizationName := "com.codely, Inc.",
organizationHomepage := Some(url("https://com.codely")),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.codely.lesson_02_tests_in_spark.video_01__end_to_end_testing.app

import org.apache.spark.sql.SparkSession

trait SparkApp extends App {

implicit val spark: SparkSession = SparkSession
.builder()
.enableHiveSupport()
.getOrCreate()

}
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
package com.codely.lesson_02_tests_in_spark.video_01__end_to_end_testing.app

import com.codely.lesson_02_tests_in_spark.video_01__end_to_end_testing.config.AppContext
import com.codely.lesson_02_tests_in_spark.video_01__end_to_end_testing.job.AvgSpendingJob
import com.codely.lesson_02_tests_in_spark.video_01__end_to_end_testing.job.TotalSpendingJob
import com.codely.lesson_02_tests_in_spark.video_01__end_to_end_testing.service.{Reader, Writer}

object AvgSpendingApp extends SparkApp {

private val appName = "avg-spending-app"
object TotalSpendingApp extends SparkApp {

private val context = AppContext.load(args)

spark.conf.set("spark.app.name", appName)

private val reader = Reader()

private val writer = Writer()

private val job = AvgSpendingJob(context, reader, writer)
private val job = TotalSpendingJob(context, reader, writer)

job.run()
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.typesafe.config.ConfigFactory

import java.io.File

case class AppConfig(
case class AppContext(
spark: SparkConfig,
source: SourceConfig,
sink: SinkConfig
Expand All @@ -14,8 +14,8 @@ case class SparkConfig(appName: String)
case class SourceConfig(format: String, options: Map[String, String])
case class SinkConfig(format: String, mode: String, path: String)

object AppConfig {
def load(args: Array[String]): AppConfig = {
object AppContext {
def load(args: Array[String]): AppContext = {

val cmdArgs = ArgumentsParser.parse(args).getOrElse(CmdArgs())
val configFile = new File(cmdArgs.configFile.get)
Expand All @@ -29,7 +29,7 @@ object AppConfig {
"driver" -> config.getString("source.options.driver")
)

AppConfig(
AppContext(
spark = SparkConfig(
appName = config.getString("spark.appName")
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package com.codely.lesson_02_tests_in_spark.video_01__end_to_end_testing.config
import scopt.OParser

object ArgumentsParser {
val builder = OParser.builder[CmdArgs]
val argsParser = {
private val builder = OParser.builder[CmdArgs]
private val argsParser = {
import builder._
OParser.sequence(
programName("Scala Application"),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package com.codely.lesson_02_tests_in_spark.video_01__end_to_end_testing.config

case class CmdArgs(configFile: Option[String] = None)
case class CmdArgs(configFile: Option[String] = None)
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package com.codely.lesson_02_tests_in_spark.video_01__end_to_end_testing.job

import com.codely.lesson_02_tests_in_spark.video_01__end_to_end_testing.config.AppContext
import org.apache.spark.sql.SparkSession
import com.codely.lesson_02_tests_in_spark.video_01__end_to_end_testing.extensions.DataFrameExtensions._
import com.codely.lesson_02_tests_in_spark.video_01__end_to_end_testing.service.{Reader, Writer}

case class AvgSpendingJob(
case class TotalSpendingJob(
config: AppContext,
reader: Reader,
writer: Writer
)(implicit spark: SparkSession) {
) {

def run(): Unit = {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,18 @@
package com.codely.lesson_02_tests_in_spark.video_02__unit_testing.app

import com.codely.lesson_02_tests_in_spark.video_02__unit_testing.config.AppConfig
import com.codely.lesson_02_tests_in_spark.video_02__unit_testing.config.AppContext
import com.codely.lesson_02_tests_in_spark.video_02__unit_testing.job.AvgSpendingJob
import com.codely.lesson_02_tests_in_spark.video_02__unit_testing.service.{Reader, StreamWriter}
import org.apache.spark.sql.SparkSession
import com.codely.lesson_02_tests_in_spark.video_02__unit_testing.service.{StreamReader, StreamWriter}

object AvgSpendingApp extends App {
object AvgSpendingApp extends SparkApp {

private val context = AppConfig.load(args)
private val context = AppContext.load(args)

implicit val spark: SparkSession = SparkSession
.builder()
.appName(context.spark.appName)
.enableHiveSupport()
.getOrCreate()
private val reader = StreamReader()

private val reader = Reader()
private val deltaWriter = StreamWriter()
private val streamWriter = StreamWriter()

val job = AvgSpendingJob(context, reader, deltaWriter)
private val job = AvgSpendingJob(context, reader, streamWriter)

job.run()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.codely.lesson_02_tests_in_spark.video_02__unit_testing.app

import org.apache.spark.sql.SparkSession

trait SparkApp extends App {

implicit val spark: SparkSession = SparkSession
.builder()
.enableHiveSupport()
.getOrCreate()

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.typesafe.config.ConfigFactory

import java.io.File

case class AppConfig(
case class AppContext(
spark: SparkConfig,
source: SourceConfig,
sink: SinkConfig
Expand All @@ -14,8 +14,8 @@ case class SparkConfig(appName: String)
case class SourceConfig(format: String, options: Map[String, String])
case class SinkConfig(format: String, mode: String, options: Map[String, String])

object AppConfig {
def load(args: Array[String]): AppConfig = {
object AppContext {
def load(args: Array[String]): AppContext = {

val cmdArgs = ArgumentsParser.parse(args).getOrElse(CmdArgs())
val configFile = new File(cmdArgs.configFile.get)
Expand All @@ -32,7 +32,7 @@ object AppConfig {
"checkpoint" -> config.getString("sink.options.checkpoint")
)

AppConfig(
AppContext(
spark = SparkConfig(
appName = config.getString("spark.appName")
),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package com.codely.lesson_02_tests_in_spark.video_02__unit_testing.config

case class CmdArgs(configFile: Option[String] = None)
case class CmdArgs(configFile: Option[String] = None)
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
package com.codely.lesson_02_tests_in_spark.video_02__unit_testing.job

import com.codely.lesson_02_tests_in_spark.video_02__unit_testing.config.AppConfig
import com.codely.lesson_02_tests_in_spark.video_02__unit_testing.config.AppContext
import com.codely.lesson_02_tests_in_spark.video_02__unit_testing.extensions.DataFrameExtensions._
import com.codely.lesson_02_tests_in_spark.video_02__unit_testing.service.{Reader, StreamWriter}
import org.apache.spark.sql.SparkSession
import com.codely.lesson_02_tests_in_spark.video_02__unit_testing.service.{StreamReader, StreamWriter}

case class AvgSpendingJob(
context: AppConfig,
reader: Reader,
writer: StreamWriter
)(implicit spark: SparkSession) {
context: AppContext,
streamReader: StreamReader,
streamWriter: StreamWriter
) {

def run(): Unit = {
def run() = {

val data = reader.read(context.source.format, context.source.options)
val data = streamReader.read(context.source.format, context.source.options)

val avgSpendingPerUserDF =
data.parseJson.addDateColum.explodeProducts.transformForAggregation.calculateAvgSpending

val query = writer.write(avgSpendingPerUserDF, context.sink.mode, context.sink.format, context.sink.options)
val query = streamWriter.write(avgSpendingPerUserDF, context.sink.mode, context.sink.format, context.sink.options)

query.awaitTermination()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import org.apache.spark.sql.DataFrame

case class StreamWriter() {
def write(
df: DataFrame,
mode: String,
format: String,
options: Map[String, String]
): StreamingQuery = {
df: DataFrame,
mode: String,
format: String,
options: Map[String, String]
): StreamingQuery = {
df.writeStream.outputMode(mode).format(format).options(options).start()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import com.codely.lesson_02_tests_in_spark.z_practical_exercise.job.AvgSpendingJ
import com.codely.lesson_02_tests_in_spark.z_practical_exercise.service.{Reader, StreamWriter}
import org.apache.spark.sql.SparkSession


object AvgSpendingApp extends App {

private val context = AppConfig.load(args)
Expand All @@ -16,7 +15,7 @@ object AvgSpendingApp extends App {
.enableHiveSupport()
.getOrCreate()

private val reader = Reader()
private val reader = Reader()
private val deltaWriter = StreamWriter()

val job = AvgSpendingJob(context, reader, deltaWriter)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package com.codely.lesson_02_tests_in_spark.z_practical_exercise.config

case class CmdArgs(configFile: Option[String] = None)
case class CmdArgs(configFile: Option[String] = None)
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ package com.codely.lesson_02_tests_in_spark.z_practical_exercise.job
import com.codely.lesson_02_tests_in_spark.z_practical_exercise.extensions.DataFrameExtensions._
import com.codely.lesson_02_tests_in_spark.z_practical_exercise.config.AppConfig
import com.codely.lesson_02_tests_in_spark.z_practical_exercise.service.{Reader, StreamWriter}
import org.apache.spark.sql.SparkSession

case class AvgSpendingJob(
context: AppConfig,
reader: Reader,
writer: StreamWriter
)(implicit spark: SparkSession) {
) {

def run(): Unit = {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import org.apache.spark.sql.DataFrame

case class StreamWriter() {
def write(
df: DataFrame,
mode: String,
format: String,
options: Map[String, String]
): StreamingQuery = {
df: DataFrame,
mode: String,
format: String,
options: Map[String, String]
): StreamingQuery = {
df.writeStream.outputMode(mode).format(format).options(options).start()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,16 @@ package com.codely.lesson_03_basics_spark_execution_model.video_01__how_spark_wo

object HowSparkWorks extends App {

// 1. docker exec -it spark-ecosystem-cluster-spark-master-1 bash
// 2. ./bin/spark-shell --master spark://spark-master:7077 --total-executor-cores 2 --executor-memory 1024m

val spark = org.apache.spark.sql.SparkSession.builder
.master("local")
.appName("Spark Example")
.getOrCreate()

val sc = spark.sparkContext
val sc = spark.sparkContext

val numbers = sc.parallelize(1 to 1000)
numbers.count()

// localhost:4040

val doubledNumbers = numbers.map(_ * 2)
doubledNumbers.count()

Expand Down
Loading

0 comments on commit a2dde61

Please sign in to comment.