diff --git a/_posts/2023-08-12-messaging-systems.md b/_posts/2023-08-12-messaging-systems.md index ffa3f37..b00e231 100644 --- a/_posts/2023-08-12-messaging-systems.md +++ b/_posts/2023-08-12-messaging-systems.md @@ -138,6 +138,71 @@ title: Messaging Systems - if we set cleanup policy to be compact - a new segment is created, and only the values for the latest keys for a topic is retained, and others are discarded. so e.g. segment 1 has value a for key x and value b for key y, and segment 2 has value c for key y, the newly created segment would have value a for key x and value c for key y. this behavior also makes sense for the consumer offsets topic if i think about it - for very large messages, either tweak configuration parameters to increase maximum limits, or better, use something like sqs extended client of aws is possible +## Quick Kafka Revision, To Delete + +- what is kafka - open source, distributed event streaming platform +- typically, there are three actors - **kafka broker**, **producer** and **consumer** +- **data collection** / **data ingestion** - kafka can sit at the edge of the data engineering platform - + - producers can somehow get data to kafka. this way, a data engineering team will have to maintain, scale, and connect to only one platform i.e. kafka + - we can perform stream or batch processing, generate the gold layer tables, and finally put the data into kafka for downstream systems to read from +- **stream processing** - we can build microservices that consume data from kafka, perform some processing using the **kafka streams** library and then produce data back to kafka / some other platform +- **confluent** - commercial, cloud kafka +- **topic details** - invoices, partitions - 4, replication - 2 +- additionally, we can also configure **infinite retention** for the topic. for our use case, we would generate the bronze layer and then there is no need of preserving the data. data would be purged automatically after 7 days, which works great for our use case +- parts of a **message** - key, value and timestamp +- **topic** - one topic for per kind of message (think of it like a table) +- so, a kafka cluster can have multiple topics +- **partitioning** of a topic helps scale it +- kafka will generate a hash of the key and perform a modulo with the number of partitions to decide which partition the message should go to +- so, all the messages with the same key will go into the same partition +- if we do not specify a key, partition would be chosen in a round robin fashion +- **partition offset** - each message when it reaches a partition is given an offset (incremental, 0, 1, 2 and so on) +- **timestamp** - it can be either the time when the cluster receives the message, or the time when the message is actually produced. the later is what we typically want, and is also called **event time** + +### Example + +program to read from a json file - each line in the file is a json object representing an invoice + +```py +import json + +from kafka import KafkaProducer + + +class InvoicesProducer: + def __init__(self): + self.producer = None + self.topic = 'invoices' + self.client_id = 'shameek-laptop' + + def initialize(self): + self.producer = KafkaProducer( + bootstrap_servers='kafka-383439a7-spark-streaming-56548.f.aivencloud.com:19202', + sasl_mechanism='SCRAM-SHA-256', + sasl_plain_username='avnadmin', + sasl_plain_password='<>', + security_protocol='SASL_SSL', + ssl_cafile='kafka-ca.pem', + api_version=(3, 7, 1) + ) + + def produce_invoices(self): + + with open('data/invoices/invoices-1.json') as lines: + for line in lines: + invoice = json.loads(line) + store_id = invoice['StoreID'] + self.producer.send(self.topic, key=store_id.encode('utf-8'), value=line.encode('utf-8')) + + self.producer.flush() + + +if __name__ == '__main__': + invoices_producer = InvoicesProducer() + invoices_producer.initialize() + invoices_producer.produce_invoices() +``` + ## RabbitMQ - messaging systems - diff --git a/_posts/2023-12-27-spark.md b/_posts/2023-12-27-spark.md index 3261db3..d976b99 100644 --- a/_posts/2023-12-27-spark.md +++ b/_posts/2023-12-27-spark.md @@ -456,6 +456,7 @@ print(counts_by_genre.collect()) - generate a bunch of **physical plans**, and associate a cost with each of them. e.g. one plan uses shuffle join, another uses broadcast join - finally, a **cost model** evaluates the most optimal physical plan - **wholestage code generation** - generate the bytecode to run on each executor +- note - i thing wherever **dag** (directed acyclic graph) is mentioned, it refers to this entire process ![execution plan](/assets/img/spark/execution-plan.jpg) @@ -1190,240 +1191,3 @@ print(counts_by_genre.collect()) - e.g. if there are data skews or out of memory issues in our application, spark would still run copies of this task (which too will run slow or maybe fail) without realizing that the root cause is actually the data / faulty configuration itself ![](/assets/img/spark/spark-speculation.png) - -## Streaming Introduction - -- earlier convention was batch processing - data first comes and sits in the lake -- then, there would be jobs that can be run for e.g. daily to perform the processing -- however, with time, jobs started demanding for smaller and quicker batches -- the idea is not to schedule the jobs in smaller intervals -- instead, we start viewing data as a stream that is in motion and not at rest -- spark streaming is an extension of the dataframe apis -- spark uses **micro batches** for achieving stream processing -- spark automatically takes care of lot of challenges like start and end time of batches, intermediate state management, etc -- initially, spark used **dstreams** - built on top of rdd -- now, sparks offers **structured streaming apis** - built on top of dataframe apis i.e. supports sql -- additionally, **event time semantics** are supported by structured streaming apis as well, which were not available in the d stream apis -- word count example using netcat - notice how for reading data, `read()` changed to `readStream()`, but otherwise, everything else stays the same. `readStream()` returns a `DataStreamReader` (recall read used to return `DataFrameReader`) - ```java - SparkSession spark = SparkSession.builder() - .master("local[*]") - .appName("Streaming Demo") - .getOrCreate(); - - Dataset lines = spark.readStream() - .format("socket") - .option("host", "localhost") - .option("port", "9999") - .load(); - ``` -- data from the socket comes in a column `value`. we want to split each line into its constituent words, and create a separate row for each word - ```java - Dataset wordCount = lines.select(explode(split(col("value"), " ")).alias("word")) - .groupBy("word") - .count(); - ``` -- finally, we try writing it to the console. again, `write()` changes to `writeStream()`. writeStream returns a `DataStreamWriter` (recall write used to return a `DataFrameWriter`) - ```java - StreamingQuery streamingQuery = wordCount.writeStream() - .format("console") - .option("checkpointLocation", "checkpoint") - .outputMode("complete") - .start(); - streamingQuery.awaitTermination(); - ``` -- note - we used `streamingQuery.awaitTermination()` above to simulate running an application indefinitely, and we got streamingQuery from the result of writing to a streaming sink -- note - sinks terminate when application is stopped / due to some error condition -- however, what if were writing to multiple sinks? - - we can use `spark.streams().awaitAnyTermination()`, when any of the streaming sinks terminate - - remember to have multiple checkpoint locations - do not use the same checkpoint location for multiple streaming sinks -- start the netcat utility using `nc -lk 9999`, and run the app to see the streaming output in the console -- working - first, spark creates an optimized logical plan, just like it did in case of dataframes -- now, it would create a job that reads from the source, processes it and finally writes it to the sink -- underneath, spark runs a background thread -- based on our trigger configuration, a new spark job is created. so, a spark job will not be created at every interval, it would only be created based on our trigger configuration, and all this is taken care of us by a background thread - ![spark streaming jobs](/assets/img/spark/spark-streaming-jobs.png) -- **trigger** determines how often to trigger the micro batch -- the default is **unspecified**. trigger a micro batch immediately, but stall this current micro batch until there is some input in the source -- trigger can also be based on for e.g. **time interval** - if the previous micro batch exceeds the time limit, the new batch starts after the previous batch finishes. however, if the previous micro batch finishes before the specified time limit, the new batch would wait till the mark reaches the time. for this, just chain the below to the `writeStream()` - ```java - .trigger(Trigger.ProcessingTime("1 minute")) - ``` -- finally, trigger can also be **continuous** - this is an experimental feature, where the performance is even faster than the current micro batch approach -- some popular streaming sources / sinks - netcat (already seen above), file and kafka -- the file source is capable of monitoring the path for new files. it can also use archival i.e. move the processed files to a different directory / delete the processed files altogether -- so, only sinks available are kafka, file and console for streaming requirements. how to for e.g. use jdbc? we can use `forEachBatch`, which is maybe called for every micro batch? - - ```java - outputDf.writeStream().foreachBatch((df, batchId) -> { - df.write() - .format("xyz") - // ... - .save(); - }); - ``` -- output modes - - - **append** - like insert only. used when previous outputs are not affected - - **update** - like upsert i.e. either new records are added or old records are updated - - **complete** - overwrite the complete result every time -- update vs complete example - - - input -
- ![streaming input](/assets/img/spark/streaming-input.png) - - complete -
- ![streaming output complete](/assets/img/spark/streaming-output-complete.png) - - update -(look at batch 2 in particular)
- ![streaming output update](/assets/img/spark/streaming-output-update.png) -- append does not make sense with aggregations like count, so it would throw an error like this - `Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;`. the why - this is because append means immutable - the other two output modes - complete and update have some way of reflecting updates made to previous groups, but append cannot allow for updating of existing groups, only creating of new groups. now maybe how aggregations work in spark streaming - spark receives a record, decides which group this record should belong to, and updates that group. this updating is not allowed in append mode, hence append mode does not support aggregations -- a spark streaming application is like a web server i.e. keeps running unlike when submitting batch jobs to spark -- even a streaming application will stop at least at some point due to reasons like some failure, some maintenance, etc -- so, we need to be able to handle this stopping and restarting gracefully -- **gracefully** = **exactly once processing** -- exactly once processing basically means neither should we end up reading an input twice, nor missing an input record -- this is what **checkpoint location** helps achieve -- checkpoint location maintains things like - - - what was the input boundaries of the last micro batch - - state information (e.g. running total of the word count) -- we just saw how checkpoints helps spark achieve exactly once processing. however, exactly once processing also depends on sources and sinks - e.g. source should be replayable i.e. allow reading of old messages. e.g. using kafka / files as streaming sources allows for this. similarly, sinks should be idempotent i.e. it should recognize duplicates instead of adding duplicates to the data -- what if our application has a bug? - we fix the spark code, we rerun spark-submit. now, can we rely on check pointing to continue the job from where it left off after the job was stopped and restarted? - - yes, if our fix was something like filter out malformed records - - no, if our fix changed the aggregation strategy etc, since maybe it messes up the checkpoint state altogether - -## Streaming Using Kafka - -- add the following dependency - - ```xml - - org.apache.spark - spark-sql-kafka-0-10_${scala.version} - ${spark.version} - - ``` -- use the following to establish a connection - - ```java - Dataset kafkaSourceDf = spark.readStream() - .format("kafka") - .option("kafka.bootstrap.servers", "localhost:9092") - .option("subscribe", "invoices") - .load(); - ``` -- when we try printing the schema - `kafkaSourceDf.printSchema();`, we get the following - - ``` - |-- key: binary (nullable = true) - |-- value: binary (nullable = true) - |-- topic: string (nullable = true) - |-- partition: integer (nullable = true) - |-- offset: long (nullable = true) - |-- timestamp: timestamp (nullable = true) - |-- timestampType: integer (nullable = true) - ``` -- the value is in binary format. here is how to extract all fields into dataframe friendly format - - assume we create the schema of the payload somewhere - - then, we can cast the value field to a string - - then, call from_json on it, which also needs the schema - - this means all our data would be available as a struct type under the attribute value - - finally, based on [this](https://stackoverflow.com/a/54433013/11885333), i chained a `.select`, so that i do not have to access fields using value.attribute, but just using attribute - - - ```java - Dataset flattenedDf = kafkaSourceDf - .select(from_json(col("value").cast("string"), schema).alias("value")) - .select("value.*") - ``` -- [this doc](https://kafka.apache.org/quickstart) is great for debugging when writing kafka related code - creating topics, publishing to topics using kafka-producer, consuming from kafka-consumer, etc -- now, when we try `flattenedDf.printSchema();`, we get the right schema which we can use in our transformations -- to understand - how does kafka + spark actually work i.e. does spark rely on offset committing logic of kafka, or does spark itself maintain the offset inside the checkpoint directory -- writing to kafka - while reading from kafka, we deserialized the value attribute. while writing to kafka, we need to convert our dataframe into two fields of key and value - - combine all fields into a struct - - convert this field to json - - rename this condensed field to value - - pick any other attribute to act as key - - ```java - .select( - to_json(struct("*")).alias("value"), - col("InvoiceNumber").alias("key")); - ``` - -## Streaming Transformations - -- **stateless transformations** - do not need to maintain state across micro batches. e.g. filter, map, flatMap, explode, etc -- **stateful transformations** - need to maintain state across micro batches. e.g. for computing totals etc as we process new records, the state needs to be stored as a part of the checkpoint. e.g. grouping, aggregations -- now, stateless transformations do not support complete output mode. think why - - - if our streaming transformations are only stateless, 10 input records would contain 10 output records - - this means we will have to include input records as a part of the output every time - - this means all records need to be stored in the state, which is not efficient for spark -- so, as a side effect - we can run into out of memory issues when using spark streaming due to excessive state. spark stores all this state inside memory for efficiency -- it also stores it in the checkpoint location so that for e.g. when the application dies / is stopped due to some reason, it can resume from where it left off -- so, we have two concepts - **time bound state** and **unbounded state** -- **time bound state** - e.g. we calculate a weekly running total. spark knows that it can get rid of records older than a week,since they do not contribute to the total. this is also called **managed state**, since spark can manage this state -- **unbounded state** - there is no time bounds we can specify for the state. therefore, we ourselves need to specify some kind of cleanup logic for the state, so that our application does not encounter out of memory issues. this is also called **unmanaged state**, since the cleanup logic is on us to implement - -## Window Aggregations - -- this is the time bound state / managed state that we talked about above -- **trigger time** - determines when a micro batch starts and ends -- **event time** - the actual time when the event occurred -- important - the bounds of the **window** we specify has nothing to do with the trigger time -- the window we specify uses the event time to decide which window the record should be a part of -- spark also handles **late events** - e.g. we get an event for 10.00-10.15 when we have already performed processing for 10.15-10.30 and 10.30-10.45 -- e.g. we create a window of 15 minutes - - - this basically means a new column called window of type struct would be added to our dataset, with two fields - start and end - - spark will automatically decide for us which of these groups a record belongs to, based on the column name we specify. this column acts as the event time - e.g. created time in this example - - since this is basically inside a group, we can specify more columns to group on. e.g. we specify type column in the group by clause. then, we get windows for each of the type separately - - finally, we perform an aggregation - all records where type is buy, have their amount attribute added to total buy, all records where type is sell, have their amount added to total sell - - so basically think about whats in state of spark - for all groups i.e. windows, spark is storing the computed aggregate and updating it as and when new records arrive - - confusion, note - remember how this window is so much more different than the windowing aggregation we saw earlier - there, there was no grouping or aggregation involved - based on our specification, we were automatically able to add a new column for running total - - ```java - Dataset outputDf = stockSourceDf - .groupBy(window(col("CreatedTime"), "15 minute")) - .agg( - sum(when(col("Type").equalTo("BUY"), col("Amount")).otherwise(lit("0"))).alias("TotalBuy"), - sum(when(col("Type").equalTo("SELL"), col("Amount")).otherwise(lit("0"))).alias("TotalSell")); - ``` - -- remember - spark had to maintain old windows inside its state as well, to help it with late events -- **watermark** - helps expire old window state, so that out of memory etc exceptions are not caused. remember how this is the biggest advantage of using managed state -- so, we need to decide how late can an event be, post which - - - we can simply ignore the event - - we can clean up the state for that window -- for this, we simply need to chain the `withWatermark`. note - - - chain it before the group by clause - - column name used for windowing and column name specified inside watermark should be the same - - ```java - .withWatermark("CreatedTime", "30 minutes") - .groupBy(window(col("CreatedTime"), "15 minute")) - ``` -- how should the cleanup happen? - all windows with end_time < (max_event_time - watermark) can be ejected from state (note - max_event_time i think means event with maximum time in the micro batch). e.g. say our watermark is 30 minutes, and we receive a record with event time = 10.48. all windows with end time before 10.48 - 30 = 10.18 would be ejected from the spark state. this is the managed state / automatic cleanup that we were talking about in time bound state -- watermark and complete output mode do not make sense together - spark cannot cleanup state if it has to output all the records for every micro batch -- recall how we had talked about append mode not working when we have group by etc in our streaming jobs, because append cannot update groups. however, think about watermarks - when the max_event_time - watermark moves, all windows with ends below this line can be closed. hence, when we introduce watermarks and windows with aggregations, spark supports append mode. all windows which have been declared closed by spark are output after the micro batch gets over -- summary of the difference between output modes when using watermark + windowing - - - complete - output all windows, ignore watermark concept - - update - output all windows which were updated by the micro batch, eject all windows from state which are declared stale by spark via watermark concept - - append - eject all windows from state and only output windows which have been declared stale by spark via watermark concept, do not output all windows that were updated like update output mode -- **tumbling windows** vs **sliding windows** - - - tumbling windows do not overlap, while sliding windows can have an overlap - - my understanding - in tumbling windows, window duration = sliding interval, whereas in sliding windows, both are unequal - - in tumbling windows, an event can be a part of only one window. in sliding windows, an event can be a part of multiple windows, e.g. 10.18 can be a part of 10.10-10.20 and 10.15-10.25 - - so, the only difference in syntax is we now pass two parameters - window duration and sliding window size - - ```java - .groupBy(window(col("CreatedTime"), "15 minute", "5 minute")) - ``` - -## Streaming Joins - -### Streaming to Static - -- commonly used for stream enrichment -- stateless - spark does not have to maintain any state - this is because every time we get an event, we can simply compute the rows it produces as a result of the join and output these results, since they would not change / the event would not be needed for computing future joins anymore -- for each micro batch, spark is smart enough to refresh the static dataframe i.e. imagine when the application is already running, we insert new data into the static dataframe underlying source, e.g. jdbc. spark will reload the static dataframe with the new data when a new event comes in for the streaming dataframe -- inner join is supported -- left outer join is possible when the streaming dataframe is on the left. why - assume right outer join was allowed. spark would have to predict for the static dataframe's record whether or not a row is present in the streaming dataframe. this cannot be concluded, since streams grow infinitely. this is why right (and full) outer joins are not supported - -### Streaming to Streaming - -- stateful - we need to maintain both sides of data forever in the state, unlike when joining streaming dataframe to static dataframe. remember how this is stateful, but streaming to static can be stateless -- we can solve this problem using 🥁 `withWatermark`. specify a watermark on both streams being joined, so that spark can remove events that are stale -- inner join is supported -- left outer join is possible but with some limitations, TODO -- TODO: spark interview question of memory diff --git a/_posts/2024-08-25-spark-advanced.md b/_posts/2024-08-25-spark-advanced.md new file mode 100644 index 0000000..a881b78 --- /dev/null +++ b/_posts/2024-08-25-spark-advanced.md @@ -0,0 +1,242 @@ +--- +title: Spark Advanced +--- + +## Stream Processing Motivation + +- popular spark cloud platform - databricks +- popular spark on prem platform - cloudera +- setup for databricks and local is already covered [here](/posts/spark) +- when our iterations are done on a weekly, daily or even an hourly basis, we are fine with using **batch processing** +- issue 1 - **backpressure problem** + - it means data is coming at a faster rate than we can process it + - if we use batch processing, we need to do the following every time within the small window + - starting of the on demand cluster / resources + - processing the entire volume of data every time + - cleanup of the resources +- issue 2 - **scheduling** + - with batch processing, we can set a schedule / cron + - but that will not work if we want to process data in realtime +- issue 3 - **incremental data processing** + - we should process only the new data + - we can achieve this using **checkpoints** + - we will know using previous checkpoint what we have already processed and what we are yet to process +- issue 4 - **handling failures** + - what if during the processing, there is a failure? + - this is why we only commit our checkpoint after the entire processing is complete + - so, we read the previous checkpoint, perform the processing and if successful, commit the current checkpoint + - small issue here as well - we have two different transactions - committing of checkpoint and processing of data + - so, it can happen that processing of data is successful, while the committing of checkpoint fails +- issue 5 - **late events** + - e.g. we have 15 minute windows to display the stock prices + - we show the output according to the data we have at 10am + - then, when processing the data for 10.15am, we receive an event for 9.58am as well + - this means our previously computed result for 10am needs to be updated as well +- **spark streaming** can address all these 5 issues for us + +## Stream Processing Databricks Example + +- initial cleanup cell - delete table, cleanup checkpoint directory, cleanup and recreate the text directory + ```py + spark.sql('drop table if exists word_counts') + + dbutils.fs.rm(f'dbfs:/FileStore/learning/chekpoint', True) + + dbutils.fs.rm(f'dbfs:/FileStore/learning/text', True) + dbutils.fs.mkdirs(f'dbfs:/FileStore/learning/text') + ``` +- processing logic - + ```py + from pyspark.sql.functions import * + + lines = ( + spark.readStream + .format('text') + .load('dbfs:/FileStore/learning/text') + ) + + words = ( + lines + .select(explode(split('value', '[ ,]')).alias('word')) # slit by ' ' and ',' + .select(lower('word').alias('word')) # lowercase the words + .where(regexp_like('word', lit('\\w'))) # discard special characters etc + ) + + word_counts = ( + words + .groupBy('word') + .count() + ) + + query = ( + word_counts.writeStream + .option('checkpointLocation', 'dbfs:/FileStore/learning/chekpoint') + .outputMode('complete') + .toTable('word_count') + ) + ``` +- some minor api changes for stream and batch processing - + - `spark.read` to `spark.readStream` + - `spark.write` to `spark.writeStream` + - `saveAsTable` to `toTable` + - `mode('overwrite')` to `outputMode('complete')` +- notice how the code for processing however still stays the same like batch processing +- TODO - apparently, writing stream only works in databricks, not in local. my understanding, to validate - + - `.format('delta')` is used by default in databricks + - `.format('parquet')` is used in local + - so, local throws this error - `Data source parquet does not support Complete output mode.` +- note, my understanding - since this was text, we were good. but if it was for e.g. json, we would have to specify the schema upfront. also, remember that we cannot rely on `inferSchema` or something, now that this is a streaming source and we might not have any readily available data +- copying files slowly to the target location to simulate streaming data - + ```py + import time + + for i in range(1, 4): + time.sleep(10) + print(f'processing {i}th iteration') + dbutils.fs.cp(f'dbfs:/FileStore/learning/text_data_{i}.txt', f'dbfs:/FileStore/learning/text') + ``` +- stopping the query - + ```py + query.stop() + ``` + +## Stream Processing Model + +- spark will start a **background thread** called **streaming query** +- the streaming query will start monitoring the **streaming source** - text directory in our case +- the streaming query will also initialize the **checkpoint** location +- each execution of the streaming plan is called a **micro batch** +- micro batch is basically responsible for the small piece of data in the stream +- a micro batch is triggered every time a new file is added to the text directory +- our data is processed and written to the **streaming sink** +- finally, the streaming query will commit to the checkpoint location +- recall how all the [issues](#stream-processing-motivation) we discussed - scheduling, incremental data processing, handling checkpoints, are all being done by the streaming query + +## Reading / Writing + +- sources supported - + - file / directory + - delta table + - kafka +- sinks supported - + - file / directory + - delta table (what we used in the [databricks example](#stream-processing-databricks-example) above) + - kafka + - for each - useful if an external connector is not available +- other external connectors are available for both source and sinks, but we need to configure them separately and they are not shipped with spark by itself +- we saw how streaming query triggers the micro batches for our processing flow using checkpoints +- we have three options for **triggers** types + - **unspecified** (the default) - trigger the next micro batch once the previous micro batch execution is over + - **fixed interval / processing time** - trigger the next micro batch once the specified interval is over + - if previous micro batch takes less time than the interval, wait till the interval is over + - if previous micro batch takes more time than the interval, kick off the next micro batch immediately + - **available now** - one micro batch to process all the available data, and then stop on its own + - analogy - it is like batch processing but with only incremental data processing unlike the traditional batch processing jobs + + ```py + query = ( + word_counts.writeStream + .option('checkpointLocation', 'checkpoint') + .outputMode('update') + .trigger(availableNow=True) + .toTable('word_count') + ) + ``` +- miscellaneous option - **max files per trigger** - helps keep micro batches small in case a lot of files get queued up, and thus produce outputs faster + ```py + word_counts.writeStream + .option('maxFilesPerTrigger', 3) + # ... + ``` +- so, if we have max files per trigger set to 1, available now set to true and we have 10 files in the queue, our job stops after 10 micro batches are processed successfully +- files that have been read can be deleted by setting the **clean source** property to **delete**. we can also archive it, but we would need to specify the archive location as well using **source archive dir** + ```py + spark.readStream + .format('text') + .option('cleanSource', 'archive') + .option('sourceArchiveDir', 'dbfs:/FileStore/learning/invoices_archive') + # ... + ``` +- recall what [**streaming query**](#stream-processing-model) is. identifying what streaming query belongs to what job can be difficult. e.g. now, we can have a streaming query for gold layer, a streaming query for silver layer and so on. we can therefore name it in the ui as follows - + ```py + word_counts.writeStream + .queryName('bronze-layer') + # ... + ``` +- reading from a table - e.g. bronze layer reads from the sources using the right delimiter etc and writes it to tables. now, the silver layer can read from these tables as follows - + ```py + spark.readStream + .table('bronze-table') + ``` + +## Working with Kafka + +- for databricks, in the **libraries** tab of the **compute**, we can add new libraries +- based on the [documentation](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html) for integrating with kafka, and the version of spark cluster of my compute, i added `org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0` + ![kafka installation](/assets/img/spark/kafka-installation.png) +- to do the same thing via spark conf, if for e.g. running locally - + ```py + conf = SparkConf() + conf.set('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1') + + spark = ( + SparkSession.builder + .appName('kafka consumer') + .master('local[*]') + .config(conf=conf) + .getOrCreate() + ) + ``` +- because my kafka had ssl, i had to use **key tool** to create a **trust store** with the certificate as input +- step 1 - copying the certificate to the cluster - + ```py + %%sh + + cat << 'EOF' > kafka_ca.pem + -----BEGIN CERTIFICATE----- + <> + -----END CERTIFICATE----- + EOF + ``` +- step 2 - generating the trust store - + ```py + %%sh + + keytool -import -storepass spark_streaming -noprompt -file kafka_ca.pem -alias CA -keystore client.truststore.jks + ``` +- finally, using it via the trust store - + ```py + + jaas_config = ( + "org.apache.kafka.common.security.scram.ScramLoginModule required " + + f'username="{kafka_connect_opt["user"]}" password="{kafka_connect_opt["password"]}";' + ) + + df = ( + spark.readStream.format("kafka") + .option("kafka.bootstrap.servers", kafka_connect_opt["bootstrap_server"]) + .option("kafka.security.protocol", "SASL_SSL") + .option("kafka.sasl.mechanism", "SCRAM-SHA-256") + .option("kafka.sasl.jaas.config", jaas_config) + .option("subscribe", "invoices") + .option("kafka.ssl.truststore.type", "jks") + .option("kafka.ssl.endpoint.identification.algorithm", "") + .option("kafka.ssl.truststore.location", "client.truststore.jks") + .option("kafka.ssl.truststore.password", "spark_streaming") + .load() + ) + + display(df) + ``` +- spark will process data using micro batches. some micro batches can be huge, others very small. so, just like we saw the option **max files per trigger** in [reading / writing](#reading--writing), to stay in control over the records being processed, we can use **max offsets per trigger** + ```py + .option("maxOffsetsPerTrigger", 10) + ``` +- we might also want to specify the starting timestamp when our job starts. we can do that via **starting timestamp**. below will start pretty much from the beginning - + ```py + .option("startingTimestamp", 0) + ``` +- issues when restarting / creating newer versions of spark streaming jobs - + - case 1 - we use checkpoint directory. our spark streaming job continues from where it had left off + - case 2 - we had an issue in our previous code. we merge the fix and deploy our job again. we rely on **starting timestamp** and an empty checkpoint directory this time around to be able to reprocess events since the faulty deployment was made +- so, we need to implement **idempotence** in our sink diff --git a/_posts/2024-08-31-dbt.md b/_posts/2024-08-31-dbt.md new file mode 100644 index 0000000..9c56fa8 --- /dev/null +++ b/_posts/2024-08-31-dbt.md @@ -0,0 +1,218 @@ +--- +title: DBT +--- + +## DBT Introduction + +- initial architecture was to load the transformed data into warehouses using [**etl**](/posts/data-engineering/) +- the recent addition of **cloud data warehouses** led to promotion of [elt](/posts/data-engineering/) - we just blast the data into the warehouse, and then can use the warehouse compute to perform transformations on top of that +- the different roles in a data team - + - **data engineers** - build and maintain infrastructure, overall pipeline orchestration, integrations for ingesting the data, etc + - **analytics engineers** - generate cleaned, transformed data for analysis + - **data analysts** - work with business to understand the requirements. use dashboards, sql, etc to query the transformed data +- dbt sits on top of the cloud warehouses. snowflake / redshift / big query / databricks. it is the t of elt, i.e. it is meant to be used by the analytics engineers +- we can **manage**, **test** and **document** transformations from one place +- can deploy the dbt project on a **schedule** using **environments** +- dbt builds a **dag** (**directed acyclic graph**) to represent the flow of data between different tables +- **sources** - where our data actually comes from. we create sources using yaml +- **models** - the intermediate layer(s) we create +- we use **macros** e.g. **source** to reference **sources**, **ref** to reference **models**, etc. we create models using sql +- based on our sql, the dag is created for us / the order of creation of models are determined for us +- **jinja** is used, which is a pythonic language. everything inside double curly braces is jinja, and the rest of it is regular sql. note - replaced curly with round braces, because it was not showing up otherwise + ```sql + ((% for i in range(5) %)) + select (( i )) as number (% if not loop.last %) union all (% endif %) + ((% endfor %)) + ``` +- this results in the following table containing one column number, with values 0-4 +- there is also a **compiled code** tab, where we can see the actual code that our dbt code compiles down to +- we can also look at the **lineage** tab, which shows us the **dag** (directed acyclic graph). it tells us the order the dbt models should be built in
+ ![](/assets/img/dbt/lineage.png) +- the graph is interactive - double click on the node to open the corresponding model file automatically +- `dbt run` - find and create the models in the warehouse for us +- `dbt test` - test the models for us +- `dbt build` - a combination of run and test +- `dbt docs generate` - generate the documentation for us +- ide - edit models and make changes to the dbt project + +## Getting Started + +- create a **connection**. we need to specify - + - the **snowflake account url** + - the **database** where the models should be created + - the **warehouse** to use + + ![](/assets/img/dbt/connection.png) +- then, we can create a **project**. we specify + - **name** of project + - **development credentials** - our credentials, used by dbt to connect to snowflake on our behalf + + ![](/assets/img/dbt/development-credentials.png) +- in the development credentials, we specify a schema. each dbt developer should use their own specific **target schema** to be able to work simultaneously +- we then hit **test connection** for dbt to test the connection, and hit next +- can leverage git to version control the code. for this, we can either use **managed repositories** or one of the supported git providers like github, gitlab, etc + +## Models + +- **models** are like layer(s) of transformations +- important - they are typically just sql select statements, and the corresponding ddl / dml statements are generated for us automatically bts +- in the **models** directory, create a file called customers.sql with the following content - + ```sql + with customers as ( + select * from dbt_raw.jaffle_shop.customers + ), + + customer_orders as ( + select + user_id, + min(order_date) as first_order_date, + max(order_date) as last_order_date, + count(*) as number_of_orders, + from + dbt_raw.jaffle_shop.orders + group by + user_id + ) + + select + customers.*, + customer_orders.first_order_date, + customer_orders.last_order_date, + coalesce(customer_orders.number_of_orders, 0) as number_of_orders + from + customers + left join customer_orders on customers.id = customer_orders.user_id + ``` +- important note - this file is using **cte** (**common table expressions**), and can be run using snowflake as well +- now, when we run the command `dbt run`, it creates a view with the name customers in the database / schema we specify when creating the project +- we can see the corresponding ddl for a particular model in the logs
+ ![](/assets/img/dbt/model-logs.png) +- use the **preview** tab in the ide to see the actual data in table format +- we see that a view has been created for inside at the schema we had specified in the **development credentials** +- we can configure the way our model is **materialized** in one of the following ways - + - configure dbt_project.yml present at the root. everything in `jaffle_shop` will be materialized as a table, but everything inside example as a view + ```yml + models: + jaffle_shop: + +materialized: table + example: + +materialized: view + ``` + - to make it specific for a model, we can specify the following snippet at the top of the sql file for the model - + ```sql + {{ + config( + materialized='view' + ) + }} + ``` +- when we delete models, it does not delete them from snowflake. so, we might have to delete them manually + +### Modularity + +- breaking things down into separate models. it allows us to for e.g. reuse the smaller models in multiple combined models. these smaller models are called **staging models** (like the staging area in warehouses?) + - stg_customers.sql + ```sql + select + * + from + dbt_raw.jaffle_shop.customers + ``` + - stg_customer_orders.sql + ```sql + select + user_id as customer_id, + min(order_date) as first_order_date, + max(order_date) as last_order_date, + count(*) as number_of_orders, + from + dbt_raw.jaffle_shop.orders + group by + customer_id + ``` +- **ref** function - allows us to reference staging models in our actual models. dbt can infer the order to build these models in using the **dag** +- customers.sql now is changed and looks as follows - + ```sql + with stg_customers as ( + select * from (( ref('stg_customers') )) + ), + + stg_customer_orders as ( + select * from (( ref('stg_customer_orders') )) + ) + + select + stg_customers.*, + stg_customer_orders.first_order_date, + stg_customer_orders.last_order_date, + coalesce(stg_customer_orders.number_of_orders, 0) as number_of_orders + from + stg_customers + left join stg_customer_orders on stg_customers.id = stg_customer_orders.customer_id + ``` +- when we go to the **compile** tab, we see the jinja being replaced with the actual sql that is generated + +## Sources + +- **sources** - helps describe data load by extract load / by data engineers +- advantages - + - helps track **lineage** better when we use **source** function instead of table names directly + - helps test assumptions on source data + - calculate freshness of source data +- create a file called sources.yml under the models directory +- put the following content inside it - + ```yml + version: 2 + + sources: + - name: jaffle_shop + database: dbt_raw + schema: jaffle_shop + tables: + - name: customers + - name: orders + ``` +- finally, change the stg_customers.sql / stg_customer_orders.sql as follows - + ```sql + select + * + from + (( source('jaffle_shop', 'customers') )) + ``` + +## Tests and Documentation + +- for each of our tests, a query is created that returns the number of rows that fail the test. this should be zero for the test to pass +- to add tests, create a file schema.yml under the models directory - + ```yml + version: 2 + + models: + - name: stg_customers + columns: + - name: id + tests: [unique, not_null] + + - name: stg_customer_orders + columns: + - name: customer_id + tests: + - unique + - not_null + - relationships: + field: id + to: ref('stg_customers') + ``` +- my understanding of **relationship** test - check if all values of `stg_customer_orders#customer_id` is present in `stg_customers#id` +- for all **models** and their individual fields in the above yaml, for all tables in [**sources**](#sources), etc we can include a description +- this way, when we run `dbt docs generate`, it generates rich documentation for our project using json + +## Deployment + +- click deploy and click create environment to create a new **environment**. similar to what we did when creating a [project](#working-with-snowflake), we need to enter + - **connection settings** + - **deployment credentials** +- then, we can create a **job** + - specify the **environment** we created + - in the **execution settings**, specify the commands to run, e.g. `dbt build`, `dbt docs generate`, etc + - we can set a **schedule** for this job to run on, or run it manually whenever we would like to diff --git a/assets/img/dbt/connection.png b/assets/img/dbt/connection.png new file mode 100644 index 0000000..4984210 Binary files /dev/null and b/assets/img/dbt/connection.png differ diff --git a/assets/img/dbt/development-credentials.png b/assets/img/dbt/development-credentials.png new file mode 100644 index 0000000..0aee63c Binary files /dev/null and b/assets/img/dbt/development-credentials.png differ diff --git a/assets/img/dbt/lineage.png b/assets/img/dbt/lineage.png new file mode 100644 index 0000000..db2a51e Binary files /dev/null and b/assets/img/dbt/lineage.png differ diff --git a/assets/img/dbt/model-logs.png b/assets/img/dbt/model-logs.png new file mode 100644 index 0000000..9bb46dc Binary files /dev/null and b/assets/img/dbt/model-logs.png differ diff --git a/assets/img/spark/kafka-installation.png b/assets/img/spark/kafka-installation.png new file mode 100644 index 0000000..b5fdc84 Binary files /dev/null and b/assets/img/spark/kafka-installation.png differ