Skip to content

Commit

Permalink
Merge pull request #64 from adtzlr/enable-parallel
Browse files Browse the repository at this point in the history
Re-enable parallel evaluation of function calls
  • Loading branch information
adtzlr authored Feb 7, 2023
2 parents 31ecb73 + 78ef43c commit 5c595d2
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 71 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def fun(F, mu=1):
return mu / 2 * (J ** (-2 / 3) * I1 - 3)
```

The hessian of the scalar-valued function w.r.t. the chosen function argument (here, `wrt=0` or `wrt="F"`) is evaluated by variational calculus (Forward Mode AD implemented as Hyper-Dual Tensors). The function is called once for each component of the hessian (symmetry is taken care of). The function and the gradient are evaluated with no additional computational cost. ~~Optionally, the function calls are executed in parallel (threaded)~~ (currently disabled, will be re-enabled soon).
The hessian of the scalar-valued function w.r.t. the chosen function argument (here, `wrt=0` or `wrt="F"`) is evaluated by variational calculus (Forward Mode AD implemented as Hyper-Dual Tensors). The function is called once for each component of the hessian (symmetry is taken care of). The function and the gradient are evaluated with no additional computational cost. Optionally, the function, gradient and hessian calls are executed in parallel (threaded).

```python
import numpy as np
Expand Down
4 changes: 2 additions & 2 deletions docs/benchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ def pre(n, **kwargs):
return C, stress, elasticity


tensors = 2 ** np.arange(1, 23, 2)
tensors = 2 ** np.arange(0, 21, 2)
time_gradient = []
time_hessian = []

print("")
print("| Tensors | Gradient in s | Hessian in s |")
print("| ------- | ------------- | ------------ |")

kwargs = dict(ntrax=1, sym=True, parallel=True)
kwargs = dict(ntrax=1, sym=True, parallel=False)
number = 3

for n in tensors:
Expand Down
48 changes: 34 additions & 14 deletions docs/benchmark/benchmark_tensortrax_vs_autograd.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@

import matplotlib.pyplot as plt
import numpy as np
from autograd import jacobian
from autograd import numpy as anp

import tensortrax as tr
import tensortrax.math as tm

from autograd import numpy as anp, jacobian


def fun_tensortrax(C):
return tm.trace(C) - tm.log(tm.linalg.det(C))
Expand Down Expand Up @@ -65,45 +65,65 @@ def pre_autograd(n, **kwargs):
for i, n in enumerate(tensors):
c, stress, elasticity = pre_tensortrax(n, **kwargs)
C, Stress, Elasticity = pre_autograd(n, **kwargs)

s = stress(c)
e = elasticity(c)

S = Stress(C)
E = Elasticity(C)

assert np.allclose(s, S)
assert np.allclose(e, E)

time_gradient_tensortrax.append(timeit(lambda: stress(c), number=number) / number)
time_hessian_tensortrax.append(timeit(lambda: elasticity(c), number=number) / number)
time_hessian_tensortrax.append(
timeit(lambda: elasticity(c), number=number) / number
)
time_gradient_autograd.append(timeit(lambda: Stress(C), number=number) / number)
time_hessian_autograd.append(timeit(lambda: Elasticity(C), number=number) / number)

print(f"...Evaluate timings... {i+1}/{len(tensors)}")

print("")
print("| | (Tensortrax) | (Autograd) | |")
print("| Tensors | Gradient in s | Gradient in s | Speedup |")
print("| ------- | ------------- | ------------- | ------- |")
for n, t_grad_trax, t_grad_autograd in zip(tensors, time_gradient_tensortrax, time_gradient_autograd):
for n, t_grad_trax, t_grad_autograd in zip(
tensors, time_gradient_tensortrax, time_gradient_autograd
):
speedup = t_grad_autograd / t_grad_trax
print(f"| {n:7d} | {t_grad_trax:13.5f} | {t_grad_autograd:13.5f} | x{speedup:6.2f} |")
print(
f"| {n:7d} | {t_grad_trax:13.5f} | {t_grad_autograd:13.5f} | x{speedup:6.2f} |"
)

print("")
print("")
print("| | (Tensortrax) | (Autograd) | |")
print("| Tensors | Hessian in s | Hessian in s | Speedup |")
print("| ------- | ------------- | ------------- | ------- |")
for n, t_hess_trax, t_hess_autograd in zip(tensors, time_hessian_tensortrax, time_hessian_autograd):
for n, t_hess_trax, t_hess_autograd in zip(
tensors, time_hessian_tensortrax, time_hessian_autograd
):
speedup = t_hess_autograd / t_hess_trax
print(f"| {n:7d} | {t_hess_trax:13.5f} | {t_hess_autograd:13.5f} | x{speedup:6.2f} |")

print(
f"| {n:7d} | {t_hess_trax:13.5f} | {t_hess_autograd:13.5f} | x{speedup:6.2f} |"
)


