Skip to content

Commit

Permalink
Spam filter config adjustments
Browse files Browse the repository at this point in the history
  • Loading branch information
mdecimus committed Dec 31, 2024
1 parent 30cf967 commit 8eda225
Show file tree
Hide file tree
Showing 19 changed files with 303 additions and 230 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ biscuit = "0.7.0"
rsa = "0.9.2"
p256 = { version = "0.13", features = ["ecdh"] }
p384 = { version = "0.13", features = ["ecdh"] }
num_cpus = "1.13.1"

[target.'cfg(unix)'.dependencies]
privdrop = "0.5.3"
Expand Down
14 changes: 7 additions & 7 deletions crates/common/src/config/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ impl Data {

// Parse capacities
let shard_amount = config
.property::<u64>("cache.shard")
.unwrap_or(32)
.property::<u64>("limiter.shard")
.unwrap_or_else(|| (num_cpus::get() * 2) as u64)
.next_power_of_two() as usize;
let capacity = config.property("cache.capacity").unwrap_or(100);
let capacity = config.property("limiter.capacity").unwrap_or(100);

// Parse id generator
let id_generator = config
Expand Down Expand Up @@ -113,7 +113,7 @@ impl Caches {
Caches {
access_tokens: CacheWithTtl::from_config(
config,
"access-tokens",
"access-token",
MB_10,
(std::mem::size_of::<AccessToken>() + 255) as u64,
),
Expand All @@ -125,8 +125,8 @@ impl Caches {
),
permissions: Cache::from_config(
config,
"permissions",
MB_10,
"permission",
MB_5,
std::mem::size_of::<RolePermissions>() as u64,
),
permissions_version: 0.into(),
Expand All @@ -149,7 +149,7 @@ impl Caches {
),
threads: Cache::from_config(
config,
"threads",
"thread",
MB_10,
(std::mem::size_of::<Threads>() + (500 * std::mem::size_of::<u64>())) as u64,
),
Expand Down
57 changes: 18 additions & 39 deletions crates/common/src/config/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ pub enum AsnGeoLookupConfig {
expires: Duration,
timeout: Duration,
max_size: usize,
resources: Vec<AsnGeoLookupResource>,
headers: HeaderMap,
asn_resources: Vec<String>,
geo_resources: Vec<String>,
},
Dns {
zone_ipv4: String,
Expand All @@ -53,12 +55,6 @@ pub enum AsnGeoLookupConfig {
Disabled,
}

#[derive(Clone)]
pub enum AsnGeoLookupResource {
Asn { url: String, headers: HeaderMap },
Geo { url: String, headers: HeaderMap },
}

#[derive(Clone)]
pub struct FieldOrDefault {
pub field: Option<String>,
Expand Down Expand Up @@ -196,50 +192,33 @@ impl AsnGeoLookupConfig {
}
.into(),
"resource" => {
let mut resources = vec![];

for id in config
.sub_keys("server.asn.resource", ".url")
.map(|k| k.to_string())
.collect::<Vec<_>>()
{
let id = id.as_str();
let url = config
.value_require_non_empty(("server.asn.resource", id, "url"))?
.to_string();
let headers = parse_http_headers(config, ("server.asn.resource", id));

resources.push(
match config.value_require(("server.asn.resource", id, "type"))? {
"asn" => AsnGeoLookupResource::Asn { url, headers },
"geo" => AsnGeoLookupResource::Geo { url, headers },
_ => {
config.new_build_error(
("server.asn.resource", id),
"Invalid resource",
);
continue;
}
},
);
}

if resources.is_empty() {
config.new_build_error("server.asn.resource", "No resources found");
let asn_resources = config
.values("server.asn.urls.asn")
.map(|(_, v)| v.to_string())
.collect::<Vec<_>>();
let geo_resources = config
.values("server.asn.urls.geo")
.map(|(_, v)| v.to_string())
.collect::<Vec<_>>();

if asn_resources.is_empty() && geo_resources.is_empty() {
config.new_build_error("server.asn.urls", "No resources found");
return None;
}

AsnGeoLookupConfig::Resource {
headers: parse_http_headers(config, "server.asn"),
expires: config.property_or_default::<Duration>("server.asn.expires", "1d")?,
timeout: config.property_or_default::<Duration>("server.asn.timeout", "5m")?,
max_size: config
.property("server.asn.max-size")
.unwrap_or(100 * 1024 * 1024),
resources,
asn_resources,
geo_resources,
}
.into()
}
"disabled" | "none" | "false" => AsnGeoLookupConfig::Disabled.into(),
"disable" | "disabled" | "none" | "false" => AsnGeoLookupConfig::Disabled.into(),
_ => {
config.new_build_error("server.asn.type", "Invalid value");
None
Expand Down
2 changes: 1 addition & 1 deletion crates/common/src/config/smtp/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ impl Default for SessionConfig {
is_allowed: IfBlock::new::<()>(
"session.mail.is-allowed",
[],
"!is_empty(authenticated_as) || !key_exists('spam-block', sender_domain)",
"!is_empty(authenticated_as) || !key_exists('blocked-domains', sender_domain)",
),
},
rcpt: Rcpt {
Expand Down
63 changes: 60 additions & 3 deletions crates/common/src/config/spamfilter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,22 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/

use std::{net::SocketAddr, time::Duration};
use std::{
net::{IpAddr, SocketAddr},
time::Duration,
};

use ahash::AHashSet;
use mail_auth::common::resolver::ToReverseName;
use nlp::bayes::BayesClassifier;
use tokio::net::lookup_host;
use utils::{
cache::CacheItemWeight,
config::{utils::ParseValue, Config},
glob::GlobMap,
};

use super::{if_block::IfBlock, tokenizer::TokenMap};
use super::{functions::ResolveVariable, if_block::IfBlock, tokenizer::TokenMap, Variable};

#[derive(Debug, Clone, Default)]
pub struct SpamFilterConfig {
Expand Down Expand Up @@ -545,7 +550,7 @@ impl BayesConfig {
.property_or_default("spam-filter.bayes.auto-learn.threshold.spam", "6.0")
.unwrap_or(6.0),
auto_learn_ham_threshold: config
.property_or_default("spam-filter.bayes.auto-learn.threshold.ham", "-2.0")
.property_or_default("spam-filter.bayes.auto-learn.threshold.ham", "-1.0")
.unwrap_or(-2.0),
score_spam: config
.property_or_default("spam-filter.bayes.score.spam", "0.7")
Expand Down Expand Up @@ -809,3 +814,55 @@ impl Element {
}
}
}

pub struct IpResolver {
ip: IpAddr,
ip_string: String,
reverse: String,
octets: Variable<'static>,
}

impl ResolveVariable for IpResolver {
fn resolve_variable(&self, variable: u32) -> Variable<'_> {
match variable {
V_IP => Variable::String(self.ip_string.as_str().into()),
V_IP_REVERSE => Variable::String(self.reverse.as_str().into()),
V_IP_OCTETS => self.octets.clone(),
V_IP_IS_V4 => Variable::Integer(self.ip.is_ipv4() as _),
V_IP_IS_V6 => Variable::Integer(self.ip.is_ipv6() as _),
_ => Variable::Integer(0),
}
}

fn resolve_global(&self, _: &str) -> Variable<'_> {
Variable::Integer(0)
}
}

impl IpResolver {
pub fn new(ip: IpAddr) -> Self {
Self {
ip_string: ip.to_string(),
reverse: ip.to_reverse_name(),
octets: Variable::Array(match ip {
IpAddr::V4(ipv4_addr) => ipv4_addr
.octets()
.iter()
.map(|o| Variable::Integer(*o as _))
.collect(),
IpAddr::V6(ipv6_addr) => ipv6_addr
.octets()
.iter()
.map(|o| Variable::Integer(*o as _))
.collect(),
}),
ip,
}
}
}

impl CacheItemWeight for IpResolver {
fn weight(&self) -> u64 {
(std::mem::size_of::<IpResolver>() + self.ip_string.len() + self.reverse.len()) as u64
}
}
4 changes: 2 additions & 2 deletions crates/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use config::{
resolver::{Policy, Tlsa},
SmtpConfig,
},
spamfilter::SpamFilterConfig,
spamfilter::{IpResolver, SpamFilterConfig},
storage::Storage,
telemetry::Metrics,
};
Expand Down Expand Up @@ -152,7 +152,7 @@ pub struct Caches {
pub dns_ipv6: CacheWithTtl<String, Arc<Vec<Ipv6Addr>>>,
pub dns_tlsa: CacheWithTtl<String, Arc<Tlsa>>,
pub dbs_mta_sts: CacheWithTtl<String, Arc<Policy>>,
pub dns_rbl: CacheWithTtl<String, Option<Arc<Vec<Ipv4Addr>>>>,
pub dns_rbl: CacheWithTtl<String, Option<Arc<IpResolver>>>,
}

pub struct Ipc {
Expand Down
21 changes: 9 additions & 12 deletions crates/common/src/listener/asn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@ use mail_auth::common::resolver::ToReverseName;
use store::write::now;
use tokio::sync::Semaphore;

use crate::{
config::network::{AsnGeoLookupConfig, AsnGeoLookupResource},
manager::fetch_resource,
Server,
};
use crate::{config::network::AsnGeoLookupConfig, manager::fetch_resource, Server};

pub struct AsnGeoLookupData {
pub lock: Semaphore,
Expand Down Expand Up @@ -153,18 +149,19 @@ impl Server {
expires,
timeout,
max_size,
resources,
asn_resources,
geo_resources,
headers,
} = &server.core.network.asn_geo_lookup
{
let mut asn_data = Data::new();
let mut country_data = Data::new();

for lookup in resources {
let (url, headers, is_asn) = match lookup {
AsnGeoLookupResource::Asn { url, headers } => (url, headers, true),
AsnGeoLookupResource::Geo { url, headers } => (url, headers, false),
};

for (is_asn, url) in asn_resources
.iter()
.map(|url| (true, url))
.chain(geo_resources.iter().map(|url| (false, url)))
{
let time = Instant::now();
match fetch_resource(url, headers.clone().into(), *timeout, *max_size)
.await
Expand Down
4 changes: 2 additions & 2 deletions crates/common/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ pub const WEBADMIN_KEY: &[u8] = "STALWART_WEBADMIN".as_bytes();
impl ConfigManager {
pub async fn fetch_resource(&self, resource_id: &str) -> Result<Vec<u8>, String> {
if let Some(url) = self
.get(&format!("config.resource.{resource_id}"))
.get(&format!("{resource_id}.resource"))
.await
.map_err(|err| {
format!("Failed to fetch configuration key 'resource.{resource_id}': {err}",)
format!("Failed to fetch configuration key '{resource_id}.resource': {err}",)
})?
{
fetch_resource(&url, None, Duration::from_secs(60), MAX_SIZE).await
Expand Down
8 changes: 7 additions & 1 deletion crates/jmap/src/api/management/enterprise/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,13 @@ impl TelemetryApi for Server {
.enterprise
.as_ref()
.and_then(|e| e.metrics_store.as_ref())
.ok_or_else(|| manage::unsupported("No metrics store has been configured"))?
.ok_or_else(|| {
manage::error(
"No metrics store has been defined",
"You need to configure a metrics store in order to use this feature."
.into(),
)
})?
.store
.query_metrics(after, before)
.await?;
Expand Down
Loading

0 comments on commit 8eda225

Please sign in to comment.