Skip to content

Commit 0e37920

Browse files
author
morten.lund@maskon.no
committed
Initial suggestion
1 parent d82af23 commit 0e37920

File tree

12 files changed

+333
-16
lines changed

12 files changed

+333
-16
lines changed

lib/data_layer.ex

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,31 @@ defmodule AshPostgres.DataLayer do
252252
]
253253
}
254254

255+
@partitioning %Spark.Dsl.Section{
256+
name: :partitioning,
257+
describe: """
258+
A section for configuring the initial partitioning of the table
259+
""",
260+
examples: [
261+
"""
262+
partitioning do
263+
method :list
264+
attribute :post
265+
end
266+
"""
267+
],
268+
schema: [
269+
method: [
270+
type: {:one_of, [:range, :list, :hash]},
271+
doc: "Specifying what partitioning method to use"
272+
],
273+
attribute: [
274+
type: :atom,
275+
doc: "The attribute to partition on"
276+
]
277+
]
278+
}
279+
255280
@postgres %Spark.Dsl.Section{
256281
name: :postgres,
257282
describe: """
@@ -262,7 +287,8 @@ defmodule AshPostgres.DataLayer do
262287
@custom_statements,
263288
@manage_tenant,
264289
@references,
265-
@check_constraints
290+
@check_constraints,
291+
@partitioning
266292
],
267293
modules: [
268294
:repo

lib/data_layer/info.ex

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,4 +222,14 @@ defmodule AshPostgres.DataLayer.Info do
222222
def manage_tenant_update?(resource) do
223223
Extension.get_opt(resource, [:postgres, :manage_tenant], :update?, false)
224224
end
225+
226+
@doc "Partitioning method"
227+
def partitioning_method(resource) do
228+
Extension.get_opt(resource, [:postgres, :partitioning], :method, nil)
229+
end
230+
231+
@doc "Partitioning attribute"
232+
def partitioning_attribute(resource) do
233+
Extension.get_opt(resource, [:postgres, :partitioning], :attribute, nil)
234+
end
225235
end

lib/migration_generator/migration_generator.ex

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1192,14 +1192,25 @@ defmodule AshPostgres.MigrationGenerator do
11921192

11931193
defp group_into_phases(
11941194
[
1195-
%Operation.CreateTable{table: table, schema: schema, multitenancy: multitenancy} | rest
1195+
%Operation.CreateTable{
1196+
table: table,
1197+
schema: schema,
1198+
multitenancy: multitenancy,
1199+
partitioning: partitioning
1200+
}
1201+
| rest
11961202
],
11971203
nil,
11981204
acc
11991205
) do
12001206
group_into_phases(
12011207
rest,
1202-
%Phase.Create{table: table, schema: schema, multitenancy: multitenancy},
1208+
%Phase.Create{
1209+
table: table,
1210+
schema: schema,
1211+
multitenancy: multitenancy,
1212+
partitioning: partitioning
1213+
},
12031214
acc
12041215
)
12051216
end
@@ -1801,7 +1812,8 @@ defmodule AshPostgres.MigrationGenerator do
18011812
table: snapshot.table,
18021813
schema: snapshot.schema,
18031814
multitenancy: snapshot.multitenancy,
1804-
old_multitenancy: empty_snapshot.multitenancy
1815+
old_multitenancy: empty_snapshot.multitenancy,
1816+
partitioning: snapshot.partitioning
18051817
}
18061818
| acc
18071819
])
@@ -2836,7 +2848,8 @@ defmodule AshPostgres.MigrationGenerator do
28362848
repo: AshPostgres.DataLayer.Info.repo(resource, :mutate),
28372849
multitenancy: multitenancy(resource),
28382850
base_filter: AshPostgres.DataLayer.Info.base_filter_sql(resource),
2839-
has_create_action: has_create_action?(resource)
2851+
has_create_action: has_create_action?(resource),
2852+
partitioning: partitioning(resource)
28402853
}
28412854

28422855
hash =
@@ -2911,6 +2924,20 @@ defmodule AshPostgres.MigrationGenerator do
29112924
end)
29122925
end
29132926

2927+
defp partitioning(resource) do
2928+
method = AshPostgres.DataLayer.Info.partitioning_method(resource)
2929+
attribute = AshPostgres.DataLayer.Info.partitioning_attribute(resource)
2930+
2931+
if not is_nil(method) and not is_nil(attribute) do
2932+
%{
2933+
method: method,
2934+
attribute: attribute
2935+
}
2936+
else
2937+
nil
2938+
end
2939+
end
2940+
29142941
defp multitenancy(resource) do
29152942
strategy = Ash.Resource.Info.multitenancy_strategy(resource)
29162943
attribute = Ash.Resource.Info.multitenancy_attribute(resource)

lib/migration_generator/operation.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ defmodule AshPostgres.MigrationGenerator.Operation do
131131

132132
defmodule CreateTable do
133133
@moduledoc false
134-
defstruct [:table, :schema, :multitenancy, :old_multitenancy]
134+
defstruct [:table, :schema, :multitenancy, :old_multitenancy, :partitioning]
135135
end
136136

137137
defmodule AddAttribute do

lib/migration_generator/phase.ex

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,21 @@ defmodule AshPostgres.MigrationGenerator.Phase do
33

44
defmodule Create do
55
@moduledoc false
6-
defstruct [:table, :schema, :multitenancy, operations: [], commented?: false]
6+
defstruct [:table, :schema, :multitenancy, partitioning: nil, operations: [], commented?: false]
77

88
import AshPostgres.MigrationGenerator.Operation.Helper, only: [as_atom: 1]
99

10-
def up(%{schema: schema, table: table, operations: operations, multitenancy: multitenancy}) do
10+
def up(%{schema: schema, table: table, operations: operations, multitenancy: multitenancy, partitioning: partitioning}) do
1111
if multitenancy.strategy == :context do
12-
"create table(:#{as_atom(table)}, primary_key: false, prefix: prefix()) do\n" <>
12+
arguments = arguments([prefix("prefix()"), options(partitioning: partitioning)])
13+
14+
"create table(:#{as_atom(table)}, primary_key: false#{arguments}) do\n" <>
1315
Enum.map_join(operations, "\n", fn operation -> operation.__struct__.up(operation) end) <>
1416
"\nend"
1517
else
16-
opts =
17-
if schema do
18-
", prefix: \"#{schema}\""
19-
else
20-
""
21-
end
18+
arguments = arguments([prefix(schema), options(partitioning: partitioning)])
2219

23-
"create table(:#{as_atom(table)}, primary_key: false#{opts}) do\n" <>
20+
"create table(:#{as_atom(table)}, primary_key: false#{arguments}) do\n" <>
2421
Enum.map_join(operations, "\n", fn operation -> operation.__struct__.up(operation) end) <>
2522
"\nend"
2623
end
@@ -40,6 +37,27 @@ defmodule AshPostgres.MigrationGenerator.Phase do
4037
"drop table(:#{as_atom(table)}#{opts})"
4138
end
4239
end
40+
41+
def arguments(["",""]), do: ""
42+
def arguments(arguments), do: ", " <> Enum.join(Enum.reject(arguments, &is_nil(&1)), ",")
43+
44+
def prefix(nil), do: nil
45+
def prefix(schema), do: "prefix: #{schema}"
46+
47+
def options(_options, _acc \\ [])
48+
def options([], []), do: ""
49+
def options([], acc), do: "options: \"#{Enum.join(acc, " ")}\""
50+
51+
def options([{:partitioning, %{method: method, attribute: attribute}} | rest], acc) do
52+
option = "PARTITION BY #{String.upcase(Atom.to_string(method))} (#{attribute})"
53+
54+
rest
55+
|> options(acc ++ [option])
56+
end
57+
58+
def options([_| rest], acc) do
59+
options(rest, acc)
60+
end
4361
end
4462

4563
defmodule Alter do

lib/partitioning.ex

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
defmodule AshPostgres.Partitioning do
2+
@moduledoc false
3+
4+
@doc """
5+
Create a new partition for a resource
6+
"""
7+
def create_partition(resource, opts) do
8+
repo = AshPostgres.DataLayer.Info.repo(resource)
9+
10+
resource
11+
|> AshPostgres.DataLayer.Info.partitioning_method()
12+
|> case do
13+
:range ->
14+
create_range_partition(repo, resource, opts)
15+
16+
:list ->
17+
create_list_partition(repo, resource, opts)
18+
19+
:hash ->
20+
create_hash_partition(repo, resource, opts)
21+
22+
unsupported_method ->
23+
raise "Invalid partition method, got: #{unsupported_method}"
24+
end
25+
end
26+
27+
@doc """
28+
Check if partition exists
29+
"""
30+
def exists?(resource, opts) do
31+
repo = AshPostgres.DataLayer.Info.repo(resource)
32+
key = Keyword.fetch!(opts, :key)
33+
table = AshPostgres.DataLayer.Info.table(resource)
34+
partition_name = table <> "_" <> "#{key}"
35+
36+
partition_exists?(repo, resource, partition_name)
37+
end
38+
39+
# TBI
40+
defp create_range_partition(repo, resource, opts) do
41+
end
42+
43+
defp create_list_partition(repo, resource, opts) do
44+
key = Keyword.fetch!(opts, :key)
45+
table = AshPostgres.DataLayer.Info.table(resource)
46+
partition_name = table <> "_" <> "#{key}"
47+
48+
if partition_exists?(repo, resource, partition_name) do
49+
{:error, :allready_exists}
50+
else
51+
Ecto.Adapters.SQL.query(
52+
repo,
53+
"CREATE TABLE #{partition_name} PARTITION OF public.#{table} FOR VALUES IN (#{key})"
54+
)
55+
56+
if partition_exists?(repo, resource, partition_name) do
57+
:ok
58+
else
59+
{:error, "Unable to create partition"}
60+
end
61+
end
62+
end
63+
64+
# TBI
65+
defp create_hash_partition(repo, resource, opts) do
66+
end
67+
68+
defp partition_exists?(repo, resource, parition_name) do
69+
%Postgrex.Result{} =
70+
result =
71+
repo
72+
|> Ecto.Adapters.SQL.query!(
73+
"select table_name from information_schema.tables t where t.table_schema = 'public' and t.table_name = $1",
74+
[parition_name]
75+
)
76+
77+
result.num_rows > 0
78+
end
79+
end
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
{
2+
"attributes": [
3+
{
4+
"allow_nil?": false,
5+
"default": "fragment(\"gen_random_uuid()\")",
6+
"generated?": false,
7+
"primary_key?": true,
8+
"references": null,
9+
"size": null,
10+
"source": "id",
11+
"type": "uuid"
12+
},
13+
{
14+
"allow_nil?": false,
15+
"default": "1",
16+
"generated?": false,
17+
"primary_key?": true,
18+
"references": null,
19+
"size": null,
20+
"source": "key",
21+
"type": "bigint"
22+
}
23+
],
24+
"base_filter": null,
25+
"check_constraints": [],
26+
"custom_indexes": [],
27+
"custom_statements": [],
28+
"has_create_action": false,
29+
"hash": "7FE5D9659135887A47FAE2729CEB0281FA8FF392EDB3B43426EAFD89A1518FEB",
30+
"identities": [],
31+
"multitenancy": {
32+
"attribute": null,
33+
"global": null,
34+
"strategy": null
35+
},
36+
"partitioning": {
37+
"attribute": "key",
38+
"method": "list"
39+
},
40+
"repo": "Elixir.AshPostgres.TestRepo",
41+
"schema": null,
42+
"table": "partitioned_posts"
43+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
defmodule AshPostgres.TestRepo.Migrations.PartitionedPost do
2+
@moduledoc """
3+
Updates resources based on their most recent snapshots.
4+
5+
This file was autogenerated with `mix ash_postgres.generate_migrations`
6+
"""
7+
8+
use Ecto.Migration
9+
10+
def up do
11+
create table(:partitioned_posts, primary_key: false, options: "PARTITION BY LIST (key)") do
12+
add(:id, :uuid, null: false, default: fragment("gen_random_uuid()"), primary_key: true)
13+
add(:key, :bigint, null: false, default: 1, primary_key: true)
14+
end
15+
end
16+
17+
def down do
18+
drop(table(:partitioned_posts))
19+
end
20+
end

test/migration_generator_test.exs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,56 @@ defmodule AshPostgres.MigrationGeneratorTest do
304304
end
305305
end
306306

307+
describe "creating initial snapshots for resources with partitioning" do
308+
setup do
309+
on_exit(fn ->
310+
File.rm_rf!("test_snapshots_path")
311+
File.rm_rf!("test_migration_path")
312+
end)
313+
314+
defposts do
315+
postgres do
316+
partitioning do
317+
method(:list)
318+
attribute(:title)
319+
end
320+
end
321+
322+
attributes do
323+
uuid_primary_key(:id)
324+
attribute(:title, :string, public?: true)
325+
end
326+
end
327+
328+
defdomain([Post])
329+
330+
AshPostgres.MigrationGenerator.generate(Domain,
331+
snapshot_path: "test_snapshots_path",
332+
migration_path: "test_migration_path",
333+
quiet: false,
334+
format: false
335+
)
336+
337+
:ok
338+
end
339+
340+
test "the migration sets up resources correctly" do
341+
# the snapshot exists and contains valid json
342+
assert File.read!(Path.wildcard("test_snapshots_path/test_repo/posts/*.json"))
343+
|> Jason.decode!(keys: :atoms!)
344+
345+
assert [file] =
346+
Path.wildcard("test_migration_path/**/*_migrate_resources*.exs")
347+
|> Enum.reject(&String.contains?(&1, "extensions"))
348+
349+
file_contents = File.read!(file)
350+
351+
# the migration creates the table with options specifing how to partition the table
352+
assert file_contents =~
353+
~S{create table(:posts, primary_key: false, options: "PARTITION BY LIST (title)") do}
354+
end
355+
end
356+
307357
describe "custom_indexes with `concurrently: true`" do
308358
setup do
309359
on_exit(fn ->

0 commit comments

Comments
 (0)