Skip to content

Commit

Permalink
cont
Browse files Browse the repository at this point in the history
  • Loading branch information
meggart committed Feb 2, 2024
1 parent 312d6a1 commit 87aa1f1
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 36 deletions.
38 changes: 22 additions & 16 deletions src/batchgetindex.jl
Original file line number Diff line number Diff line change
@@ -1,29 +1,35 @@


function has_chunk_gap(cs,ids)
function has_chunk_gap(cs,ids::AbstractVector{<:Integer})
#Find largest jump in indices
largest_jump = foldl(ids,init=(0,first(ids))) do (largest,last),next
largest = max(largest,next-last)
(largest,next)
end |> first
largest_jump > cs
largest_jump > first(cs)
end
function is_sparse_index(ids; density_threshold = 0.5)
minid, maxid = extrema(ids)
indexdensity = (length(ids) / (maxid - minid))
return indexdensity < density_threshold
#Return true for all multidimensional indices for now, could be optimised in the future
has_chunk_gap(cs,ids) = true

#Compute the number of possible indices in the hyperrectangle
span(v::AbstractVector{<:Integer}) = 1 -(-(extrema(v)...))
function span(v::AbstractVector{CartesianIndex{N}}) where N
minind,maxind = extrema(v)
prod((maxind-mindind+oneunit(minind)).I)
end
function span(v::AbstractArray{Bool})
mindind,maxind = extrema(view(CartesianIndices(size(v)),v))
prod((maxind-mindind+oneunit(minind)).I)
end
#The number of indices to actually be read
numind(v::AbstractArray{Bool}) = sum(v)
numind(v::Union{AbstractVector{<:Integer},AbstractVector{<:CartesianIndex}})=length(v)

function need_batch(a,inds)
allow_multi = allow_multi_chunk_access(a)
cs = approx_chunksize(eachchunk(a))
map(cs,inds) do c, i
(allow_multi || has_chunk_gap(c,i)) && is_sparse_index(i)
end |> any
function is_sparse_index(ids; density_threshold = 0.5)
indexdensity = numind(ids) / span(ids)
return indexdensity < density_threshold
end

# Define fallbacks for reading and writing sparse data
function _readblock!(A::AbstractArray, A_ret, r::AbstractVector...)
#= function _readblock!(A::AbstractArray, A_ret, r::AbstractVector...)
if need_batch(A,r)
# Fall back to batchgetindex to do the readblock
A_ret .= batchgetindex(A, r...)
Expand Down Expand Up @@ -57,7 +63,7 @@ function _writeblock!(A::AbstractArray, A_ret, r::AbstractVector...)
writeblock!(A, A_temp, map(:, mi, ma)...)
end
return nothing
end
end =#

