Skip to content

Commit

Permalink
Add support for Amazon S3 bucket storage backend.
Browse files Browse the repository at this point in the history
This adds support for using Amazon's S3 bucket as a storage backend via
the `AmazonS3Store` module. It is only implemented for the `Lwt` concurrency
library at this time since the underlying AWS-S3 library does not yet
have support for `Eio`.
  • Loading branch information
zoj613 committed Nov 19, 2024
1 parent c9fa270 commit 006b466
Show file tree
Hide file tree
Showing 8 changed files with 240 additions and 2 deletions.
30 changes: 29 additions & 1 deletion .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,41 @@ jobs:
local-packages:
- zarr.opam

env:
AWS_ACCESS_KEY_ID: minioadmin
AWS_SECRET_ACCESS_KEY: minioadmin

services:
minio:
image: fclairamb/minio-github-actions
ports:
- 9000:9000

name: Ocaml version - ${{ matrix.ocaml-compiler }} - ${{ matrix.os }}
steps:
- name: checkout
uses: actions/checkout@v4
with:
fetch-depth: 2

- name: Setup Minio
run: |
mkdir ~/.aws
echo '[default]' > ~/.aws/credentials
echo 'aws_access_key_id = minioadmin' >> ~/.aws/credentials
echo 'aws_secret_access_key = minioadmin' >> ~/.aws/credentials
pip3 install minio
python3 - <<'EOF'
from minio import Minio
minio = Minio(
'localhost:9000',
access_key='minioadmin',
secret_key='minioadmin',
secure=False
)
minio.make_bucket('test-bucket-lwt', location='us-east-1')
EOF
- name: setup-ocaml
uses: ocaml/setup-ocaml@v3
with:
Expand All @@ -45,7 +73,7 @@ jobs:
run: |
opam install --deps-only --with-test --with-doc --yes zarr
opam install bytesrw conf-zlib conf-zstd --yes
opam install lwt --yes
opam install lwt aws-s3-lwt --yes
opam exec -- dune build zarr zarr-sync zarr-lwt
- name: setup ocaml-5-specific
Expand Down
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,8 @@ docs:
.PHONY: view-docs
view-docs: docs
chromium _build/default/_doc/_html/index.html

