diff --git a/src/app/core/events.rs b/src/app/core/events.rs index 78ac90b..c956a61 100644 --- a/src/app/core/events.rs +++ b/src/app/core/events.rs @@ -27,7 +27,7 @@ impl LiwanEvents { } /// Get the daily salt, generating a new one if the current one is older than 24 hours - pub async fn get_salt(&self) -> Result { + pub fn get_salt(&self) -> Result { let (salt, updated_at) = { let salt = self.daily_salt.read().map_err(|_| eyre::eyre!("Failed to acquire read lock"))?; salt.clone() @@ -55,10 +55,15 @@ impl LiwanEvents { pub fn append(&self, events: impl Iterator) -> Result<()> { let conn = self.duckdb.get()?; let mut appender = conn.appender("events")?; + let mut first_event_time = OffsetDateTime::now_utc(); for event in events { appender.append_row(event_params![event])?; + if first_event_time > event.created_at { + first_event_time = event.created_at; + } } appender.flush()?; + update_event_times(&conn, first_event_time)?; Ok(()) } @@ -69,6 +74,7 @@ impl LiwanEvents { Ok(event) => { let conn = self.duckdb.get()?; let mut appender = conn.appender("events")?; + let mut first_event_time = event.created_at; appender.append_row(event_params![event])?; // Non-blockingly drain the remaining events in the queue if there are any @@ -76,8 +82,18 @@ impl LiwanEvents { for event in events.try_iter() { appender.append_row(event_params![event])?; count += 1; + + if first_event_time > event.created_at { + first_event_time = event.created_at; + } + + // always flush after 5000 events + if count >= 5000 { + break; + } } appender.flush()?; + update_event_times(&conn, first_event_time)?; tracing::debug!("Processed {} events", count); // Sleep to allow more events to be received before the next batch @@ -88,3 +104,38 @@ impl LiwanEvents { } } } + +use duckdb::{params, Connection, Result as DuckResult}; + +pub fn update_event_times(conn: &Connection, from_time: OffsetDateTime) -> DuckResult<()> { + // this can probably be simplified, sadly the where clause can't contain window functions + let sql = "--sql + with + filtered_events as ( + select * + from events + where created_at >= ?::timestamp or visitor_id in ( + select visitor_id + from events + where created_at >= now()::timestamp - interval '24 hours' and created_at < ?::timestamp and time_to_next_event is null + ) + ), + cte as ( + select + visitor_id, + created_at, + created_at - lag(created_at) over (partition by visitor_id order by created_at) as time_from_last_event, + lead(created_at) over (partition by visitor_id order by created_at) - created_at as time_to_next_event + from filtered_events + ) + update events + set + time_from_last_event = cte.time_from_last_event, + time_to_next_event = cte.time_to_next_event + from cte + where events.visitor_id = cte.visitor_id and events.created_at = cte.created_at; + "; + + conn.execute(sql, params![&from_time, &from_time])?; + Ok(()) +} diff --git a/src/app/core/reports.rs b/src/app/core/reports.rs index c69df0e..5ebb35d 100644 --- a/src/app/core/reports.rs +++ b/src/app/core/reports.rs @@ -192,18 +192,18 @@ fn metric_sql(metric: Metric) -> String { } Metric::BounceRate => { // total sessions: no time_to_next_event / time_to_next_event is null - // bounce sessions: time to next / time to prev are both null or both > 1800 + // bounce sessions: time to next / time to prev are both null or both > interval '30 minutes' "--sql count(distinct sd.visitor_id) - filter (where (sd.time_until_next_event is null or sd.time_until_next_event > 1800) and - (sd.time_since_previous_event is null or sd.time_since_previous_event > 1800)) / - count(distinct sd.visitor_id) filter (where sd.time_until_next_event is null or sd.time_until_next_event > 1800) + filter (where (sd.time_to_next_event is null or sd.time_to_next_event > interval '30 minutes') and + (sd.time_from_last_event is null or sd.time_from_last_event > interval '30 minutes')) / + count(distinct sd.visitor_id) filter (where sd.time_to_next_event is null or sd.time_to_next_event > interval '30 minutes') " } Metric::AvgTimeOnSite => { - // avg time_until_next_event where time_until_next_event <= 1800 and time_until_next_event is not null + // avg time_to_next_event where time_to_next_event <= 1800 and time_to_next_event is not null "--sql - coalesce(avg(sd.time_until_next_event) filter (where sd.time_until_next_event is not null and sd.time_until_next_event <= 1800), 0)" + coalesce(avg(extract(epoch from sd.time_to_next_event)) filter (where sd.time_to_next_event is not null and sd.time_to_next_event <= interval '30 minutes'), 0)" } } .to_owned() @@ -278,7 +278,8 @@ pub fn overall_report( params.extend_from_params(filters_params); params.push(range.end); - let query = format!("--sql + let query = format!( + "--sql with params as ( select @@ -296,10 +297,8 @@ pub fn overall_report( select visitor_id, created_at, - -- the time to the next event for the same visitor - extract(epoch from (lead(created_at) over (partition by visitor_id order by created_at) - created_at)) as time_until_next_event, - -- the time to the previous event for the same visitor - extract(epoch from (created_at - lag(created_at) over (partition by visitor_id order by created_at))) as time_since_previous_event + time_from_last_event, + time_to_next_event, from events, params where event = ?::text and @@ -326,7 +325,8 @@ pub fn overall_report( left join event_bins eb on tb.bin_start = eb.bin_start order by tb.bin_start; - "); + " + ); let mut stmt = conn.prepare_cached(&query)?; @@ -371,21 +371,20 @@ pub fn overall_stats( params.extend(entities); params.extend_from_params(filters_params); - let query = format!("--sql + let query = format!( + "--sql with params as ( select ?::timestamp as start_time, - ?::timestamp as end_time + ?::timestamp as end_time, ), session_data as ( select visitor_id, created_at, - -- the time to the next event for the same visitor - extract(epoch from (lead(created_at) over (partition by visitor_id order by created_at) - created_at)) as time_until_next_event, - -- the time to the previous event for the same visitor - extract(epoch from (created_at - lag(created_at) over (partition by visitor_id order by created_at))) as time_since_previous_event + time_from_last_event, + time_to_next_event, from events, params where event = ?::text and @@ -400,7 +399,8 @@ pub fn overall_stats( {metric_avg_time_on_site} as avg_time_on_site from session_data sd; - "); + " + ); let mut stmt = conn.prepare_cached(&query)?; let result = stmt.query_row(duckdb::params_from_iter(params), |row| { @@ -456,22 +456,21 @@ pub fn dimension_report( params.extend(entities); params.extend_from_params(filters_params); - let query = format!("--sql + let query = format!( + "--sql with params as ( select ?::timestamp as start_time, - ?::timestamp as end_time + ?::timestamp as end_time, ), session_data as ( select coalesce({dimension_column}, 'Unknown') as dimension_value, visitor_id, created_at, - -- the time to the next event for the same visitor - extract(epoch from (lead(created_at) over (partition by visitor_id order by created_at) - created_at)) as time_until_next_event, - -- the time to the previous event for the same visitor - extract(epoch from (created_at - lag(created_at) over (partition by visitor_id order by created_at))) as time_since_previous_event + time_from_last_event, + time_to_next_event, from events sd, params where event = ?::text and @@ -479,7 +478,7 @@ pub fn dimension_report( entity_id in ({entity_vars}) {filters_sql} group by - {group_by_columns}, visitor_id, created_at + {group_by_columns}, visitor_id, created_at, time_from_last_event, time_to_next_event ) select dimension_value, diff --git a/src/app/models.rs b/src/app/models.rs index 6cb7c40..49d96f0 100644 --- a/src/app/models.rs +++ b/src/app/models.rs @@ -92,7 +92,9 @@ macro_rules! event_params { $event.utm_medium, $event.utm_campaign, $event.utm_content, - $event.utm_term + $event.utm_term, + None::, + None::, ] }; } diff --git a/src/cli.rs b/src/cli.rs index 8a01011..25e2662 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -133,7 +133,7 @@ pub fn handle_command(mut config: Config, cmd: Command) -> Result<()> { #[cfg(any(debug_assertions, test, feature = "_enable_seeding"))] Command::SeedDatabase(_) => { let app = Liwan::try_new(config)?; - app.seed_database(100000)?; + app.seed_database(1000000)?; println!("Database seeded with test data"); } } diff --git a/src/migrations/events/V4__session_intervals.sql b/src/migrations/events/V4__session_intervals.sql new file mode 100644 index 0000000..86a3b95 --- /dev/null +++ b/src/migrations/events/V4__session_intervals.sql @@ -0,0 +1,17 @@ +alter table events add column time_from_last_event interval; +alter table events add column time_to_next_event interval; + +with cte as ( + select + visitor_id, + created_at, + created_at - lag(created_at) over (partition by visitor_id order by created_at) as time_from_last_event, + lead(created_at) over (partition by visitor_id order by created_at) - created_at as time_to_next_event + from events +) +update events +set + time_from_last_event = cte.time_from_last_event, + time_to_next_event = cte.time_to_next_event +from cte +where events.visitor_id = cte.visitor_id and events.created_at = cte.created_at; \ No newline at end of file diff --git a/src/utils/seed.rs b/src/utils/seed.rs index 8283e48..fa4c7ec 100644 --- a/src/utils/seed.rs +++ b/src/utils/seed.rs @@ -42,12 +42,12 @@ pub fn random_events( } generated += 1; - let created_at = random_date(time_range.0, time_range.1, 0.5); + // let created_at = random_date(time_range.0, time_range.1, 0.5); - // let time_slice = time_range.1 - time_range.0; - // let skew_factor = 2.0; - // let normalized = 1.0 - (1.0 - (generated as f64 / count as f64)).powf(skew_factor); - // let created_at = time_range.0 + time_slice * normalized; + let time_slice = time_range.1 - time_range.0; + let skew_factor = 2.0; + let normalized = 1.0 - (1.0 - (generated as f64 / count as f64)).powf(skew_factor); + let created_at = time_range.0 + time_slice * normalized; let path = random_el(PATHS, 0.5); let referrer = random_el(REFERRERS, 0.5); @@ -78,16 +78,16 @@ pub fn random_events( }) } -fn random_date(min: OffsetDateTime, max: OffsetDateTime, scale: f64) -> OffsetDateTime { - let mut rng = rand::thread_rng(); - let uniform_random: f64 = rng.gen(); - let weighted_random = (uniform_random.powf(1.0 - scale)).min(1.0); - let duration = max - min; - let duration_seconds = duration.as_seconds_f64(); - let weighted_duration_seconds = duration_seconds * weighted_random; - let weighted_duration = time::Duration::seconds(weighted_duration_seconds as i64); - min + weighted_duration -} +// fn random_date(min: OffsetDateTime, max: OffsetDateTime, scale: f64) -> OffsetDateTime { +// let mut rng = rand::thread_rng(); +// let uniform_random: f64 = rng.gen(); +// let weighted_random = (uniform_random.powf(1.0 - scale)).min(1.0); +// let duration = max - min; +// let duration_seconds = duration.as_seconds_f64(); +// let weighted_duration_seconds = duration_seconds * weighted_random; +// let weighted_duration = time::Duration::seconds(weighted_duration_seconds as i64); +// min + weighted_duration +// } fn random_el(slice: &[T], scale: f64) -> &T { let mut rng = rand::thread_rng(); diff --git a/src/web/routes/event.rs b/src/web/routes/event.rs index 53580e5..c72ca1b 100644 --- a/src/web/routes/event.rs +++ b/src/web/routes/event.rs @@ -6,12 +6,14 @@ use crate::web::webext::{ApiResult, EmptyResponse, PoemErrExt}; use cached::{Cached, TimedCache}; use crossbeam::channel::Sender; -use eyre::Context; +use eyre::{Context, Result}; use poem::http::{StatusCode, Uri}; +use poem::web::headers::UserAgent; use poem::web::{headers, Data, RealIp, TypedHeader}; use poem_openapi::payload::Json; use poem_openapi::{Object, OpenApi}; use std::cell::RefCell; +use std::net::IpAddr; use std::str::FromStr; use time::OffsetDateTime; @@ -50,68 +52,87 @@ impl EventApi { Json(event): Json, TypedHeader(user_agent): TypedHeader, ) -> ApiResult { - let client = useragent::parse(user_agent.as_str()); - if useragent::is_bot(&client) { - return EmptyResponse::ok(); - } - - let referrer = match process_referer(event.referrer.as_deref()) { - Referrer::Fqdn(fqdn) => Some(fqdn), - Referrer::Unknown(r) => r, - Referrer::Spammer => return EmptyResponse::ok(), - }; - - let referrer = referrer.map(|r| r.trim_start_matches("www.").to_string()); // remove www. prefix - let referrer = referrer.filter(|r| r.trim().len() > 3); // ignore empty or short referrers + let url = Uri::from_str(&event.url).wrap_err("invalid url").http_err("invalid url", StatusCode::BAD_REQUEST)?; + let app = app.clone(); + let events = events.clone(); - if !EXISTING_ENTITIES.with(|cache| cache.borrow_mut().cache_get(&event.entity_id).is_some()) { - if !app.entities.exists(&event.entity_id).http_status(StatusCode::INTERNAL_SERVER_ERROR)? { - return EmptyResponse::ok(); + // run the event processing in the background + let _ = tokio::task::spawn_blocking(move || { + if let Err(e) = process_event(app, events, event, url, ip, user_agent) { + tracing::error!("Failed to process event: {:?}", e); } - EXISTING_ENTITIES - .with(|cache| cache.borrow_mut().cache_set(event.entity_id.clone(), event.entity_id.clone())); - } + }); - let url = Uri::from_str(&event.url).wrap_err("invalid url").http_err("invalid url", StatusCode::BAD_REQUEST)?; - let daily_salt = app.events.get_salt().await.http_status(StatusCode::INTERNAL_SERVER_ERROR)?; - let visitor_id = match ip { - Some(ip) => hash_ip(&ip, user_agent.as_str(), &daily_salt, &event.entity_id), - None => visitor_id(), - }; - - let (country, city) = match (&app.geoip, ip) { - (Some(geoip), Some(ip)) => match geoip.lookup(&ip) { - Ok(lookup) => (lookup.country_code, lookup.city), - Err(_) => (None, None), - }, - _ => (None, None), - }; - - let path = url.path().to_string(); - let path = if path.len() > 1 && path.ends_with('/') { path.trim_end_matches('/').to_string() } else { path }; - let fqdn = url.host().unwrap_or_default().to_string(); - - let event = Event { - visitor_id, - referrer, - country, - city, - browser: client.user_agent.family.to_string().into(), - created_at: OffsetDateTime::now_utc(), - entity_id: event.entity_id, - event: event.name, - fqdn: fqdn.into(), - path: path.into(), - mobile: Some(useragent::is_mobile(&client)), - platform: client.os.family.to_string().into(), - utm_campaign: event.utm.as_ref().and_then(|u| u.campaign.clone()), - utm_content: event.utm.as_ref().and_then(|u| u.content.clone()), - utm_medium: event.utm.as_ref().and_then(|u| u.medium.clone()), - utm_source: event.utm.as_ref().and_then(|u| u.source.clone()), - utm_term: event.utm.as_ref().and_then(|u| u.term.clone()), - }; - - let _ = events.try_send(event); EmptyResponse::ok() } } + +fn process_event( + app: Liwan, + events: Sender, + event: EventRequest, + url: Uri, + ip: Option, + user_agent: UserAgent, +) -> Result<()> { + let client = useragent::parse(user_agent.as_str()); + if useragent::is_bot(&client) { + return Ok(()); + } + + let referrer = match process_referer(event.referrer.as_deref()) { + Referrer::Fqdn(fqdn) => Some(fqdn), + Referrer::Unknown(r) => r, + Referrer::Spammer => return Ok(()), + }; + + let referrer = referrer.map(|r| r.trim_start_matches("www.").to_string()); // remove www. prefix + let referrer = referrer.filter(|r| r.trim().len() > 3); // ignore empty or short referrers + + if !EXISTING_ENTITIES.with(|cache| cache.borrow_mut().cache_get(&event.entity_id).is_some()) { + if !app.entities.exists(&event.entity_id).unwrap_or(false) { + return Ok(()); + } + EXISTING_ENTITIES.with(|cache| cache.borrow_mut().cache_set(event.entity_id.clone(), event.entity_id.clone())); + } + + let visitor_id = match ip { + Some(ip) => hash_ip(&ip, user_agent.as_str(), &app.events.get_salt()?, &event.entity_id), + None => visitor_id(), + }; + + let (country, city) = match (&app.geoip, ip) { + (Some(geoip), Some(ip)) => match geoip.lookup(&ip) { + Ok(lookup) => (lookup.country_code, lookup.city), + Err(_) => (None, None), + }, + _ => (None, None), + }; + + let path = url.path().to_string(); + let path = if path.len() > 1 && path.ends_with('/') { path.trim_end_matches('/').to_string() } else { path }; + let fqdn = url.host().unwrap_or_default().to_string(); + + let event = Event { + visitor_id, + referrer, + country, + city, + browser: client.user_agent.family.to_string().into(), + created_at: OffsetDateTime::now_utc(), + entity_id: event.entity_id, + event: event.name, + fqdn: fqdn.into(), + path: path.into(), + mobile: Some(useragent::is_mobile(&client)), + platform: client.os.family.to_string().into(), + utm_campaign: event.utm.as_ref().and_then(|u| u.campaign.clone()), + utm_content: event.utm.as_ref().and_then(|u| u.content.clone()), + utm_medium: event.utm.as_ref().and_then(|u| u.medium.clone()), + utm_source: event.utm.as_ref().and_then(|u| u.source.clone()), + utm_term: event.utm.as_ref().and_then(|u| u.term.clone()), + }; + + let _ = events.try_send(event); + Ok(()) +} diff --git a/web/src/api/client.ts b/web/src/api/client.ts new file mode 100644 index 0000000..75ce064 --- /dev/null +++ b/web/src/api/client.ts @@ -0,0 +1,17 @@ +import { createClient } from "fets"; +import type { DashboardSpec } from "./types"; + +export const api = createClient({ + globalParams: { credentials: "same-origin" }, + fetchFn(input, init) { + return fetch(input, init).then((res) => { + if (!res.ok) { + return res + .json() + .catch((_) => Promise.reject({ status: res.status, message: res.statusText })) + .then((body) => Promise.reject({ status: res.status, message: body?.message ?? res.statusText })); + } + return res; + }); + }, +}); diff --git a/web/src/api/index.ts b/web/src/api/index.ts index 21bfc66..e67ba3e 100644 --- a/web/src/api/index.ts +++ b/web/src/api/index.ts @@ -1,22 +1,5 @@ -import { createClient } from "fets"; -import type { DashboardSpec } from "./types"; - export * from "./query"; export * from "./constants"; export * from "./types"; export * from "./hooks"; - -export const api = createClient({ - globalParams: { credentials: "same-origin" }, - fetchFn(input, init) { - return fetch(input, init).then((res) => { - if (!res.ok) { - return res - .json() - .catch((_) => Promise.reject({ status: res.status, message: res.statusText })) - .then((body) => Promise.reject({ status: res.status, message: body?.message ?? res.statusText })); - } - return res; - }); - }, -}); +export * from "./client"; diff --git a/web/src/components/project/range.tsx b/web/src/components/project/range.tsx index 75dab43..eefc4e9 100644 --- a/web/src/components/project/range.tsx +++ b/web/src/components/project/range.tsx @@ -38,6 +38,7 @@ export const SelectRange = ({ const range = new DateRange({ start: startOfDay(from), end: endOfDay(new Date()) }); range.variant = "allTime"; onSelect(range); + if (detailsRef.current) detailsRef.current.open = false; }; return ( diff --git a/web/src/pages/login.astro b/web/src/pages/login.astro index e400420..961daab 100644 --- a/web/src/pages/login.astro +++ b/web/src/pages/login.astro @@ -38,7 +38,7 @@ import Layout from "../layouts/Base.astro";