Skip to content
Merged
51 changes: 49 additions & 2 deletions src/tower/_dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import shlex
import tempfile
from collections.abc import Iterable as IterableABC
from contextlib import contextmanager
from dataclasses import dataclass, field
from pathlib import Path
Expand Down Expand Up @@ -38,6 +39,24 @@ def to_arg_list(self) -> list[str]:
DbtCommand("build"),
)

# Commands that support --select flag
# See: https://docs.getdbt.com/reference/node-selection/syntax
COMMANDS_WITH_SELECT: frozenset[str] = frozenset(
{
"run",
"test",
"build",
"compile",
"seed",
"snapshot",
"docs",
"list",
"ls",
"show",
"source",
}
)


def parse_command_plan(raw: str | None) -> tuple[DbtCommand, ...]:
"""
Expand Down Expand Up @@ -182,8 +201,10 @@ def run_dbt_workflow(config: DbtRunnerConfig) -> list[object]:
for command in config.commands:
args = command.to_arg_list()

if config.selector and not (
_has_flag(args, "--select") or _has_flag(args, "-s")
if (
config.selector
and command.name in COMMANDS_WITH_SELECT
and not (_has_flag(args, "--select") or _has_flag(args, "-s"))
):
args.extend(["--select", config.selector])

Expand Down Expand Up @@ -220,8 +241,34 @@ def run_dbt_workflow(config: DbtRunnerConfig) -> list[object]:


def _log_run_results(log: logging.Logger, entries: Iterable[object] | None) -> None:
"""Log individual model/test results from dbt commands that produce them.

Based on dbt-core's return types (see dbt.cli.main.dbtRunnerResult):

Commands returning RunExecutionResult (iterable, has node-level results):
- build, compile, run, seed, snapshot, test, run-operation

Commands returning non-iterable results:
- docs generate → CatalogArtifact
- parse → Manifest
- list/ls → List[str] (iterable but no node results)
- debug → bool
- clean, deps, init, docs serve → None

This function logs node-level results when available (RunExecutionResult).
For other return types, dbt's own logging is sufficient.
"""
if not entries:
return

if not isinstance(entries, IterableABC) or isinstance(entries, (str, bytes)):
result_type = type(entries).__name__
log.debug(
"Command returned %s (not iterable node results), skipping detailed logging",
result_type,
)
return

for entry in entries:
node = getattr(entry, "node", None)
status = getattr(entry, "status", None)
Expand Down
148 changes: 142 additions & 6 deletions tests/tower/test_dbt.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
import os
import pytest
import tempfile
from pathlib import Path
from unittest.mock import patch, MagicMock
from unittest.mock import MagicMock, patch

import pytest

from tower._dbt import (
dbt,
COMMANDS_WITH_SELECT,
DEFAULT_COMMAND_PLAN,
DbtCommand,
DbtRunnerConfig,
DbtWorkflow,
parse_command_plan,
dbt,
load_profile_from_env,
parse_command_plan,
run_dbt_workflow,
DEFAULT_COMMAND_PLAN,
)


Expand Down Expand Up @@ -335,7 +337,7 @@ def test_run_workflow_success(
def test_run_workflow_with_selector(
self, temp_dbt_project, sample_profile, mock_dbt_runner
):
"""Test workflow with selector adds --select flag."""
"""Test workflow with selector adds --select flag to commands that support it."""
with patch("tower._dbt.dbtRunner", return_value=mock_dbt_runner):
config = DbtRunnerConfig(
project_path=temp_dbt_project,
Expand All @@ -350,6 +352,140 @@ def test_run_workflow_with_selector(
assert "--select" in call_args[0][0]
assert "tag:daily" in call_args[0][0]

def test_run_workflow_selector_not_added_to_unsupported_commands(
self, temp_dbt_project, sample_profile, mock_dbt_runner
):
"""Test workflow with selector does NOT add --select to commands that don't support it."""
with patch("tower._dbt.dbtRunner", return_value=mock_dbt_runner):
config = DbtRunnerConfig(
project_path=temp_dbt_project,
profile_payload=sample_profile,
commands=(DbtCommand("deps"),),
selector="tag:daily",
)
run_dbt_workflow(config)

