From d7392c83fccb28a886f83d6784885f9ac453e193 Mon Sep 17 00:00:00 2001 From: Michael Crumm Date: Mon, 6 May 2024 15:24:51 -0700 Subject: [PATCH] Handle messages without data (#99) Messages with payloadFormat: "NONE" may not contain the data attribute. Note something may have changed on Google's side because I believe this was previously handled by the nil check. The nil data case was originally fixed in #14 so I am leaving the explicit match for messages with nil data. The attribute match will cover the cases where no data attribute exists on the payload. --- lib/broadway_cloud_pub_sub/pull_client.ex | 7 +++-- .../pull_client_test.exs | 31 +++++++++++++++++++ 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/lib/broadway_cloud_pub_sub/pull_client.ex b/lib/broadway_cloud_pub_sub/pull_client.ex index a794522..96a3468 100644 --- a/lib/broadway_cloud_pub_sub/pull_client.ex +++ b/lib/broadway_cloud_pub_sub/pull_client.ex @@ -167,12 +167,13 @@ defmodule BroadwayCloudPubSub.PullClient do end end - defp decode_message(%{"data" => nil} = message), do: message - - defp decode_message(%{"data" => encoded_data} = message) do + defp decode_message(%{"data" => encoded_data} = message) when is_binary(encoded_data) do %{message | "data" => Base.decode64!(encoded_data)} end + defp decode_message(%{"data" => nil} = message), do: message + defp decode_message(%{"attributes" => %{"payloadFormat" => "NONE"}} = message), do: message + defp headers(config) do token = get_token(config) [{"authorization", "Bearer #{token}"}, {"content-type", "application/json"}] diff --git a/test/broadway_cloud_pub_sub/pull_client_test.exs b/test/broadway_cloud_pub_sub/pull_client_test.exs index e1b15e2..6ebad6f 100644 --- a/test/broadway_cloud_pub_sub/pull_client_test.exs +++ b/test/broadway_cloud_pub_sub/pull_client_test.exs @@ -79,6 +79,23 @@ defmodule BroadwayCloudPubSub.PullClientTest do } """ + @no_payload_response """ + { + "receivedMessages": [ + { + "ackId": "1", + "message": { + "attributes": { + "payloadFormat": "NONE" + }, + "messageId": "20240501001", + "publishTime": "2024-05-01T13:07:41.716Z" + } + } + ] + } + """ + setup do server = Bypass.open() base_url = "http://localhost:#{server.port}" @@ -155,6 +172,20 @@ defmodule BroadwayCloudPubSub.PullClientTest do assert message.metadata.orderingKey == "key1" end + test "returns a list of Broadway.Message when payloadFormat is NONE", %{ + opts: base_opts, + server: server + } do + on_pubsub_request(server, fn _, _ -> + {:ok, @no_payload_response} + end) + + {:ok, opts} = PullClient.init(base_opts) + + assert [message] = PullClient.receive_messages(10, & &1, opts) + assert message.metadata.messageId == "20240501001" + end + test "returns a list of Broadway.Message with :data and :metadata set", %{ opts: base_opts } do