From d5b300a8be1d84dd0b0db01d837ae7e5f4f464ce Mon Sep 17 00:00:00 2001 From: Micah Woods Date: Tue, 30 Jul 2024 14:31:43 -0400 Subject: [PATCH] Retry http errors (#103) --- lib/broadway_cloud_pub_sub/options.ex | 15 ++++++- lib/broadway_cloud_pub_sub/producer.ex | 9 ++++- lib/broadway_cloud_pub_sub/pull_client.ex | 34 +++++++++++++++- test/broadway_cloud_pub_sub/producer_test.exs | 14 +++++++ .../pull_client_test.exs | 40 +++++++++++++++++-- 5 files changed, 106 insertions(+), 6 deletions(-) diff --git a/lib/broadway_cloud_pub_sub/options.ex b/lib/broadway_cloud_pub_sub/options.ex index da222a5..0289879 100644 --- a/lib/broadway_cloud_pub_sub/options.ex +++ b/lib/broadway_cloud_pub_sub/options.ex @@ -13,13 +13,26 @@ defmodule BroadwayCloudPubSub.Options do # Handled by Broadway. broadway: [type: :any, doc: false], client: [ - type: :atom, + type: {:or, [:atom, :mod_arg]}, default: BroadwayCloudPubSub.PullClient, doc: """ A module that implements the BroadwayCloudPubSub.Client behaviour. This module is responsible for fetching and acknowledging the messages. Pay attention that all options passed to the producer will be forwarded to the client. It's up to the client to normalize the options it needs. + + The BroadwayCloudPubSub.PullClient is the default client and will + automatically retry the following errors [408, 500, 502, 503, 504, 522, + 524] up to 10 times with a 500ms pause between retries. This can be + configured by passing the module with options to the client: + + {BroadwayCloudPubSub.PullClient, + retry_codes: [502, 503], + retry_delay_ms: 300, + max_retries: 5} + + These options will be merged with the options to the producer and passed + to the client init/1 function. """ ], subscription: [ diff --git a/lib/broadway_cloud_pub_sub/producer.ex b/lib/broadway_cloud_pub_sub/producer.ex index 6a24f33..7050c90 100644 --- a/lib/broadway_cloud_pub_sub/producer.ex +++ b/lib/broadway_cloud_pub_sub/producer.ex @@ -161,7 +161,14 @@ defmodule BroadwayCloudPubSub.Producer do opts = NimbleOptions.validate!(client_opts, Options.definition()) ack_ref = broadway_opts[:name] - client = opts[:client] + + {client, opts} = + case opts[:client] do + {client, client_opts} -> {client, Keyword.merge(opts, client_opts)} + client -> {client, opts} + end + + opts = Keyword.put(opts, :client, client) opts = Keyword.put_new_lazy(opts, :token_generator, fn -> diff --git a/lib/broadway_cloud_pub_sub/pull_client.ex b/lib/broadway_cloud_pub_sub/pull_client.ex index 20e379d..7fd2fa7 100644 --- a/lib/broadway_cloud_pub_sub/pull_client.ex +++ b/lib/broadway_cloud_pub_sub/pull_client.ex @@ -10,6 +10,10 @@ defmodule BroadwayCloudPubSub.PullClient do @behaviour Client + @default_retry_codes [408, 500, 502, 503, 504, 522, 524] + @default_retry_delay_ms 500 + @default_max_retries 10 + @impl Client def prepare_to_connect(name, producer_opts) do case Keyword.fetch(producer_opts, :finch) do @@ -192,19 +196,47 @@ defmodule BroadwayCloudPubSub.PullClient do url = url(config, action) body = Jason.encode!(payload) headers = headers(config) + execute(url, body, headers, config, action, payload, max_retries(config)) + end + defp execute(url, body, headers, config, action, payload, retries_left) do case finch_request(config.finch, url, body, headers, config.receive_timeout) do {:ok, %Response{status: 200, body: body}} -> {:ok, Jason.decode!(body)} {:ok, %Response{} = resp} -> - {:error, format_error(url, resp)} + maybe_retry(resp, url, body, headers, config, action, payload, retries_left) {:error, err} -> {:error, format_error(url, err)} end end + defp maybe_retry(resp, url, body, headers, config, action, payload, retries_left) do + if should_retry(resp, config, retries_left) do + config |> retry_delay() |> Process.sleep() + execute(url, body, headers, config, action, payload, retries_left - 1) + else + {:error, format_error(url, resp)} + end + end + + defp should_retry(%Response{status: status}, config, retries_left) do + retries_left > 0 and status in retry_codes(config) + end + + defp max_retries(config) do + config[:max_retries] || @default_max_retries + end + + defp retry_codes(config) do + config[:retry_codes] || @default_retry_codes + end + + defp retry_delay(config) do + config[:retry_delay_ms] || @default_retry_delay_ms + end + defp finch_request(finch, url, body, headers, timeout) do :post |> Finch.build(url, headers, body) diff --git a/test/broadway_cloud_pub_sub/producer_test.exs b/test/broadway_cloud_pub_sub/producer_test.exs index f51432f..02b87f1 100644 --- a/test/broadway_cloud_pub_sub/producer_test.exs +++ b/test/broadway_cloud_pub_sub/producer_test.exs @@ -711,6 +711,20 @@ defmodule BroadwayCloudPubSub.ProducerTest do end end + test "passing extra options to client" do + {:ok, message_server} = MessageServer.start_link() + broadway_name = new_unique_name() + + {:ok, pid_1} = + start_broadway( + broadway_name, + message_server, + {FakeClient, max_retries: 3, retry_codes: [502, 503], retry_delay_ms: 300} + ) + + stop_broadway(pid_1) + end + test "support multiple topologies" do {:ok, message_server} = MessageServer.start_link() broadway_name = new_unique_name() diff --git a/test/broadway_cloud_pub_sub/pull_client_test.exs b/test/broadway_cloud_pub_sub/pull_client_test.exs index 6ebad6f..56d0aa2 100644 --- a/test/broadway_cloud_pub_sub/pull_client_test.exs +++ b/test/broadway_cloud_pub_sub/pull_client_test.exs @@ -124,6 +124,21 @@ defmodule BroadwayCloudPubSub.PullClientTest do end) end + def multiple_errors_on_pubsub(server, error_count: total_errors, error_status: error_status) do + {:ok, agent} = Agent.start_link(fn -> 1 end) + + Bypass.expect(server, fn conn -> + attempt = Agent.get_and_update(agent, fn num -> {num, num + 1} end) + + if attempt <= total_errors do + Plug.Conn.resp(conn, error_status, @empty_response) + else + Agent.stop(agent) + Plug.Conn.resp(conn, 200, @ordered_response) + end + end) + end + defp init_with_ack_builder(opts) do # mimics workflow from Producer.prepare_for_start/2 ack_ref = opts[:broadway][:name] @@ -151,7 +166,8 @@ defmodule BroadwayCloudPubSub.PullClientTest do max_number_of_messages: 10, subscription: "projects/foo/subscriptions/bar", token_generator: {__MODULE__, :generate_token, []}, - receive_timeout: :infinity + receive_timeout: :infinity, + max_retries: 0 ] } end @@ -172,6 +188,22 @@ defmodule BroadwayCloudPubSub.PullClientTest do assert message.metadata.orderingKey == "key1" end + test "retries if the option is set", %{ + opts: base_opts, + server: server + } do + multiple_errors_on_pubsub(server, error_count: 3, error_status: 502) + + {:ok, opts} = + base_opts + |> Keyword.put(:max_retries, 3) + |> Keyword.put(:retry_delay_ms, 0) + |> Keyword.put(:retry_codes, [502]) + |> PullClient.init() + + assert [_message] = PullClient.receive_messages(10, & &1, opts) + end + test "returns a list of Broadway.Message when payloadFormat is NONE", %{ opts: base_opts, server: server @@ -314,7 +346,8 @@ defmodule BroadwayCloudPubSub.PullClientTest do subscription: "projects/foo/subscriptions/bar", token_generator: {__MODULE__, :generate_token, []}, receive_timeout: :infinity, - topology_name: Broadway3 + topology_name: Broadway3, + max_retries: 0 ] } end @@ -399,7 +432,8 @@ defmodule BroadwayCloudPubSub.PullClientTest do subscription: "projects/foo/subscriptions/bar", token_generator: {__MODULE__, :generate_token, []}, receive_timeout: :infinity, - topology_name: Broadway3 + topology_name: Broadway3, + max_retries: 0 ] } end