Skip to content

Commit

Permalink
Retry http errors (#103)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwoods79 authored Jul 30, 2024
1 parent b4bdbec commit d5b300a
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 6 deletions.
15 changes: 14 additions & 1 deletion lib/broadway_cloud_pub_sub/options.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,26 @@ defmodule BroadwayCloudPubSub.Options do
# Handled by Broadway.
broadway: [type: :any, doc: false],
client: [
type: :atom,
type: {:or, [:atom, :mod_arg]},
default: BroadwayCloudPubSub.PullClient,
doc: """
A module that implements the BroadwayCloudPubSub.Client behaviour.
This module is responsible for fetching and acknowledging the messages.
Pay attention that all options passed to the producer will be forwarded
to the client. It's up to the client to normalize the options it needs.
The BroadwayCloudPubSub.PullClient is the default client and will
automatically retry the following errors [408, 500, 502, 503, 504, 522,
524] up to 10 times with a 500ms pause between retries. This can be
configured by passing the module with options to the client:
{BroadwayCloudPubSub.PullClient,
retry_codes: [502, 503],
retry_delay_ms: 300,
max_retries: 5}
These options will be merged with the options to the producer and passed
to the client init/1 function.
"""
],
subscription: [
Expand Down
9 changes: 8 additions & 1 deletion lib/broadway_cloud_pub_sub/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,14 @@ defmodule BroadwayCloudPubSub.Producer do
opts = NimbleOptions.validate!(client_opts, Options.definition())

ack_ref = broadway_opts[:name]
client = opts[:client]

{client, opts} =
case opts[:client] do
{client, client_opts} -> {client, Keyword.merge(opts, client_opts)}
client -> {client, opts}
end

opts = Keyword.put(opts, :client, client)

opts =
Keyword.put_new_lazy(opts, :token_generator, fn ->
Expand Down
34 changes: 33 additions & 1 deletion lib/broadway_cloud_pub_sub/pull_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ defmodule BroadwayCloudPubSub.PullClient do

@behaviour Client

@default_retry_codes [408, 500, 502, 503, 504, 522, 524]
@default_retry_delay_ms 500
@default_max_retries 10

@impl Client
def prepare_to_connect(name, producer_opts) do
case Keyword.fetch(producer_opts, :finch) do
Expand Down Expand Up @@ -192,19 +196,47 @@ defmodule BroadwayCloudPubSub.PullClient do
url = url(config, action)
body = Jason.encode!(payload)
headers = headers(config)
execute(url, body, headers, config, action, payload, max_retries(config))
end

defp execute(url, body, headers, config, action, payload, retries_left) do
case finch_request(config.finch, url, body, headers, config.receive_timeout) do
{:ok, %Response{status: 200, body: body}} ->
{:ok, Jason.decode!(body)}

{:ok, %Response{} = resp} ->
{:error, format_error(url, resp)}
maybe_retry(resp, url, body, headers, config, action, payload, retries_left)

{:error, err} ->
{:error, format_error(url, err)}
end
end

defp maybe_retry(resp, url, body, headers, config, action, payload, retries_left) do
if should_retry(resp, config, retries_left) do
config |> retry_delay() |> Process.sleep()
execute(url, body, headers, config, action, payload, retries_left - 1)
else
{:error, format_error(url, resp)}
end
end

defp should_retry(%Response{status: status}, config, retries_left) do
retries_left > 0 and status in retry_codes(config)
end

defp max_retries(config) do
config[:max_retries] || @default_max_retries
end

defp retry_codes(config) do
config[:retry_codes] || @default_retry_codes
end

defp retry_delay(config) do
config[:retry_delay_ms] || @default_retry_delay_ms
end

defp finch_request(finch, url, body, headers, timeout) do
:post
|> Finch.build(url, headers, body)
Expand Down
14 changes: 14 additions & 0 deletions test/broadway_cloud_pub_sub/producer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,20 @@ defmodule BroadwayCloudPubSub.ProducerTest do
end
end

test "passing extra options to client" do
{:ok, message_server} = MessageServer.start_link()
broadway_name = new_unique_name()

{:ok, pid_1} =
start_broadway(
broadway_name,
message_server,
{FakeClient, max_retries: 3, retry_codes: [502, 503], retry_delay_ms: 300}
)

stop_broadway(pid_1)
end

test "support multiple topologies" do
{:ok, message_server} = MessageServer.start_link()
broadway_name = new_unique_name()
Expand Down
40 changes: 37 additions & 3 deletions test/broadway_cloud_pub_sub/pull_client_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,21 @@ defmodule BroadwayCloudPubSub.PullClientTest do
end)
end

def multiple_errors_on_pubsub(server, error_count: total_errors, error_status: error_status) do
{:ok, agent} = Agent.start_link(fn -> 1 end)

Bypass.expect(server, fn conn ->
attempt = Agent.get_and_update(agent, fn num -> {num, num + 1} end)

if attempt <= total_errors do
Plug.Conn.resp(conn, error_status, @empty_response)
else
Agent.stop(agent)
Plug.Conn.resp(conn, 200, @ordered_response)
end
end)
end

defp init_with_ack_builder(opts) do
# mimics workflow from Producer.prepare_for_start/2
ack_ref = opts[:broadway][:name]
Expand Down Expand Up @@ -151,7 +166,8 @@ defmodule BroadwayCloudPubSub.PullClientTest do
max_number_of_messages: 10,
subscription: "projects/foo/subscriptions/bar",
token_generator: {__MODULE__, :generate_token, []},
receive_timeout: :infinity
receive_timeout: :infinity,
max_retries: 0
]
}
end
Expand All @@ -172,6 +188,22 @@ defmodule BroadwayCloudPubSub.PullClientTest do
assert message.metadata.orderingKey == "key1"
end

test "retries if the option is set", %{
opts: base_opts,
server: server
} do
multiple_errors_on_pubsub(server, error_count: 3, error_status: 502)

{:ok, opts} =
base_opts
|> Keyword.put(:max_retries, 3)
|> Keyword.put(:retry_delay_ms, 0)
|> Keyword.put(:retry_codes, [502])
|> PullClient.init()

assert [_message] = PullClient.receive_messages(10, & &1, opts)
end

test "returns a list of Broadway.Message when payloadFormat is NONE", %{
opts: base_opts,
server: server
Expand Down Expand Up @@ -314,7 +346,8 @@ defmodule BroadwayCloudPubSub.PullClientTest do
subscription: "projects/foo/subscriptions/bar",
token_generator: {__MODULE__, :generate_token, []},
receive_timeout: :infinity,
topology_name: Broadway3
topology_name: Broadway3,
max_retries: 0
]
}
end
Expand Down Expand Up @@ -399,7 +432,8 @@ defmodule BroadwayCloudPubSub.PullClientTest do
subscription: "projects/foo/subscriptions/bar",
token_generator: {__MODULE__, :generate_token, []},
receive_timeout: :infinity,
topology_name: Broadway3
topology_name: Broadway3,
max_retries: 0
]
}
end
Expand Down

0 comments on commit d5b300a

Please sign in to comment.