diff --git a/Cargo.lock b/Cargo.lock index 69adf0f9..28d683a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -27,6 +27,16 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "aead" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d122413f284cf2d62fb1b7db97e02edb8cda96d769b16e443a4f6195e35662b0" +dependencies = [ + "crypto-common", + "generic-array", +] + [[package]] name = "ahash" version = "0.7.8" @@ -749,10 +759,12 @@ dependencies = [ "cala-ledger-core-types", "cala-ledger-outbox-client", "cala-tracing", + "chacha20poly1305", "chrono", "clap", "derive_builder", "futures", + "hex", "rust_decimal", "serde", "serde_json", @@ -803,6 +815,30 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" +[[package]] +name = "chacha20" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3613f74bd2eac03dad61bd53dbe620703d4371614fe0bc3b9f04dd36fe4e818" +dependencies = [ + "cfg-if", + "cipher", + "cpufeatures", +] + +[[package]] +name = "chacha20poly1305" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10cd79432192d1c0f4e1a0fef9527696cc039165d729fb41b3f4f4f354c2dc35" +dependencies = [ + "aead", + "chacha20", + "cipher", + "poly1305", + "zeroize", +] + [[package]] name = "chrono" version = "0.4.38" @@ -816,6 +852,17 @@ dependencies = [ "windows-targets 0.52.5", ] +[[package]] +name = "cipher" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" +dependencies = [ + "crypto-common", + "inout", + "zeroize", +] + [[package]] name = "clap" version = "4.5.7" @@ -954,6 +1001,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" dependencies = [ "generic-array", + "rand_core", "typenum", ] @@ -1697,6 +1745,15 @@ dependencies = [ "serde", ] +[[package]] +name = "inout" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5" +dependencies = [ + "generic-array", +] + [[package]] name = "instant" version = "0.1.12" @@ -2122,6 +2179,12 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "opaque-debug" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" + [[package]] name = "opentelemetry" version = "0.22.0" @@ -2365,6 +2428,17 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" +[[package]] +name = "poly1305" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8159bd90725d2df49889a078b54f4f79e87f1f8a8444194cdca81d38f5393abf" +dependencies = [ + "cpufeatures", + "opaque-debug", + "universal-hash", +] + [[package]] name = "powerfmt" version = "0.2.0" @@ -3991,6 +4065,16 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" +[[package]] +name = "universal-hash" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc1de2c688dc15305988b563c3854064043356019f97a4b46276fe734c4f07ea" +dependencies = [ + "crypto-common", + "subtle", +] + [[package]] name = "unsafe-libyaml" version = "0.2.11" diff --git a/Cargo.toml b/Cargo.toml index 6428a005..d5c3ddc7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,8 @@ lalrpop = { version = "0.20", features = ["lexer"] } rust_decimal_macros = "1.34.2" rust_decimal = "1.35.0" rusty-money = { version = "0.4", features = ["iso", "crypto"] } +chacha20poly1305 = "0.10.1" +hex = "0.4.3" [profile.release] lto = true diff --git a/cala-server/.sqlx/query-17635a222bf17f0f5054dd3e3feba0fd52068e2fab93b6b693aa6e1d24d1e7f8.json b/cala-server/.sqlx/query-17635a222bf17f0f5054dd3e3feba0fd52068e2fab93b6b693aa6e1d24d1e7f8.json deleted file mode 100644 index b75282dc..00000000 --- a/cala-server/.sqlx/query-17635a222bf17f0f5054dd3e3feba0fd52068e2fab93b6b693aa6e1d24d1e7f8.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "INSERT INTO integrations (id, name, data)\n VALUES ($1, $2, $3)", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Uuid", - "Varchar", - "Jsonb" - ] - }, - "nullable": [] - }, - "hash": "17635a222bf17f0f5054dd3e3feba0fd52068e2fab93b6b693aa6e1d24d1e7f8" -} diff --git a/cala-server/.sqlx/query-275299ad68fad92f335ef47f78acb1ca155adde9f71a4941337306b418aab46c.json b/cala-server/.sqlx/query-8789249d94a4a5d693ca44989524def14cc74abff78657b8f30ed8ff45d48229.json similarity index 53% rename from cala-server/.sqlx/query-275299ad68fad92f335ef47f78acb1ca155adde9f71a4941337306b418aab46c.json rename to cala-server/.sqlx/query-8789249d94a4a5d693ca44989524def14cc74abff78657b8f30ed8ff45d48229.json index 7927592c..6556d5d6 100644 --- a/cala-server/.sqlx/query-275299ad68fad92f335ef47f78acb1ca155adde9f71a4941337306b418aab46c.json +++ b/cala-server/.sqlx/query-8789249d94a4a5d693ca44989524def14cc74abff78657b8f30ed8ff45d48229.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT id, name, data\n FROM integrations\n WHERE id = $1", + "query": "SELECT id, name, cipher, nonce \n FROM integrations\n WHERE id = $1", "describe": { "columns": [ { @@ -15,8 +15,13 @@ }, { "ordinal": 2, - "name": "data", - "type_info": "Jsonb" + "name": "cipher", + "type_info": "Bytea" + }, + { + "ordinal": 3, + "name": "nonce", + "type_info": "Bytea" } ], "parameters": { @@ -27,8 +32,9 @@ "nullable": [ false, false, - true + false, + false ] }, - "hash": "275299ad68fad92f335ef47f78acb1ca155adde9f71a4941337306b418aab46c" + "hash": "8789249d94a4a5d693ca44989524def14cc74abff78657b8f30ed8ff45d48229" } diff --git a/cala-server/.sqlx/query-89bf3e34accb31d1163ba4914cbea7170e6e163dc0988e08b1c2248459273143.json b/cala-server/.sqlx/query-89bf3e34accb31d1163ba4914cbea7170e6e163dc0988e08b1c2248459273143.json new file mode 100644 index 00000000..456b24e9 --- /dev/null +++ b/cala-server/.sqlx/query-89bf3e34accb31d1163ba4914cbea7170e6e163dc0988e08b1c2248459273143.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO integrations (id, name, cipher, nonce)\n VALUES ($1, $2, $3, $4)", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Varchar", + "Bytea", + "Bytea" + ] + }, + "nullable": [] + }, + "hash": "89bf3e34accb31d1163ba4914cbea7170e6e163dc0988e08b1c2248459273143" +} diff --git a/cala-server/Cargo.toml b/cala-server/Cargo.toml index 45178cb5..ac314568 100644 --- a/cala-server/Cargo.toml +++ b/cala-server/Cargo.toml @@ -38,3 +38,5 @@ serde_with = { workspace = true } uuid = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +chacha20poly1305 = { workspace = true } +hex = "0.4.3" diff --git a/cala-server/migrations/20231211151809_cala_server_setup.sql b/cala-server/migrations/20231211151809_cala_server_setup.sql index 8ca53902..84585236 100644 --- a/cala-server/migrations/20231211151809_cala_server_setup.sql +++ b/cala-server/migrations/20231211151809_cala_server_setup.sql @@ -28,7 +28,8 @@ CREATE TABLE job_executions ( CREATE TABLE integrations ( id UUID PRIMARY KEY, name VARCHAR NOT NULL, - data JSONB, + cipher BYTEA NOT NULL, + nonce BYTEA NOT NULL, modified_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); diff --git a/cala-server/src/app/config.rs b/cala-server/src/app/config.rs index 27ae8fce..d60e1f9a 100644 --- a/cala-server/src/app/config.rs +++ b/cala-server/src/app/config.rs @@ -2,8 +2,12 @@ use serde::{Deserialize, Serialize}; use crate::job::JobExecutorConfig; +use super::EncryptionConfig; + #[derive(Clone, Default, Debug, Deserialize, Serialize)] pub struct AppConfig { #[serde(default)] pub job_execution: JobExecutorConfig, + #[serde(default)] + pub encryption: EncryptionConfig, } diff --git a/cala-server/src/app/mod.rs b/cala-server/src/app/mod.rs index c63bdc2a..bd818888 100644 --- a/cala-server/src/app/mod.rs +++ b/cala-server/src/app/mod.rs @@ -14,6 +14,7 @@ pub struct CalaApp { pool: PgPool, ledger: CalaLedger, jobs: Jobs, + config: AppConfig, } impl CalaApp { @@ -23,13 +24,23 @@ impl CalaApp { ledger: CalaLedger, registry: JobRegistry, ) -> Result { - let mut jobs = Jobs::new(&pool, config.job_execution, registry); + let mut jobs = Jobs::new( + &pool, + config.job_execution.clone(), + registry, + config.encryption.clone(), + ); jobs.start_poll().await?; - Ok(Self { pool, ledger, jobs }) + Ok(Self { + pool, + ledger, + jobs, + config, + }) } pub fn integrations(&self) -> Integrations { - Integrations::new(&self.pool) + Integrations::new(&self.pool, &self.config.encryption) } pub fn ledger(&self) -> &CalaLedger { diff --git a/cala-server/src/cli/config.rs b/cala-server/src/cli/config.rs index 5f60b2d0..f901567a 100644 --- a/cala-server/src/cli/config.rs +++ b/cala-server/src/cli/config.rs @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize}; use std::path::Path; use super::db::*; -use crate::{app::AppConfig, server::ServerConfig}; +use crate::{app::AppConfig, integration::EncryptionKey, server::ServerConfig}; #[derive(Clone, Default, Serialize, Deserialize)] pub struct Config { @@ -21,6 +21,7 @@ pub struct Config { pub struct EnvOverride { pub db_con: String, + pub encryption_key: String, } impl Config { @@ -37,11 +38,28 @@ impl Config { Config::default() }; - config.apply_env_override(env_override); + config.apply_env_override(env_override)?; Ok(config) } - fn apply_env_override(&mut self, EnvOverride { db_con }: EnvOverride) { + fn apply_env_override( + &mut self, + EnvOverride { + db_con, + encryption_key, + }: EnvOverride, + ) -> anyhow::Result<()> { self.db.pg_con = db_con; + + let key_bytes = hex::decode(encryption_key)?; + if key_bytes.len() != 32 { + return Err(anyhow::anyhow!( + "Signer encryption key must be 32 bytes, got {}", + key_bytes.len() + )); + } + + self.app.encryption.key = EncryptionKey::clone_from_slice(key_bytes.as_ref()); + Ok(()) } } diff --git a/cala-server/src/cli/mod.rs b/cala-server/src/cli/mod.rs index bd7fe1cc..db35e79c 100644 --- a/cala-server/src/cli/mod.rs +++ b/cala-server/src/cli/mod.rs @@ -22,6 +22,8 @@ struct Cli { cala_home: String, #[clap(env = "PG_CON")] pg_con: String, + #[clap(env = "ENCRYPTION_KEY")] + encryption_key: String, } pub async fn run( @@ -29,7 +31,13 @@ pub async fn run( ) -> anyhow::Result<()> { let cli = Cli::parse(); - let config = Config::load_config(cli.config, EnvOverride { db_con: cli.pg_con })?; + let config = Config::load_config( + cli.config, + EnvOverride { + db_con: cli.pg_con, + encryption_key: cli.encryption_key, + }, + )?; run_cmd::(&cli.cala_home, config, job_registration).await?; diff --git a/cala-server/src/integration/encryption_config.rs b/cala-server/src/integration/encryption_config.rs new file mode 100644 index 00000000..b0e971d5 --- /dev/null +++ b/cala-server/src/integration/encryption_config.rs @@ -0,0 +1,124 @@ +use chacha20poly1305::{ + aead::{Aead, AeadCore, KeyInit, OsRng}, + ChaCha20Poly1305, +}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; + +use super::{error::IntegrationError, Data}; + +pub type EncryptionKey = chacha20poly1305::Key; +#[derive(Clone)] +pub(super) struct ConfigCipher(pub(super) Vec); +#[derive(Clone)] +pub(super) struct Nonce(pub(super) Vec); + +#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Default)] +#[serde(into = "RawEncryptionConfig")] +#[serde(try_from = "RawEncryptionConfig")] +pub struct EncryptionConfig { + pub key: EncryptionKey, +} + +impl Data { + pub(super) fn encrypt( + &self, + key: &EncryptionKey, + ) -> Result<(ConfigCipher, Nonce), IntegrationError> { + let cipher = ChaCha20Poly1305::new(key); + let nonce = ChaCha20Poly1305::generate_nonce(&mut OsRng); + let encrypted_config = cipher + .encrypt(&nonce, serde_json::to_vec(&self.0)?.as_slice()) + .expect("should always encrypt"); + Ok((ConfigCipher(encrypted_config), Nonce(nonce.to_vec()))) + } + + pub(super) fn decrypt( + encrypted_data: &ConfigCipher, + nonce: &Nonce, + key: &EncryptionKey, + ) -> Result { + let cipher = ChaCha20Poly1305::new(key); + let decrypted_data = cipher + .decrypt( + chacha20poly1305::Nonce::from_slice(nonce.0.as_slice()), + encrypted_data.0.as_slice(), + ) + .map_err(IntegrationError::DecryptionError)?; + let data = serde_json::from_slice(&decrypted_data)?; + Ok(data) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] +struct RawEncryptionConfig { + pub key: String, +} +impl From for RawEncryptionConfig { + fn from(config: EncryptionConfig) -> Self { + Self { + key: hex::encode(config.key), + } + } +} + +impl TryFrom for EncryptionConfig { + type Error = IntegrationError; + + fn try_from(raw: RawEncryptionConfig) -> Result { + let key_vec = hex::decode(raw.key)?; + let key_bytes = key_vec.as_slice(); + Ok(Self { + key: EncryptionKey::clone_from_slice(key_bytes), + }) + } +} + +impl std::fmt::Debug for EncryptionConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "EncryptionConfig {{ key: *******Redacted******* }}") + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] + struct Dummy { + pub name: String, + pub secret: String, + } + + impl Default for Dummy { + fn default() -> Self { + Self { + name: "Alice".to_string(), + secret: "Secret".to_string(), + } + } + } + + fn gen_encryption_key() -> EncryptionKey { + ChaCha20Poly1305::generate_key(&mut OsRng) + } + + #[test] + fn encrypt_decrypt() { + let key = gen_encryption_key(); + let data = Data::new(Dummy::default()); + let (encrypted, nonce) = data.encrypt(&key).expect("Failed to encrypt"); + let decrypted: Dummy = Data::decrypt(&encrypted, &nonce, &key).expect("Failed to decrypt"); + + assert_eq!(data.0, serde_json::to_value(&decrypted).unwrap()); + } + + #[test] + fn serialize_deserialize() { + let key = gen_encryption_key(); + let encryption_config = EncryptionConfig { key }; + let serialized = serde_json::to_string(&encryption_config).unwrap(); + let deserialized: EncryptionConfig = serde_json::from_str(&serialized).unwrap(); + assert_eq!(deserialized.key, key); + assert_eq!(encryption_config, deserialized) + } +} diff --git a/cala-server/src/integration/error.rs b/cala-server/src/integration/error.rs new file mode 100644 index 00000000..42f5c02c --- /dev/null +++ b/cala-server/src/integration/error.rs @@ -0,0 +1,13 @@ +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum IntegrationError { + #[error("IntegrationError - Sqlx: {0}")] + Sqlx(#[from] sqlx::Error), + #[error("XPubError - Serde: {0}")] + Serde(#[from] serde_json::Error), + #[error("XPubError - FromHex: {0}")] + FromHex(#[from] hex::FromHexError), + #[error("Could not decrypt signer config: {0}")] + DecryptionError(chacha20poly1305::Error), +} diff --git a/cala-server/src/integration/mod.rs b/cala-server/src/integration/mod.rs index 78931fb7..59fbfbbb 100644 --- a/cala-server/src/integration/mod.rs +++ b/cala-server/src/integration/mod.rs @@ -1,13 +1,34 @@ +mod encryption_config; +pub mod error; + +use serde::{Deserialize, Serialize}; use sqlx::PgPool; use cala_ledger::AtomicOperation; - cala_types::entity_id! { IntegrationId } +pub use encryption_config::*; +use error::IntegrationError; + pub struct Integration { pub id: IntegrationId, pub name: String, - data: serde_json::Value, + data: Data, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +struct Data(serde_json::Value); + +impl Data { + fn new(data: impl serde::Serialize) -> Self { + Self(serde_json::to_value(data).unwrap()) + } +} + +impl AsRef for Data { + fn as_ref(&self) -> &serde_json::Value { + &self.0 + } } impl Integration { @@ -15,21 +36,25 @@ impl Integration { Self { id, name, - data: serde_json::to_value(data).expect("Could not serialize data"), + data: Data::new(data), } } pub fn data(&self) -> Result { - serde_json::from_value(self.data.clone()) + serde_json::from_value(self.data.as_ref().clone()) } } pub struct Integrations { pool: PgPool, + encryption_config: EncryptionConfig, } impl Integrations { - pub(crate) fn new(pool: &PgPool) -> Self { - Self { pool: pool.clone() } + pub(crate) fn new(pool: &PgPool, encryption_config: &EncryptionConfig) -> Self { + Self { + pool: pool.clone(), + encryption_config: encryption_config.clone(), + } } pub async fn create_in_op( @@ -38,14 +63,17 @@ impl Integrations { id: impl Into + std::fmt::Debug, name: String, data: impl serde::Serialize, - ) -> Result { + ) -> Result { let integration = Integration::new(id.into(), name, data); + let (cipher, nonce) = integration.data.encrypt(&self.encryption_config.key)?; + sqlx::query!( - r#"INSERT INTO integrations (id, name, data) - VALUES ($1, $2, $3)"#, + r#"INSERT INTO integrations (id, name, cipher, nonce) + VALUES ($1, $2, $3, $4)"#, integration.id as IntegrationId, integration.name, - integration.data + &cipher.0, + &nonce.0 ) .execute(&mut **op.tx()) .await?; @@ -55,17 +83,26 @@ impl Integrations { pub async fn find_by_id( &self, id: impl Into, - ) -> Result { + ) -> Result { let id = id.into(); - let row = sqlx::query_as!( - Integration, - r#"SELECT id, name, data + let row = sqlx::query!( + r#"SELECT id, name, cipher, nonce FROM integrations WHERE id = $1"#, id as IntegrationId ) .fetch_one(&self.pool) .await?; - Ok(row) + + let data = Data::decrypt( + &ConfigCipher(row.cipher), + &Nonce(row.nonce), + &self.encryption_config.key, + )?; + Ok(Integration { + id, + name: row.name, + data, + }) } } diff --git a/cala-server/src/job/current.rs b/cala-server/src/job/current.rs index 68ff60fd..aad4b0bf 100644 --- a/cala-server/src/job/current.rs +++ b/cala-server/src/job/current.rs @@ -8,14 +8,21 @@ pub struct CurrentJob { id: JobId, pool: PgPool, state_json: Option, + encryption_config: crate::integration::EncryptionConfig, } impl CurrentJob { - pub(super) fn new(id: JobId, pool: PgPool, state: Option) -> Self { + pub(super) fn new( + id: JobId, + pool: PgPool, + state: Option, + encryption_config: EncryptionConfig, + ) -> Self { Self { id, pool, state_json: state, + encryption_config, } } @@ -64,7 +71,10 @@ impl CurrentJob { JobRepo::new(self.pool()).persist_in_tx(db, entity).await } - pub async fn integration(&self, id: IntegrationId) -> Result { - Integrations::new(self.pool()).find_by_id(id).await + pub async fn integration(&self, id: IntegrationId) -> Result { + let integration = Integrations::new(self.pool(), &self.encryption_config) + .find_by_id(id) + .await?; + Ok(integration) } } diff --git a/cala-server/src/job/error.rs b/cala-server/src/job/error.rs index e38411ca..75bfbb7b 100644 --- a/cala-server/src/job/error.rs +++ b/cala-server/src/job/error.rs @@ -22,6 +22,8 @@ pub enum JobError { NoInitializerPresent, #[error("JobError - JobExecutionError: {0}")] JobExecutionError(String), + #[error("JobError - IntegrationError: {0}")] + IntegrationError(#[from] crate::integration::error::IntegrationError), } impl From> for JobError { diff --git a/cala-server/src/job/executor.rs b/cala-server/src/job/executor.rs index d4edcf15..50679c16 100644 --- a/cala-server/src/job/executor.rs +++ b/cala-server/src/job/executor.rs @@ -16,6 +16,7 @@ pub struct JobExecutor { poller_handle: Option>>, running_jobs: Arc>>, jobs: JobRepo, + encryption: crate::integration::EncryptionConfig, } impl JobExecutor { @@ -24,6 +25,7 @@ impl JobExecutor { config: JobExecutorConfig, registry: JobRegistry, jobs: &JobRepo, + encryption: crate::integration::EncryptionConfig, ) -> Self { Self { pool: pool.clone(), @@ -32,6 +34,7 @@ impl JobExecutor { registry: Arc::new(registry), running_jobs: Arc::new(RwLock::new(HashMap::new())), jobs: jobs.clone(), + encryption, } } @@ -67,6 +70,7 @@ impl JobExecutor { pub async fn start_poll(&mut self) -> Result<(), JobError> { let pool = self.pool.clone(); let poll_interval = self.config.poll_interval; + let encryption = self.encryption.clone(); let pg_interval = PgInterval::try_from(poll_interval * 4) .map_err(|e| JobError::InvalidPollInterval(e.to_string()))?; let running_jobs = Arc::clone(&self.running_jobs); @@ -84,6 +88,7 @@ impl JobExecutor { pg_interval.clone(), &running_jobs, &jobs, + &encryption, ) .await; tokio::time::sleep(poll_interval).await; @@ -109,6 +114,7 @@ impl JobExecutor { pg_interval: PgInterval, running_jobs: &Arc>>, jobs: &JobRepo, + encryption: &crate::integration::EncryptionConfig, ) -> Result<(), JobError> { let span = tracing::Span::current(); span.record("keep_alive", *keep_alive); @@ -174,6 +180,7 @@ impl JobExecutor { job, row.state_json, jobs.clone(), + encryption.clone(), ) .await; } @@ -193,6 +200,7 @@ impl JobExecutor { job: Job, job_payload: Option, repo: JobRepo, + encryption: crate::integration::EncryptionConfig, ) -> Result<(), JobError> { let id = job.id; let runner = registry.init_job(job)?; @@ -200,7 +208,7 @@ impl JobExecutor { let pool = pool.clone(); let handle = tokio::spawn(async move { { - let _ = Self::execute_job(id, pool, job_payload, runner, repo).await; + let _ = Self::execute_job(id, pool, job_payload, runner, repo, encryption).await; } all_jobs.write().await.remove(&id); }); @@ -217,9 +225,10 @@ impl JobExecutor { payload: Option, mut runner: Box, repo: JobRepo, + encryption: crate::integration::EncryptionConfig, ) -> Result<(), JobError> { let current_job_pool = pool.clone(); - let current_job = CurrentJob::new(id, current_job_pool, payload); + let current_job = CurrentJob::new(id, current_job_pool, payload, encryption); match runner .run(current_job) .await diff --git a/cala-server/src/job/mod.rs b/cala-server/src/job/mod.rs index 4c56726c..84620999 100644 --- a/cala-server/src/job/mod.rs +++ b/cala-server/src/job/mod.rs @@ -25,6 +25,8 @@ use error::*; use executor::*; use repo::*; +use crate::integration::EncryptionConfig; + #[derive(Clone)] pub struct Jobs { _pool: PgPool, @@ -33,9 +35,14 @@ pub struct Jobs { } impl Jobs { - pub fn new(pool: &PgPool, config: JobExecutorConfig, registry: JobRegistry) -> Self { + pub fn new( + pool: &PgPool, + config: JobExecutorConfig, + registry: JobRegistry, + encryption: EncryptionConfig, + ) -> Self { let repo = JobRepo::new(pool); - let executor = JobExecutor::new(pool, config, registry, &repo); + let executor = JobExecutor::new(pool, config, registry, &repo, encryption); Self { _pool: pool.clone(), repo,