Skip to content

Commit

Permalink
Merge pull request #5 from pezra/streamify-retry
Browse files Browse the repository at this point in the history
Composable, stream based retry delay system
  • Loading branch information
safwank authored Jul 17, 2016
2 parents e90c422 + ce797d3 commit b122b8f
Show file tree
Hide file tree
Showing 5 changed files with 368 additions and 113 deletions.
41 changes: 30 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,42 @@ Check out the [API reference](https://hexdocs.pm/retry/Retry.html) for the lates

## Features

#### Linear retry
### Retrying

```
result = retry 5 in 500 do
SomeModule.flaky_function # Either raises a transient runtime error or returns an error tuple
The `retry(with: _, do: _)` macro provides a way to retry a block of code on failure with a variety of delay and give up behaviors. The execution of a block is considered a failure if it returns `:error`, `{:error, _}` or raises a runtime error.

#### Example -- exponential backoff

```elixir
result = retry with: exp_backoff |> randomize |> expiry(10000) do
ExternalApi.do_something # fails if other system is down
end
```
The first argument (5) is the number of retries and the second (500) is the period between attempts in milliseconds.
This will try the block, and return the result, as soon as it succeeds. On a failure this example will wait an exponentially increasing amount of time (`exp_backoff/0`). Each delay will be randomly adjusted to remain within +/-10% of its original value (`randomize/2`). And finally it will give up entirely if the block has not succeeded with in 10 seconds (`expiry/2`).

#### Exponential backoff
#### Example -- linear backoff

```elixir
result = retry with: lin_backoff(10, 2) |> cap(1000) |> Stream.take(10) do
ExternalApi.do_something # fails if other system is down
end
```
result = backoff 1000 do
SomeModule.flaky_function # Either raises a transient runtime error or returns an error tuple

This example doubles the delay with each retry, starting with 10 milliseconds, caps the delay at 1 second and gives up after 10 tries.

#### Delay streams

The `with:` option of `retry` accepts any `Stream` that yields integers. These integers will be interpreted as the amount of time to delay before retrying a failed operation. When the stream is exhausted `retry` will give up, returning the last value of the block.

##### Example

```elixir
result = retry with: Stream.cycle([500]) do
ExternalApi.do_something # fails if other system is down
end
```
The argument is the timeout (in milliseconds) before giving up. `backoff` accepts a optional argument `delay_cap` which is the maximum delay (in milliseconds) between attempts.

#### Circuit breaker
Work in progress.
This will retry failures forever, waiting .5 seconds between attempts.


`Retry.DelayStreams` provides a set of fully composable helper functions for building useful delay behaviors such as the ones in previous examples. See the `Retry.DelayStreams` module docs for full details and addition behavior not covered here. For convenience these functions are imported by `use Retry` so you can, usually, use them without prefixing them with the module name.
180 changes: 98 additions & 82 deletions lib/retry.ex
Original file line number Diff line number Diff line change
@@ -1,27 +1,99 @@
defmodule Retry do
@moduledoc """
Retry functions.
Provides a convenient interface to retrying behavior. All durations a
specified in milliseconds.
Examples
use Retry
import Stream
retry with: exp_backoff |> randomize |> cap(1000) |> expiry(10000) do
# interact with external service
end
retry with: lin_backoff(10, @fibonacci) |> cap(1000) |> take(10) do
# interact with external service
end
retry with: cycle([500]) |> take(10) do
# interact with external service
end
The first retry will exponentially increase the delay, fudging each delay up
to 10%, until the delay reaches 1 second and then give up after 10 seconds.
The second retry will linearly increase the retry from 10ms following a
Fibonacci pattern giving up after 10 attempts.
The third example show how we can product a delay stream using standard
`Stream` functionality. Any stream of integers may be used as the value of
`with:`.
"""

@doc false
defmacro __using__(_opts) do
quote do
import Retry
import Retry.DelayStreams
end
end

@doc """
Retry a block of code delaying between each attempt the duration specified by
the next item in the `with` delay stream.
Example
use Retry
import Stream
retry with: exp_backoff |> cap(1000) |> expiry(1000) do
# interact with external service
end
retry with: linear_backoff(@fibonacci) |> cap(1000) |> take(10) do
# interact with external service
end
retry with: cycle([500]) |> take(10) do
# interact with external service
end
"""
defmacro retry([with: stream_builder], do: block) do
quote do
fun = unquote(block_runner(block))
retry_delays = unquote(stream_builder)
delays = Stream.concat([0], retry_delays)

final_result = Enum.reduce_while(delays, nil, fn(delay, _last_result) ->
:timer.sleep(delay)
fun.()
end)

case final_result do
{:exception, e} -> raise e
result -> result
end
end
end


@doc """
Retry block of code a maximum number of times with a fixed delay between
attempts.
Example
```elixir
retry 5 in 500 do
# interact with external service
end
```
retry 5 in 500 do
# interact with external service
end
Runs the block up to 5 times with a half second sleep between each
attempt. Execution is deemed a failure if the block returns `{:error, _}` or
Expand All @@ -30,10 +102,8 @@ defmodule Retry do
"""
defmacro retry({:in, _, [retries, sleep]}, do: block) do
quote do
do_retry(
fixed_delays(unquote(retries), unquote(sleep)),
unquote(block_runner(block))
)
import Stream
retry([with: [unquote(sleep)] |> cycle |> take(unquote(retries))], do: unquote(block))
end
end

Expand All @@ -43,11 +113,9 @@ defmodule Retry do
Example
```elixir
backoff 1000, delay_cap: 100 do
# interact the external service
end
```
backoff 1000, delay_cap: 100 do
# interact the external service
end
Runs the block repeated until it succeeds or 1 second elapses with an
exponentially increasing delay between attempts. Execution is deemed a failure
Expand All @@ -60,40 +128,30 @@ defmodule Retry do
"""
defmacro backoff(time_budget, do: block) do
quote do
do_retry(
exp_backoff_delays(unquote(time_budget), :infinity),
unquote(block_runner(block))
import Stream

retry(
[with: exp_backoff
|> randomize
|> expiry(unquote(time_budget))],
do: unquote(block)
)
end
end
defmacro backoff(time_budget, delay_cap: delay_cap, do: block) do
quote do
do_retry(
exp_backoff_delays(unquote(time_budget), unquote(delay_cap)),
unquote(block_runner(block))
import Stream

retry(
[with: exp_backoff
|> randomize
|> cap(unquote(delay_cap))
|> expiry(unquote(time_budget))],
do: unquote(block)
)
end
end

@doc """
Executes fun until it succeeds or we have run out of retry_delays. Each retry
is preceded by a sleep of the specified retry delay.
"""
def do_retry(retry_delays, fun) do
delays = [0] |> Stream.concat(retry_delays)

final_result = delays |> Enum.reduce_while(nil, fn(delay, _last_result) ->
:timer.sleep(delay)
fun.()
end)

case final_result do
{:exception, e} -> raise e
result -> result
end
end

defp block_runner(block) do
quote do
Expand All @@ -110,46 +168,4 @@ defmodule Retry do
end
end
end

@doc """
Returns stream of delays that are exponentially increasing. Stream halts once
the specified budget of milliseconds has elapsed.
"""
def exp_backoff_delays(budget, delay_cap) do
{1, :os.system_time(:milli_seconds) + budget}
|> Stream.unfold(fn {failures, end_t} ->
next_delay = figure_exp_delay(failures, delay_cap)
now_t = :os.system_time(:milli_seconds)

cond do
now_t > end_t ->
nil # out of time
(now_t + next_delay) > end_t ->
{end_t - now_t, {failures + 1, end_t}} # one last try
true ->
{next_delay, {failures + 1, end_t}}
end
end)
end

@doc """
Returns stream that returns specified number of the specified delay.
"""
def fixed_delays(count, delay) do
[delay]
|> Stream.cycle
|> Stream.take(count)
end

defp figure_exp_delay(failures, :infinity) do
:erlang.round((1 + :random.uniform) * 10 * :math.pow(2, failures))
end
defp figure_exp_delay(failures, delay_cap) do
Enum.min([
figure_exp_delay(failures, :infinity),
delay_cap
])
end
end
Loading

0 comments on commit b122b8f

Please sign in to comment.