From b1eab4c672498ce76b9cf538ad952d8990dff8a6 Mon Sep 17 00:00:00 2001 From: Zolisa Bleki Date: Sun, 17 Nov 2024 19:45:19 +0200 Subject: [PATCH] Add support for Amazon S3 bucket storage backend. This adds support for using Amazon's S3 bucket as a storage backend via the `AmazonS3Store` module. It is only implemented for the `Lwt` concurrency library at this time since the underlying AWS-S3 library does not yet have support for `Eio`. --- .github/workflows/build-and-test.yml | 30 ++++- Makefile | 5 + dune-project | 1 + zarr-lwt.opam | 1 + zarr-lwt/src/dune | 1 + zarr-lwt/src/storage.ml | 177 +++++++++++++++++++++++++++ zarr-lwt/src/storage.mli | 32 ++++- zarr-lwt/test/test_lwt.ml | 7 ++ 8 files changed, 252 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index 2608ab6b..5f01af23 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -29,6 +29,16 @@ jobs: local-packages: - zarr.opam + env: + AWS_ACCESS_KEY_ID: minioadmin + AWS_SECRET_ACCESS_KEY: minioadmin + + services: + minio: + image: fclairamb/minio-github-actions + ports: + - 9000:9000 + name: Ocaml version - ${{ matrix.ocaml-compiler }} - ${{ matrix.os }} steps: - name: checkout @@ -36,6 +46,24 @@ jobs: with: fetch-depth: 2 + - name: Setup Minio + run: | + mkdir ~/.aws + echo '[default]' > ~/.aws/credentials + echo 'aws_access_key_id = minioadmin' >> ~/.aws/credentials + echo 'aws_secret_access_key = minioadmin' >> ~/.aws/credentials + pip3 install minio + python3 - <<'EOF' + from minio import Minio + minio = Minio( + 'localhost:9000', + access_key='minioadmin', + secret_key='minioadmin', + secure=False + ) + minio.make_bucket('test-bucket-lwt', location='us-east-1') + EOF + - name: setup-ocaml uses: ocaml/setup-ocaml@v3 with: @@ -45,7 +73,7 @@ jobs: run: | opam install --deps-only --with-test --with-doc --yes zarr opam install bytesrw conf-zlib conf-zstd --yes - opam install lwt --yes + opam install lwt aws-s3-lwt --yes opam exec -- dune build zarr zarr-sync zarr-lwt - name: setup ocaml-5-specific diff --git a/Makefile b/Makefile index 1ca1b2a2..5319f402 100644 --- a/Makefile +++ b/Makefile @@ -26,3 +26,8 @@ docs: .PHONY: view-docs view-docs: docs chromium _build/default/_doc/_html/index.html + +.PHONY: minio +minio: + mkdir -p /tmp/minio/test-bucket-lwt + docker run --rm -it -p 9000:9000 -v /tmp/minio:/minio minio/minio:latest server /minio diff --git a/dune-project b/dune-project index aaa4c9e0..7232da60 100644 --- a/dune-project +++ b/dune-project @@ -63,6 +63,7 @@ (and (>= 4.14.0))) (zarr (= :version)) (lwt (>= 2.5.1)) + (aws-s3-lwt (>= 4.8.1)) (odoc :with-doc) (ounit2 :with-test) (ppx_deriving :with-test) diff --git a/zarr-lwt.opam b/zarr-lwt.opam index 4e14ce44..f5e43a41 100644 --- a/zarr-lwt.opam +++ b/zarr-lwt.opam @@ -13,6 +13,7 @@ depends: [ "ocaml" {>= "4.14.0"} "zarr" {= version} "lwt" {>= "2.5.1"} + "aws-s3-lwt" {>= "4.8.1"} "odoc" {with-doc} "ounit2" {with-test} "ppx_deriving" {with-test} diff --git a/zarr-lwt/src/dune b/zarr-lwt/src/dune index 459c8244..9df95697 100644 --- a/zarr-lwt/src/dune +++ b/zarr-lwt/src/dune @@ -3,6 +3,7 @@ (public_name zarr-lwt) (libraries zarr + aws-s3-lwt lwt lwt.unix) (ocamlopt_flags diff --git a/zarr-lwt/src/storage.ml b/zarr-lwt/src/storage.ml index 85417811..857d3b9e 100644 --- a/zarr-lwt/src/storage.ml +++ b/zarr-lwt/src/storage.ml @@ -144,3 +144,180 @@ module FilesystemStore = struct include Zarr.Storage.Make(IO) end + +module AmazonS3Store = struct + module Credentials = Aws_s3_lwt.Credentials + module S3 = Aws_s3_lwt.S3 + + open Deferred.Syntax + + exception S3_error of S3.error + + let empty_content () = S3.{ + size = 0; + last_modified = 0.; + key = String.empty; + etag = String.empty; + meta_headers = None; + storage_class = Standard + } + + let empty_Ls = Fun.const ([], S3.Ls.Done) + + let content_key S3.{key; _} = key + + let raise_not_found k () = raise (Zarr.Storage.Key_not_found k) + + let fold_and_catch : + not_found:(unit -> 'b) -> ('a, S3.error) result -> 'b Lwt.t + = fun ~not_found res -> + Lwt.catch + (fun () -> match res with + | Ok v -> Deferred.return v + | Error e -> raise (S3_error e)) + (function + | S3_error Not_found -> Lwt.return (not_found ()) + | exn -> raise exn) + + module IO = struct + module Deferred = Deferred + + type t = + {bucket : string + ;cred : Credentials.t + ;endpoint : Aws_s3.Region.endpoint} + + let size t key = + let content_size S3.{size; _} = size in + let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in + let* res = S3.head ~bucket ~credentials ~endpoint ~key () in + let+ c = fold_and_catch ~not_found:empty_content res in + content_size c + + let is_member t key = + let+ size = size t key in + if size = 0 then false else true + + let get t key = + let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in + let* res = S3.get ~bucket ~credentials ~endpoint ~key () in + fold_and_catch ~not_found:(raise_not_found key) res + + let get_partial_values t key ranges = + let read_range ~t ~key (ofs, len) = + let range : S3.range = match len with + | None -> {first = Some ofs; last = None} + | Some l -> {first = Some ofs; last = Some (ofs + l - 1)} + in + let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in + let* res = S3.get ~bucket ~credentials ~endpoint ~range ~key () in + let+ data = fold_and_catch ~not_found:(raise_not_found key) res in + [data] + in + Deferred.concat_map (read_range ~t ~key) ranges + + let set t key data = + let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in + let* res = S3.put ~bucket ~credentials ~endpoint ~data ~key () in + let* _ = fold_and_catch ~not_found:(Fun.const String.empty) res in + Deferred.return_unit + + let set_partial_values t key ?(append=false) rsv = + let* size = size t key in + let* ov = match size with + | 0 -> Deferred.return String.empty + | _ -> get t key + in + let f = if append || ov = String.empty then + fun acc (_, v) -> acc ^ v else + fun acc (rs, v) -> + let s = Bytes.unsafe_of_string acc in + Bytes.blit_string v 0 s rs String.(length v); + Bytes.unsafe_to_string s + in + set t key (List.fold_left f ov rsv) + + let erase t key = + let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in + let* res = S3.delete ~bucket ~credentials ~endpoint ~key () in + fold_and_catch ~not_found:(Fun.const ()) res + + let rec delete_keys : t -> S3.Ls.cont -> unit Deferred.t = fun t -> function + | Done -> Deferred.return_unit + | More k -> + let* res = k () in + let* xs, rest = fold_and_catch ~not_found:empty_Ls res in + let* () = Deferred.iter (delete_content t) xs in + delete_keys t rest + + and delete_content t c = erase t (content_key c) + + and erase_prefix t prefix = + let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in + let* res = S3.ls ~bucket ~credentials ~endpoint ~prefix () in + let* xs, rest = fold_and_catch ~not_found:empty_Ls res in + let* () = Deferred.iter (delete_content t) xs in + delete_keys t rest + + let rec list t = + let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in + let* res = S3.ls ~bucket ~credentials ~endpoint () in + let* xs, rest = fold_and_catch ~not_found:empty_Ls res in + accumulate_keys (List.map content_key xs) rest + + and accumulate_keys : + string list -> S3.Ls.cont -> string list Deferred.t + = fun acc -> function + | Done -> Deferred.return acc + | More k -> + let* res = k () in + let* xs, rest = fold_and_catch ~not_found:empty_Ls res in + accumulate_keys (acc @ List.map content_key xs) rest + + module S = Set.Make(String) + + let rec partition_add_keys : + string -> string list * S.t -> S3.Ls.cont -> (string list * S.t) Deferred.t + = fun prefix acc -> function + | Done -> Deferred.return acc + | More k -> + let* res = k () in + let* xs, rest = fold_and_catch ~not_found:empty_Ls res in + let acc' = List.fold_left (add ~prefix) acc xs in + partition_add_keys prefix acc' rest + + and add ~prefix (l, r) (c : S3.content) = + let size = String.length prefix in + if not (String.contains_from c.key size '/') then c.key :: l, r else + l, S.add String.(sub c.key 0 @@ 1 + index_from c.key size '/') r + + and list_dir t prefix = + let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in + let* res = S3.ls ~bucket ~credentials ~endpoint ~prefix () in + let* xs, rest = fold_and_catch ~not_found:empty_Ls res in + let init = List.fold_left (add ~prefix) ([], S.empty) xs in + let+ keys, prefixes = partition_add_keys prefix init rest in + (keys, S.elements prefixes) + + let rec rename t prefix new_prefix = + let remove ~t (k, v) = set t k v in + let* xs = list t in + let to_delete = List.filter (String.starts_with ~prefix) xs in + let* data = Deferred.fold_left (rename_and_add ~t ~prefix ~new_prefix) [] to_delete in + let* () = Deferred.iter (remove ~t) data in + Deferred.iter (erase t) to_delete + + and rename_and_add ~t ~prefix ~new_prefix acc k = + let l = String.length prefix in + let k' = new_prefix ^ String.sub k l (String.length k - l) in + let+ a = get t k in (k', a) :: acc + end + + let with_open ?(scheme=`Http) ?(inet=`V4) ~region ~bucket ~profile f = + let* res = Credentials.Helper.get_credentials ~profile () in + let cred = Result.fold ~ok:Fun.id ~error:raise res in + let endpoint = Aws_s3.Region.endpoint ~inet ~scheme region in + f IO.{bucket; cred; endpoint} + + include Zarr.Storage.Make(IO) +end diff --git a/zarr-lwt/src/storage.mli b/zarr-lwt/src/storage.mli index 08a2d2fc..c5053034 100644 --- a/zarr-lwt/src/storage.mli +++ b/zarr-lwt/src/storage.mli @@ -1,5 +1,5 @@ (** An Lwt-aware in-memory storage backend for Zarr v3 hierarchy. *) -module MemoryStore : sig include Zarr.Memory.S with type 'a Deferred.t = 'a Lwt.t end +module MemoryStore : Zarr.Memory.S with type 'a Deferred.t = 'a Lwt.t (** An Lwt-aware Zip file storage backend for a Zarr v3 hierarchy. *) module ZipStore : Zarr.Zip.S with type 'a Deferred.t = 'a Lwt.t @@ -18,3 +18,33 @@ module FilesystemStore : sig @raise Failure if [dir] is not a Zarr store path. *) end + +(** An Lwt-aware Amazon S3 bucket storage backend for a Zarr V3 hierarchy. *) +module AmazonS3Store : sig + exception S3_error of Aws_s3_lwt.S3.error + + include Zarr.Storage.STORE with type 'a Deferred.t = 'a Lwt.t + + val with_open : + ?scheme:[ `Http | `Https ] -> + ?inet:[ `V4 | `V6 ] -> + region:Aws_s3.Region.t -> + bucket:string -> + profile:string -> + (t -> 'a Lwt.t) -> + 'a Lwt.t + (** [with_open ~region ~bucket ~profile f] opens an S3 bucket store with + bucket name [bucket] at region [region] using credentials specified by + profile [profile]. The credentials are read locally from a [~/.aws/credentials] + file or from an IAM service if the profile or file is not available. + Function [f] is applied to the store's open handle and its output is + returned to the caller. + + {ul + {- [scheme] is the HTTP scheme to use when connecting to S3, and must be + one of [`Http | `Https]. Defaults to [`Http].} + {- [inet] is the IP version and must be one of [`V4 | `V6]. Defaults to [`V4].} + } + + @raise S3_error if an error occurs while sending a request to the S3 service. *) +end diff --git a/zarr-lwt/test/test_lwt.ml b/zarr-lwt/test/test_lwt.ml index e3bf3f98..cf0b8932 100644 --- a/zarr-lwt/test/test_lwt.ml +++ b/zarr-lwt/test/test_lwt.ml @@ -141,11 +141,18 @@ let _ = (Zarr.Storage.Not_a_filesystem_store fn) (fun () -> FilesystemStore.open_store fn); + (* ZipStore configuration *) let zpath = tmp_dir ^ ".zip" in + (* AmazonS3Store configuration *) + let region = Aws_s3.Region.minio ~port:9000 ~host:"localhost" () + and bucket = "test-bucket-lwt" + and profile = "default" in + Lwt_main.run @@ Lwt.join [ZipStore.with_open `Read_write zpath (fun z -> test_storage (module ZipStore) z) (* test just opening the now exisitant archive created by the previous test. *) ;ZipStore.with_open `Read_only zpath (fun _ -> ZipStore.Deferred.return_unit) + ;AmazonS3Store.with_open ~region ~bucket ~profile (test_storage (module AmazonS3Store)) ;test_storage (module MemoryStore) @@ MemoryStore.create () ;test_storage (module FilesystemStore) s]) ])