.PHONY: minio
minio:
mkdir -p /tmp/minio/test-bucket-lwt
docker run --rm -it -p 9000:9000 -v /tmp/minio:/minio minio/minio:latest server /minio
1 change: 1 addition & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
(and (>= 4.14.0)))
(zarr (= :version))
(lwt (>= 2.5.1))
(aws-s3-lwt (>= 4.8.1))
(odoc :with-doc)
(ounit2 :with-test)
(ppx_deriving :with-test)
Expand Down
1 change: 1 addition & 0 deletions zarr-lwt.opam
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ depends: [
"ocaml" {>= "4.14.0"}
"zarr" {= version}
"lwt" {>= "2.5.1"}
"aws-s3-lwt" {>= "4.8.1"}
"odoc" {with-doc}
"ounit2" {with-test}
"ppx_deriving" {with-test}
Expand Down
1 change: 1 addition & 0 deletions zarr-lwt/src/dune
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
(public_name zarr-lwt)
(libraries
zarr
aws-s3-lwt
lwt
lwt.unix)
(ocamlopt_flags
Expand Down
165 changes: 165 additions & 0 deletions zarr-lwt/src/storage.ml
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,168 @@ module FilesystemStore = struct

include Zarr.Storage.Make(IO)
end

module AmazonS3Store = struct
module Credentials = Aws_s3_lwt.Credentials
module S3 = Aws_s3_lwt.S3

open Deferred.Infix
open Deferred.Syntax

exception S3_error of S3.error

let empty_content () = S3.{
storage_class = Standard;
meta_headers = None;
etag = String.empty;
key = String.empty;
last_modified = 0.;
size = 0
}

let fold_or_catch ~not_found res =
let return_or_raise r () = match r with
| Ok v -> Deferred.return v
| Error e -> raise (S3_error e)
and on_exception ~not_found = function
| S3_error S3.Not_found -> Lwt.return (not_found ())
| exn -> raise exn

Check warning on line 172 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L172

Added line #L172 was not covered by tests
in
Lwt.catch (return_or_raise res) (on_exception ~not_found)

let raise_not_found k () = raise (Zarr.Storage.Key_not_found k)

Check warning on line 176 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L176

Added line #L176 was not covered by tests

let empty_Ls = Fun.const ([], S3.Ls.Done)

let fold_continuation ~return ~more = function
| S3.Ls.Done -> Deferred.return return
| S3.Ls.More continuation ->
continuation () >>= fold_or_catch ~not_found:empty_Ls >>= fun (xs, cont) ->
more xs cont

Check warning on line 184 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L182-L184

Added lines #L182 - L184 were not covered by tests

module IO = struct
module Deferred = Deferred

type t =
{bucket : string
;cred : Credentials.t
;endpoint : Aws_s3.Region.endpoint}

let size t key =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let* res = S3.head ~bucket ~credentials ~endpoint ~key () in
let+ c = fold_or_catch ~not_found:empty_content res in
c.size

let is_member t key =
let+ size = size t key in
if size = 0 then false else true

let get t key =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let* res = S3.get ~bucket ~credentials ~endpoint ~key () in
fold_or_catch ~not_found:(raise_not_found key) res

let get_partial_values t key ranges =
let read_range t key (ofs, len) =
let range = match len with
| None -> S3.{first = Some ofs; last = None}
| Some l -> S3.{first = Some ofs; last = Some (ofs + l - 1)}
in
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let* res = S3.get ~bucket ~credentials ~endpoint ~range ~key () in
let+ data = fold_or_catch ~not_found:(raise_not_found key) res in
[data]
in
Deferred.concat_map (read_range t key) ranges

let set t key data =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let* res = S3.put ~bucket ~credentials ~endpoint ~data ~key () in
let* _ = fold_or_catch ~not_found:(Fun.const String.empty) res in
Deferred.return_unit

let set_partial_values t key ?(append=false) rsv =
let* size = size t key in
let* ov = match size with
| 0 -> Deferred.return String.empty
| _ -> get t key
in
let f = if append || ov = String.empty then
fun acc (_, v) -> acc ^ v else
fun acc (rs, v) ->
let s = Bytes.unsafe_of_string acc in
Bytes.blit_string v 0 s rs String.(length v);
Bytes.unsafe_to_string s
in
set t key (List.fold_left f ov rsv)

let erase t key =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let* res = S3.delete ~bucket ~credentials ~endpoint ~key () in
fold_or_catch ~not_found:(Fun.const ()) res

let rec delete_keys t cont () =
let del t xs c = Deferred.iter (delete_content t) xs >>= delete_keys t c in

Check warning on line 249 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L249

Added line #L249 was not covered by tests
fold_continuation ~return:() ~more:(del t) cont

and delete_content t S3.{key; _} = erase t key

and erase_prefix t prefix =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let* res = S3.ls ~bucket ~credentials ~endpoint ~prefix () in
let* xs, rest = fold_or_catch ~not_found:empty_Ls res in
Deferred.iter (delete_content t) xs >>= delete_keys t rest

let rec list t =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let* res = S3.ls ~bucket ~credentials ~endpoint () in
let* xs, rest = fold_or_catch ~not_found:empty_Ls res in
accumulate_keys (List.map content_key xs) rest

and content_key S3.{key; _} = key

and accumulate_keys acc cont =
let append acc xs c = accumulate_keys (acc @ List.map content_key xs) c in

Check warning on line 269 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L269

Added line #L269 was not covered by tests
fold_continuation ~return:acc ~more:(append acc) cont

module S = Set.Make(String)

let rec partition_keys prefix ((l, r) as acc) cont =
let split ~acc ~prefix xs c = partition_keys prefix (List.fold_left (add prefix) acc xs) c in

Check warning on line 275 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L275

Added line #L275 was not covered by tests
fold_continuation ~return:(l, S.elements r) ~more:(split ~acc ~prefix) cont

and add prefix (l, r) (c : S3.content) =
let size = String.length prefix in
if not (String.contains_from c.key size '/') then c.key :: l, r else
l, S.add String.(sub c.key 0 @@ 1 + index_from c.key size '/') r

and list_dir t prefix =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let* res = S3.ls ~bucket ~credentials ~endpoint ~prefix () in
let* xs, rest = fold_or_catch ~not_found:empty_Ls res in
let init = List.fold_left (add prefix) ([], S.empty) xs in
partition_keys prefix init rest

let rec rename t prefix new_prefix =
let upload t (k, v) = set t k v in
let* xs = list t in
let to_delete = List.filter (String.starts_with ~prefix) xs in
let* data = Deferred.fold_left (rename_and_add ~t ~prefix ~new_prefix) [] to_delete in
let* () = Deferred.iter (upload t) data in
Deferred.iter (erase t) to_delete

and rename_and_add ~t ~prefix ~new_prefix acc k =
let l = String.length prefix in
let k' = new_prefix ^ String.sub k l (String.length k - l) in
let+ a = get t k in (k', a) :: acc
end

let with_open ?(scheme=`Http) ?(inet=`V4) ~region ~bucket ~profile f =
let* res = Credentials.Helper.get_credentials ~profile () in
let cred = Result.fold ~ok:Fun.id ~error:raise res in
let endpoint = Aws_s3.Region.endpoint ~inet ~scheme region in
f IO.{bucket; cred; endpoint}

include Zarr.Storage.Make(IO)
end
32 changes: 31 additions & 1 deletion zarr-lwt/src/storage.mli
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(** An Lwt-aware in-memory storage backend for Zarr v3 hierarchy. *)
module MemoryStore : sig include Zarr.Memory.S with type 'a Deferred.t = 'a Lwt.t end
module MemoryStore : Zarr.Memory.S with type 'a Deferred.t = 'a Lwt.t

(** An Lwt-aware Zip file storage backend for a Zarr v3 hierarchy. *)
module ZipStore : Zarr.Zip.S with type 'a Deferred.t = 'a Lwt.t
Expand All @@ -18,3 +18,33 @@ module FilesystemStore : sig
@raise Failure if [dir] is not a Zarr store path. *)
end

(** An Lwt-aware Amazon S3 bucket storage backend for a Zarr V3 hierarchy. *)
module AmazonS3Store : sig
exception S3_error of Aws_s3_lwt.S3.error

include Zarr.Storage.STORE with type 'a Deferred.t = 'a Lwt.t

val with_open :
?scheme:[ `Http | `Https ] ->
?inet:[ `V4 | `V6 ] ->
region:Aws_s3.Region.t ->
bucket:string ->
profile:string ->
(t -> 'a Lwt.t) ->
'a Lwt.t
(** [with_open ~region ~bucket ~profile f] opens an S3 bucket store with
bucket name [bucket] at region [region] using credentials specified by
profile [profile]. The credentials are read locally from a [~/.aws/credentials]
file or from an IAM service if the profile or file is not available.
Function [f] is applied to the store's open handle and its output is
returned to the caller.
{ul
{- [scheme] is the HTTP scheme to use when connecting to S3, and must be
one of [`Http | `Https]. Defaults to [`Http].}
{- [inet] is the IP version and must be one of [`V4 | `V6]. Defaults to [`V4].}
}
@raise S3_error if an error occurs while sending a request to the S3 service. *)
end
7 changes: 7 additions & 0 deletions zarr-lwt/test/test_lwt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,18 @@ let _ =
(Zarr.Storage.Not_a_filesystem_store fn)
(fun () -> FilesystemStore.open_store fn);

(* ZipStore configuration *)
let zpath = tmp_dir ^ ".zip" in
(* AmazonS3Store configuration *)
let region = Aws_s3.Region.minio ~port:9000 ~host:"localhost" ()
and bucket = "test-bucket-lwt"
and profile = "default" in

Lwt_main.run @@ Lwt.join
[ZipStore.with_open `Read_write zpath (fun z -> test_storage (module ZipStore) z)
(* test just opening the now exisitant archive created by the previous test. *)
;ZipStore.with_open `Read_only zpath (fun _ -> ZipStore.Deferred.return_unit)
;AmazonS3Store.with_open ~region ~bucket ~profile (test_storage (module AmazonS3Store))
;test_storage (module MemoryStore) @@ MemoryStore.create ()
;test_storage (module FilesystemStore) s])
])

0 comments on commit 006b466

Please sign in to comment.