From 16777418a76ebf829e3218a13d6f37273351bf9e Mon Sep 17 00:00:00 2001 From: Michael Crumm Date: Thu, 2 May 2024 12:39:01 -0700 Subject: [PATCH 1/3] Handle messages without data Messages with payloadFormat: "NONE" may not contain the data attribute. Note something may have changed on Google's side because I belive this was previously handled by the nil check. Closes #98 --- lib/broadway_cloud_pub_sub/pull_client.ex | 2 ++ .../pull_client_test.exs | 31 +++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/lib/broadway_cloud_pub_sub/pull_client.ex b/lib/broadway_cloud_pub_sub/pull_client.ex index a794522..eb5e120 100644 --- a/lib/broadway_cloud_pub_sub/pull_client.ex +++ b/lib/broadway_cloud_pub_sub/pull_client.ex @@ -173,6 +173,8 @@ defmodule BroadwayCloudPubSub.PullClient do %{message | "data" => Base.decode64!(encoded_data)} end + defp decode_message(message), do: message + defp headers(config) do token = get_token(config) [{"authorization", "Bearer #{token}"}, {"content-type", "application/json"}] diff --git a/test/broadway_cloud_pub_sub/pull_client_test.exs b/test/broadway_cloud_pub_sub/pull_client_test.exs index e1b15e2..6ebad6f 100644 --- a/test/broadway_cloud_pub_sub/pull_client_test.exs +++ b/test/broadway_cloud_pub_sub/pull_client_test.exs @@ -79,6 +79,23 @@ defmodule BroadwayCloudPubSub.PullClientTest do } """ + @no_payload_response """ + { + "receivedMessages": [ + { + "ackId": "1", + "message": { + "attributes": { + "payloadFormat": "NONE" + }, + "messageId": "20240501001", + "publishTime": "2024-05-01T13:07:41.716Z" + } + } + ] + } + """ + setup do server = Bypass.open() base_url = "http://localhost:#{server.port}" @@ -155,6 +172,20 @@ defmodule BroadwayCloudPubSub.PullClientTest do assert message.metadata.orderingKey == "key1" end + test "returns a list of Broadway.Message when payloadFormat is NONE", %{ + opts: base_opts, + server: server + } do + on_pubsub_request(server, fn _, _ -> + {:ok, @no_payload_response} + end) + + {:ok, opts} = PullClient.init(base_opts) + + assert [message] = PullClient.receive_messages(10, & &1, opts) + assert message.metadata.messageId == "20240501001" + end + test "returns a list of Broadway.Message with :data and :metadata set", %{ opts: base_opts } do From c1f39a58d4e8f2c8e279d3cce16e95ad76e9aa0a Mon Sep 17 00:00:00 2001 From: Michael Crumm Date: Thu, 2 May 2024 15:57:52 -0700 Subject: [PATCH 2/3] Decode only binary message data --- lib/broadway_cloud_pub_sub/pull_client.ex | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/broadway_cloud_pub_sub/pull_client.ex b/lib/broadway_cloud_pub_sub/pull_client.ex index eb5e120..e87f19a 100644 --- a/lib/broadway_cloud_pub_sub/pull_client.ex +++ b/lib/broadway_cloud_pub_sub/pull_client.ex @@ -167,9 +167,7 @@ defmodule BroadwayCloudPubSub.PullClient do end end - defp decode_message(%{"data" => nil} = message), do: message - - defp decode_message(%{"data" => encoded_data} = message) do + defp decode_message(%{"data" => encoded_data} = message) when is_binary(encoded_data) do %{message | "data" => Base.decode64!(encoded_data)} end From d94293828cd64f202a65f684515c4e742b42c5c0 Mon Sep 17 00:00:00 2001 From: Michael Crumm Date: Mon, 6 May 2024 13:47:22 -0700 Subject: [PATCH 3/3] Explicit match on nil data and payloadFormat NONE The nil data case was originally fixed in #14 so I am leaving the explicit match for messages with nil data. The attribute match will cover the cases where no data attribute exists on the payload. --- lib/broadway_cloud_pub_sub/pull_client.ex | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/broadway_cloud_pub_sub/pull_client.ex b/lib/broadway_cloud_pub_sub/pull_client.ex index e87f19a..96a3468 100644 --- a/lib/broadway_cloud_pub_sub/pull_client.ex +++ b/lib/broadway_cloud_pub_sub/pull_client.ex @@ -171,7 +171,8 @@ defmodule BroadwayCloudPubSub.PullClient do %{message | "data" => Base.decode64!(encoded_data)} end - defp decode_message(message), do: message + defp decode_message(%{"data" => nil} = message), do: message + defp decode_message(%{"attributes" => %{"payloadFormat" => "NONE"}} = message), do: message defp headers(config) do token = get_token(config)