Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: more sharding utilities #809

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions src/Sharding.jl
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,16 @@ See also: [`Sharding.NamedSharding`](@ref)
"""
struct NoSharding <: AbstractSharding end

@inline ndevices(::NoSharding) = 1

@inline shard_type(::Type{NoSharding}, _) = ShardInfo{NoSharding,Nothing}

# This allows us to mark entire branches as NoSharding
Base.getproperty(::NoSharding, x) = NoSharding()
Base.getproperty(::NoSharding, x::Symbol) = NoSharding()

function (::NoSharding)(client::XLA.PJRT.Client, device, x::Union{AbstractArray,Number})
device === nothing && (device = XLA.default_device(client))
buffer = XLA.PJRT.AsyncBuffer(client, x, device)
return (buffer,), ShardInfo(NoSharding(), nothing)
end
Expand Down Expand Up @@ -185,6 +190,12 @@ struct NamedSharding{D1,D2,P<:Tuple} <: AbstractSharding
end
end

@inline ndevices(sharding::NamedSharding) = length(sharding.mesh.device_ids)

@inline function shard_type(::Type{NamedSharding{D1,D2,P}}, N) where {D1,D2,P}
return shard_type(HloSharding{D1,D2}, N)
end

function (sharding::NamedSharding)(
client::XLA.PJRT.Client, device::Nothing, x::Union{AbstractArray,Number}
)
Expand Down Expand Up @@ -226,6 +237,84 @@ function get_shardy_tensor_sharding_attribute(
)
end

# TODO: Something like NamedDims.jl will allow us to support NamedDimsSharding similar to
# `levanter`

"""
DimsSharding(
mesh::Mesh{M},
dims::NTuple{D,Int},
partition_spec;
is_closed::NTuple{D,Bool}=ntuple(Returns(true), D),
priority::NTuple{D,Int}=ntuple(i -> -1, D),
)

Similar to [`NamedSharding`](@ref) but works for a arbitrary dimensional array. Dimensions
not specified in `dims` are replicated. If any dimension in `dims` is greater than the total
number of dimensions in the array, the corresponding `partition_spec`, `is_closed` and
`priority` are ignored. Additionally for any negative dimensions in `dims`, the true
dims are calculated as `ndims(x) - dim + 1`. A dims value of `0` will throw an error.
"""
struct DimsSharding{M,D,P} <: AbstractSharding
mesh::Mesh{M}
dims::NTuple{D,Int}
partition_spec::P
is_closed::NTuple{D,Bool}
priority::NTuple{D,Int}

function DimsSharding(
mesh::Mesh{M},
dims::NTuple{D,Int},
partition_spec;
is_closed::NTuple{D,Bool}=ntuple(Returns(true), length(partition_spec)),
priority::NTuple{D,Int}=ntuple(i -> -1, length(partition_spec)),
) where {M,D}
@assert length(partition_spec) == length(dims)
# Validity checks on the inputs are deferred to NamedSharding
return new{M,D,typeof(partition_spec)}(
mesh, dims, partition_spec, is_closed, priority
)
end
end

@inline ndevices(sharding::DimsSharding) = length(sharding.mesh.device_ids)

@inline function shard_type(::Type{DimsSharding{M,D,P}}, N) where {M,D,P}
return shard_type(HloSharding{M,N}, N)
end

function standardize_sharding(sharding::DimsSharding, x::Union{AbstractArray,Number})
final_dims = map(sharding.dims) do d
@assert !iszero(d) "dims cannot contain 0"
return ifelse(d < 0, ndims(x) + d + 1, d)
end

dim_indices = ntuple(i -> findfirst(==(i), final_dims), ndims(x))
partition_spec = ntuple(ndims(x)) do i
dim_index = dim_indices[i]
dim_index === nothing && return nothing # replicated dimension
return sharding.partition_spec[dim_index]
end
is_closed = ntuple(ndims(x)) do i
dim_index = dim_indices[i]
dim_index === nothing && return true # replicated dimension
return sharding.is_closed[dim_index]
end
priority = ntuple(ndims(x)) do i
dim_index = dim_indices[i]
dim_index === nothing && return -1 # replicated dimension
return sharding.priority[dim_index]
end

return NamedSharding(sharding.mesh, partition_spec; is_closed, priority)
end

function (sharding::DimsSharding)(
client::XLA.PJRT.Client, device::Nothing, x::Union{AbstractArray,Number}
)
return (standardize_sharding(sharding, x))(client, device, x)
end

# HloSharding
# This stores the sharding information in the form of XLA.HloSharding, and provides a
# central type for the final storage. It also potentially saves us the pain of not having
Expand All @@ -244,6 +333,12 @@ struct HloSharding{D1,D2} <: AbstractSharding
end
end

@inline ndevices(sharding::HloSharding) = length(sharding.mesh.device_ids)

@inline function shard_type(::Type{HloSharding{D1,D2}}, N) where {D1,D2}
return ShardInfo{HloSharding{D1,D2},Vector{NTuple{N,UnitRange{Int64}}}}
end

function Base.convert(::Type{HloSharding}, sharding::NamedSharding)
if MLIR.IR._has_context()
ctx = MLIR.IR.context()
Expand Down Expand Up @@ -321,6 +416,10 @@ struct ShardInfo{S,D} <: AbstractSharding
device_to_array_slices::D
end

@inline ndevices(sharding::ShardInfo) = length(sharding.mesh)

@inline shard_type(::Type{ShardInfo{S,D}}, N) where {S,D} = shard_type(S, N)

function Base.getproperty(sharding::ShardInfo, name::Symbol)
name ∈ (:sharding, :device_to_array_slices) && return getfield(sharding, name)
return getproperty(sharding.sharding, name)
Expand Down Expand Up @@ -348,6 +447,7 @@ Checks whether the given sharding refers to no sharding.
"""
is_sharded(::NoSharding) = false
is_sharded(::NamedSharding) = true
is_sharded(::DimsSharding) = true
is_sharded(::HloSharding) = true
is_sharded(s::ShardInfo) = is_sharded(s.sharding)

