Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 81 additions & 4 deletions integration_test/myxql/upsert_all_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,87 @@ defmodule Ecto.Integration.UpsertAllTest do

test "on conflict ignore" do
post = [title: "first", uuid: "6fa459ea-ee8a-3ca4-894e-db77e160355e"]
assert TestRepo.insert_all(Post, [post], on_conflict: :nothing) ==
{1, nil}
assert TestRepo.insert_all(Post, [post], on_conflict: :nothing) ==
{1, nil}
# Default :nothing behavior uses ON DUPLICATE KEY UPDATE x = x workaround
# which always reports rows as affected
assert TestRepo.insert_all(Post, [post], on_conflict: :nothing) == {1, nil}
assert TestRepo.insert_all(Post, [post], on_conflict: :nothing) == {1, nil}
end

test "insert_mode: :ignore_errors" do
post = [title: "first", uuid: "6fa459ea-ee8a-3ca4-894e-db77e160355e"]
# First insert succeeds - 1 row inserted
assert TestRepo.insert_all(Post, [post],
on_conflict: :nothing,
insert_mode: :ignore_errors
) == {1, nil}

# Second insert is ignored due to duplicate - 0 rows inserted (INSERT IGNORE behavior)
assert TestRepo.insert_all(Post, [post],
on_conflict: :nothing,
insert_mode: :ignore_errors
) == {0, nil}
end

test "insert_mode: :ignore_errors with mixed records (some conflicts, some new)" do
# Insert an existing post
existing_uuid = "6fa459ea-ee8a-3ca4-894e-db77e160355e"
existing_post = [title: "existing", uuid: existing_uuid]

assert TestRepo.insert_all(Post, [existing_post],
on_conflict: :nothing,
insert_mode: :ignore_errors
) == {1, nil}

# Now insert a batch with one duplicate and two new records
new_uuid1 = "7fa459ea-ee8a-3ca4-894e-db77e160355f"
new_uuid2 = "8fa459ea-ee8a-3ca4-894e-db77e160355a"

posts = [
[title: "new post 1", uuid: new_uuid1],
[title: "duplicate", uuid: existing_uuid],
[title: "new post 2", uuid: new_uuid2]
]

# With INSERT IGNORE, only 2 rows should be inserted (the non-duplicates)
assert TestRepo.insert_all(Post, posts,
on_conflict: :nothing,
insert_mode: :ignore_errors
) == {2, nil}

# Verify the data - should have 3 posts total (1 existing + 2 new)
assert length(TestRepo.all(Post)) == 3

# Verify the existing post was not modified
[original] = TestRepo.all(from p in Post, where: p.uuid == ^existing_uuid)
assert original.title == "existing"

# Verify new posts were inserted
assert TestRepo.exists?(from p in Post, where: p.uuid == ^new_uuid1)
assert TestRepo.exists?(from p in Post, where: p.uuid == ^new_uuid2)
end

test "insert_mode: :ignore_errors with all duplicates" do
# Insert initial posts
uuid1 = "1fa459ea-ee8a-3ca4-894e-db77e160355e"
uuid2 = "2fa459ea-ee8a-3ca4-894e-db77e160355e"
initial_posts = [[title: "first", uuid: uuid1], [title: "second", uuid: uuid2]]

assert TestRepo.insert_all(Post, initial_posts,
on_conflict: :nothing,
insert_mode: :ignore_errors
) == {2, nil}

# Try to insert all duplicates
duplicate_posts = [[title: "dup1", uuid: uuid1], [title: "dup2", uuid: uuid2]]

# All are duplicates, so 0 rows inserted
assert TestRepo.insert_all(Post, duplicate_posts,
on_conflict: :nothing,
insert_mode: :ignore_errors
) == {0, nil}

# Verify count unchanged
assert length(TestRepo.all(Post)) == 2
end

test "on conflict keyword list" do
Expand Down
36 changes: 32 additions & 4 deletions lib/ecto/adapters/myxql.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,25 @@ defmodule Ecto.Adapters.MyXQL do
automatically commits after some commands like CREATE TABLE.
Therefore MySQL migrations does not run inside transactions.

### Upserts

When using `on_conflict: :nothing`, the adapter uses the
`ON DUPLICATE KEY UPDATE x = x` workaround to simulate "do nothing"
behavior. This always reports 1 affected row regardless of whether
the row was actually inserted or ignored.

If you need accurate row counts (0 when ignored, 1 when inserted),
you can opt into MySQL's `INSERT IGNORE` by specifying:

Repo.insert_all(Post, posts,
on_conflict: :nothing,
insert_mode: :ignore_errors)

Note that `INSERT IGNORE` has broader semantics in MySQL - it also
ignores certain type conversion errors, not just duplicate key conflicts.
The `insert_mode: :ignore_errors` option only affects the behavior of
`on_conflict: :nothing`.
Comment on lines +122 to +123
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be a little bit misleading. It can effect the behaviour of any insert query because we are injecting it without looking at what conflict option was chosen.

