Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix persistent term error, refactor version to match broadway #64

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## v0.1.0 (2021-08-30)

* Add selectable term storage
* Persistent Term error

## v0.7.0 (2021-08-30)

* Add the following telemetry events:
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ Broadway.start_link(MyBroadway,
name: MyBroadway,
producer: [
module: {BroadwaySQS.Producer,
queue_url: "https://sqs.amazonaws.com/1234567890/queue"
queue_url: "https://sqs.amazonaws.com/1234567890/queue",
storage: :ets
}
]
)
Expand Down
69 changes: 69 additions & 0 deletions lib/broadway_sqs/ets_storage.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
defmodule BroadwaySQS.Storage.ETS do
@moduledoc """
A simple term storage to avoid passing large amounts of data between processes.

If you have a large amount of data and you want to avoid passing it between
processes, you can use the `Storage.ETS`. The `Storage.ETS` creates a unique
reference for it, allowing you pass the reference around instead of the term.

If the same term is put multiple times, it is stored only once, avoiding
generating garbage. However, the stored terms are never removed.
A common use case for this feature is in Broadway.Producer, which may need
to pass a lot of information to acknowledge messages. With this module, you
can store those terms when the producer starts and only pass the reference
between messages:

iex> ref = BroadwaySQS.Storage.ETS.put({:foo, :bar, :baz}) # On init
iex> BroadwaySQS.Storage.ETS.get!(ref) # On ack
{:foo, :bar, :baz}

"""

use GenServer
@name __MODULE__

@doc false
def start_link(_) do
GenServer.start_link(__MODULE__, :ok, name: @name)
end

@doc """
Gets a previously stored term.
"""
def get!(ref) when is_reference(ref) do
:ets.lookup_element(@name, ref, 2)
end

@doc """
Puts a term.
"""
def put(term) do
find_by_term(term) || GenServer.call(@name, {:put, term}, :infinity)
end

# Callbacks

@impl true
def init(:ok) do
ets = :ets.new(@name, [:named_table, :protected, :set, read_concurrency: true])
{:ok, ets}
end

@impl true
def handle_call({:put, term}, _from, state) do
if ref = find_by_term(term) do
{:reply, ref, state}
else
ref = make_ref()
:ets.insert(@name, {ref, term})
{:reply, ref, state}
end
end

defp find_by_term(term) do
case :ets.match(@name, {:"$1", term}) do
[[ref]] -> ref
[] -> nil
end
end
end
49 changes: 47 additions & 2 deletions lib/broadway_sqs/ex_aws_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ defmodule BroadwaySQS.ExAwsClient do
"""

alias Broadway.{Message, Acknowledger}
alias BroadwaySQS.Storage.{ETS, PersistentTerm}
require BroadwaySQS.Validators
require Logger

@behaviour BroadwaySQS.SQSClient
Expand All @@ -16,8 +18,50 @@ defmodule BroadwaySQS.ExAwsClient do

@impl true
def init(opts) do
opts_map = opts |> Enum.into(%{ack_ref: opts[:broadway][:name]})

storage = Kernel.get_in(opts, [:storage]) || :ets

{_(ack_ref)} =
case storage do
:ets ->
with {:ok, queue_url} <- validate(opts, :queue_url, required: true),
{:ok, receive_messages_opts} <- validate_receive_messages_opts(opts),
{:ok, config} <- validate(opts, :config, default: []),
{:ok, on_success} <- validate(opts, :on_success, default: :ack),
{:ok, on_failure} <- validate(opts, :on_failure, default: :noop) do
ack_ref =
ETS.put(%{
queue_url: queue_url,
config: config,
on_success: on_success,
on_failure: on_failure
})

{:ok,
%{
queue_url: queue_url,
receive_messages_opts: receive_messages_opts,
config: config,
ack_ref: ack_ref
}}
end

:persistant ->
ack_ref =
PersistentTerm.put(%{
queue_url: opts[:broadway][:queue_url],
config: opts[:broadway][:config]
})

{:ok,
%{
queue_url: queue_url,
receive_messages_opts: receive_messages_opts,
config: config,
ack_ref: ack_ref
}}
end

opts_map = opts |> Enum.into(%{ack_ref: ack_ref})
{:ok, opts_map}
end

Expand All @@ -33,6 +77,7 @@ defmodule BroadwaySQS.ExAwsClient do

@impl Acknowledger
def ack(ack_ref, successful, failed) do
# ack_options = BroadwaySQS.PersistentTermStorage.get(ack_ref)
ack_options = :persistent_term.get(ack_ref)

messages =
Expand Down
73 changes: 73 additions & 0 deletions lib/broadway_sqs/persistent_term.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
defmodule BroadwaySQS.Storage.PersistentTerm do
@moduledoc """
A simple term storage to avoid passing large amounts of data between processes.

If you have a large amount of data and you want to avoid passing it between
processes, you can use the `PersistentTerm`. The `PersistentTerm` creates a persistent term
for it, allowing you pass the reference around instead of the term.

If the same term is put multiple times, it is stored only once, avoiding
generating garbage. However, the stored terms are never removed.
A common use case for this feature is in Broadway.Producer, which may need
to pass a lot of information to acknowledge messages. With this module, you
can store those terms when the producer starts and only pass the reference
between messages:

iex> ref = BroadwaySQS.Storage.PersistentTerm.put({:foo, :bar, :baz}) # On init
iex> BroadwaySQS.Storage.PersistentTerm.get!(ref) # On ack
{:foo, :bar, :baz}

"""

use GenServer
@name __MODULE__

@doc false
def start_link(_) do
GenServer.start_link(__MODULE__, :ok, name: @name)
end

@doc """
Gets a previously stored term.
"""
def get!(term) do
find_by_term(term)
end

@doc """
Delete a previously stored term.
"""
def delete!(term) do
:persistent_term.erase(term)
end

@doc """
Puts a term.
"""
def put(term) do
find_by_term(term) || GenServer.call(@name, {:put, term}, :infinity)
end

# Callbacks

@impl true
def init(:ok) do
{:ok, []}
end

@impl true
def handle_call({:put, term}, _from, state) do
case find_by_term(term) do
{:error, _} -> {:reply, nil, state}
term -> {:reply, term, state}
end
end

defp find_by_term(term) do
try do
:persistent_term.get(term)
rescue
{:error, :term_not_found}
end
end
end
135 changes: 135 additions & 0 deletions lib/broadway_sqs/validators.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
defmodule BroadwaySQS.Validators do
@moduledoc """
The Validators used for sqs
"""

@doc false
def validate_configure_options!(options) do
Enum.each(options, fn {option, value} ->
with true <- option in [:on_success, :on_failure],
{:ok, _} <- validate_option(option, value) do
:ok
else
_ ->
raise ArgumentError,
"unsupported configure option #{inspect(option)} => #{inspect(value)}"
end
end)
end

@doc false
def validate(opts, key, options \\ []) when is_list(opts) do
has_key = Keyword.has_key?(opts, key)
required = Keyword.get(options, :required, false)
default = Keyword.get(options, :default)

cond do
has_key ->
validate_option(key, opts[key])

required ->
{:error, "#{inspect(key)} is required"}

default != nil ->
validate_option(key, default)

true ->
{:ok, nil}
end
end

@doc false
def validate_option(:config, value) when not is_list(value),
do: validation_error(:config, "a keyword list", value)

@doc false
def validate_option(:queue_url, value) when not is_binary(value) or value == "",
do: validation_error(:queue_url, "a non empty string", value)

@doc false
def validate_option(:wait_time_seconds, value) when not is_integer(value) or value < 0,
do: validation_error(:wait_time_seconds, "a non negative integer", value)

@doc false
def validate_option(:max_number_of_messages, value)
when value not in 1..@max_num_messages_allowed_by_aws do
validation_error(
:max_number_of_messages,
"an integer between 1 and #{@max_num_messages_allowed_by_aws}",
value
)
end

@doc false
def validate_option(:visibility_timeout, value)
when value not in 0..@max_visibility_timeout_allowed_by_aws_in_seconds do
validation_error(
:visibility_timeout,
"an integer between 0 and #{@max_visibility_timeout_allowed_by_aws_in_seconds}",
value
)
end

@doc false
def validate_option(:attribute_names, value) do
supported? = fn name -> name in @supported_attributes end

if value == :all || (is_list(value) && Enum.all?(value, supported?)) do
{:ok, value}
else
validation_error(
:attribute_names,
":all or a list containing any of #{inspect(@supported_attributes)}",
value
)
end
end

@doc false
def validate_option(:message_attribute_names, value) do
non_empty_string? = fn name -> is_binary(name) && name != "" end

if value == :all || (is_list(value) && Enum.all?(value, non_empty_string?)) do
{:ok, value}
else
validation_error(:message_attribute_names, ":all or a list of non empty strings", value)
end
end

@doc false
def validate_option(option, value) when option in [:on_success, :on_failure] do
if value in [:ack, :noop] do
{:ok, value}
else
validation_error(option, ":ack or :noop", value)
end
end

@doc false
def validate_option(_, value), do: {:ok, value}

@doc false
def validation_error(option, expected, value) do
{:error, "expected #{inspect(option)} to be #{expected}, got: #{inspect(value)}"}
end

@doc false
def validate_receive_messages_opts(opts) do
with {:ok, wait_time_seconds} <- validate(opts, :wait_time_seconds),
{:ok, attribute_names} <- validate(opts, :attribute_names),
{:ok, message_attribute_names} <- validate(opts, :message_attribute_names),
{:ok, max_number_of_messages} <-
validate(opts, :max_number_of_messages, default: @default_max_number_of_messages),
{:ok, visibility_timeout} <- validate(opts, :visibility_timeout) do
validated_opts = [
max_number_of_messages: max_number_of_messages,
wait_time_seconds: wait_time_seconds,
visibility_timeout: visibility_timeout,
attribute_names: attribute_names,
message_attribute_names: message_attribute_names
]

{:ok, Enum.filter(validated_opts, fn {_, value} -> value end)}
end
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule BroadwaySqs.MixProject do
use Mix.Project

@version "0.7.0"
@version "0.1.0"
@description "A SQS connector for Broadway"

def project do
Expand Down