Skip to content

Commit

Permalink
Support Google Cloud Storage
Browse files Browse the repository at this point in the history
  • Loading branch information
aykut-bozkurt committed Jan 30, 2025
1 parent 3ff46d5 commit ec5c082
Show file tree
Hide file tree
Showing 11 changed files with 242 additions and 16 deletions.
5 changes: 5 additions & 0 deletions .devcontainer/.env
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,10 @@ AZURE_TEST_CONTAINER_NAME=testcontainer
AZURE_TEST_READ_ONLY_SAS="se=2100-05-05&sp=r&sv=2022-11-02&sr=c&sig=YMPFnAHKe9y0o3hFegncbwQTXtAyvsJEgPB2Ne1b9CQ%3D"
AZURE_TEST_READ_WRITE_SAS="se=2100-05-05&sp=rcw&sv=2022-11-02&sr=c&sig=TPz2jEz0t9L651t6rTCQr%2BOjmJHkM76tnCGdcyttnlA%3D"

# GCS tests
GOOGLE_TEST_BUCKET=testbucket
GOOGLE_SERVICE_ACCOUNT_KEY='{"gcs_base_url": "http://localhost:4443","disable_oauth": true,"client_email": "","private_key_id": "","private_key": ""}'
GOOGLE_SERVICE_ENDPOINT=http://localhost:4443

# Others
RUST_TEST_THREADS=1
15 changes: 15 additions & 0 deletions .devcontainer/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ services:
- ${USERPROFILE}${HOME}/.gitconfig:/home/rust/.gitconfig:ro
- ${USERPROFILE}${HOME}/.aws:/home/rust/.aws:rw
- ${USERPROFILE}${HOME}/.azure:/home/rust/.azure:rw
- ${USERPROFILE}${HOME}/.config/gcloud:/home/rust/.config/gcloud:rw
- ./entrypoint.sh:/entrypoint.sh
env_file:
- .env
Expand All @@ -20,6 +21,7 @@ services:
depends_on:
- minio
- azurite
- fake-gcs-server

minio:
image: minio/minio
Expand Down Expand Up @@ -47,3 +49,16 @@ services:
interval: 6s
timeout: 2s
retries: 3

fake-gcs-server:
image: tustvold/fake-gcs-server
env_file:
- .env
network_mode: host
command: -scheme http -public-host localhost:4443
restart: unless-stopped
healthcheck:
test: ["CMD", "nc", "-z", "localhost", "4443"]
interval: 6s
timeout: 2s
retries: 3
4 changes: 4 additions & 0 deletions .devcontainer/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,8 @@ trap "echo 'Caught termination signal. Exiting...'; exit 0" SIGINT SIGTERM
az storage container create -n $AZURE_TEST_CONTAINER_NAME --connection-string $AZURE_STORAGE_CONNECTION_STRING
az storage container create -n ${AZURE_TEST_CONTAINER_NAME}2 --connection-string $AZURE_STORAGE_CONNECTION_STRING

# create fake-gcs bucket
curl -v -X POST --data-binary "{\"name\":\"$GOOGLE_TEST_BUCKET\"}" -H "Content-Type: application/json" "$GOOGLE_SERVICE_ENDPOINT/storage/v1/b"
curl -v -X POST --data-binary "{\"name\":\"${GOOGLE_TEST_BUCKET}2\"}" -H "Content-Type: application/json" "$GOOGLE_SERVICE_ENDPOINT/storage/v1/b"

