Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds Support for COPY TO/FROM Google Cloud Storage #61

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .devcontainer/.env
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,10 @@ AZURE_TEST_CONTAINER_NAME=testcontainer
AZURE_TEST_READ_ONLY_SAS="se=2100-05-05&sp=r&sv=2022-11-02&sr=c&sig=YMPFnAHKe9y0o3hFegncbwQTXtAyvsJEgPB2Ne1b9CQ%3D"
AZURE_TEST_READ_WRITE_SAS="se=2100-05-05&sp=rcw&sv=2022-11-02&sr=c&sig=TPz2jEz0t9L651t6rTCQr%2BOjmJHkM76tnCGdcyttnlA%3D"

# GCS tests
GOOGLE_TEST_BUCKET=testbucket
GOOGLE_SERVICE_ACCOUNT_KEY='{"gcs_base_url": "http://localhost:4443","disable_oauth": true,"client_email": "","private_key_id": "","private_key": ""}'
GOOGLE_SERVICE_ENDPOINT=http://localhost:4443

# Others
RUST_TEST_THREADS=1
15 changes: 15 additions & 0 deletions .devcontainer/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ services:
- ${USERPROFILE}${HOME}/.gitconfig:/home/rust/.gitconfig:ro
- ${USERPROFILE}${HOME}/.aws:/home/rust/.aws:rw
- ${USERPROFILE}${HOME}/.azure:/home/rust/.azure:rw
- ${USERPROFILE}${HOME}/.config/gcloud:/home/rust/.config/gcloud:rw
- ./entrypoint.sh:/entrypoint.sh
env_file:
- .env
Expand All @@ -20,6 +21,7 @@ services:
depends_on:
- minio
- azurite
- fake-gcs-server

minio:
image: minio/minio
Expand Down Expand Up @@ -47,3 +49,16 @@ services:
interval: 6s
timeout: 2s
retries: 3

fake-gcs-server:
image: tustvold/fake-gcs-server
env_file:
- .env
network_mode: host
command: -scheme http -public-host localhost:4443
restart: unless-stopped
healthcheck:
test: ["CMD", "nc", "-z", "localhost", "4443"]
interval: 6s
timeout: 2s
retries: 3
4 changes: 4 additions & 0 deletions .devcontainer/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,8 @@ trap "echo 'Caught termination signal. Exiting...'; exit 0" SIGINT SIGTERM
az storage container create -n $AZURE_TEST_CONTAINER_NAME --connection-string $AZURE_STORAGE_CONNECTION_STRING
az storage container create -n ${AZURE_TEST_CONTAINER_NAME}2 --connection-string $AZURE_STORAGE_CONNECTION_STRING

# create fake-gcs bucket
curl -v -X POST --data-binary "{\"name\":\"$GOOGLE_TEST_BUCKET\"}" -H "Content-Type: application/json" "$GOOGLE_SERVICE_ENDPOINT/storage/v1/b"
curl -v -X POST --data-binary "{\"name\":\"${GOOGLE_TEST_BUCKET}2\"}" -H "Content-Type: application/json" "$GOOGLE_SERVICE_ENDPOINT/storage/v1/b"

sleep infinity
16 changes: 16 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,22 @@ jobs:
az storage container create -n $AZURE_TEST_CONTAINER_NAME --connection-string $AZURE_STORAGE_CONNECTION_STRING
az storage container create -n ${AZURE_TEST_CONTAINER_NAME}2 --connection-string $AZURE_STORAGE_CONNECTION_STRING

- name: Start fake-gcs-server for Google Cloud Storage emulator tests
run: |
docker run -d \
--env-file .devcontainer/.env \
-p 4443:4443 \
tustvold/fake-gcs-server -scheme http -public-host localhost:4443

while ! curl $GOOGLE_SERVICE_ENDPOINT; do
echo "Waiting for $GOOGLE_SERVICE_ENDPOINT..."
sleep 1
done

# create bucket
curl -v -X POST --data-binary "{\"name\":\"$GOOGLE_TEST_BUCKET\"}" -H "Content-Type: application/json" "$GOOGLE_SERVICE_ENDPOINT/storage/v1/b"
curl -v -X POST --data-binary "{\"name\":\"${GOOGLE_TEST_BUCKET}2\"}" -H "Content-Type: application/json" "$GOOGLE_SERVICE_ENDPOINT/storage/v1/b"