plt.figure()
plt.title(r"Strain Energy Function $\psi(C) = \mathrm{tr}(C) - \ln(\det(C))$")
plt.loglog(tensors, time_gradient_tensortrax, "C0", label="Gradient (Tensortrax) $\partial \psi~/~\partial C$")
plt.loglog(tensors, time_gradient_autograd, "C0--", label="Gradient (Autograd) $\partial \psi~/~\partial C$")
plt.loglog(
tensors,
time_gradient_tensortrax,
"C0",
label="Gradient (Tensortrax) $\partial \psi~/~\partial C$",
)
plt.loglog(
tensors,
time_gradient_autograd,
"C0--",
label="Gradient (Autograd) $\partial \psi~/~\partial C$",
)
plt.loglog(
tensors,
time_hessian_tensortrax,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ classifiers = [
]
dynamic = ["version"]
requires-python = ">=3.7"
dependencies = ["numpy"]
dependencies = ["numpy", "joblib"]

[tool.setuptools.dynamic]
version = {attr = "tensortrax.__about__.__version__"}
Expand Down
2 changes: 1 addition & 1 deletion src/tensortrax/__about__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
tensorTRAX: Math on (Hyper-Dual) Tensors with Trailing Axes.
"""

__version__ = "0.7.0"
__version__ = "0.8.0"
213 changes: 167 additions & 46 deletions src/tensortrax/_evaluate.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@

from copy import copy
from functools import wraps
from threading import Thread

import numpy as np
from joblib import Parallel, cpu_count, delayed

from ._tensor import Tensor, Δδ, broadcast_to, f, δ
from .math._special import from_triu_1d, from_triu_2d, triu_1d
Expand Down Expand Up @@ -65,16 +65,111 @@ def add_tensor(
return args_out, kwargs_out, tensor.shape, trax


def partition(args, kwargs, wrt, ntrax, parallel, chunks=None, batch=100, axis=None):
"""Partition function (keyword) arguments into a list of (keyword) arguments. Only
top-level args and kwargs with equal shapes to be splitted are allowed."""

# deactivate parallel evaluation if no trailing axes are present
if ntrax == 0:
parallel = False

# get shape of trailing axes, define axis and chunks
# if size of chosen axis is below batch, deactivate parallel evaluation
if parallel:
# get shape of trailing axes
trax = (kwargs[wrt] if isinstance(wrt, str) else args[wrt]).shape[-ntrax:]

# select axis
if axis is None:
axis = -(1 + np.argmax(trax[::-1]))

# define chunks
if chunks is None:
if (trax[axis] // batch) > 0:
chunks = min(trax[-1] // batch, cpu_count())
else:
parallel = False

if not parallel:
list_of_args_kwargs = [(args, kwargs)]
chunks = 1
axis = -1

else:
# generate list with args and kwargs for chunks
list_of_args_kwargs = [[list(args), {**kwargs}] for chunk in range(chunks)]

# test if object has attribute shape (is tensor or array)
isactive = lambda x: hasattr(x, "shape") and np.all(
np.isin(trax, x.shape[-ntrax:])
)

# iterate through args and split tensor-like objects
args_partitioned = []
for i, arg in enumerate(args):
if isactive(arg):
args_partitioned.append((i, np.array_split(arg, chunks, axis=axis)))

# replace arguments by chunks
for i, arg in enumerate(list_of_args_kwargs):
for j, argp in args_partitioned:
list_of_args_kwargs[i][0][j] = argp[i]

# iterate through kwargs and split tensor-like objects
kwargs_partitioned = []
for key, value in kwargs.items():
if isactive(value):
kwargs_partitioned.append(
(key, np.array_split(value, chunks, axis=axis))
)

# replace keyword arguments by chunks
for i, kwarg in enumerate(list_of_args_kwargs):
for key, value in kwargs_partitioned:
list_of_args_kwargs[i][1][key] = value[i]

return list_of_args_kwargs, chunks, axis


def concatenate_results(res, axis, full_output):
"Concatenate results (with optional full output) on existing axis."

def concat(arrays, axis):
"Concatenate arrays, fall-back to first item if shape of first array is zero."

if len(arrays[0].shape) == 0 or len(arrays) == 1:
return arrays[0]
else:
return np.concatenate(arrays, axis=axis)

if full_output:
nres = len(res[0])
return [concat([r[a] for r in res], axis=axis) for a in range(nres)]
else:
return concat(res, axis=axis)


def function(fun, wrt=0, ntrax=0, parallel=False):
"Evaluate a scalar-valued function."

@wraps(fun)
def evaluate_function(*args, **kwargs):
args, kwargs, shape, trax = add_tensor(
args, kwargs, wrt, ntrax, False, False, False
def kernel(args, kwargs):
args, kwargs, shape, trax = add_tensor(
args, kwargs, wrt, ntrax, False, False, False
)
func = fun(*args, **kwargs)
return f(func)

list_of_args_kwargs, chunks, axis = partition(
args, kwargs, wrt, ntrax, parallel
)
func = fun(*args, **kwargs)
return f(func)

res = Parallel(n_jobs=chunks, prefer="threads")(
delayed(kernel)(*args_chunk) for args_chunk in list_of_args_kwargs
)

return concatenate_results(res=res, axis=axis, full_output=False)

return evaluate_function

Expand All @@ -84,21 +179,29 @@ def gradient(fun, wrt=0, ntrax=0, parallel=False, full_output=False, sym=False):

@wraps(fun)
def evaluate_gradient(*args, **kwargs):
args, kwargs, shape, trax = add_tensor(
args, kwargs, wrt, ntrax, sym, True, False
def kernel(args, kwargs):
args, kwargs, shape, trax = add_tensor(
args, kwargs, wrt, ntrax, sym, True, False
)
func = fun(*args, **kwargs)
grad = δ(func) if sym is False else from_triu_1d(δ(func))
grad = broadcast_to(grad, (*shape, *trax))
if full_output:
trax = (1,) if len(trax) == 0 else trax
zeros = np.zeros_like(shape) if sym is False else (0,)
funct = f(func)[(*zeros,)]
funct = broadcast_to(funct, (*trax,))
return grad, funct
else:
return grad

list_of_args, chunks, axis = partition(args, kwargs, wrt, ntrax, parallel)

res = Parallel(n_jobs=chunks, prefer="threads")(
delayed(kernel)(*args_chunk) for args_chunk in list_of_args
)
func = fun(*args, **kwargs)
grad = δ(func) if sym is False else from_triu_1d(δ(func))
grad = broadcast_to(grad, (*shape, *trax))

if full_output:
trax = (1,) if len(trax) == 0 else trax
zeros = np.zeros_like(shape) if sym is False else (0,)
funct = f(func)[(*zeros,)]
funct = broadcast_to(funct, (*trax,))
return grad, funct
else:
return grad

return concatenate_results(res=res, axis=axis, full_output=full_output)

return evaluate_gradient

Expand All @@ -108,28 +211,37 @@ def hessian(fun, wrt=0, ntrax=0, parallel=False, full_output=False, sym=False):

@wraps(fun)
def evaluate_hessian(*args, **kwargs):
args, kwargs, shape, trax = add_tensor(
args, kwargs, wrt, ntrax, sym, False, True
def kernel(args, kwargs):
args, kwargs, shape, trax = add_tensor(
args, kwargs, wrt, ntrax, sym, False, True
)
func = fun(*args, **kwargs)
hess = Δδ(func) if sym is False else from_triu_2d(Δδ(func))

if full_output:
grad = δ(func) if sym is False else from_triu_1d(δ(func))
zeros = np.zeros_like(shape) if sym is False else (0,)
grad = grad[(*[slice(None) for a in shape], *zeros)]
grad = broadcast_to(grad, (*shape, *trax))
funct = f(func)[
(
*zeros,
*zeros,
)
]
funct = broadcast_to(funct, (*trax,))
trax = (1,) if len(trax) == 0 else trax
return hess, grad, funct
else:
return hess

list_of_args, chunks, axis = partition(args, kwargs, wrt, ntrax, parallel)

res = Parallel(n_jobs=chunks, prefer="threads")(
delayed(kernel)(*args_chunk) for args_chunk in list_of_args
)
func = fun(*args, **kwargs)
hess = Δδ(func) if sym is False else from_triu_2d(Δδ(func))

if full_output:
grad = δ(func) if sym is False else from_triu_1d(δ(func))
zeros = np.zeros_like(shape) if sym is False else (0,)
grad = grad[(*[slice(None) for a in shape], *zeros)]
grad = broadcast_to(grad, (*shape, *trax))
funct = f(func)[
(
*zeros,
*zeros,
)
]
funct = broadcast_to(funct, (*trax,))
trax = (1,) if len(trax) == 0 else trax
return hess, grad, funct
else:
return hess
return concatenate_results(res=res, axis=axis, full_output=full_output)

return evaluate_hessian

Expand All @@ -139,15 +251,24 @@ def jacobian(fun, wrt=0, ntrax=0, parallel=False, full_output=False):

@wraps(fun)
def evaluate_jacobian(*args, **kwargs):
args, kwargs, shape, trax = add_tensor(
args, kwargs, wrt, ntrax, False, True, False
def kernel(args, kwargs):
args, kwargs, shape, trax = add_tensor(
args, kwargs, wrt, ntrax, False, True, False
)
func = fun(*args, **kwargs)

if full_output:
return δ(func), f(func).reshape(*func.shape, *trax)
else:
return δ(func)

list_of_args, chunks, axis = partition(args, kwargs, wrt, ntrax, parallel)

res = Parallel(n_jobs=chunks, prefer="threads")(
delayed(kernel)(*args_chunk) for args_chunk in list_of_args
)
func = fun(*args, **kwargs)

if full_output:
return δ(func), f(func).reshape(*func.shape, *trax)
else:
return δ(func)
return concatenate_results(res=res, axis=axis, full_output=full_output)

return evaluate_jacobian

Expand Down
Loading

0 comments on commit 5c595d2

Please sign in to comment.