Skip to content

Commit

Permalink
add materialization
Browse files Browse the repository at this point in the history
  • Loading branch information
Shameek Agarwal committed Sep 20, 2024
1 parent ddc0d60 commit af9bae5
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 36 deletions.
22 changes: 0 additions & 22 deletions _posts/2023-08-12-messaging-systems.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,27 +138,6 @@ title: Messaging Systems
- if we set cleanup policy to be compact - a new segment is created, and only the values for the latest keys for a topic is retained, and others are discarded. so e.g. segment 1 has value a for key x and value b for key y, and segment 2 has value c for key y, the newly created segment would have value a for key x and value c for key y. this behavior also makes sense for the consumer offsets topic if i think about it
- for very large messages, either tweak configuration parameters to increase maximum limits, or better, use something like sqs extended client of aws is possible

## Quick Kafka Revision, To Delete

- what is kafka - open source, distributed event streaming platform
- typically, there are three actors - **kafka broker**, **producer** and **consumer**
- **data collection** / **data ingestion** - kafka can sit at the edge of the data engineering platform -
- producers can somehow get data to kafka. this way, a data engineering team will have to maintain, scale, and connect to only one platform i.e. kafka
- we can perform stream or batch processing, generate the gold layer tables, and finally put the data into kafka for downstream systems to read from
- **stream processing** - we can build microservices that consume data from kafka, perform some processing using the **kafka streams** library and then produce data back to kafka / some other platform
- **confluent** - commercial, cloud kafka
- **topic details** - invoices, partitions - 4, replication - 2
- additionally, we can also configure **infinite retention** for the topic. for our use case, we would generate the bronze layer and then there is no need of preserving the data. data would be purged automatically after 7 days, which works great for our use case
- parts of a **message** - key, value and timestamp
- **topic** - one topic for per kind of message (think of it like a table)
- so, a kafka cluster can have multiple topics
- **partitioning** of a topic helps scale it
- kafka will generate a hash of the key and perform a modulo with the number of partitions to decide which partition the message should go to
- so, all the messages with the same key will go into the same partition
- if we do not specify a key, partition would be chosen in a round robin fashion
- **partition offset** - each message when it reaches a partition is given an offset (incremental, 0, 1, 2 and so on)
- **timestamp** - it can be either the time when the cluster receives the message, or the time when the message is actually produced. the later is what we typically want, and is also called **event time**

### Example