- name: Run tests
run: |
# Run tests with coverage tool
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ aws-credential-types = {version = "1", default-features = false}
azure_storage = {version = "0.21", default-features = false}
futures = "0.3"
home = "0.5"
object_store = {version = "0.11", default-features = false, features = ["aws", "azure"]}
object_store = {version = "0.11", default-features = false, features = ["aws", "azure", "gcp"]}
once_cell = "1"
parquet = {version = "53", default-features = false, features = [
"arrow",
Expand Down
24 changes: 23 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ SELECT uri, encode(key, 'escape') as key, encode(value, 'escape') as value FROM
```

## Object Store Support
`pg_parquet` supports reading and writing Parquet files from/to `S3` and `Azure Blob Storage` object stores.
`pg_parquet` supports reading and writing Parquet files from/to `S3`, `Azure Blob Storage` and `Google Cloud Service` object stores.

> [!NOTE]
> To be able to write into a object store location, you need to grant `parquet_object_store_write` role to your current postgres user.
Expand Down Expand Up @@ -239,6 +239,28 @@ Supported authorization methods' priority order is shown below:
2. Sas token,
3. Storage key.

#### Google Cloud Storage

The simplest way to configure object storage is by creating a json config file like [`/tmp/gcs.json`]:

```bash
$ cat /tmp/gcs.json
{
"gcs_base_url": "http://localhost:4443",
"disable_oauth": true,
"client_email": "",
"private_key_id": "",
"private_key": ""
}
```

Alternatively, you can use the following environment variables when starting postgres to configure the Google Cloud Storage client:
- `GOOGLE_SERVICE_ACCOUNT_KEY`: json serialized service account key **(only via environment variables)**
- `GOOGLE_SERVICE_ACCOUNT_PATH`: an alternative location for the config file **(only via environment variables)**

Supported Google Cloud Storage uri formats are shown below:
- gs:// \<bucket\> / \<path\>

## Copy Options
`pg_parquet` supports the following options in the `COPY TO` command:
- `format parquet`: you need to specify this option to read or write Parquet files which does not end with `.parquet[.<compression>]` extension,
Expand Down
1 change: 1 addition & 0 deletions src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ use crate::{

pub(crate) mod aws;
pub(crate) mod azure;
pub(crate) mod gcs;
pub(crate) mod local_file;
pub(crate) mod object_store_cache;
69 changes: 69 additions & 0 deletions src/object_store/gcs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use std::sync::Arc;

use object_store::gcp::GoogleCloudStorageBuilder;
use url::Url;

use super::object_store_cache::ObjectStoreWithExpiration;

// create_gcs_object_store a GoogleCloudStorage object store from given uri.
// It is configured by environment variables. Currently, we only support
// following environment variables:
// - GOOGLE_SERVICE_ACCOUNT_KEY
// - GOOGLE_SERVICE_ACCOUNT_PATH
pub(crate) fn create_gcs_object_store(uri: &Url) -> ObjectStoreWithExpiration {
let bucket_name = parse_gcs_bucket(uri).unwrap_or_else(|| {
panic!("unsupported gcs uri: {}", uri);

Check warning on line 15 in src/object_store/gcs.rs

View check run for this annotation

Codecov / codecov/patch

src/object_store/gcs.rs#L15

Added line #L15 was not covered by tests
});

let mut gcs_builder = GoogleCloudStorageBuilder::new().with_bucket_name(bucket_name);

let gcs_config = GoogleStorageConfig::load();

// service account key
if let Some(service_account_key) = gcs_config.service_account_key {
gcs_builder = gcs_builder.with_service_account_key(&service_account_key);
}

// service account path
if let Some(service_account_path) = gcs_config.service_account_path {
gcs_builder = gcs_builder.with_service_account_path(&service_account_path);

Check warning on line 29 in src/object_store/gcs.rs

View check run for this annotation

Codecov / codecov/patch

src/object_store/gcs.rs#L29

Added line #L29 was not covered by tests
}

let object_store = gcs_builder.build().unwrap_or_else(|e| panic!("{}", e));

// object store handles refreshing bearer token, so we do not need to handle expiry here
let expire_at = None;

ObjectStoreWithExpiration {
object_store: Arc::new(object_store),
expire_at,
}
}

pub(crate) fn parse_gcs_bucket(uri: &Url) -> Option<String> {
let host = uri.host_str()?;

// gs://{bucket}/key
if uri.scheme() == "gs" {
return Some(host.to_string());
}

None

Check warning on line 51 in src/object_store/gcs.rs

View check run for this annotation

Codecov / codecov/patch

src/object_store/gcs.rs#L49-L51

Added lines #L49 - L51 were not covered by tests
}

// GoogleStorageConfig is a struct that holds the configuration that is
// used to configure the Google Storage object store.
struct GoogleStorageConfig {
service_account_key: Option<String>,
service_account_path: Option<String>,
}

impl GoogleStorageConfig {
// load loads the Google Storage configuration from the environment.
fn load() -> Self {
Self {
service_account_key: std::env::var("GOOGLE_SERVICE_ACCOUNT_KEY").ok(),
service_account_path: std::env::var("GOOGLE_SERVICE_ACCOUNT_PATH").ok(),
}
}
}
16 changes: 10 additions & 6 deletions src/object_store/object_store_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
use url::Url;

use super::{
aws::parse_s3_bucket, azure::parse_azure_blob_container, create_azure_object_store,
create_local_file_object_store, create_s3_object_store,
aws::parse_s3_bucket,
azure::parse_azure_blob_container,
create_azure_object_store, create_local_file_object_store, create_s3_object_store,
gcs::{create_gcs_object_store, parse_gcs_bucket},
};

// OBJECT_STORE_CACHE is a global cache for object stores per Postgres session.
Expand Down Expand Up @@ -44,7 +46,7 @@
fn get_or_create(&mut self, uri: &Url, copy_from: bool) -> (Arc<dyn ObjectStore>, Path) {
let (scheme, path) = ObjectStoreScheme::parse(uri).unwrap_or_else(|_| {
panic!(
"unrecognized uri {}. pg_parquet supports local paths, s3:// or azure:// schemes.",
"unrecognized uri {}. pg_parquet supports local paths, s3://, azure:// or gs:// schemes.",
uri
)
});
Expand Down Expand Up @@ -74,13 +76,14 @@

fn create(scheme: ObjectStoreScheme, uri: &Url, copy_from: bool) -> ObjectStoreWithExpiration {
// object_store crate can recognize a bunch of different schemes and paths, but we only support
// local, azure, and s3 schemes with a subset of all supported paths.
// local, s3, azure and gs schemes with a subset of all supported paths.
match scheme {
ObjectStoreScheme::AmazonS3 => create_s3_object_store(uri),
ObjectStoreScheme::MicrosoftAzure => create_azure_object_store(uri),
ObjectStoreScheme::GoogleCloudStorage => create_gcs_object_store(uri),
ObjectStoreScheme::Local => create_local_file_object_store(uri, copy_from),
_ => panic!(
"unsupported scheme {} in uri {}. pg_parquet supports local paths, s3:// or azure:// schemes.",
"unsupported scheme {} in uri {}. pg_parquet supports local paths, s3://, azure:// or gs:// schemes.",

Check warning on line 86 in src/object_store/object_store_cache.rs

View check run for this annotation

Codecov / codecov/patch

src/object_store/object_store_cache.rs#L86

Added line #L86 was not covered by tests
uri.scheme(),
uri
),
Expand Down Expand Up @@ -131,9 +134,10 @@
let bucket = match scheme {
ObjectStoreScheme::AmazonS3 => parse_s3_bucket(uri).unwrap_or_else(|| panic!("unsupported s3 uri: {uri}")),
ObjectStoreScheme::MicrosoftAzure => parse_azure_blob_container(uri).unwrap_or_else(|| panic!("unsupported azure blob storage uri: {uri}")),
ObjectStoreScheme::GoogleCloudStorage => parse_gcs_bucket(uri).unwrap_or_else(|| panic!("unsupported gs uri: {uri}")),
ObjectStoreScheme::Local => panic!("local paths should not be cached"),
_ => panic!(
"unsupported scheme {} in uri {}. pg_parquet supports local paths, s3:// or azure:// schemes.",
"unsupported scheme {} in uri {}. pg_parquet supports local paths, s3://, azure:// or gs:// schemes.",
uri.scheme(),
uri
),
Expand Down
Loading