Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement partial encode/decode for sharding codec. #37

Merged
merged 5 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
283 changes: 185 additions & 98 deletions lib/codecs/array_to_bytes.ml

Large diffs are not rendered by default.

24 changes: 23 additions & 1 deletion lib/codecs/array_to_bytes.mli
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,29 @@ module ArrayToBytes : sig
array_tobytes ->
('a, 'b) Util.array_repr ->
string ->
(('a, 'b, Bigarray.c_layout) Bigarray.Genarray.t, [> error]) result
(('a, 'b, Bigarray.c_layout) Bigarray.Genarray.t
,[> `Store_read of string | error]) result
val of_yojson : Yojson.Safe.t -> (array_tobytes, string) result
val to_yojson : array_tobytes -> Yojson.Safe.t
end

module ShardingIndexedCodec : sig
type t = internal_shard_config
val partial_encode :
t ->
((int * int option) list ->
(string list, [> `Store_read of string | error ] as 'c) result) ->
partial_setter ->
int ->
('a, 'b) Util.array_repr ->
(int array * 'a) list ->
(unit, 'c) result
val partial_decode :
t ->
((int * int option) list ->
(string list, [> `Store_read of string | error ] as 'c) result) ->
int ->
('a, 'b) Util.array_repr ->
(int * int array) list ->
((int * 'a) list, 'c) result
end
36 changes: 35 additions & 1 deletion lib/codecs/codecs.ml
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,45 @@
(fun acc c -> acc >>= BytesToBytes.encode c)
(ArrayToBytes.encode t.a2b y) t.b2b

let is_just_sharding : t -> bool = function
| {a2a = []; a2b = `ShardingIndexed _; b2b = []} -> true
| _ -> false

let partial_encode :
t ->
((int * int option) list ->
(string list, [> `Store_read of string | error] as 'c) result) ->
partial_setter ->
int ->
('a, 'b) Util.array_repr ->
(int array * 'a) list ->
(unit, 'c) result
= fun t f g bsize repr pairs ->
match t.a2b with
| `ShardingIndexed c ->
ShardingIndexedCodec.partial_encode c f g bsize repr pairs
| `Bytes _ -> failwith "bytes codec does not support partial encoding."

Check warning on line 113 in lib/codecs/codecs.ml

View check run for this annotation

Codecov / codecov/patch

lib/codecs/codecs.ml#L113

Added line #L113 was not covered by tests

let partial_decode :
t ->
((int * int option) list ->
(string list, [> `Store_read of string | error ] as 'c) result) ->
int ->
('a, 'b) Util.array_repr ->
(int * int array) list ->
((int * 'a) list, 'c) result
= fun t f s repr pairs ->
match t.a2b with
| `ShardingIndexed c ->
ShardingIndexedCodec.partial_decode c f s repr pairs
| `Bytes _ -> failwith "bytes codec does not support partial decoding."

Check warning on line 127 in lib/codecs/codecs.ml

View check run for this annotation

Codecov / codecov/patch

lib/codecs/codecs.ml#L127

Added line #L127 was not covered by tests

let decode :
t ->
('a, 'b) Util.array_repr ->
string ->
(('a, 'b, Bigarray.c_layout) Bigarray.Genarray.t, [> error]) result
(('a, 'b, Bigarray.c_layout) Bigarray.Genarray.t
,[> `Store_read of string | error]) result
= fun t repr x ->
List.fold_right
(fun c acc -> acc >>= BytesToBytes.decode c) t.b2b (Ok x)
Expand Down
26 changes: 25 additions & 1 deletion lib/codecs/codecs.mli
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ module Chain : sig
the required codecs as defined in the Zarr Version 3 specification. *)
val default : t

(** [is_just_sharding t] is [true] if the codec chain [t] contains only
the [sharding_indexed] codec. *)
val is_just_sharding : t -> bool

(** [encode t x] computes the encoded byte string representation of
array chunk [x]. Returns an error upon failure. *)
val encode :
Expand All @@ -34,7 +38,27 @@ module Chain : sig
t ->
('a, 'b) Util.array_repr ->
string ->
(('a, 'b, Bigarray.c_layout) Bigarray.Genarray.t, [> error ]) result
(('a, 'b, Bigarray.c_layout) Bigarray.Genarray.t
,[> `Store_read of string | error ]) result

val partial_encode :
t ->
((int * int option) list ->
(string list, [> `Store_read of string | error ] as 'c) result) ->
partial_setter ->
int ->
('a, 'b) Util.array_repr ->
(int array * 'a) list ->
(unit, 'c) result

val partial_decode :
t ->
((int * int option) list ->
(string list, [> `Store_read of string | error ] as 'c) result) ->
int ->
('a, 'b) Util.array_repr ->
(int * int array) list ->
((int * 'a) list, 'c) result

(** [x = y] returns true if chain [x] is equal to chain [y],
and false otherwise. *)
Expand Down
3 changes: 3 additions & 0 deletions lib/codecs/codecs_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type error =
| `CodecChain of string
| `Sharding of int array * int array * string ]

type partial_setter = ?append:bool -> (int * string) list -> unit

module type Interface = sig
(** The type of [array -> array] codecs. *)
Expand Down Expand Up @@ -102,4 +103,6 @@ module type Interface = sig
| `Transpose_order of int array * string
| `CodecChain of string
| `Sharding of int array * int array * string ]

type partial_setter = ?append:bool -> (int * string) list -> unit
end
56 changes: 43 additions & 13 deletions lib/storage/filesystem.ml
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,49 @@
with
| Sys_error _ -> Error (`Store_read fpath)

let get_partial_values t key ranges =
let open Util.Result_syntax in
In_channel.with_open_gen
In_channel.[Open_rdonly]
t.file_perm
(key_to_fspath t key)
(fun ic ->
let size = In_channel.length ic |> Int64.to_int in
List.fold_right
(fun (rs, len) acc ->
acc >>= fun xs ->
let len' =
match len with
| Some l -> l
| None -> size - rs
in
In_channel.seek ic @@ Int64.of_int rs;
match In_channel.really_input_string ic len' with
| Some s -> Ok (s :: xs)
| None ->

Check warning on line 50 in lib/storage/filesystem.ml

View check run for this annotation

Codecov / codecov/patch

lib/storage/filesystem.ml#L50

Added line #L50 was not covered by tests
Error (`Store_read "EOF reached before all bytes are read."))
ranges (Ok []))

let set t key value =
let filename = key_to_fspath t key in
create_parent_dir filename t.file_perm;
Out_channel.with_open_gen
Out_channel.[Open_wronly; Open_trunc; Open_creat]
t.file_perm
filename
(fun oc -> Out_channel.output_string oc value)
(fun oc -> Out_channel.output_string oc value; Out_channel.flush oc)

let set_partial_values t key ?(append=false) rvs =
let open Out_channel in
Out_channel.with_open_gen
[if append then Open_append else Open_wronly]
t.file_perm
(key_to_fspath t key)
(fun oc ->
List.iter
(fun (rs, value) ->
Out_channel.seek oc @@ Int64.of_int rs;
Out_channel.output_string oc value) rvs; Out_channel.flush oc)

let list t =
let module StrSet = Storage_intf.Base.StrSet in
Expand Down Expand Up @@ -68,23 +103,18 @@
try Sys.remove @@ key_to_fspath t key with
| Sys_error _ -> ()

let get_partial_values t kr_pairs =
Storage_intf.Base.get_partial_values
~get_fn:get t kr_pairs

let set_partial_values t krv_triplet =
Storage_intf.Base.set_partial_values
~set_fn:set ~get_fn:get t krv_triplet

let erase_values t keys =
Storage_intf.Base.erase_values
~erase_fn:erase t keys
let size t key =
In_channel.with_open_gen
In_channel.[Open_rdonly]
t.file_perm
(key_to_fspath t key)
(fun ic -> In_channel.length ic |> Int64.to_int)

let erase_prefix t pre =
Storage_intf.Base.erase_prefix
~list_fn:list ~erase_fn:erase t pre

let list_prefix pre t =
let list_prefix t pre =
Storage_intf.Base.list_prefix ~list_fn:list t pre

let list_dir t pre =
Expand Down
16 changes: 8 additions & 8 deletions lib/storage/memory.ml
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,23 @@ module Impl = struct

let erase = StrMap.remove

let size t key =
get t key |> Result.get_ok |> String.length

let erase_prefix t pre =
StrMap.filter_map_inplace
(fun k v ->
if String.starts_with ~prefix:pre k then None else Some v) t

let get_partial_values t kr_pairs =
let get_partial_values t key ranges =
Storage_intf.Base.get_partial_values
~get_fn:get t kr_pairs
~get_fn:get t key ranges

let set_partial_values t krv_triplet =
let set_partial_values t key ?(append=false) rv =
Storage_intf.Base.set_partial_values
~set_fn:set ~get_fn:get t krv_triplet

let erase_values t keys =
Storage_intf.Base.erase_values ~erase_fn:erase t keys
~set_fn:set ~get_fn:get t key append rv

let list_prefix pre t =
let list_prefix t pre =
Storage_intf.Base.list_prefix ~list_fn:list t pre

let list_dir t pre =
Expand Down
Loading
Loading