# Verify --select was NOT added for deps command
call_args = mock_dbt_runner.invoke.call_args
assert "--select" not in call_args[0][0]

def test_run_workflow_selector_not_added_if_already_present(
self, temp_dbt_project, sample_profile, mock_dbt_runner
):
"""Test workflow doesn't add --select if it's already in command args."""
with patch("tower._dbt.dbtRunner", return_value=mock_dbt_runner):
config = DbtRunnerConfig(
project_path=temp_dbt_project,
profile_payload=sample_profile,
commands=(DbtCommand("build", ("--select", "models/")),),
selector="tag:daily",
)
run_dbt_workflow(config)

# Verify only one --select flag is present (from command args)
call_args = mock_dbt_runner.invoke.call_args[0][0]
select_count = call_args.count("--select")
assert select_count == 1
assert "models/" in call_args
assert "tag:daily" not in call_args

def test_run_workflow_with_multiple_commands_mixed_select_support(
self, temp_dbt_project, sample_profile, mock_dbt_runner
):
"""Test workflow with multiple commands, some supporting --select and some not."""
with patch("tower._dbt.dbtRunner", return_value=mock_dbt_runner):
config = DbtRunnerConfig(
project_path=temp_dbt_project,
profile_payload=sample_profile,
commands=(
DbtCommand("deps"),
DbtCommand("build"),
DbtCommand("docs", ("generate",)),
),
selector="tag:daily",
)
run_dbt_workflow(config)

# Check all three invocations
assert mock_dbt_runner.invoke.call_count == 3

# First call (deps) should NOT have --select
deps_args = mock_dbt_runner.invoke.call_args_list[0][0][0]
assert "--select" not in deps_args

# Second call (build) should have --select
build_args = mock_dbt_runner.invoke.call_args_list[1][0][0]
assert "--select" in build_args
assert "tag:daily" in build_args

# Third call (docs generate) - docs is in COMMANDS_WITH_SELECT
# so it will have --select (even though docs generate may not need it)
docs_args = mock_dbt_runner.invoke.call_args_list[2][0][0]
assert "--select" in docs_args
assert "tag:daily" in docs_args
assert "generate" in docs_args

@pytest.mark.parametrize(
"command_name",
[
"run",
"test",
"build",
"compile",
"seed",
"snapshot",
"docs",
"list",
"ls",
"show",
"source",
],
)
def test_commands_with_select_support(
self, temp_dbt_project, sample_profile, mock_dbt_runner, command_name
):
"""Test that all commands in COMMANDS_WITH_SELECT get --select flag when selector is provided."""
# Verify the command is actually in COMMANDS_WITH_SELECT
assert command_name in COMMANDS_WITH_SELECT

with patch("tower._dbt.dbtRunner", return_value=mock_dbt_runner):
config = DbtRunnerConfig(
project_path=temp_dbt_project,
profile_payload=sample_profile,
commands=(DbtCommand(command_name),),
selector="tag:daily",
)
run_dbt_workflow(config)

# Verify --select was added
call_args = mock_dbt_runner.invoke.call_args[0][0]
assert "--select" in call_args
assert "tag:daily" in call_args

@pytest.mark.parametrize(
"command_name",
["deps", "clean", "debug", "init"],
)
def test_commands_without_select_support(
self, temp_dbt_project, sample_profile, mock_dbt_runner, command_name
):
"""Test that commands not in COMMANDS_WITH_SELECT do NOT get --select flag even when selector is provided."""
# Verify the command is NOT in COMMANDS_WITH_SELECT
assert command_name not in COMMANDS_WITH_SELECT

with patch("tower._dbt.dbtRunner", return_value=mock_dbt_runner):
config = DbtRunnerConfig(
project_path=temp_dbt_project,
profile_payload=sample_profile,
commands=(DbtCommand(command_name),),
selector="tag:daily",
)
run_dbt_workflow(config)

# Verify --select was NOT added
call_args = mock_dbt_runner.invoke.call_args[0][0]
assert "--select" not in call_args

def test_run_workflow_with_full_refresh(
self, temp_dbt_project, sample_profile, mock_dbt_runner
):
Expand Down