Skip to content

Commit

Permalink
Add support for Amazon S3 bucket storage backend.
Browse files Browse the repository at this point in the history
This adds support for using Amazon's S3 bucket as a storage backend via
the `AmazonS3Store` module. It is only implemented for the `Lwt` concurrency
library at this time since the underlying AWS-S3 library does not yet
have support for `Eio`.
  • Loading branch information
zoj613 committed Nov 18, 2024
1 parent c9fa270 commit b1eab4c
Show file tree
Hide file tree
Showing 8 changed files with 252 additions and 2 deletions.
30 changes: 29 additions & 1 deletion .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,41 @@ jobs:
local-packages:
- zarr.opam

env:
AWS_ACCESS_KEY_ID: minioadmin
AWS_SECRET_ACCESS_KEY: minioadmin

services:
minio:
image: fclairamb/minio-github-actions
ports:
- 9000:9000

name: Ocaml version - ${{ matrix.ocaml-compiler }} - ${{ matrix.os }}
steps:
- name: checkout
uses: actions/checkout@v4
with:
fetch-depth: 2

- name: Setup Minio
run: |
mkdir ~/.aws
echo '[default]' > ~/.aws/credentials
echo 'aws_access_key_id = minioadmin' >> ~/.aws/credentials
echo 'aws_secret_access_key = minioadmin' >> ~/.aws/credentials
pip3 install minio
python3 - <<'EOF'
from minio import Minio
minio = Minio(
'localhost:9000',
access_key='minioadmin',
secret_key='minioadmin',
secure=False
)
minio.make_bucket('test-bucket-lwt', location='us-east-1')
EOF
- name: setup-ocaml
uses: ocaml/setup-ocaml@v3
with:
Expand All @@ -45,7 +73,7 @@ jobs:
run: |
opam install --deps-only --with-test --with-doc --yes zarr
opam install bytesrw conf-zlib conf-zstd --yes
opam install lwt --yes
opam install lwt aws-s3-lwt --yes
opam exec -- dune build zarr zarr-sync zarr-lwt
- name: setup ocaml-5-specific
Expand Down
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,8 @@ docs:
.PHONY: view-docs
view-docs: docs
chromium _build/default/_doc/_html/index.html