So maybe rewriting this to say it changes the default sql for :nothing would be clearer. And probably having a specific example showing it with the option on and off to really drive the point home.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wait you are checking the conflict option when injecting it. Then I think there is a separate question if we should do that. Because IGNORE is an independent modifier in MySQL. We should probably allow it to be added whenever the user wants or raise if we want to only allow this one use case.

Personally I like the former. What do you think @josevalim ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. We should probably have a section exclusively on the :insert_mode option and in there say it affects the behaviour of on_conflct: :nothing as well, as the mean feature here is no longer the upsert itself.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably allow it to be added whenever the user wants or raise if we want to only allow this one use case.

Yes, exactly. We should allow insert_mode everywhere, regardless of :on_conflict, and then document that it affects the on_conflict behaviour too.


## Old MySQL versions

### JSON support
Expand Down Expand Up @@ -319,7 +338,10 @@ defmodule Ecto.Adapters.MyXQL do

key = primary_key!(schema_meta, returning)
{fields, values} = :lists.unzip(params)
sql = @conn.insert(prefix, source, fields, [fields], on_conflict, [], [])

# Extract insert_mode and pass it to the connection's insert function
insert_opts = if opts[:insert_mode], do: [insert_mode: opts[:insert_mode]], else: []
sql = @conn.insert(prefix, source, fields, [fields], on_conflict, [], [], insert_opts)

opts =
if is_nil(Keyword.get(opts, :cache_statement)) do
Expand All @@ -330,9 +352,15 @@ defmodule Ecto.Adapters.MyXQL do

case Ecto.Adapters.SQL.query(adapter_meta, sql, values ++ query_params, opts) do
{:ok, %{num_rows: 0}} ->
raise "insert operation failed to insert any row in the database. " <>
"This may happen if you have trigger or other database conditions rejecting operations. " <>
"The emitted SQL was: #{sql}"
# With INSERT IGNORE (insert_mode: :ignore_errors), 0 rows means the row
# was ignored due to a conflict, which is expected behavior
if opts[:insert_mode] == :ignore_errors do
{:ok, []}
else
raise "insert operation failed to insert any row in the database. " <>
"This may happen if you have trigger or other database conditions rejecting operations. " <>
"The emitted SQL was: #{sql}"
end

# We were used to check if num_rows was 1 or 2 (in case of upserts)
# but MariaDB supports tables with System Versioning, and in those
Expand Down
46 changes: 33 additions & 13 deletions lib/ecto/adapters/myxql/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -179,41 +179,61 @@ if Code.ensure_loaded?(MyXQL) do
end

@impl true
def insert(prefix, table, header, rows, on_conflict, [], []) do
def insert(prefix, table, header, rows, on_conflict, returning, placeholders, opts \\ [])

def insert(prefix, table, header, rows, on_conflict, [], [], opts) do
fields = quote_names(header)
insert_keyword = insert_keyword(on_conflict, opts)

[
"INSERT INTO ",
insert_keyword,
quote_table(prefix, table),
" (",
fields,
") ",
insert_all(rows) | on_conflict(on_conflict, header)
insert_all(rows) | on_conflict(on_conflict, header, opts)
]
end

def insert(_prefix, _table, _header, _rows, _on_conflict, _returning, []) do
def insert(_prefix, _table, _header, _rows, _on_conflict, _returning, [], _opts) do
error!(nil, ":returning is not supported in insert/insert_all by MySQL")
end

def insert(_prefix, _table, _header, _rows, _on_conflict, _returning, _placeholders) do
def insert(_prefix, _table, _header, _rows, _on_conflict, _returning, _placeholders, _opts) do
error!(nil, ":placeholders is not supported by MySQL")
end

defp on_conflict({_, _, [_ | _]}, _header) do
# INSERT IGNORE when insert_mode: :ignore_errors is passed
defp insert_keyword({:nothing, _, _}, opts) do
if Keyword.get(opts, :insert_mode) == :ignore_errors do
"INSERT IGNORE INTO "
else
"INSERT INTO "
end
end

defp insert_keyword(_, _opts), do: "INSERT INTO "

defp on_conflict({_, _, [_ | _]}, _header, _opts) do
error!(nil, ":conflict_target is not supported in insert/insert_all by MySQL")
end

defp on_conflict({:raise, _, []}, _header) do
defp on_conflict({:raise, _, []}, _header, _opts) do
[]
end

