From 19be1a644ae0ef10d0deb83e8b622e6de02e85b2 Mon Sep 17 00:00:00 2001 From: Anders Fugmann Date: Thu, 28 Nov 2024 22:53:42 +0100 Subject: [PATCH] Add skeleton for an Eio backend --- aws-s3-eio.opam | 32 +++++++++ aws-s3-eio/.#io.ml | 1 + aws-s3-eio/bin/aws_cli_async.ml | 7 ++ aws-s3-eio/bin/dune | 6 ++ aws-s3-eio/credentials.ml | 6 ++ aws-s3-eio/dune | 6 ++ aws-s3-eio/io.ml | 113 ++++++++++++++++++++++++++++++++ aws-s3-eio/io.mli | 2 + aws-s3-eio/s3.ml | 5 ++ 9 files changed, 178 insertions(+) create mode 100644 aws-s3-eio.opam create mode 120000 aws-s3-eio/.#io.ml create mode 100644 aws-s3-eio/bin/aws_cli_async.ml create mode 100644 aws-s3-eio/bin/dune create mode 100644 aws-s3-eio/credentials.ml create mode 100644 aws-s3-eio/dune create mode 100644 aws-s3-eio/io.ml create mode 100644 aws-s3-eio/io.mli create mode 100644 aws-s3-eio/s3.ml diff --git a/aws-s3-eio.opam b/aws-s3-eio.opam new file mode 100644 index 0000000..ab4ff20 --- /dev/null +++ b/aws-s3-eio.opam @@ -0,0 +1,32 @@ +opam-version: "2.0" +maintainer: "Anders Fugmann " +authors: "Anders Fugmann" +license: "BSD-3-Clause" +homepage: "https://github.com/andersfugmann/aws-s3" +dev-repo: "git+https://github.com/andersfugmann/aws-s3" +bug-reports: "https://github.com/andersfugmann/aws-s3/issues" +doc: "https://andersfugmann.github.io/aws-s3/" +build: [ + ["dune" "subst"] {dev} + ["dune" "build" "-p" name "-j" jobs] +] +depends: [ + "ocaml" {>= "4.14.0"} + "dune" {>= "2.0.0"} + "aws-s3" {= version} + "cohttp-eio" + "eio" {>= "1.2" } +] +synopsis: "Ocaml library for accessing Amazon S3 - Eio version" +description: """ +This library provides access to Amazon Simple Storage Solution (S3). +The library supports: +* Copying file to and from s3 +* List files in S3 (from root) +* Delete single/multi object in S3 +* HEAD operation on single objects +* Streaming transfer to and from aws. +* Multi part upload (including s3 -> s3 copy) +* Fetching machine role/credentials (though IAM) + +This library uses eio for concurrency""" diff --git a/aws-s3-eio/.#io.ml b/aws-s3-eio/.#io.ml new file mode 120000 index 0000000..529b732 --- /dev/null +++ b/aws-s3-eio/.#io.ml @@ -0,0 +1 @@ +afu@slartibartfast.1073069:1729245178 \ No newline at end of file diff --git a/aws-s3-eio/bin/aws_cli_async.ml b/aws-s3-eio/bin/aws_cli_async.ml new file mode 100644 index 0000000..cb852f3 --- /dev/null +++ b/aws-s3-eio/bin/aws_cli_async.ml @@ -0,0 +1,7 @@ +module Aws = Aws_cli.Aws.Make(Aws_s3_async.Io) + +let exec cmd = + Async.Thread_safe.block_on_async_exn (fun () -> Aws.exec cmd) + +let () = + Aws_cli.Cli.parse exec diff --git a/aws-s3-eio/bin/dune b/aws-s3-eio/bin/dune new file mode 100644 index 0000000..69b37f8 --- /dev/null +++ b/aws-s3-eio/bin/dune @@ -0,0 +1,6 @@ +(executable + (name aws_cli_eio) + (public_name aws-cli-eio) + (libraries aws_cli aws-s3-eio) + (package aws-s3-eio) +) diff --git a/aws-s3-eio/credentials.ml b/aws-s3-eio/credentials.ml new file mode 100644 index 0000000..8be1fe3 --- /dev/null +++ b/aws-s3-eio/credentials.ml @@ -0,0 +1,6 @@ +(** Async aware Credentials. + For API documentation + @see <../../../aws-s3/Aws_s3/Credentials/Make/index.html>({!module:Aws_s3.Credentials.Make}) +*) +include Aws_s3.Credentials.Make(Io) +type t = Aws_s3.Credentials.t diff --git a/aws-s3-eio/dune b/aws-s3-eio/dune new file mode 100644 index 0000000..daafbbd --- /dev/null +++ b/aws-s3-eio/dune @@ -0,0 +1,6 @@ +(library + (name aws_s3_eio) + (public_name aws-s3-eio) + (synopsis "Eio backed for aws-s3") + (libraries aws-s3 eio ipaddr.unix cohttp-eio) +) diff --git a/aws-s3-eio/io.ml b/aws-s3-eio/io.ml new file mode 100644 index 0000000..e7c7208 --- /dev/null +++ b/aws-s3-eio/io.ml @@ -0,0 +1,113 @@ +module Deferred = struct + type 'a t = 'a + module Or_error = struct + type nonrec 'a t = ('a, exn) result t + let return v = Ok v + let fail exn = Error exn + let catch f = + match f () with + | v -> v + | exception exn -> Error exn + + let (>>=) : 'a t -> ('a -> 'b t) -> 'b t = fun v f -> + match v with + | Ok v -> f v + | err -> err + end + + let (>>=) v f = f v + let (>>|) v f = f v + let (>>=?) v f = + match v with + | Ok v -> f v + | err -> err + + let return v = v + let after delay = + (* Need some Eio function to delay: + Eio.Time.Sleep time_state (float delay) + *) + failwith "Not implemented" + let catch f = match f () with + | v -> Ok v + | exception exn -> Error exn + + (* Need some state to be able to spawn a new fiber *) + let async = (* Eio spawn a new fiber *) failwith "Not implemented" +end + +module Ivar = struct + type 'a t = ('a Eio.Promise.t * 'a Eio.Promise.u) + let create () = Eio.Promise.create () + let fill (_t, u) v = Eio.Promise.resolve u v + let wait (t, _u) = Eio.Promise.await t +end + +module Pipe = struct + (* Create an infinite pipe that can be closed. + Then the reader is closed any new writes will fail + When the writer is closed the read can read new message until last element is read + The pipe uses callbacks???? + *) + + type ('a, 'b) pipe = ('a, 'b) Pipe.pipe + type 'a writer = 'a Pipe.Writer.t + type 'a reader = 'a Pipe.Reader.t + + let flush writer = Pipe.downstream_flushed writer >>= fun _ -> return () + let read reader = Pipe.read reader >>= function + | `Eof -> return None + | `Ok v -> return (Some v) + let write writer data = + (* Pipe.write writer data *) + Pipe.write_without_pushback writer data; + return () + let close writer = Pipe.close writer + let close_reader reader = Pipe.close_read reader + let create_reader ~f = Pipe.create_reader ~close_on_exception:true f + let create_writer ~f = Pipe.create_writer f + let transfer reader writer = Pipe.transfer_id reader writer + let create () = Pipe.create () + let is_closed pipe = Pipe.is_closed pipe + let closed pipe = Pipe.closed pipe +end + +module Net = struct + let connect ?connect_timeout_ms ~inet ~host ~port ~scheme () = + let uri = + let scheme = match scheme with + | `Http -> "http" + | `Https -> "https" + in + Uri.make ~scheme ~host:host ~port () + in + let options = + let domain : Async_unix.Unix.socket_domain = + match inet with + | `V4 -> PF_INET + | `V6 -> PF_INET6 + in + Core_unix.[AI_FAMILY domain] + in + let close_socket_no_error = function + | Conduit_async.V3.Inet_sock socket -> try Socket.shutdown socket `Both; with _ -> () + in + let interrupt = match connect_timeout_ms with + | None -> None + | Some ms -> Some (Async.after (Time_float_unix.Span.of_int_ms ms)) + in + Async.try_with (fun () -> Conduit_async.V3.connect_uri ?interrupt ~options uri) >>=? fun (socket, ic, oc) -> + let reader = Reader.pipe ic in + don't_wait_for ( + Async_kernel.Pipe.closed reader >>= fun () -> + Monitor.try_with ~name:"Io.Net.connect connection-cleanup" (fun () -> + Writer.close oc >>= fun () -> + Reader.close ic >>= fun () -> + return () + ) >>= fun _ -> + close_socket_no_error socket; + return () + ); + let writer = Writer.pipe oc in + Deferred.Or_error.return (reader, writer) +end diff --git a/aws-s3-eio/io.mli b/aws-s3-eio/io.mli new file mode 100644 index 0000000..bf76ce8 --- /dev/null +++ b/aws-s3-eio/io.mli @@ -0,0 +1,2 @@ +include Aws_s3.Types.Io + with type 'a Deferred.t = 'a diff --git a/aws-s3-eio/s3.ml b/aws-s3-eio/s3.ml new file mode 100644 index 0000000..bf424b5 --- /dev/null +++ b/aws-s3-eio/s3.ml @@ -0,0 +1,5 @@ +(** Async aware S3 commands. + For API documentation + @see <../../../aws-s3/Aws_s3/S3/Make/index.html>({!module:Aws_s3.S3.Make}) +*) +include Aws_s3.S3.Make(Io)