Skip to content

Commit

Permalink
chore: improve report performance
Browse files Browse the repository at this point in the history
added `time_from_last_event` and `time_to_next_event` fields to events to
that are precomputed when the event is created.

Signed-off-by: Henry Gressmann <mail@henrygressmann.de>
  • Loading branch information
explodingcamera committed Nov 23, 2024
1 parent 8380bf6 commit 95d95d0
Show file tree
Hide file tree
Showing 12 changed files with 215 additions and 124 deletions.
53 changes: 52 additions & 1 deletion src/app/core/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl LiwanEvents {
}

/// Get the daily salt, generating a new one if the current one is older than 24 hours
pub async fn get_salt(&self) -> Result<String> {
pub fn get_salt(&self) -> Result<String> {
let (salt, updated_at) = {
let salt = self.daily_salt.read().map_err(|_| eyre::eyre!("Failed to acquire read lock"))?;
salt.clone()
Expand Down Expand Up @@ -55,10 +55,15 @@ impl LiwanEvents {
pub fn append(&self, events: impl Iterator<Item = Event>) -> Result<()> {
let conn = self.duckdb.get()?;
let mut appender = conn.appender("events")?;
let mut first_event_time = OffsetDateTime::now_utc();
for event in events {
appender.append_row(event_params![event])?;
if first_event_time > event.created_at {
first_event_time = event.created_at;
}
}
appender.flush()?;
update_event_times(&conn, first_event_time)?;
Ok(())
}

Expand All @@ -69,15 +74,26 @@ impl LiwanEvents {
Ok(event) => {
let conn = self.duckdb.get()?;
let mut appender = conn.appender("events")?;
let mut first_event_time = event.created_at;
appender.append_row(event_params![event])?;

// Non-blockingly drain the remaining events in the queue if there are any
let mut count = 1;
for event in events.try_iter() {
appender.append_row(event_params![event])?;
count += 1;

if first_event_time > event.created_at {
first_event_time = event.created_at;
}

// always flush after 5000 events
if count >= 5000 {
break;
}
}
appender.flush()?;
update_event_times(&conn, first_event_time)?;
tracing::debug!("Processed {} events", count);

// Sleep to allow more events to be received before the next batch
Expand All @@ -88,3 +104,38 @@ impl LiwanEvents {
}
}
}

use duckdb::{params, Connection, Result as DuckResult};

pub fn update_event_times(conn: &Connection, from_time: OffsetDateTime) -> DuckResult<()> {
// this can probably be simplified, sadly the where clause can't contain window functions
let sql = "--sql
with
filtered_events as (
select *
from events
where created_at >= ?::timestamp or visitor_id in (
select visitor_id
from events
where created_at >= now()::timestamp - interval '24 hours' and created_at < ?::timestamp and time_to_next_event is null
)
),
cte as (
select
visitor_id,
created_at,
created_at - lag(created_at) over (partition by visitor_id order by created_at) as time_from_last_event,
lead(created_at) over (partition by visitor_id order by created_at) - created_at as time_to_next_event
from filtered_events
)
update events
set
time_from_last_event = cte.time_from_last_event,
time_to_next_event = cte.time_to_next_event
from cte
where events.visitor_id = cte.visitor_id and events.created_at = cte.created_at;
";

conn.execute(sql, params![&from_time, &from_time])?;
Ok(())
}
51 changes: 25 additions & 26 deletions src/app/core/reports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,18 +192,18 @@ fn metric_sql(metric: Metric) -> String {
}
Metric::BounceRate => {
// total sessions: no time_to_next_event / time_to_next_event is null
// bounce sessions: time to next / time to prev are both null or both > 1800
// bounce sessions: time to next / time to prev are both null or both > interval '30 minutes'
"--sql
count(distinct sd.visitor_id)
filter (where (sd.time_until_next_event is null or sd.time_until_next_event > 1800) and
(sd.time_since_previous_event is null or sd.time_since_previous_event > 1800)) /
count(distinct sd.visitor_id) filter (where sd.time_until_next_event is null or sd.time_until_next_event > 1800)
filter (where (sd.time_to_next_event is null or sd.time_to_next_event > interval '30 minutes') and
(sd.time_from_last_event is null or sd.time_from_last_event > interval '30 minutes')) /
count(distinct sd.visitor_id) filter (where sd.time_to_next_event is null or sd.time_to_next_event > interval '30 minutes')
"
}
Metric::AvgTimeOnSite => {
// avg time_until_next_event where time_until_next_event <= 1800 and time_until_next_event is not null
// avg time_to_next_event where time_to_next_event <= 1800 and time_to_next_event is not null
"--sql
coalesce(avg(sd.time_until_next_event) filter (where sd.time_until_next_event is not null and sd.time_until_next_event <= 1800), 0)"
coalesce(avg(extract(epoch from sd.time_to_next_event)) filter (where sd.time_to_next_event is not null and sd.time_to_next_event <= interval '30 minutes'), 0)"
}
}
.to_owned()
Expand Down Expand Up @@ -278,7 +278,8 @@ pub fn overall_report(
params.extend_from_params(filters_params);
params.push(range.end);

let query = format!("--sql
let query = format!(
"--sql
with
params as (
select
Expand All @@ -296,10 +297,8 @@ pub fn overall_report(
select
visitor_id,
created_at,
-- the time to the next event for the same visitor
extract(epoch from (lead(created_at) over (partition by visitor_id order by created_at) - created_at)) as time_until_next_event,
-- the time to the previous event for the same visitor
extract(epoch from (created_at - lag(created_at) over (partition by visitor_id order by created_at))) as time_since_previous_event
time_from_last_event,
time_to_next_event,
from events, params
where
event = ?::text and
Expand All @@ -326,7 +325,8 @@ pub fn overall_report(
left join event_bins eb on tb.bin_start = eb.bin_start
order by
tb.bin_start;
");
"
);

let mut stmt = conn.prepare_cached(&query)?;

Expand Down Expand Up @@ -371,21 +371,20 @@ pub fn overall_stats(
params.extend(entities);
params.extend_from_params(filters_params);

let query = format!("--sql
let query = format!(
"--sql
with
params as (
select
?::timestamp as start_time,
?::timestamp as end_time
?::timestamp as end_time,
),
session_data as (
select
visitor_id,
created_at,
-- the time to the next event for the same visitor
extract(epoch from (lead(created_at) over (partition by visitor_id order by created_at) - created_at)) as time_until_next_event,
-- the time to the previous event for the same visitor
extract(epoch from (created_at - lag(created_at) over (partition by visitor_id order by created_at))) as time_since_previous_event
time_from_last_event,
time_to_next_event,
from events, params
where
event = ?::text and
Expand All @@ -400,7 +399,8 @@ pub fn overall_stats(
{metric_avg_time_on_site} as avg_time_on_site
from
session_data sd;
");
"
);

let mut stmt = conn.prepare_cached(&query)?;
let result = stmt.query_row(duckdb::params_from_iter(params), |row| {
Expand Down Expand Up @@ -456,30 +456,29 @@ pub fn dimension_report(
params.extend(entities);
params.extend_from_params(filters_params);

let query = format!("--sql
let query = format!(
"--sql
with
params as (
select
?::timestamp as start_time,
?::timestamp as end_time
?::timestamp as end_time,
),
session_data as (
select
coalesce({dimension_column}, 'Unknown') as dimension_value,
visitor_id,
created_at,
-- the time to the next event for the same visitor
extract(epoch from (lead(created_at) over (partition by visitor_id order by created_at) - created_at)) as time_until_next_event,
-- the time to the previous event for the same visitor
extract(epoch from (created_at - lag(created_at) over (partition by visitor_id order by created_at))) as time_since_previous_event
time_from_last_event,
time_to_next_event,
from events sd, params
where
event = ?::text and
created_at between params.start_time and params.end_time and
entity_id in ({entity_vars})
{filters_sql}
group by
{group_by_columns}, visitor_id, created_at
{group_by_columns}, visitor_id, created_at, time_from_last_event, time_to_next_event
)
select
dimension_value,
Expand Down
4 changes: 3 additions & 1 deletion src/app/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ macro_rules! event_params {
$event.utm_medium,
$event.utm_campaign,
$event.utm_content,
$event.utm_term
$event.utm_term,
None::<std::time::Duration>,
None::<std::time::Duration>,
]
};
}
Expand Down
2 changes: 1 addition & 1 deletion src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ pub fn handle_command(mut config: Config, cmd: Command) -> Result<()> {
#[cfg(any(debug_assertions, test, feature = "_enable_seeding"))]
Command::SeedDatabase(_) => {
let app = Liwan::try_new(config)?;
app.seed_database(100000)?;
app.seed_database(1000000)?;
println!("Database seeded with test data");
}
}
Expand Down
17 changes: 17 additions & 0 deletions src/migrations/events/V4__session_intervals.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
alter table events add column time_from_last_event interval;
alter table events add column time_to_next_event interval;

with cte as (
select
visitor_id,
created_at,
created_at - lag(created_at) over (partition by visitor_id order by created_at) as time_from_last_event,
lead(created_at) over (partition by visitor_id order by created_at) - created_at as time_to_next_event
from events
)
update events
set
time_from_last_event = cte.time_from_last_event,
time_to_next_event = cte.time_to_next_event
from cte
where events.visitor_id = cte.visitor_id and events.created_at = cte.created_at;
30 changes: 15 additions & 15 deletions src/utils/seed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ pub fn random_events(
}
generated += 1;

let created_at = random_date(time_range.0, time_range.1, 0.5);
// let created_at = random_date(time_range.0, time_range.1, 0.5);

// let time_slice = time_range.1 - time_range.0;
// let skew_factor = 2.0;
// let normalized = 1.0 - (1.0 - (generated as f64 / count as f64)).powf(skew_factor);
// let created_at = time_range.0 + time_slice * normalized;
let time_slice = time_range.1 - time_range.0;
let skew_factor = 2.0;
let normalized = 1.0 - (1.0 - (generated as f64 / count as f64)).powf(skew_factor);
let created_at = time_range.0 + time_slice * normalized;

let path = random_el(PATHS, 0.5);
let referrer = random_el(REFERRERS, 0.5);
Expand Down Expand Up @@ -78,16 +78,16 @@ pub fn random_events(
})
}

fn random_date(min: OffsetDateTime, max: OffsetDateTime, scale: f64) -> OffsetDateTime {
let mut rng = rand::thread_rng();
let uniform_random: f64 = rng.gen();
let weighted_random = (uniform_random.powf(1.0 - scale)).min(1.0);
let duration = max - min;
let duration_seconds = duration.as_seconds_f64();
let weighted_duration_seconds = duration_seconds * weighted_random;
let weighted_duration = time::Duration::seconds(weighted_duration_seconds as i64);
min + weighted_duration
}
// fn random_date(min: OffsetDateTime, max: OffsetDateTime, scale: f64) -> OffsetDateTime {
// let mut rng = rand::thread_rng();
// let uniform_random: f64 = rng.gen();
// let weighted_random = (uniform_random.powf(1.0 - scale)).min(1.0);
// let duration = max - min;
// let duration_seconds = duration.as_seconds_f64();
// let weighted_duration_seconds = duration_seconds * weighted_random;
// let weighted_duration = time::Duration::seconds(weighted_duration_seconds as i64);
// min + weighted_duration
// }

fn random_el<T>(slice: &[T], scale: f64) -> &T {
let mut rng = rand::thread_rng();
Expand Down
Loading

0 comments on commit 95d95d0

Please sign in to comment.