Expand Down
67 changes: 30 additions & 37 deletions src/Tracing.jl
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,9 @@ Base.@nospecializeinfer function traced_type_inner(
@nospecialize(sharding)
)
if Mode == ArrayToConcrete && T <: track_numbers
if !Sharding.is_sharded(sharding)
return ConcretePJRTNumber{T,1,Sharding.NoShardInfo}
else
error("TODO: implement sharding")
end
return ConcretePJRTNumber{
T,Sharding.ndevices(sharding),Sharding.shard_type(typeof(sharding), 0)
}
elseif (mode == NoStopTracedTrack || mode == TracedTrack || mode == TracedSetPath) &&
T <: track_numbers
return TracedRNumber{T}
Expand Down Expand Up @@ -300,11 +298,12 @@ Base.@nospecializeinfer function traced_type_inner(
if mode == ConcreteToTraced
throw("TracedRArray cannot be traced")
elseif mode == TracedToConcrete
if !Sharding.is_sharded(sharding)
return ConcretePJRTArray{T.parameters[1],T.parameters[2],1,Sharding.NoShardInfo}
else
error("TODO: implement sharding")
end
return ConcretePJRTArray{
T.parameters[1],
T.parameters[2],
Sharding.ndevices(sharding),
Sharding.shard_type(typeof(sharding), T.parameters[2]),
}
elseif mode == TracedTrack || mode == NoStopTracedTrack || mode == TracedSetPath
return T
else
Expand All @@ -322,14 +321,21 @@ Base.@nospecializeinfer function traced_type_inner(
if mode == ConcreteToTraced
throw("TracedRNumber cannot be traced")
elseif mode == TracedToConcrete
if !Sharding.is_sharded(sharding)
if T isa UnionAll
return UnionAll(T.var, ConcretePJRTNumber{T.var,1,Sharding.NoShardInfo})
end
return ConcretePJRTNumber{T.parameters[1],1,Sharding.NoShardInfo}
else
error("TODO: implement sharding")
if T isa UnionAll
return UnionAll(
T.var,
ConcretePJRTNumber{
T.var,
Sharding.ndevices(sharding),
Sharding.shard_type(typeof(sharding), 0),
},
)
end
return ConcretePJRTNumber{
T.parameters[1],
Sharding.ndevices(sharding),
Sharding.shard_type(typeof(sharding), 0),
}
elseif mode == TracedTrack || mode == NoStopTracedTrack || mode == TracedSetPath
return T
else
Expand All @@ -344,19 +350,9 @@ Base.@nospecializeinfer function traced_type_inner(
@nospecialize(track_numbers::Type),
@nospecialize(sharding)
)
if mode == ConcreteToTraced
throw("TracedRNG cannot be traced")
elseif mode == TracedToConcrete
if !Sharding.is_sharded(sharding)
return ConcreteRNG{1,Sharding.NoShardInfo}
else
error("TODO: implement sharding")
end
elseif mode == TracedTrack || mode == NoStopTracedTrack || mode == TracedSetPath
return T
else
throw("Unsupported mode: $mode")
end
return ConcreteRNG{
traced_type_inner(TracedRArray{UInt64,1}, seen, mode, track_numbers, sharding)
}
end

Base.@nospecializeinfer function traced_type_inner(
Expand Down Expand Up @@ -413,11 +409,9 @@ Base.@nospecializeinfer function traced_type_inner(
else
N = ndims(A)
if mode == ArrayToConcrete && T <: Reactant.ReactantPrimitive
if !Sharding.is_sharded(sharding)
return ConcretePJRTArray{T,N,1,Sharding.NoShardInfo}
else
error("TODO: implement sharding")
end
return ConcretePJRTArray{
T,N,Sharding.ndevices(sharding),Sharding.shard_type(typeof(sharding), N)
}
else
return Array{
traced_type_inner(T, seen, mode, track_numbers, getproperty(sharding, 1)),N
Expand Down Expand Up @@ -914,7 +908,7 @@ function make_tracer(
if !Sharding.is_sharded(sharding)
return prev
else
error("TODO: implement sharding")
return ConcretePJRTNumber(prev; sharding)
end
end
if mode != ConcreteToTraced
Expand Down Expand Up @@ -1106,7 +1100,6 @@ function make_tracer(
return nothing
end
RT = Core.Typeof(prev)
Sharding.is_sharded(sharding) && error("Cannot specify sharding for Numbers")
if RT <: track_numbers
if mode == ArrayToConcrete
return ConcretePJRTNumber(prev; sharding)
Expand Down
11 changes: 10 additions & 1 deletion src/xla/Sharding.jl
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,16 @@ function sharding_to_concrete_array_indices(
@assert n_shards > 0 "Invalid number of shards: $n_shards"
n_shards == 1 && return [1:dim]
shard_size, remainder = divrem(dim, n_shards)
@assert remainder == 0 "Dimension $dim not evenly divisible by $n_shards shards"

if remainder != 0
throw(
DimensionMismatch(
"Dimension of Size $(dim) cannot be partitioned into $(n_shards) \
shards each of size $(shard_size) (remainder = $(remainder)).",
),
)
end

return [(i * shard_size + 1):((i + 1) * shard_size) for i in 0:(n_shards - 1)]
end

Expand Down
3 changes: 3 additions & 0 deletions test/sharding.jl
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,19 @@ fn_test3(x) = sum(x; dims=1)
Sharding.NamedSharding(mesh, ("model", "data")),
Sharding.NamedSharding(mesh, ("model", nothing)),
Sharding.NamedSharding(mesh, (nothing, "data")),
Sharding.DimsSharding(mesh, (2,), (:data,)),
),
(
Sharding.NamedSharding(mesh, ("model", "data")),
Sharding.NamedSharding(mesh, (nothing, "data")),
Sharding.NoSharding(),
Sharding.DimsSharding(mesh, (-2,), (:model,)),
),
(
Sharding.NamedSharding(mesh, ("model", "data")),
Sharding.NoSharding(),
Sharding.NoSharding(),
Sharding.NamedSharding(mesh, ("model", "data")),
),
)
samples_ra = Reactant.to_rarray(samples; sharding=samples_sharding)
Expand Down
Loading