diff --git a/lib/remote_persistent_term.ex b/lib/remote_persistent_term.ex index 55e4534..c028be5 100644 --- a/lib/remote_persistent_term.ex +++ b/lib/remote_persistent_term.ex @@ -69,6 +69,16 @@ defmodule RemotePersistentTerm do Currently only supports gzip (0x1F, 0x8B). """ + ], + version_fallback?: [ + type: :boolean, + required: false, + default: false, + doc: """ + If true, when deserialization fails, the system will attempt to use previous versions \ + of the term until a valid version is found or all versions are exhausted. \ + Only currently supported by the S3 fetcher. + """ ] ] @@ -77,7 +87,8 @@ defmodule RemotePersistentTerm do fetcher_state: term(), refresh_interval: pos_integer(), current_version: String.t(), - auto_decompress?: boolean() + auto_decompress?: boolean(), + version_fallback?: boolean() } defstruct [ :fetcher_mod, @@ -85,7 +96,8 @@ defmodule RemotePersistentTerm do :refresh_interval, :current_version, :name, - :auto_decompress? + :auto_decompress?, + :version_fallback? ] @doc """ @@ -138,7 +150,8 @@ defmodule RemotePersistentTerm do fetcher_state: fetcher_state, refresh_interval: opts[:refresh_interval], name: name(opts), - auto_decompress?: opts[:auto_decompress?] + auto_decompress?: opts[:auto_decompress?], + version_fallback?: opts[:version_fallback?] } if opts[:lazy_init?] do @@ -268,9 +281,15 @@ defmodule RemotePersistentTerm do start_meta, fn -> {status, version} = - with {:ok, current_version} <- state.fetcher_mod.current_version(state.fetcher_state), + with {:ok, current_version, updated_fetcher_state} <- + state.fetcher_mod.current_version(state.fetcher_state), true <- state.current_version != current_version, - :ok <- download_and_store_term(state, deserialize_fun, put_fun) do + :ok <- + download_and_store_term( + %{state | fetcher_state: updated_fetcher_state}, + deserialize_fun, + put_fun + ) do {:updated, current_version} else false -> @@ -307,9 +326,39 @@ defmodule RemotePersistentTerm do defp download_and_store_term(state, deserialize_fun, put_fun) do with {:ok, term} <- state.fetcher_mod.download(state.fetcher_state), - {:ok, decompressed} <- maybe_decompress(state, term), - {:ok, deserialized} <- deserialize_fun.(decompressed) do - put_fun.(deserialized) + {:ok, decompressed} <- maybe_decompress(state, term) do + try_deserialize_and_store(state, decompressed, deserialize_fun, put_fun) + end + end + + defp try_deserialize_and_store(state, term, deserialize_fun, put_fun) do + case deserialize_fun.(term) do + {:ok, deserialized} -> + put_fun.(deserialized) + + {:error, _reason} when state.version_fallback? -> + Logger.error( + "#{state.name} - failed to deserialize remote term, falling back to previous version" + ) + + try_previous_version(state, deserialize_fun, put_fun) + + error -> + error + end + end + + defp try_previous_version(state, deserialize_fun, put_fun) do + case state.fetcher_mod.previous_version(state.fetcher_state) do + {:ok, previous_state} -> + download_and_store_term( + %{state | fetcher_state: previous_state}, + deserialize_fun, + put_fun + ) + + {:error, _} = error -> + error end end diff --git a/lib/remote_persistent_term/fetcher.ex b/lib/remote_persistent_term/fetcher.ex index 30933a5..29b7d3b 100644 --- a/lib/remote_persistent_term/fetcher.ex +++ b/lib/remote_persistent_term/fetcher.ex @@ -20,10 +20,16 @@ defmodule RemotePersistentTerm.Fetcher do Check the current version of the remote term. Used to avoid downloading the same term multiple times. """ - @callback current_version(state()) :: {:ok, version()} | {:error, term()} + @callback current_version(state()) :: {:ok, version(), state()} | {:error, term()} @doc """ Download the term from the remote source. """ @callback download(state()) :: {:ok, term()} | {:error, term()} + + @doc """ + Get the previous version of the remote term. + Returns a new state that can be used to fetch the previous version. + """ + @callback previous_version(state()) :: {:ok, state()} | {:error, term()} end diff --git a/lib/remote_persistent_term/fetcher/http.ex b/lib/remote_persistent_term/fetcher/http.ex index 1facdc9..986ed64 100644 --- a/lib/remote_persistent_term/fetcher/http.ex +++ b/lib/remote_persistent_term/fetcher/http.ex @@ -66,8 +66,8 @@ defmodule RemotePersistentTerm.Fetcher.Http do end @impl true - def current_version(_state) do - {:ok, DateTime.utc_now() |> DateTime.to_string()} + def current_version(state) do + {:ok, DateTime.utc_now() |> DateTime.to_string(), state} end @impl true @@ -81,6 +81,9 @@ defmodule RemotePersistentTerm.Fetcher.Http do end end + @impl true + def previous_version(_state), do: {:error, :not_supported} + defp response_status(url, status) do if status < 300 do Logger.info("successfully downloaded remote term from #{url} with status #{status}") diff --git a/lib/remote_persistent_term/fetcher/s3.ex b/lib/remote_persistent_term/fetcher/s3.ex index f2d16f2..5661e80 100644 --- a/lib/remote_persistent_term/fetcher/s3.ex +++ b/lib/remote_persistent_term/fetcher/s3.ex @@ -14,9 +14,11 @@ defmodule RemotePersistentTerm.Fetcher.S3 do bucket: bucket, key: String.t(), region: region, - failover_buckets: [failover_bucket] | nil + failover_buckets: [failover_bucket] | nil, + version_id: String.t() | nil, + versions: [map()] | nil } - defstruct [:bucket, :key, :region, :failover_buckets] + defstruct [:bucket, :key, :region, :failover_buckets, :version_id, :versions] @failover_bucket_schema [ bucket: [ @@ -91,7 +93,7 @@ defmodule RemotePersistentTerm.Fetcher.S3 do message: "Found latest version of object" ) - {:ok, etag} + {:ok, etag, %{state | versions: versions}} else {:error, {:unexpected_response, %{body: reason}}} -> {:error, reason} @@ -136,9 +138,9 @@ defmodule RemotePersistentTerm.Fetcher.S3 do defp list_object_versions(state) do res = aws_client_request( - &ExAws.S3.get_bucket_object_versions/2, + :get_bucket_object_versions, state, - prefix: state.key + [[prefix: state.key]] ) with {:ok, %{body: %{versions: versions}}} <- res do @@ -147,7 +149,7 @@ defmodule RemotePersistentTerm.Fetcher.S3 do end defp get_object(state) do - aws_client_request(&ExAws.S3.get_object/2, state, state.key) + aws_client_request(:get_object, state, [state.key, [version_id: state.version_id]]) end defp find_latest([_ | _] = contents) do @@ -166,6 +168,45 @@ defmodule RemotePersistentTerm.Fetcher.S3 do defp find_latest(_), do: {:error, :not_found} + @impl true + def previous_version(state) do + Logger.info( + bucket: state.bucket, + key: state.key, + message: "About to fetch previous version of object", + version_id: state.version_id + ) + + versions = if state.versions, do: {:ok, state.versions}, else: list_object_versions(state) + + with {:ok, versions} <- versions, + {:ok, previous_version} <- find_previous_version(versions, state.version_id) do + {:ok, %{state | version_id: previous_version.version_id, versions: versions}} + else + {:error, reason} -> + Logger.error(%{ + bucket: state.bucket, + key: state.key, + reason: inspect(reason), + message: "Failed to get previous version of object" + }) + + {:error, reason} + end + end + + defp find_previous_version(versions, current_version_id) do + versions + |> Enum.sort_by(& &1.last_modified, :desc) + |> Enum.find(fn version -> + version.version_id != current_version_id + end) + |> case do + nil -> {:error, :no_previous_version} + version -> {:ok, version} + end + end + defp aws_client_request(op, %{failover_buckets: nil} = state, opts) do perform_request(op, state.bucket, state.region, opts) end @@ -173,7 +214,7 @@ defmodule RemotePersistentTerm.Fetcher.S3 do defp aws_client_request( op, %{ - failover_buckets: [_|_] = failover_buckets + failover_buckets: [_ | _] = failover_buckets } = state, opts ) do @@ -222,8 +263,8 @@ defmodule RemotePersistentTerm.Fetcher.S3 do end end - defp perform_request(op, bucket, region, opts) do - op.(bucket, opts) + defp perform_request(func, bucket, region, opts) do + apply(ExAws.S3, func, [bucket | opts]) |> client().request(region: region) end diff --git a/lib/remote_persistent_term/fetcher/static.ex b/lib/remote_persistent_term/fetcher/static.ex index 1b8f4fc..09743e1 100644 --- a/lib/remote_persistent_term/fetcher/static.ex +++ b/lib/remote_persistent_term/fetcher/static.ex @@ -1,6 +1,6 @@ defmodule RemotePersistentTerm.Fetcher.Static do @moduledoc """ - A macro to help define a valid `RemotePersistentTerm.Fetcher` which + A macro to help define a valid `RemotePersistentTerm.Fetcher` which always returns some hardcoded static data. Mostly intended for testing purposes. @@ -22,10 +22,13 @@ defmodule RemotePersistentTerm.Fetcher.Static do def init(_), do: {:ok, []} @impl true - def current_version(_), do: {:ok, unquote(Keyword.get(opts, :version, "1"))} + def current_version(state), do: {:ok, unquote(Keyword.get(opts, :version, "1")), state} @impl true - def download(_), do: {:ok, unquote(Macro.escape(Keyword.fetch!(opts, :data)))} + def download(state), do: {:ok, unquote(Macro.escape(Keyword.fetch!(opts, :data)))} + + @impl true + def previous_version(_), do: {:error, :not_supported} end end end diff --git a/mix.exs b/mix.exs index e6cbd69..bbab609 100644 --- a/mix.exs +++ b/mix.exs @@ -2,7 +2,7 @@ defmodule RemotePersistentTerm.MixProject do use Mix.Project @name "RemotePersistentTerm" - @version "0.12.0" + @version "0.13.0" @repo_url "https://github.com/AppMonet/remote_persistent_term" def project do diff --git a/test/remote_persistent_term/fetcher/s3_test.exs b/test/remote_persistent_term/fetcher/s3_test.exs index 9e7d34d..a3fee3f 100644 --- a/test/remote_persistent_term/fetcher/s3_test.exs +++ b/test/remote_persistent_term/fetcher/s3_test.exs @@ -99,7 +99,7 @@ defmodule RemotePersistentTerm.Fetcher.S3Test do log = capture_log(fn -> result = S3.current_version(state) - assert {:ok, "current-etag"} = result + assert {:ok, "current-etag", _updated_state} = result end) assert log =~ "bucket: \"#{@bucket}\"" @@ -253,4 +253,58 @@ defmodule RemotePersistentTerm.Fetcher.S3Test do assert log =~ "Downloaded object from S3" end end + + describe "previous_version/1" do + test "finds the correct previous version when given a current version ID" do + versions = [ + %{ + version_id: "v3", + last_modified: "2025-05-08T09:58:38.000Z", + is_latest: "true" + }, + %{ + version_id: "v2", + last_modified: "2025-04-02T10:21:18.000Z", + is_latest: "false" + }, + %{ + version_id: "v1", + last_modified: "2025-04-02T09:10:37.000Z", + is_latest: "false" + } + ] + + expect(AwsClientMock, :request, fn operation, opts -> + assert operation.bucket == @bucket + assert operation.resource == "versions" + assert operation.params == [prefix: @key] + assert opts == [region: @region] + {:ok, %{body: %{versions: versions}}} + end) + + state = %S3{bucket: @bucket, key: @key, region: @region, version_id: "v3"} + assert {:ok, %{version_id: "v2"}} = S3.previous_version(state) + end + + test "returns error when there are no previous versions" do + versions = [ + %{ + version_id: "v1", + last_modified: "2025-04-02T09:10:37.000Z", + is_latest: "true" + } + ] + + expect(AwsClientMock, :request, fn operation, opts -> + assert operation.bucket == @bucket + assert operation.resource == "versions" + assert operation.params == [prefix: @key] + assert opts == [region: @region] + {:ok, %{body: %{versions: versions}}} + end) + + state = %S3{bucket: @bucket, key: @key, region: @region, version_id: "v1"} + assert {:error, :no_previous_version} = S3.previous_version(state) + end + end end