sleep infinity
16 changes: 16 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,22 @@ jobs:
az storage container create -n $AZURE_TEST_CONTAINER_NAME --connection-string $AZURE_STORAGE_CONNECTION_STRING
az storage container create -n ${AZURE_TEST_CONTAINER_NAME}2 --connection-string $AZURE_STORAGE_CONNECTION_STRING
- name: Start fake-gcs-server for Google Cloud Storage emulator tests
run: |
docker run -d \
--env-file .devcontainer/.env \
-p 4443:4443 \
tustvold/fake-gcs-server -scheme http -public-host localhost:4443
while ! curl $GOOGLE_SERVICE_ENDPOINT; do
echo "Waiting for $GOOGLE_SERVICE_ENDPOINT..."
sleep 1
done
# create bucket
curl -v -X POST --data-binary "{\"name\":\"$GOOGLE_TEST_BUCKET\"}" -H "Content-Type: application/json" "$GOOGLE_SERVICE_ENDPOINT/storage/v1/b"
curl -v -X POST --data-binary "{\"name\":\"${GOOGLE_TEST_BUCKET}2\"}" -H "Content-Type: application/json" "$GOOGLE_SERVICE_ENDPOINT/storage/v1/b"
- name: Run tests
run: |
# Run tests with coverage tool
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ aws-credential-types = {version = "1", default-features = false}
azure_storage = {version = "0.21", default-features = false}
futures = "0.3"
home = "0.5"
object_store = {version = "0.11", default-features = false, features = ["aws", "azure"]}
object_store = {version = "0.11", default-features = false, features = ["aws", "azure", "gcp"]}
once_cell = "1"
parquet = {version = "53", default-features = false, features = [
"arrow",
Expand Down
24 changes: 23 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ SELECT uri, encode(key, 'escape') as key, encode(value, 'escape') as value FROM
```

## Object Store Support
`pg_parquet` supports reading and writing Parquet files from/to `S3` and `Azure Blob Storage` object stores.
`pg_parquet` supports reading and writing Parquet files from/to `S3`, `Azure Blob Storage` and `Google Cloud Service` object stores.

> [!NOTE]
> To be able to write into a object store location, you need to grant `parquet_object_store_write` role to your current postgres user.
Expand Down Expand Up @@ -239,6 +239,28 @@ Supported authorization methods' priority order is shown below:
2. Sas token,
3. Storage key.

#### Google Cloud Storage

The simplest way to configure object storage is by creating a json config file like [`/tmp/gcs.json`]:

```bash
$ cat /tmp/gcs.json
{
"gcs_base_url": "http://localhost:4443",
"disable_oauth": true,
"client_email": "",
"private_key_id": "",
"private_key": ""
}
```

Alternatively, you can use the following environment variables when starting postgres to configure the Google Cloud Storage client:
- `GOOGLE_SERVICE_ACCOUNT_KEY`: json serialized service account key **(only via environment variables)**
- `GOOGLE_SERVICE_ACCOUNT_PATH`: an alternative location for the config file **(only via environment variables)**

Supported Google Cloud Storage uri formats are shown below:
- gs:// \<bucket\> / \<path\>

## Copy Options
`pg_parquet` supports the following options in the `COPY TO` command:
- `format parquet`: you need to specify this option to read or write Parquet files which does not end with `.parquet[.<compression>]` extension,
Expand Down
1 change: 1 addition & 0 deletions src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ use crate::{

pub(crate) mod aws;
pub(crate) mod azure;
pub(crate) mod gcs;
pub(crate) mod local_file;
pub(crate) mod object_store_cache;
69 changes: 69 additions & 0 deletions src/object_store/gcs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use std::sync::Arc;

use object_store::gcp::GoogleCloudStorageBuilder;
use url::Url;

use super::object_store_cache::ObjectStoreWithExpiration;

// create_gcs_object_store a GoogleCloudStorage object store from given uri.
// It is configured by environment variables. Currently, we only support
// following environment variables:
// - GOOGLE_SERVICE_ACCOUNT_KEY
// - GOOGLE_SERVICE_ACCOUNT_PATH
pub(crate) fn create_gcs_object_store(uri: &Url) -> ObjectStoreWithExpiration {
let bucket_name = parse_gcs_bucket(uri).unwrap_or_else(|| {
panic!("unsupported gcs uri: {}", uri);

Check warning on line 15 in src/object_store/gcs.rs

View check run for this annotation

Codecov / codecov/patch

src/object_store/gcs.rs#L15

Added line #L15 was not covered by tests
});

let mut gcs_builder = GoogleCloudStorageBuilder::new().with_bucket_name(bucket_name);

let gcs_config = GoogleStorageConfig::load();

// service account key
if let Some(service_account_key) = gcs_config.service_account_key {
gcs_builder = gcs_builder.with_service_account_key(&service_account_key);
}

// service account path
if let Some(service_account_path) = gcs_config.service_account_path {
gcs_builder = gcs_builder.with_service_account_path(&service_account_path);

Check warning on line 29 in src/object_store/gcs.rs

View check run for this annotation

Codecov / codecov/patch

src/object_store/gcs.rs#L29

Added line #L29 was not covered by tests
}

let object_store = gcs_builder.build().unwrap_or_else(|e| panic!("{}", e));

// object store handles refreshing bearer token, so we do not need to handle expiry here
let expire_at = None;

ObjectStoreWithExpiration {
object_store: Arc::new(object_store),
expire_at,
}
}

pub(crate) fn parse_gcs_bucket(uri: &Url) -> Option<String> {
let host = uri.host_str()?;

// gs://{bucket}/key
if uri.scheme() == "gs" {
return Some(host.to_string());
}

None

Check warning on line 51 in src/object_store/gcs.rs

View check run for this annotation

Codecov / codecov/patch

src/object_store/gcs.rs#L49-L51

Added lines #L49 - L51 were not covered by tests
}

// GoogleStorageConfig is a struct that holds the configuration that is
// used to configure the Google Storage object store.
struct GoogleStorageConfig {
service_account_key: Option<String>,
service_account_path: Option<String>,
}

impl GoogleStorageConfig {
// load loads the Google Storage configuration from the environment.
fn load() -> Self {
Self {
service_account_key: std::env::var("GOOGLE_SERVICE_ACCOUNT_KEY").ok(),
service_account_path: std::env::var("GOOGLE_SERVICE_ACCOUNT_PATH").ok(),
}
}
}
16 changes: 10 additions & 6 deletions src/object_store/object_store_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ use pgrx::{ereport, PgLogLevel, PgSqlErrorCode};
use url::Url;

use super::{
aws::parse_s3_bucket, azure::parse_azure_blob_container, create_azure_object_store,
create_local_file_object_store, create_s3_object_store,
aws::parse_s3_bucket,
azure::parse_azure_blob_container,
create_azure_object_store, create_local_file_object_store, create_s3_object_store,
gcs::{create_gcs_object_store, parse_gcs_bucket},
};

// OBJECT_STORE_CACHE is a global cache for object stores per Postgres session.
Expand Down Expand Up @@ -44,7 +46,7 @@ impl ObjectStoreCache {
fn get_or_create(&mut self, uri: &Url, copy_from: bool) -> (Arc<dyn ObjectStore>, Path) {
let (scheme, path) = ObjectStoreScheme::parse(uri).unwrap_or_else(|_| {
panic!(
"unrecognized uri {}. pg_parquet supports local paths, s3:// or azure:// schemes.",
"unrecognized uri {}. pg_parquet supports local paths, s3://, azure:// or gs:// schemes.",
uri
)
});
Expand Down Expand Up @@ -74,13 +76,14 @@ impl ObjectStoreCache {

fn create(scheme: ObjectStoreScheme, uri: &Url, copy_from: bool) -> ObjectStoreWithExpiration {
// object_store crate can recognize a bunch of different schemes and paths, but we only support
// local, azure, and s3 schemes with a subset of all supported paths.
// local, s3, azure and gs schemes with a subset of all supported paths.
match scheme {
ObjectStoreScheme::AmazonS3 => create_s3_object_store(uri),
ObjectStoreScheme::MicrosoftAzure => create_azure_object_store(uri),
ObjectStoreScheme::GoogleCloudStorage => create_gcs_object_store(uri),
ObjectStoreScheme::Local => create_local_file_object_store(uri, copy_from),
_ => panic!(
"unsupported scheme {} in uri {}. pg_parquet supports local paths, s3:// or azure:// schemes.",
"unsupported scheme {} in uri {}. pg_parquet supports local paths, s3://, azure:// or gs:// schemes.",

Check warning on line 86 in src/object_store/object_store_cache.rs

View check run for this annotation

Codecov / codecov/patch

src/object_store/object_store_cache.rs#L86

Added line #L86 was not covered by tests
uri.scheme(),
uri
),
Expand Down Expand Up @@ -131,9 +134,10 @@ impl ObjectStoreCacheKey {
let bucket = match scheme {
ObjectStoreScheme::AmazonS3 => parse_s3_bucket(uri).unwrap_or_else(|| panic!("unsupported s3 uri: {uri}")),
ObjectStoreScheme::MicrosoftAzure => parse_azure_blob_container(uri).unwrap_or_else(|| panic!("unsupported azure blob storage uri: {uri}")),
ObjectStoreScheme::GoogleCloudStorage => parse_gcs_bucket(uri).unwrap_or_else(|| panic!("unsupported gs uri: {uri}")),
ObjectStoreScheme::Local => panic!("local paths should not be cached"),
_ => panic!(
"unsupported scheme {} in uri {}. pg_parquet supports local paths, s3:// or azure:// schemes.",
"unsupported scheme {} in uri {}. pg_parquet supports local paths, s3://, azure:// or gs:// schemes.",
uri.scheme(),
uri
),
Expand Down
Loading

0 comments on commit ec5c082

Please sign in to comment.