defp on_conflict({:nothing, _, []}, [field | _]) do
quoted = quote_name(field)
[" ON DUPLICATE KEY UPDATE ", quoted, " = " | quoted]
# With insert_mode: :ignore_errors, INSERT IGNORE handles conflicts - no ON DUPLICATE KEY needed
defp on_conflict({:nothing, _, []}, [field | _], opts) do
if Keyword.get(opts, :insert_mode) == :ignore_errors do
[]
else
# Default :nothing - uses workaround to simulate "do nothing" behavior
quoted = quote_name(field)
[" ON DUPLICATE KEY UPDATE ", quoted, " = " | quoted]
end
end

defp on_conflict({fields, _, []}, _header) when is_list(fields) do
defp on_conflict({fields, _, []}, _header, _opts) when is_list(fields) do
[
" ON DUPLICATE KEY UPDATE "
| Enum.map_intersperse(fields, ?,, fn field ->
Expand All @@ -223,11 +243,11 @@ if Code.ensure_loaded?(MyXQL) do
]
end

defp on_conflict({%{wheres: []} = query, _, []}, _header) do
defp on_conflict({%{wheres: []} = query, _, []}, _header, _opts) do
[" ON DUPLICATE KEY " | update_all(query, "UPDATE ")]
end

defp on_conflict({_query, _, []}, _header) do
defp on_conflict({_query, _, []}, _header, _opts) do
error!(
nil,
"Using a query with :where in combination with the :on_conflict option is not supported by MySQL"
Expand Down
2 changes: 1 addition & 1 deletion lib/ecto/adapters/postgres/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ if Code.ensure_loaded?(Postgrex) do
end

@impl true
def insert(prefix, table, header, rows, on_conflict, returning, placeholders) do
def insert(prefix, table, header, rows, on_conflict, returning, placeholders, _opts \\ []) do
counter_offset = length(placeholders) + 1

values =
Expand Down
2 changes: 1 addition & 1 deletion lib/ecto/adapters/sql.ex
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,7 @@ defmodule Ecto.Adapters.SQL do
rows -> unzip_inserts(header, rows)
end

sql = conn.insert(prefix, source, header, rows, on_conflict, returning, placeholders)
sql = conn.insert(prefix, source, header, rows, on_conflict, returning, placeholders, opts)

opts =
if is_nil(Keyword.get(opts, :cache_statement)) do
Expand Down
3 changes: 2 additions & 1 deletion lib/ecto/adapters/sql/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ defmodule Ecto.Adapters.SQL.Connection do
rows :: [[atom | nil]],
on_conflict :: Ecto.Adapter.Schema.on_conflict(),
returning :: [atom],
placeholders :: [term]
placeholders :: [term],
opts :: Keyword.t()
) :: iodata

@doc """
Expand Down
2 changes: 1 addition & 1 deletion lib/ecto/adapters/tds/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ if Code.ensure_loaded?(Tds) do
end

@impl true
def insert(prefix, table, header, rows, on_conflict, returning, placeholders) do
def insert(prefix, table, header, rows, on_conflict, returning, placeholders, _opts \\ []) do
counter_offset = length(placeholders) + 1
[] = on_conflict(on_conflict, header)
returning = returning(returning, "INSERTED")
Expand Down
21 changes: 19 additions & 2 deletions test/ecto/adapters/myxql_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ defmodule Ecto.Adapters.MyXQLTest do
defp delete_all(query), do: query |> SQL.delete_all() |> IO.iodata_to_binary()
defp execute_ddl(query), do: query |> SQL.execute_ddl() |> Enum.map(&IO.iodata_to_binary/1)

defp insert(prefx, table, header, rows, on_conflict, returning) do
IO.iodata_to_binary(SQL.insert(prefx, table, header, rows, on_conflict, returning, []))
defp insert(prefx, table, header, rows, on_conflict, returning, opts \\ []) do
IO.iodata_to_binary(SQL.insert(prefx, table, header, rows, on_conflict, returning, [], opts))
end

defp update(prefx, table, fields, filter, returning) do
Expand Down Expand Up @@ -1466,6 +1466,7 @@ defmodule Ecto.Adapters.MyXQLTest do
end

test "insert with on duplicate key" do
# Default :nothing uses ON DUPLICATE KEY UPDATE workaround
query = insert(nil, "schema", [:x, :y], [[:x, :y]], {:nothing, [], []}, [])

assert query ==
Expand Down Expand Up @@ -1499,6 +1500,22 @@ defmodule Ecto.Adapters.MyXQLTest do
end
end

test "insert with insert_mode: :ignore_errors" do
# INSERT IGNORE via insert_mode: :ignore_errors option
query =
insert(
nil,
"schema",
[:x, :y],
[[:x, :y]],
{:nothing, [], []},
[],
insert_mode: :ignore_errors
)

assert query == ~s{INSERT IGNORE INTO `schema` (`x`,`y`) VALUES (?,?)}
end

test "insert with query" do
select_query = from("schema", select: [:id]) |> plan(:all)

Expand Down