diff --git a/lib/marathon/binge_watch.ex b/lib/marathon/binge_watch.ex index 4f708d3..29a654f 100644 --- a/lib/marathon/binge_watch.ex +++ b/lib/marathon/binge_watch.ex @@ -11,6 +11,8 @@ defmodule Heimdall.Marathon.BingeWatch do streaming Marathon events to itself in `start_link/0`. """ + @name Heimdall.Marathon.BingeWatch + use GenServer require Logger @@ -18,10 +20,57 @@ defmodule Heimdall.Marathon.BingeWatch do alias Plug.Router.Utils def start_link(args) do - marathon_url = Keyword.get(args, :marathon_url) - {:ok, pid} = GenServer.start_link(__MODULE__, [marathon_url: marathon_url], []) - HTTPoison.get!(marathon_url <> "/v2/events", %{"Accept": "text/event-stream"}, stream_to: pid, recv_timeout: :infinity) - {:ok, pid} + GenServer.start_link(__MODULE__, args, name: @name) + end + + def init(args) do + state = reset_retries(args) + GenServer.cast(@name, :connect) + {:ok, state} + end + + def handle_cast(:connect, state) do + retries = Keyword.get(state, :retries) + :timer.sleep(delay_seconds(retries)) + marathon_url = Keyword.get(state, :marathon_url) + case connect_to_marathon(marathon_url) do + {:ok, _} -> + new_state = reset_retries(state) + {:noreply, new_state} + {:error, %HTTPoison.Error{reason: reason}} -> + Logger.warn "Failed to connect to Marathon: #{inspect(reason)}" + retry_connect(state) + {:error, reason} -> + Logger.warn "Failed to connect to Marathon: #{inspect(reason)}" + retry_connect(state) + end + end + + def handle_cast(_req, state) do + {:noreply, state} + end + + defp delay_seconds(retries) do + back_off = round(:math.pow(2, retries)) + :timer.seconds(back_off) + end + + defp increment_retries(state) do + Keyword.update(state, :retries, 0, fn r -> r + 1 end) + end + + defp reset_retries(state) do + Keyword.put(state, :retries, 0) + end + + defp retry_connect(state) do + new_state = increment_retries(state) + GenServer.cast(@name, :connect) + {:noreply, new_state} + end + + defp connect_to_marathon(marathon_url) do + HTTPoison.get(marathon_url <> "/v2/events", %{"Accept": "text/event-stream"}, stream_to: self, recv_timeout: 15_000) end @doc """ @@ -122,8 +171,8 @@ defmodule Heimdall.Marathon.BingeWatch do `handle_info/2` handles responses streamed in from Marathon If a response from Marathon gives back anything other than 200, - or if there is an error connecting, BingeWatch will stop with - the reason. + or if there is an error connecting, BingeWatch will attempt to + reconnect with exponential back-off. Any chunked response other than a carriage return (which is used as a keep-alive) from Marathon will trigger a reload of the @@ -132,11 +181,14 @@ defmodule Heimdall.Marathon.BingeWatch do Also other message to `handle_info/2` will be ignored. """ def handle_info(%HTTPoison.AsyncStatus{code: 200}, state) do - {:noreply, state} + Logger.info "Successfully connected to Marathon stream" + new_state = reset_retries(state) + {:noreply, new_state} end def handle_info(%HTTPoison.AsyncStatus{code: status}, state) do - {:stop, "Got error code from Marathon: " <> status, state} + Logger.warn "Got error status code from Marathon: #{status}" + retry_connect(state) end def handle_info(%HTTPoison.AsyncChunk{chunk: "\r\n"}, state) do @@ -150,7 +202,7 @@ defmodule Heimdall.Marathon.BingeWatch do {:ok, _routes} -> {:noreply, state} {:error, reason} -> - Logger.warn "Creating routes failed: #{reason}" + Logger.warn "Creating routes failed: #{inspect(reason)}" {:noreply, state} _ -> Logger.warn "Creating routes failed for unknown reason" @@ -158,8 +210,14 @@ defmodule Heimdall.Marathon.BingeWatch do end end + def handle_info(%HTTPoison.AsyncEnd{}, state) do + Logger.warn "Disconnected from Marathon, attempting to reconnect now" + retry_connect(state) + end + def handle_info(%HTTPoison.Error{reason: reason}, state) do - {:stop, reason, state} + Logger.warn "Got error from Marathon stream: #{inspect(reason)}" + retry_connect(state) end def handle_info(_msg, state) do diff --git a/lib/marathon/binge_watch/supervisor.ex b/lib/marathon/binge_watch/supervisor.ex index e7b2655..1e579d1 100644 --- a/lib/marathon/binge_watch/supervisor.ex +++ b/lib/marathon/binge_watch/supervisor.ex @@ -1,14 +1,15 @@ defmodule Heimdall.Marathon.BingeWatch.Supervisor do use Supervisor + @name Heimdall.Marathon.BingeWatch.Supervisor + def start_link(args) do - Supervisor.start_link(__MODULE__, args) + Supervisor.start_link(__MODULE__, args, name: @name) end def init(args) do - marathon_url = Keyword.get(args, :marathon_url) children = [ - worker(Heimdall.Marathon.BingeWatch, [[marathon_url: marathon_url]], restart: :permanent) + worker(Heimdall.Marathon.BingeWatch, [args], restart: :permanent) ] supervise(children, strategy: :one_for_one, max_retries: :infinity, max_seconds: 1)