From a852e2a55fee8963073fb49e417f3b68a8b6427f Mon Sep 17 00:00:00 2001 From: Louis Date: Thu, 8 May 2025 13:18:55 +0200 Subject: [PATCH 1/7] Add fallback to previous version --- lib/remote_persistent_term.ex | 46 ++++++++++++-- lib/remote_persistent_term/fetcher.ex | 6 ++ lib/remote_persistent_term/fetcher/http.ex | 3 + lib/remote_persistent_term/fetcher/s3.ex | 63 ++++++++++++++++--- lib/remote_persistent_term/fetcher/static.ex | 5 +- mix.exs | 2 +- .../fetcher/s3_test.exs | 54 ++++++++++++++++ 7 files changed, 163 insertions(+), 16 deletions(-) diff --git a/lib/remote_persistent_term.ex b/lib/remote_persistent_term.ex index 55e4534..fbca472 100644 --- a/lib/remote_persistent_term.ex +++ b/lib/remote_persistent_term.ex @@ -69,6 +69,15 @@ defmodule RemotePersistentTerm do Currently only supports gzip (0x1F, 0x8B). """ + ], + version_fallback?: [ + type: :boolean, + required: false, + default: false, + doc: """ + If true, when deserialization fails, the system will attempt to use previous versions \ + of the term until a valid version is found or all versions are exhausted. + """ ] ] @@ -77,7 +86,8 @@ defmodule RemotePersistentTerm do fetcher_state: term(), refresh_interval: pos_integer(), current_version: String.t(), - auto_decompress?: boolean() + auto_decompress?: boolean(), + version_fallback?: boolean() } defstruct [ :fetcher_mod, @@ -85,7 +95,8 @@ defmodule RemotePersistentTerm do :refresh_interval, :current_version, :name, - :auto_decompress? + :auto_decompress?, + :version_fallback? ] @doc """ @@ -138,7 +149,8 @@ defmodule RemotePersistentTerm do fetcher_state: fetcher_state, refresh_interval: opts[:refresh_interval], name: name(opts), - auto_decompress?: opts[:auto_decompress?] + auto_decompress?: opts[:auto_decompress?], + version_fallback?: opts[:version_fallback?] } if opts[:lazy_init?] do @@ -307,9 +319,31 @@ defmodule RemotePersistentTerm do defp download_and_store_term(state, deserialize_fun, put_fun) do with {:ok, term} <- state.fetcher_mod.download(state.fetcher_state), - {:ok, decompressed} <- maybe_decompress(state, term), - {:ok, deserialized} <- deserialize_fun.(decompressed) do - put_fun.(deserialized) + {:ok, decompressed} <- maybe_decompress(state, term) do + try_deserialize_and_store(state, decompressed, deserialize_fun, put_fun) + end + end + + defp try_deserialize_and_store(state, term, deserialize_fun, put_fun) do + case deserialize_fun.(term) do + {:ok, deserialized} -> + put_fun.(deserialized) + + {:error, _reason} = error when state.version_fallback? -> + case state.fetcher_mod.previous_version(state.fetcher_state) do + {:ok, previous_state} -> + download_and_store_term( + %{state | fetcher_state: previous_state}, + deserialize_fun, + put_fun + ) + + {:error, _} -> + error + end + + error -> + error end end diff --git a/lib/remote_persistent_term/fetcher.ex b/lib/remote_persistent_term/fetcher.ex index 30933a5..676e13a 100644 --- a/lib/remote_persistent_term/fetcher.ex +++ b/lib/remote_persistent_term/fetcher.ex @@ -26,4 +26,10 @@ defmodule RemotePersistentTerm.Fetcher do Download the term from the remote source. """ @callback download(state()) :: {:ok, term()} | {:error, term()} + + @doc """ + Get the previous version of the remote term. + Returns a new state that can be used to fetch the previous version. + """ + @callback previous_version(state()) :: {:ok, state()} | {:error, term()} end diff --git a/lib/remote_persistent_term/fetcher/http.ex b/lib/remote_persistent_term/fetcher/http.ex index 1facdc9..e94d6ac 100644 --- a/lib/remote_persistent_term/fetcher/http.ex +++ b/lib/remote_persistent_term/fetcher/http.ex @@ -81,6 +81,9 @@ defmodule RemotePersistentTerm.Fetcher.Http do end end + @impl true + def previous_version(_state), do: {:error, :no_previous_version} + defp response_status(url, status) do if status < 300 do Logger.info("successfully downloaded remote term from #{url} with status #{status}") diff --git a/lib/remote_persistent_term/fetcher/s3.ex b/lib/remote_persistent_term/fetcher/s3.ex index f2d16f2..7d126cc 100644 --- a/lib/remote_persistent_term/fetcher/s3.ex +++ b/lib/remote_persistent_term/fetcher/s3.ex @@ -14,9 +14,10 @@ defmodule RemotePersistentTerm.Fetcher.S3 do bucket: bucket, key: String.t(), region: region, - failover_buckets: [failover_bucket] | nil + failover_buckets: [failover_bucket] | nil, + version_id: String.t() | nil } - defstruct [:bucket, :key, :region, :failover_buckets] + defstruct [:bucket, :key, :region, :failover_buckets, :version_id] @failover_bucket_schema [ bucket: [ @@ -136,9 +137,9 @@ defmodule RemotePersistentTerm.Fetcher.S3 do defp list_object_versions(state) do res = aws_client_request( - &ExAws.S3.get_bucket_object_versions/2, + :get_bucket_object_versions, state, - prefix: state.key + [[prefix: state.key]] ) with {:ok, %{body: %{versions: versions}}} <- res do @@ -147,7 +148,8 @@ defmodule RemotePersistentTerm.Fetcher.S3 do end defp get_object(state) do - aws_client_request(&ExAws.S3.get_object/2, state, state.key) + # aws_client_request(&ExAws.S3.get_object/2, state, state.key) + aws_client_request(:get_object, state, [state.key, [version_id: state.version_id]]) end defp find_latest([_ | _] = contents) do @@ -166,6 +168,51 @@ defmodule RemotePersistentTerm.Fetcher.S3 do defp find_latest(_), do: {:error, :not_found} + @impl true + def previous_version(state) do + Logger.info( + bucket: state.bucket, + key: state.key, + message: "About to fetch previous version of object", + version_id: state.version_id + ) + + with {:ok, versions} <- list_object_versions(state), + {:ok, previous_version} <- find_previous_version(versions, state.version_id) do + {:ok, %{state | version_id: previous_version.version_id}} + else + {:error, reason} -> + Logger.error(%{ + bucket: state.bucket, + key: state.key, + reason: inspect(reason), + message: "Failed to get previous version of object" + }) + + {:error, reason} + end + end + + defp find_previous_version(versions, current_version_id) do + versions + |> Enum.sort_by( + fn version -> + {:ok, datetime, _} = DateTime.from_iso8601(version.last_modified) + datetime + end, + {:desc, DateTime} + ) + |> Enum.find(fn version -> + version.version_id != current_version_id + end) + |> case do + nil -> {:error, :no_previous_version} + version -> {:ok, version} + end + end + + defp aws_client_request(op, state, opts \\ []) + defp aws_client_request(op, %{failover_buckets: nil} = state, opts) do perform_request(op, state.bucket, state.region, opts) end @@ -173,7 +220,7 @@ defmodule RemotePersistentTerm.Fetcher.S3 do defp aws_client_request( op, %{ - failover_buckets: [_|_] = failover_buckets + failover_buckets: [_ | _] = failover_buckets } = state, opts ) do @@ -222,8 +269,8 @@ defmodule RemotePersistentTerm.Fetcher.S3 do end end - defp perform_request(op, bucket, region, opts) do - op.(bucket, opts) + defp perform_request(func, bucket, region, opts) do + apply(ExAws.S3, func, [bucket | opts]) |> client().request(region: region) end diff --git a/lib/remote_persistent_term/fetcher/static.ex b/lib/remote_persistent_term/fetcher/static.ex index 1b8f4fc..3b2531f 100644 --- a/lib/remote_persistent_term/fetcher/static.ex +++ b/lib/remote_persistent_term/fetcher/static.ex @@ -1,6 +1,6 @@ defmodule RemotePersistentTerm.Fetcher.Static do @moduledoc """ - A macro to help define a valid `RemotePersistentTerm.Fetcher` which + A macro to help define a valid `RemotePersistentTerm.Fetcher` which always returns some hardcoded static data. Mostly intended for testing purposes. @@ -26,6 +26,9 @@ defmodule RemotePersistentTerm.Fetcher.Static do @impl true def download(_), do: {:ok, unquote(Macro.escape(Keyword.fetch!(opts, :data)))} + + @impl true + def previous_version(_), do: {:error, :no_previous_version} end end end diff --git a/mix.exs b/mix.exs index e6cbd69..bbab609 100644 --- a/mix.exs +++ b/mix.exs @@ -2,7 +2,7 @@ defmodule RemotePersistentTerm.MixProject do use Mix.Project @name "RemotePersistentTerm" - @version "0.12.0" + @version "0.13.0" @repo_url "https://github.com/AppMonet/remote_persistent_term" def project do diff --git a/test/remote_persistent_term/fetcher/s3_test.exs b/test/remote_persistent_term/fetcher/s3_test.exs index 9e7d34d..b914007 100644 --- a/test/remote_persistent_term/fetcher/s3_test.exs +++ b/test/remote_persistent_term/fetcher/s3_test.exs @@ -253,4 +253,58 @@ defmodule RemotePersistentTerm.Fetcher.S3Test do assert log =~ "Downloaded object from S3" end end + + describe "previous_version/1" do + test "finds the correct previous version when given a current version ID" do + versions = [ + %{ + version_id: "v3", + last_modified: "2025-05-08T09:58:38.000Z", + is_latest: "true" + }, + %{ + version_id: "v2", + last_modified: "2025-04-02T10:21:18.000Z", + is_latest: "false" + }, + %{ + version_id: "v1", + last_modified: "2025-04-02T09:10:37.000Z", + is_latest: "false" + } + ] + + expect(AwsClientMock, :request, fn operation, opts -> + assert operation.bucket == @bucket + assert operation.resource == "versions" + assert operation.params == [prefix: @key] + assert opts == [region: @region] + {:ok, %{body: %{versions: versions}}} + end) + + state = %S3{bucket: @bucket, key: @key, region: @region, version_id: "v3"} + assert {:ok, %{version_id: "v2"}} = S3.previous_version(state) + end + + test "returns error when there are no previous versions" do + versions = [ + %{ + version_id: "v1", + last_modified: "2025-04-02T09:10:37.000Z", + is_latest: "true" + } + ] + + expect(AwsClientMock, :request, fn operation, opts -> + assert operation.bucket == @bucket + assert operation.resource == "versions" + assert operation.params == [prefix: @key] + assert opts == [region: @region] + {:ok, %{body: %{versions: versions}}} + end) + + state = %S3{bucket: @bucket, key: @key, region: @region, version_id: "v1"} + assert {:error, :no_previous_version} = S3.previous_version(state) + end + end end From 7a4bd976bb36a98cd01384d1cb105763b22b582d Mon Sep 17 00:00:00 2001 From: Louis Date: Thu, 8 May 2025 13:26:36 +0200 Subject: [PATCH 2/7] Delete comment --- lib/remote_persistent_term/fetcher/s3.ex | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/remote_persistent_term/fetcher/s3.ex b/lib/remote_persistent_term/fetcher/s3.ex index 7d126cc..3e6ff58 100644 --- a/lib/remote_persistent_term/fetcher/s3.ex +++ b/lib/remote_persistent_term/fetcher/s3.ex @@ -148,7 +148,6 @@ defmodule RemotePersistentTerm.Fetcher.S3 do end defp get_object(state) do - # aws_client_request(&ExAws.S3.get_object/2, state, state.key) aws_client_request(:get_object, state, [state.key, [version_id: state.version_id]]) end From a73a5552b3276a69a4b20942f21f78d682b5115c Mon Sep 17 00:00:00 2001 From: Louis Date: Thu, 8 May 2025 13:31:58 +0200 Subject: [PATCH 3/7] Remove function uneccessary --- lib/remote_persistent_term/fetcher/s3.ex | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/remote_persistent_term/fetcher/s3.ex b/lib/remote_persistent_term/fetcher/s3.ex index 3e6ff58..e8d0080 100644 --- a/lib/remote_persistent_term/fetcher/s3.ex +++ b/lib/remote_persistent_term/fetcher/s3.ex @@ -210,8 +210,6 @@ defmodule RemotePersistentTerm.Fetcher.S3 do end end - defp aws_client_request(op, state, opts \\ []) - defp aws_client_request(op, %{failover_buckets: nil} = state, opts) do perform_request(op, state.bucket, state.region, opts) end From de5797066322c345865b49b5c8097837b56e8c69 Mon Sep 17 00:00:00 2001 From: Louis Date: Thu, 8 May 2025 14:57:36 +0200 Subject: [PATCH 4/7] Small changes from pr comment --- lib/remote_persistent_term.ex | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/remote_persistent_term.ex b/lib/remote_persistent_term.ex index fbca472..71b5467 100644 --- a/lib/remote_persistent_term.ex +++ b/lib/remote_persistent_term.ex @@ -76,7 +76,8 @@ defmodule RemotePersistentTerm do default: false, doc: """ If true, when deserialization fails, the system will attempt to use previous versions \ - of the term until a valid version is found or all versions are exhausted. + of the term until a valid version is found or all versions are exhausted. \ + Only currently supported by the S3 fetcher. """ ] ] @@ -330,6 +331,7 @@ defmodule RemotePersistentTerm do put_fun.(deserialized) {:error, _reason} = error when state.version_fallback? -> + Logger.error("#{state.name} - failed to deserialize remote term, falling back to previous version") case state.fetcher_mod.previous_version(state.fetcher_state) do {:ok, previous_state} -> download_and_store_term( From 55bfd44026836bd49d0e120961f7e8d2e55d1b7a Mon Sep 17 00:00:00 2001 From: Louis Date: Thu, 8 May 2025 14:57:56 +0200 Subject: [PATCH 5/7] Mix format --- lib/remote_persistent_term.ex | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/remote_persistent_term.ex b/lib/remote_persistent_term.ex index 71b5467..81022e5 100644 --- a/lib/remote_persistent_term.ex +++ b/lib/remote_persistent_term.ex @@ -331,7 +331,10 @@ defmodule RemotePersistentTerm do put_fun.(deserialized) {:error, _reason} = error when state.version_fallback? -> - Logger.error("#{state.name} - failed to deserialize remote term, falling back to previous version") + Logger.error( + "#{state.name} - failed to deserialize remote term, falling back to previous version" + ) + case state.fetcher_mod.previous_version(state.fetcher_state) do {:ok, previous_state} -> download_and_store_term( From 412070c3a0262e843811ee245b8e61b00e5487ec Mon Sep 17 00:00:00 2001 From: Louis Date: Thu, 8 May 2025 18:50:51 +0200 Subject: [PATCH 6/7] Cache the versions and sort date chronologically --- lib/remote_persistent_term.ex | 4 ++-- lib/remote_persistent_term/fetcher.ex | 2 +- lib/remote_persistent_term/fetcher/http.ex | 4 ++-- lib/remote_persistent_term/fetcher/s3.ex | 21 ++++++++----------- lib/remote_persistent_term/fetcher/static.ex | 4 ++-- .../fetcher/s3_test.exs | 2 +- 6 files changed, 17 insertions(+), 20 deletions(-) diff --git a/lib/remote_persistent_term.ex b/lib/remote_persistent_term.ex index 81022e5..f7bb58c 100644 --- a/lib/remote_persistent_term.ex +++ b/lib/remote_persistent_term.ex @@ -281,9 +281,9 @@ defmodule RemotePersistentTerm do start_meta, fn -> {status, version} = - with {:ok, current_version} <- state.fetcher_mod.current_version(state.fetcher_state), + with {:ok, current_version, updated_fetcher_state} <- state.fetcher_mod.current_version(state.fetcher_state), true <- state.current_version != current_version, - :ok <- download_and_store_term(state, deserialize_fun, put_fun) do + :ok <- download_and_store_term(%{state | fetcher_state: updated_fetcher_state}, deserialize_fun, put_fun) do {:updated, current_version} else false -> diff --git a/lib/remote_persistent_term/fetcher.ex b/lib/remote_persistent_term/fetcher.ex index 676e13a..29b7d3b 100644 --- a/lib/remote_persistent_term/fetcher.ex +++ b/lib/remote_persistent_term/fetcher.ex @@ -20,7 +20,7 @@ defmodule RemotePersistentTerm.Fetcher do Check the current version of the remote term. Used to avoid downloading the same term multiple times. """ - @callback current_version(state()) :: {:ok, version()} | {:error, term()} + @callback current_version(state()) :: {:ok, version(), state()} | {:error, term()} @doc """ Download the term from the remote source. diff --git a/lib/remote_persistent_term/fetcher/http.ex b/lib/remote_persistent_term/fetcher/http.ex index e94d6ac..a5fe2d5 100644 --- a/lib/remote_persistent_term/fetcher/http.ex +++ b/lib/remote_persistent_term/fetcher/http.ex @@ -66,8 +66,8 @@ defmodule RemotePersistentTerm.Fetcher.Http do end @impl true - def current_version(_state) do - {:ok, DateTime.utc_now() |> DateTime.to_string()} + def current_version(state) do + {:ok, DateTime.utc_now() |> DateTime.to_string(), state} end @impl true diff --git a/lib/remote_persistent_term/fetcher/s3.ex b/lib/remote_persistent_term/fetcher/s3.ex index e8d0080..5661e80 100644 --- a/lib/remote_persistent_term/fetcher/s3.ex +++ b/lib/remote_persistent_term/fetcher/s3.ex @@ -15,9 +15,10 @@ defmodule RemotePersistentTerm.Fetcher.S3 do key: String.t(), region: region, failover_buckets: [failover_bucket] | nil, - version_id: String.t() | nil + version_id: String.t() | nil, + versions: [map()] | nil } - defstruct [:bucket, :key, :region, :failover_buckets, :version_id] + defstruct [:bucket, :key, :region, :failover_buckets, :version_id, :versions] @failover_bucket_schema [ bucket: [ @@ -92,7 +93,7 @@ defmodule RemotePersistentTerm.Fetcher.S3 do message: "Found latest version of object" ) - {:ok, etag} + {:ok, etag, %{state | versions: versions}} else {:error, {:unexpected_response, %{body: reason}}} -> {:error, reason} @@ -176,9 +177,11 @@ defmodule RemotePersistentTerm.Fetcher.S3 do version_id: state.version_id ) - with {:ok, versions} <- list_object_versions(state), + versions = if state.versions, do: {:ok, state.versions}, else: list_object_versions(state) + + with {:ok, versions} <- versions, {:ok, previous_version} <- find_previous_version(versions, state.version_id) do - {:ok, %{state | version_id: previous_version.version_id}} + {:ok, %{state | version_id: previous_version.version_id, versions: versions}} else {:error, reason} -> Logger.error(%{ @@ -194,13 +197,7 @@ defmodule RemotePersistentTerm.Fetcher.S3 do defp find_previous_version(versions, current_version_id) do versions - |> Enum.sort_by( - fn version -> - {:ok, datetime, _} = DateTime.from_iso8601(version.last_modified) - datetime - end, - {:desc, DateTime} - ) + |> Enum.sort_by(& &1.last_modified, :desc) |> Enum.find(fn version -> version.version_id != current_version_id end) diff --git a/lib/remote_persistent_term/fetcher/static.ex b/lib/remote_persistent_term/fetcher/static.ex index 3b2531f..0a8ce9a 100644 --- a/lib/remote_persistent_term/fetcher/static.ex +++ b/lib/remote_persistent_term/fetcher/static.ex @@ -22,10 +22,10 @@ defmodule RemotePersistentTerm.Fetcher.Static do def init(_), do: {:ok, []} @impl true - def current_version(_), do: {:ok, unquote(Keyword.get(opts, :version, "1"))} + def current_version(state), do: {:ok, unquote(Keyword.get(opts, :version, "1")), state} @impl true - def download(_), do: {:ok, unquote(Macro.escape(Keyword.fetch!(opts, :data)))} + def download(state), do: {:ok, unquote(Macro.escape(Keyword.fetch!(opts, :data)))} @impl true def previous_version(_), do: {:error, :no_previous_version} diff --git a/test/remote_persistent_term/fetcher/s3_test.exs b/test/remote_persistent_term/fetcher/s3_test.exs index b914007..a3fee3f 100644 --- a/test/remote_persistent_term/fetcher/s3_test.exs +++ b/test/remote_persistent_term/fetcher/s3_test.exs @@ -99,7 +99,7 @@ defmodule RemotePersistentTerm.Fetcher.S3Test do log = capture_log(fn -> result = S3.current_version(state) - assert {:ok, "current-etag"} = result + assert {:ok, "current-etag", _updated_state} = result end) assert log =~ "bucket: \"#{@bucket}\"" From 0977c2b773efffa1809bf0dde4c3a0e25faaf34b Mon Sep 17 00:00:00 2001 From: Louis Date: Tue, 13 May 2025 16:13:07 +0200 Subject: [PATCH 7/7] Improve readability based on pr comments --- lib/remote_persistent_term.ex | 38 ++++++++++++-------- lib/remote_persistent_term/fetcher/http.ex | 2 +- lib/remote_persistent_term/fetcher/static.ex | 2 +- 3 files changed, 26 insertions(+), 16 deletions(-) diff --git a/lib/remote_persistent_term.ex b/lib/remote_persistent_term.ex index f7bb58c..c028be5 100644 --- a/lib/remote_persistent_term.ex +++ b/lib/remote_persistent_term.ex @@ -281,9 +281,15 @@ defmodule RemotePersistentTerm do start_meta, fn -> {status, version} = - with {:ok, current_version, updated_fetcher_state} <- state.fetcher_mod.current_version(state.fetcher_state), + with {:ok, current_version, updated_fetcher_state} <- + state.fetcher_mod.current_version(state.fetcher_state), true <- state.current_version != current_version, - :ok <- download_and_store_term(%{state | fetcher_state: updated_fetcher_state}, deserialize_fun, put_fun) do + :ok <- + download_and_store_term( + %{state | fetcher_state: updated_fetcher_state}, + deserialize_fun, + put_fun + ) do {:updated, current_version} else false -> @@ -330,28 +336,32 @@ defmodule RemotePersistentTerm do {:ok, deserialized} -> put_fun.(deserialized) - {:error, _reason} = error when state.version_fallback? -> + {:error, _reason} when state.version_fallback? -> Logger.error( "#{state.name} - failed to deserialize remote term, falling back to previous version" ) - case state.fetcher_mod.previous_version(state.fetcher_state) do - {:ok, previous_state} -> - download_and_store_term( - %{state | fetcher_state: previous_state}, - deserialize_fun, - put_fun - ) - - {:error, _} -> - error - end + try_previous_version(state, deserialize_fun, put_fun) error -> error end end + defp try_previous_version(state, deserialize_fun, put_fun) do + case state.fetcher_mod.previous_version(state.fetcher_state) do + {:ok, previous_state} -> + download_and_store_term( + %{state | fetcher_state: previous_state}, + deserialize_fun, + put_fun + ) + + {:error, _} = error -> + error + end + end + defp maybe_decompress(%__MODULE__{auto_decompress?: true}, body) do case body do <<0x1F, 0x8B, _rest::binary>> = gzipped -> diff --git a/lib/remote_persistent_term/fetcher/http.ex b/lib/remote_persistent_term/fetcher/http.ex index a5fe2d5..986ed64 100644 --- a/lib/remote_persistent_term/fetcher/http.ex +++ b/lib/remote_persistent_term/fetcher/http.ex @@ -82,7 +82,7 @@ defmodule RemotePersistentTerm.Fetcher.Http do end @impl true - def previous_version(_state), do: {:error, :no_previous_version} + def previous_version(_state), do: {:error, :not_supported} defp response_status(url, status) do if status < 300 do diff --git a/lib/remote_persistent_term/fetcher/static.ex b/lib/remote_persistent_term/fetcher/static.ex index 0a8ce9a..09743e1 100644 --- a/lib/remote_persistent_term/fetcher/static.ex +++ b/lib/remote_persistent_term/fetcher/static.ex @@ -28,7 +28,7 @@ defmodule RemotePersistentTerm.Fetcher.Static do def download(state), do: {:ok, unquote(Macro.escape(Keyword.fetch!(opts, :data)))} @impl true - def previous_version(_), do: {:error, :no_previous_version} + def previous_version(_), do: {:error, :not_supported} end end end