macro implement_batchgetindex(t)
t = esc(t)
Expand Down
63 changes: 43 additions & 20 deletions src/diskarray.jl
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,31 @@ Determines a list of tuples used to perform the read or write operations. The re
- `temp_indices` indices for reading from temp array
- `data_indices` indices for reading from data array
"""
Base.@assume_effects :foldable resolve_indices(a, i) = _resolve_indices(eachchunk(a).chunks,i,(),(),(),(),())
Base.@assume_effects :foldable resolve_indices(a::AbstractVector,i::Tuple{AbstractVector{<:Integer}}) = _resolve_indices(eachchunk(a).chunks,i,(),(),(),(),())
Base.@assume_effects :foldable need_batch(a,i) = _need_batch(eachchunk(a).chunks,i)
function _need_batch(cs, i)
res, csrem = need_batch_index(first(i),)
Base.@assume_effects :foldable resolve_indices(a, i, nb=Val{false}()) = _resolve_indices(eachchunk(a).chunks,i,(),(),(),(),(),nb)
Base.@assume_effects :foldable resolve_indices(a::AbstractVector,i::Tuple{AbstractVector{<:Integer}},nb=Val{false}()) = _resolve_indices(eachchunk(a).chunks,i,(),(),(),(),(),nb)
Base.@assume_effects :foldable need_batch(a,i) = _need_batch(eachchunk(a).chunks,i,allow_multi_chunk_access(a))
function _need_batch(cs, i, am)
nb, csrem = need_batch_index(first(i),cs,am)
nb ? true : _need_batch(csrem,Base.tail(i),am)
end
function _resolve_indices(cs,i,output_size,temp_sizes,output_indices,temp_indices,data_indices)
_need_batch(::Tuple{},::Tuple{},_) = false
_need_batch(::Tuple{},_,_) = false
_need_batch(_,::Tuple{},_) = false
need_batch_index(::Union{Integer,UnitRange,Colon},cs,_) = false, Base.tail(cs)
function need_batch_index(i, cs,allow_multi)
csnow,csrem = splitcs(i,cs)
nb = (allow_multi || has_chunk_gap(approx_chunksize.(csnow),i)) && is_sparse_index(i)
nb, csrem
end
function _resolve_indices(cs,i,output_size,temp_sizes,output_indices,temp_indices,data_indices,nb)
inow = first(i)
outsize, tempsize, outinds,tempinds,datainds,cs = process_index(inow, cs)
outsize, tempsize, outinds,tempinds,datainds,cs = process_index(inow, cs, nb)
output_size = (output_size...,outsize...)
output_indices = (output_indices...,outinds...)
temp_sizes = (temp_sizes...,tempsize...)
temp_indices = (temp_indices...,tempinds...)
data_indices = (data_indices...,datainds...)
_resolve_indices(cs,Base.tail(i),output_size,temp_sizes,output_indices,temp_indices, data_indices)
_resolve_indices(cs,Base.tail(i),output_size,temp_sizes,output_indices,temp_indices, data_indices, nb)
end
_resolve_indices(::Tuple{},::Tuple{},output_size,temp_sizes,output_indices,temp_indices,data_indices) = output_size,temp_sizes,output_indices,temp_indices,data_indices
#No dimension left in array, only singular indices allowed
Expand Down Expand Up @@ -96,6 +106,11 @@ function resolve_indices(a, i::Tuple{<:AbstractVector{<:Integer}})
length.(i),tempsize,(Colon(),),(tempinds,),datainds
end
#outsize, tempsize, outinds,tempinds,datainds,cs
process_index(i, cs, ::Val{false}) = process_index(i,cs)
function process_index(i, cs, ::Val{true})
outsize, tempsize, outinds,tempinds,datainds,cs = process_index(i,cs)
outsize, tempsize, MultiRead(outinds), MultiRead(tempinds), MultiRead(datainds)
end
process_index(inow::Integer, cs) = ((), 1, (), (1,),(inow:inow,), Base.tail(cs))
function process_index(::Colon, cs)
s = arraysize_from_chunksize(first(cs))
Expand All @@ -104,22 +119,21 @@ end
function process_index(i::AbstractUnitRange, cs)
(length(i),), (length(i),), (Colon(),), (Colon(),), (i,), Base.tail(cs)
end
function process_index(i::AbstractVector{<:Integer}, cs)
function process_index(i::AbstractVector{<:Integer}, cs, ::Val{false})
indmin,indmax = extrema(i)

(length(i),), ((indmax-indmin+1),), (Colon(),), ((i.-(indmin-1)),), (indmin:indmax,), Base.tail(cs)
end
function process_index(i::AbstractArray{Bool,N}, cs) where N
csnow, csrem = splitcs(size(i),(),cs)
function process_index(i::AbstractArray{Bool,N}, cs, ::Val{false}) where N
csnow, csrem = splitcs(i,cs)
s = arraysize_from_chunksize.(csnow)
cindmin,cindmax = extrema(view(CartesianIndices(s),i))
indmin,indmax = cindmin.I,cindmax.I
tempsize = indmax .- indmin .+ 1
tempinds = view(i,range.(indmin,indmax)...)
(sum(i),), tempsize, (Colon(),),(tempinds,), range.(indmin,indmax), csrem
end
function process_index(i::AbstractVector{<:CartesianIndex{N}}, cs) where N
csnow, csrem = splitcs(first(i).I,(),cs)
function process_index(i::AbstractVector{<:CartesianIndex{N}}, cs, ::Val{false}) where N
csnow, csrem = splitcs(i,cs)
s = arraysize_from_chunksize.(csnow)
cindmin,cindmax = extrema(view(CartesianIndices(s),i))
indmin,indmax = cindmin.I,cindmax.I
Expand All @@ -128,6 +142,9 @@ function process_index(i::AbstractVector{<:CartesianIndex{N}}, cs) where N
tempinds = i .- tempoffset
(length(i),), tempsize, (Colon(),), (tempinds,), range.(indmin,indmax), csrem
end
splitcs(i::AbstractVector{<:CartesianIndex},cs) = splitcs(first(i).I,(),cs)
splitcs(i::AbstractArray{Bool},cs) = splitcs(size(i),(),cs)
splitcs(_,cs) = (first(cs),), Base.tail(cs)
splitcs(si,csnow,csrem) = splitcs(Base.tail(si),(csnow...,first(csrem)),Base.tail(csrem))
splitcs(::Tuple{},csnow,csrem) = (csnow,csrem)

Expand Down Expand Up @@ -156,12 +173,18 @@ function create_outputarray(out,a,output_size)
end
create_outputarray(::Nothing,a,output_size) = Array{eltype(a)}(undef, output_size...)
function getindex_disk!(out, a, i...)
output_size, temparray_size, output_indices, temparray_indices, data_indices = resolve_indices(a,i)
#@debug output_size, temparray_size, output_indices, temparray_indices, data_indices
outputarray = create_outputarray(out,a,output_size)
temparray = Array{eltype(a)}(undef, temparray_size...)
readblock!(a, temparray, data_indices...)
transfer_results!(outputarray, temparray, output_indices, temparray_indices)
if need_batch(a,i)
output_size, temparray_size, output_indices, temparray_indices, data_indices = resolve_indices(a,i,batch=true)
println("Doing batch stuff")

else
output_size, temparray_size, output_indices, temparray_indices, data_indices = resolve_indices(a,i)
#@debug output_size, temparray_size, output_indices, temparray_indices, data_indices
outputarray = create_outputarray(out,a,output_size)
temparray = Array{eltype(a)}(undef, temparray_size...)
readblock!(a, temparray, data_indices...)
transfer_results!(outputarray, temparray, output_indices, temparray_indices)
end
end

function transfer_results!(outputarray, temparray, output_indices, temparray_indices)
Expand Down

0 comments on commit 87aa1f1

Please sign in to comment.