program to read from a json file - each line in the file is a json object representing an invoice
Expand Down Expand Up @@ -321,4 +300,3 @@ if __name__ == '__main__':
spring.rabbitmq.listener.simple.retry.multiplier=2
```
- retry at 3s, then 6s (refer multiplier), and remaining 2 retries at 10s gaps
-
133 changes: 119 additions & 14 deletions _posts/2024-08-31-dbt.md
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ title: DBT
- by default, a **development** environment is already created for us
- go to deploy -> environments and create a new environment
- **environment type** can be **development** or **deployment**
- we select deployment here, since development is already there, and only development environment is allowed
- we select deployment here, since development is already there, and only deployment environment is allowed
- then, we select a deployment type - typically **production**
- just like we saw in [getting started](#getting-started), we need to set the **connection**, wherein we specify snowflake account url, database, warehouse
- we enter **deployment credentials** instead of **development credentials** like in [getting started](#getting-started)
Expand All @@ -338,7 +338,7 @@ title: DBT
- then, we can create a **job** of type **deploy job** for this environment
- here, we can enter a schedule for this job
- we can also configure the set of commands that we would want to run on that cadence
- by default, `dbt build` is present, and we are allows us to check options for running freshness checks on sources and generating docs
- by default, `dbt build` is present, and we are allowed to check options for running freshness checks on sources and generating docs
- after configuring all this, we can create our job
- we can trigger this job manually by hitting **run now** whenever we want, apart from relying on the schedule to trigger it
- dbt cloud easily lets us access our documentation, shows us a breakdown of how much time the models took to build, etc
Expand All @@ -357,7 +357,7 @@ title: DBT
## DBT Explorer Features

- feels like a superset of features available in [documentation](#documentation)
- **models** tab - shows us the status of models - success, error, skipped. it will also show us the last executed date of the model. we can click on a model to inspect its performance, the sql code for it, its lineage, etc
- **models tab** - shows us the status of models - success, error, skipped. it will also show us the last executed date of the model. we can click on a model to inspect its performance, the sql code for it, its lineage, etc
- **sources tab** - freshness of sources etc
- **tests tab** - status of test (pass or fail), what model and column it ran on, etc
- **performance tab** - which models took the longest time to run, which models were executed the most, which models fail the most
Expand All @@ -368,7 +368,7 @@ title: DBT
- `stg_customers+` - highlights `stg_customers` along with all of its downstream dependencies
- after entering our select statement, we hit the **update graph** button / hit enter key to show only nodes related to our query
<!-- new line for code block for right rendering -->
- **union** - we can extract dags for all models combined by separating using a space -
- **union** - we can extract dags for all models combined by separating using a space -
`+stg_customers+ +stg_orders+`
- **intersection** - we can extract the shared upstream between all models by separating using a comma - `+stg_customers+,+stg_orders+`
- to get only the sources of a model, we can use `+dim_customers,resource_type:source`
Expand Down Expand Up @@ -448,9 +448,9 @@ title: DBT
- by default, these will be materialized under `fct_orders_v1` and `fct_orders_v2`. to ensure that for e.g. v1 is materialized under `fct_orders` itself, we can do the following -

```yml
- v: 1
config:
alias: fct_orders
- v: 1
config:
alias: fct_orders
```
- downstream users can reference a specific version using `(( ref('fct_orders', v=1) ))`
- if downstream users do not specify a version i.e. they use `(( ref('fct_orders') ))`, the latest version will be used by default. to change this behavior, we can configure what version should be used by default using `latest_version`
Expand Down Expand Up @@ -550,11 +550,11 @@ title: DBT
- `(( ))` - pulling it out of the jinja context and actually outputting it
- we use the if condition so that union all is not added after the last select. this is what the compiled tab shows -
```sql
select 0 as number union all
select 1 as number union all
select 2 as number union all
select 3 as number union all
select 4 as number
select 0 as number union all
select 1 as number union all
select 2 as number union all
select 3 as number union all
select 4 as number
```
- we can use set to set a variable - the compiled output just has my name
```sql
Expand Down Expand Up @@ -670,7 +670,7 @@ the compiled sql looks as follows -
- package: gitlabhq/snowflake_spend
version: 1.3.0
```
- setup instructions can be found [here](https://github.com/gitlabhq/snowflake_spend), e.g. we need to [seed](#analyses-and-seeds) with effective rates (per credit price)
- setup instructions can be found [here](https://github.com/gitlabhq/snowflake_spend), e.g. we need to [seed](#analyses-and-seeds) with effective rates (per credit price, which depends on our choice of cloud provider, region, etc)

## Jinja, Macros and Packages Examples

Expand All @@ -682,7 +682,7 @@ the compiled sql looks as follows -
- we can run some sql using `run_query`. we can find the available macros in [dbt jinja functions](https://docs.getdbt.com/reference/dbt-jinja-functions)
- the `target` variable contains information about things like warehouse, database, etc in the form of a dictionary
- these are defined / derived from the settings we specify under the (development / deployment) environment, credentials, connection, etc
- to access its values, i entered `(( target ))` in a file and compiled it. the compilation output gave me all the things
- to access its values, i entered `(( target ))` in a file and compiled it. the compilation output gave me this
```json
{
"account": "tp279l2.ap-southeast-1",
Expand Down Expand Up @@ -865,3 +865,108 @@ the compiled sql looks as follows -
- **events** to trigger this webhook for - run started, run completed (any status), run completed (errored)
- **jobs** - trigger this webhook for all jobs or some specific job(s)
- finally, we enter the **endpoint** that dbt should call

## Materialization

- **materialization** - how dbt builds the models. remember - we just write select statements, and dbt generates the right ddl / dml on top of that for us
- five types of materialization
- can be configured inside dbt_project.yml to configure at a model level / inside the individual model itself using a config block at the top
- **table** - actual table is built
- **views** - we would be rerunning the query every time
- **ephemeral** - take our select statements and use it as a **cte** in the downstream models. so, nothing exists in our database. hard to debug as we cannot query or inspect them directly
- small note - if we change the materialization of a model for e.g. from table to a view, the table will be dropped and a new view would be created in its place. this is because dbt generates commands like this - `create or replace`. however, if we go for e.g. from a table to ephemeral, the table still stays there
- now, we discuss [incremental](#incremental) and [snapshot](#snapshot)

### Incremental

- we end up reprocessing years worth of data (which should not ideally change) when using materialization of type [table](#materialization)
- so, the idea is that we just add the new records and keep the old table as is, because historical data typically cannot / should not change
- we save on processing time, resources and money this way
- start with view. if querying it takes too long, switch to table. if building it takes too long, switch to incremental
```sql
(( config(materialized = 'incremental') ))
```
- we have the source table, and an already built table which we want to modify
- we use the way below to identify the new data in our source. note - this should only apply to subsequent runs. for the first run, this filtering should not happen
```sql
with events as (
select *
from (( source('snowplow', 'events') ))
(% if is_incremental() %)
where t_stamp >= (select max(t_stamp) from (( this )))
(% endif %)
)
```
- notice the use of `this` to reference the already existing and built model. we cannot use `ref(model)` in the model itself, since it will become a cyclic dependency
- the first time around, it will run the usual `create or replace table`. second time around though, it runs `create or replace temporary table` and inserts records from this temporary table into the model using `insert into`
- when we add the `--full-refresh` flag, it will build from scratch using the `create or replace table` command like the first time around
- issues - what if data showed up in our warehouse late? e.g. we always use the latest time as the cutoff, but events can appear out of order - we have already processed events up to 11.30, but in the next incremental run, we receive an event which happened at 10.30
- so, we can change the sql to following -
```sql
(% if is_incremental() %)
where t_stamp >= (select dateadd('day', -3, max(t_stamp)) from (( this )))
(% endif %)
```
- we move the cutoff to three days back
- now, the current issue - we can end up with duplicate records. solution - `unique_keys`. it helps update the existing records
```sql
((
config(
materialized = 'incremental',
unique_key = 'page_view_id'
)
))
```
- with this, the command that was doing `insert into` from the temporary table to the model now changes to `merge into`
- note - advantage of dbt - dbt automatically used `merge into`, since snowflake supports it. different warehouses might have different ways of achieving this, which dbt can handle for us automatically
- how to decide three days - set the cutoff based on the two parameters below -
- ask for tolerance for correctness
- perform analysis on 'how late' can say 99 percentile of data be
- prioritizing correctness can negate performance gains
- secondly, remember we can always run a `--full-refresh` on a weekly basis
- another issue - assume our 'unique key' has records in both - current table and the 3 day current window we are processing
- we end up overwriting the old data using our new batch
- my understanding of one potential fix - we change our filter. instead of filtering based on time, filter in this way i.e. retrieve all records having page view id in the batch being currently processed -
```sql
(% if is_incremental() %)
where page_view_id in (
select distinct page_view_id
from (( source('snowplow', 'events') ))
where t_stamp >= (select dateadd('day', -3, max(t_stamp)) from (( this )))
)
(% endif %)
```
- issue - this subquery for finding all the distinct page view ids can become very slow
- we should avoid such fixes if we have truly huge amounts of data, as these optimizations can become very slow
- good candidates for incremental models - immutable events, like page views on our browser
- if they are mutable - we need fields like updated at to be able to filter easily

### Snapshot

- think slowly changing dimensions (type 2)
- we run them using `dbt snapshot`, as they run separately from our models
- we now get four new columns -
- `dbt_scd_id` - a totally unique identifier
- `dbt_updated_at` - a mirror of the `updated_at` field
- `dbt_valid_from` and `dbt_valid_to` fields
- we use a different target database and schema, which is different from what we use in profiles / environments. this is because we want it to be independent from our models, since they cannot be reproduced. it is source data, which is not something we otherwise have access to
- just like [incremental](#incremental) models, we need to specify the unique key
- we can specify a strategy and the field for this strategy, which is timestamp and updated at in our case
```sql
(% snapshot snap_products %)
((
config(
target_database = 'xyz',
target_schema = 'pqr',
unique_key = 'id',
strategy = 'timestamp',
updated_at = 'updated_at'
)
))
select * from (( source('abc', 'def') ))
(% endsnapshot %)
```
- we pretend that this snapshot is actually available in the source itself, and use it like that
- sometimes however, we might have these snapshots at the end of our reported data, to view the changes easily
- nothing changes as such, we select from `ref` instead of `source`, thats all

0 comments on commit af9bae5

Please sign in to comment.