.PHONY: minio
minio:
mkdir -p /tmp/minio/test-bucket-lwt
docker run --rm -it -p 9000:9000 -v /tmp/minio:/minio minio/minio:latest server /minio
1 change: 1 addition & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
(and (>= 4.14.0)))
(zarr (= :version))
(lwt (>= 2.5.1))
(aws-s3-lwt (>= 4.8.1))
(odoc :with-doc)
(ounit2 :with-test)
(ppx_deriving :with-test)
Expand Down
1 change: 1 addition & 0 deletions zarr-lwt.opam
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ depends: [
"ocaml" {>= "4.14.0"}
"zarr" {= version}
"lwt" {>= "2.5.1"}
"aws-s3-lwt" {>= "4.8.1"}
"odoc" {with-doc}
"ounit2" {with-test}
"ppx_deriving" {with-test}
Expand Down
1 change: 1 addition & 0 deletions zarr-lwt/src/dune
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
(public_name zarr-lwt)
(libraries
zarr
aws-s3-lwt
lwt
lwt.unix)
(ocamlopt_flags
Expand Down
177 changes: 177 additions & 0 deletions zarr-lwt/src/storage.ml
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,180 @@ module FilesystemStore = struct

include Zarr.Storage.Make(IO)
end

module AmazonS3Store = struct
module Credentials = Aws_s3_lwt.Credentials
module S3 = Aws_s3_lwt.S3

open Deferred.Syntax

exception S3_error of S3.error

let empty_content () = S3.{
size = 0;
last_modified = 0.;
key = String.empty;
etag = String.empty;
meta_headers = None;
storage_class = Standard
}

let empty_Ls = Fun.const ([], S3.Ls.Done)

let content_key S3.{key; _} = key

let raise_not_found k () = raise (Zarr.Storage.Key_not_found k)

Check warning on line 169 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L169

Added line #L169 was not covered by tests

let fold_and_catch :
not_found:(unit -> 'b) -> ('a, S3.error) result -> 'b Lwt.t
= fun ~not_found res ->
Lwt.catch
(fun () -> match res with
| Ok v -> Deferred.return v
| Error e -> raise (S3_error e))
(function
| S3_error Not_found -> Lwt.return (not_found ())
| exn -> raise exn)

Check warning on line 180 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L180

Added line #L180 was not covered by tests

module IO = struct
module Deferred = Deferred

type t =
{bucket : string
;cred : Credentials.t
;endpoint : Aws_s3.Region.endpoint}

let size t key =
let content_size S3.{size; _} = size in
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let* res = S3.head ~bucket ~credentials ~endpoint ~key () in
let+ c = fold_and_catch ~not_found:empty_content res in
content_size c

let is_member t key =
let+ size = size t key in
if size = 0 then false else true

let get t key =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let* res = S3.get ~bucket ~credentials ~endpoint ~key () in
fold_and_catch ~not_found:(raise_not_found key) res

let get_partial_values t key ranges =
let read_range ~t ~key (ofs, len) =
let range : S3.range = match len with
| None -> {first = Some ofs; last = None}
| Some l -> {first = Some ofs; last = Some (ofs + l - 1)}
in
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let* res = S3.get ~bucket ~credentials ~endpoint ~range ~key () in
let+ data = fold_and_catch ~not_found:(raise_not_found key) res in
[data]
in
Deferred.concat_map (read_range ~t ~key) ranges

let set t key data =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let* res = S3.put ~bucket ~credentials ~endpoint ~data ~key () in
let* _ = fold_and_catch ~not_found:(Fun.const String.empty) res in
Deferred.return_unit

let set_partial_values t key ?(append=false) rsv =
let* size = size t key in
let* ov = match size with
| 0 -> Deferred.return String.empty
| _ -> get t key
in
let f = if append || ov = String.empty then
fun acc (_, v) -> acc ^ v else
fun acc (rs, v) ->
let s = Bytes.unsafe_of_string acc in
Bytes.blit_string v 0 s rs String.(length v);
Bytes.unsafe_to_string s
in
set t key (List.fold_left f ov rsv)

let erase t key =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let* res = S3.delete ~bucket ~credentials ~endpoint ~key () in
fold_and_catch ~not_found:(Fun.const ()) res

let rec delete_keys : t -> S3.Ls.cont -> unit Deferred.t = fun t -> function
| Done -> Deferred.return_unit
| More k ->
let* res = k () in
let* xs, rest = fold_and_catch ~not_found:empty_Ls res in
let* () = Deferred.iter (delete_content t) xs in
delete_keys t rest

Check warning on line 251 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L247-L251

Added lines #L247 - L251 were not covered by tests

and delete_content t c = erase t (content_key c)

and erase_prefix t prefix =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let* res = S3.ls ~bucket ~credentials ~endpoint ~prefix () in
let* xs, rest = fold_and_catch ~not_found:empty_Ls res in
let* () = Deferred.iter (delete_content t) xs in
delete_keys t rest

let rec list t =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let* res = S3.ls ~bucket ~credentials ~endpoint () in
let* xs, rest = fold_and_catch ~not_found:empty_Ls res in
accumulate_keys (List.map content_key xs) rest

and accumulate_keys :
string list -> S3.Ls.cont -> string list Deferred.t
= fun acc -> function
| Done -> Deferred.return acc
| More k ->
let* res = k () in
let* xs, rest = fold_and_catch ~not_found:empty_Ls res in
accumulate_keys (acc @ List.map content_key xs) rest

Check warning on line 275 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L272-L275

Added lines #L272 - L275 were not covered by tests

module S = Set.Make(String)

let rec partition_add_keys :
string -> string list * S.t -> S3.Ls.cont -> (string list * S.t) Deferred.t
= fun prefix acc -> function
| Done -> Deferred.return acc
| More k ->
let* res = k () in
let* xs, rest = fold_and_catch ~not_found:empty_Ls res in
let acc' = List.fold_left (add ~prefix) acc xs in
partition_add_keys prefix acc' rest

Check warning on line 287 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L283-L287

Added lines #L283 - L287 were not covered by tests

and add ~prefix (l, r) (c : S3.content) =
let size = String.length prefix in
if not (String.contains_from c.key size '/') then c.key :: l, r else
l, S.add String.(sub c.key 0 @@ 1 + index_from c.key size '/') r

and list_dir t prefix =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let* res = S3.ls ~bucket ~credentials ~endpoint ~prefix () in
let* xs, rest = fold_and_catch ~not_found:empty_Ls res in
let init = List.fold_left (add ~prefix) ([], S.empty) xs in
let+ keys, prefixes = partition_add_keys prefix init rest in
(keys, S.elements prefixes)

let rec rename t prefix new_prefix =
let remove ~t (k, v) = set t k v in
let* xs = list t in
let to_delete = List.filter (String.starts_with ~prefix) xs in
let* data = Deferred.fold_left (rename_and_add ~t ~prefix ~new_prefix) [] to_delete in
let* () = Deferred.iter (remove ~t) data in
Deferred.iter (erase t) to_delete

and rename_and_add ~t ~prefix ~new_prefix acc k =
let l = String.length prefix in
let k' = new_prefix ^ String.sub k l (String.length k - l) in
let+ a = get t k in (k', a) :: acc
end

let with_open ?(scheme=`Http) ?(inet=`V4) ~region ~bucket ~profile f =
let* res = Credentials.Helper.get_credentials ~profile () in
let cred = Result.fold ~ok:Fun.id ~error:raise res in
let endpoint = Aws_s3.Region.endpoint ~inet ~scheme region in
f IO.{bucket; cred; endpoint}

include Zarr.Storage.Make(IO)
end
32 changes: 31 additions & 1 deletion zarr-lwt/src/storage.mli
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(** An Lwt-aware in-memory storage backend for Zarr v3 hierarchy. *)
module MemoryStore : sig include Zarr.Memory.S with type 'a Deferred.t = 'a Lwt.t end
module MemoryStore : Zarr.Memory.S with type 'a Deferred.t = 'a Lwt.t

(** An Lwt-aware Zip file storage backend for a Zarr v3 hierarchy. *)
module ZipStore : Zarr.Zip.S with type 'a Deferred.t = 'a Lwt.t
Expand All @@ -18,3 +18,33 @@ module FilesystemStore : sig
@raise Failure if [dir] is not a Zarr store path. *)
end

(** An Lwt-aware Amazon S3 bucket storage backend for a Zarr V3 hierarchy. *)
module AmazonS3Store : sig
exception S3_error of Aws_s3_lwt.S3.error

include Zarr.Storage.STORE with type 'a Deferred.t = 'a Lwt.t

val with_open :
?scheme:[ `Http | `Https ] ->
?inet:[ `V4 | `V6 ] ->
region:Aws_s3.Region.t ->
bucket:string ->
profile:string ->
(t -> 'a Lwt.t) ->
'a Lwt.t
(** [with_open ~region ~bucket ~profile f] opens an S3 bucket store with
bucket name [bucket] at region [region] using credentials specified by
profile [profile]. The credentials are read locally from a [~/.aws/credentials]
file or from an IAM service if the profile or file is not available.
Function [f] is applied to the store's open handle and its output is
returned to the caller.
{ul
{- [scheme] is the HTTP scheme to use when connecting to S3, and must be
one of [`Http | `Https]. Defaults to [`Http].}
{- [inet] is the IP version and must be one of [`V4 | `V6]. Defaults to [`V4].}
}
@raise S3_error if an error occurs while sending a request to the S3 service. *)
end
7 changes: 7 additions & 0 deletions zarr-lwt/test/test_lwt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,18 @@ let _ =
(Zarr.Storage.Not_a_filesystem_store fn)
(fun () -> FilesystemStore.open_store fn);

(* ZipStore configuration *)
let zpath = tmp_dir ^ ".zip" in
(* AmazonS3Store configuration *)
let region = Aws_s3.Region.minio ~port:9000 ~host:"localhost" ()
and bucket = "test-bucket-lwt"
and profile = "default" in

Lwt_main.run @@ Lwt.join
[ZipStore.with_open `Read_write zpath (fun z -> test_storage (module ZipStore) z)
(* test just opening the now exisitant archive created by the previous test. *)
;ZipStore.with_open `Read_only zpath (fun _ -> ZipStore.Deferred.return_unit)
;AmazonS3Store.with_open ~region ~bucket ~profile (test_storage (module AmazonS3Store))
;test_storage (module MemoryStore) @@ MemoryStore.create ()
;test_storage (module FilesystemStore) s])
])

0 comments on commit b1eab4c

Please sign in to comment.