Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "dataops-testgen"
version = "4.38.3"
version = "4.38.6"
description = "DataKitchen's Data Quality DataOps TestGen"
authors = [
{ "name" = "DataKitchen, Inc.", "email" = "info@datakitchen.io" },
Expand Down
46 changes: 24 additions & 22 deletions testgen/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ def quick_start(
test_suite_id = "9df7489d-92b3-49f9-95ca-512160d7896f"

click.echo(f"run-profile with table_group_id: {table_group_id}")
message = run_profiling(table_group_id, run_date=now_date + time_delta)
message = run_profiling(table_group_id, run_date=now_date + time_delta)
click.echo("\n" + message)

LOG.info(f"run-test-generation with table_group_id: {table_group_id} test_suite: {settings.DEFAULT_TEST_SUITE_KEY}")
Expand Down Expand Up @@ -640,8 +640,6 @@ def list_ui_plugins():
def run_ui():
from testgen.ui.scripts import patch_streamlit

status_code: int = -1

use_ssl = os.path.isfile(settings.SSL_CERT_FILE) and os.path.isfile(settings.SSL_KEY_FILE)

patch_streamlit.patch(force=True)
Expand All @@ -656,25 +654,29 @@ def cancel_all_running():

cancel_all_running()

try:
app_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "ui/app.py")
status_code = subprocess.check_call(
[ # noqa: S607
"streamlit",
"run",
app_file,
"--browser.gatherUsageStats=false",
"--client.showErrorDetails=none",
"--client.toolbarMode=minimal",
f"--server.sslCertFile={settings.SSL_CERT_FILE}" if use_ssl else "",
f"--server.sslKeyFile={settings.SSL_KEY_FILE}" if use_ssl else "",
"--",
f"{'--debug' if settings.IS_DEBUG else ''}",
],
env={**os.environ, "TG_JOB_SOURCE": "UI"}
)
except Exception:
LOG.exception(f"Testgen UI exited with status code {status_code}")
app_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "ui/app.py")
process= subprocess.Popen(
[ # noqa: S607
"streamlit",
"run",
app_file,
"--browser.gatherUsageStats=false",
"--client.showErrorDetails=none",
"--client.toolbarMode=minimal",
f"--server.sslCertFile={settings.SSL_CERT_FILE}" if use_ssl else "",
f"--server.sslKeyFile={settings.SSL_KEY_FILE}" if use_ssl else "",
"--",
f"{'--debug' if settings.IS_DEBUG else ''}",
],
env={**os.environ, "TG_JOB_SOURCE": "UI"}
)
def term_ui(signum, _):
LOG.info(f"Sending termination signal {signum} to Testgen UI")
process.send_signal(signum)
signal.signal(signal.SIGINT, term_ui)
signal.signal(signal.SIGTERM, term_ui)
status_code = process.wait()
LOG.log(logging.ERROR if status_code != 0 else logging.INFO, f"Testgen UI exited with status code {status_code}")


@cli.command("run-app", help="Runs TestGen's application modules")
Expand Down
32 changes: 17 additions & 15 deletions testgen/commands/queries/execute_tests_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from testgen.common import read_template_sql_file
from testgen.common.clean_sql import concat_columns
from testgen.common.database.database_service import get_flavor_service, replace_params
from testgen.common.database.database_service import get_flavor_service, get_tg_schema, replace_params
from testgen.common.models.connection import Connection
from testgen.common.models.table_group import TableGroup
from testgen.common.models.test_definition import TestRunType, TestScope
Expand Down Expand Up @@ -107,7 +107,7 @@ def _get_params(self, test_def: TestExecutionDef | None = None) -> dict:
"TEST_SUITE_ID": self.test_run.test_suite_id,
"TEST_RUN_ID": self.test_run.id,
"RUN_DATE": self.run_date,
"SQL_FLAVOR": self.flavor,
"SQL_FLAVOR": self.flavor,
"VARCHAR_TYPE": self.flavor_service.varchar_type,
"QUOTE": quote,
}
Expand All @@ -116,7 +116,9 @@ def _get_params(self, test_def: TestExecutionDef | None = None) -> dict:
params.update({
"TEST_TYPE": test_def.test_type,
"TEST_DEFINITION_ID": test_def.id,
"APP_SCHEMA_NAME": get_tg_schema(),
"SCHEMA_NAME": test_def.schema_name,
"TABLE_GROUPS_ID": self.table_group.id,
"TABLE_NAME": test_def.table_name,
"COLUMN_NAME": f"{quote}{test_def.column_name or ''}{quote}",
"COLUMN_NAME_NO_QUOTES": test_def.column_name,
Expand Down Expand Up @@ -146,7 +148,7 @@ def _get_params(self, test_def: TestExecutionDef | None = None) -> dict:
"MATCH_HAVING_CONDITION": f"HAVING {test_def.match_having_condition}" if test_def.match_having_condition else "",
"CUSTOM_QUERY": test_def.custom_query,
"COLUMN_TYPE": test_def.column_type,
"INPUT_PARAMETERS": self._get_input_parameters(test_def),
"INPUT_PARAMETERS": self._get_input_parameters(test_def),
})
return params

Expand All @@ -169,11 +171,11 @@ def _get_query(
query = query.replace(":", "\\:")

return query, None if no_bind else params

def get_active_test_definitions(self) -> tuple[dict]:
# Runs on App database
return self._get_query("get_active_test_definitions.sql")

def get_target_identifiers(self, schemas: Iterable[str]) -> tuple[str, dict]:
# Runs on Target database
filename = "get_target_identifiers.sql"
Expand All @@ -185,7 +187,7 @@ def get_target_identifiers(self, schemas: Iterable[str]) -> tuple[str, dict]:
return self._get_query(filename, f"flavors/{self.connection.sql_flavor}/validate_tests", extra_params=params)
except ModuleNotFoundError:
return self._get_query(filename, "flavors/generic/validate_tests", extra_params=params)

def get_test_errors(self, test_defs: list[TestExecutionDef]) -> list[list[UUID | str | datetime]]:
return [
[
Expand All @@ -205,15 +207,15 @@ def get_test_errors(self, test_defs: list[TestExecutionDef]) -> list[list[UUID |
None, # No result_measure on errors
] for td in test_defs if td.errors
]

def disable_invalid_test_definitions(self) -> tuple[str, dict]:
# Runs on App database
return self._get_query("disable_invalid_test_definitions.sql")

def update_historic_thresholds(self) -> tuple[str, dict]:
# Runs on App database
return self._get_query("update_historic_thresholds.sql")

def run_query_test(self, test_def: TestExecutionDef) -> tuple[str, dict]:
# Runs on Target database
folder = "generic" if test_def.template_name.endswith("_generic.sql") else self.flavor
Expand All @@ -225,7 +227,7 @@ def run_query_test(self, test_def: TestExecutionDef) -> tuple[str, dict]:
extra_params={"DATA_SCHEMA": test_def.schema_name},
test_def=test_def,
)

def aggregate_cat_tests(
self,
test_defs: list[TestExecutionDef],
Expand Down Expand Up @@ -265,7 +267,7 @@ def add_query(test_defs: list[TestExecutionDef]) -> str:

aggregate_queries.append((query, None))
aggregate_test_defs.append(test_defs)

if single:
for td in test_defs:
# Add separate query for each test
Expand Down Expand Up @@ -296,9 +298,9 @@ def add_query(test_defs: list[TestExecutionDef]) -> str:
current_test_defs.append(td)

add_query(current_test_defs)

return aggregate_queries, aggregate_test_defs

def get_cat_test_results(
self,
aggregate_results: list[AggregateResult],
Expand All @@ -309,7 +311,7 @@ def get_cat_test_results(
test_defs = aggregate_test_defs[result["query_index"]]
result_measures = result["result_measures"].split("|")
result_codes = result["result_codes"].split(",")

for index, td in enumerate(test_defs):
test_results.append([
self.test_run.id,
Expand All @@ -329,7 +331,7 @@ def get_cat_test_results(
])

return test_results

def update_test_results(self) -> list[tuple[str, dict]]:
# Runs on App database
return [
Expand Down
1 change: 1 addition & 0 deletions testgen/commands/run_launch_db_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def _get_params_mapping() -> dict:
"TABLE_GROUPS_NAME": settings.DEFAULT_TABLE_GROUPS_NAME,
"TEST_SUITE": settings.DEFAULT_TEST_SUITE_KEY,
"TEST_SUITE_DESCRIPTION": settings.DEFAULT_TEST_SUITE_DESCRIPTION,
"MONITOR_TEST_SUITE": settings.DEFAULT_MONITOR_TEST_SUITE_KEY,
"MAX_THREADS": settings.PROJECT_CONNECTION_MAX_THREADS,
"MAX_QUERY_CHARS": settings.PROJECT_CONNECTION_MAX_QUERY_CHAR,
"OBSERVABILITY_API_URL": settings.OBSERVABILITY_API_URL,
Expand Down
16 changes: 7 additions & 9 deletions testgen/commands/run_test_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def run_test_execution_in_background(test_suite_id: str | UUID):
def run_test_execution(test_suite_id: str | UUID, username: str | None = None, run_date: datetime | None = None) -> str:
if test_suite_id is None:
raise ValueError("Test Suite ID was not specified")

LOG.info(f"Starting test run for test suite {test_suite_id}")
time_delta = (run_date - datetime.now(UTC)) if run_date else timedelta()

Expand Down Expand Up @@ -112,9 +112,7 @@ def run_test_execution(test_suite_id: str | UUID, username: str | None = None, r
"CAT": partial(_run_cat_tests, sql_generator),
}
# Run metadata tests last so that results for other tests are available to them
# TODO: TURN ON WHEN ADDING METADATA TESTS
# for run_type in ["QUERY", "CAT", "METADATA"]:
for run_type in ["QUERY", "CAT"]:
for run_type in ["QUERY", "CAT", "METADATA"]:
if (run_test_defs := [td for td in valid_test_defs if td.run_type == run_type]):
run_functions[run_type](run_test_defs)
else:
Expand Down Expand Up @@ -198,7 +196,7 @@ def update_test_progress(progress: ThreadedProgress) -> None:
LOG.info(f"Writing {run_type} test errors")
for index, error in error_data.items():
test_defs[index].errors.append(error)

error_results = sql_generator.get_test_errors(test_defs)
write_to_app_db(error_results, sql_generator.result_columns, sql_generator.test_results_table)

Expand All @@ -219,7 +217,7 @@ def _run_cat_tests(sql_generator: TestExecutionSQL, test_defs: list[TestExecutio
total_count = len(test_defs)
LOG.info(f"Aggregating CAT tests: {total_count}")
aggregate_queries, aggregate_test_defs = sql_generator.aggregate_cat_tests(test_defs)

def update_aggegate_progress(progress: ThreadedProgress) -> None:
processed_count = sum(len(aggregate_test_defs[index]) for index in progress["indexes"])
test_run.set_progress(
Expand Down Expand Up @@ -251,7 +249,7 @@ def update_aggegate_progress(progress: ThreadedProgress) -> None:
error_test_defs: list[TestExecutionDef] = []
for index in aggregate_errors:
error_test_defs.extend(aggregate_test_defs[index])

single_queries, single_test_defs = sql_generator.aggregate_cat_tests(error_test_defs, single=True)

test_run.set_progress(
Expand All @@ -260,7 +258,7 @@ def update_aggegate_progress(progress: ThreadedProgress) -> None:
error="Rerunning errored tests singly",
)
test_run.save()

def update_single_progress(progress: ThreadedProgress) -> None:
test_run.set_progress(
"CAT",
Expand Down Expand Up @@ -293,7 +291,7 @@ def update_single_progress(progress: ThreadedProgress) -> None:
td = single_test_defs[index][0]
td.errors.append(error)
error_test_defs.append(td)

error_results = sql_generator.get_test_errors(error_test_defs)
write_to_app_db(error_results, sql_generator.result_columns, sql_generator.test_results_table)

Expand Down
3 changes: 1 addition & 2 deletions testgen/common/models/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,7 @@ def init_progress(self) -> None:
"validation": {"label": "Validating test definitions"},
"QUERY": {"label": "Running query tests"},
"CAT": {"label": "Running aggregated tests"},
# TODO: TURN ON WHEN ADDING METADATA TESTS
# "METADATA": {"label": "Running metadata tests"},
"METADATA": {"label": "Running metadata tests"},
}
for key in self._progress:
self._progress[key].update({"key": key, "status": "Pending"})
Expand Down
8 changes: 8 additions & 0 deletions testgen/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,14 @@
defaults to: `default_suite_desc`
"""

DEFAULT_MONITOR_TEST_SUITE_KEY: str = os.getenv("DEFAULT_MONITOR_TEST_SUITE_NAME", "default-monitor-suite-1")
"""
Key to be assgined to the auto generated monitoring test suite.

from env variable: `DEFAULT_MONITOR_TEST_SUITE_NAME`
defaults to: `default-monitor-suite-1`
"""

DEFAULT_PROFILING_TABLE_SET = os.getenv("DEFAULT_PROFILING_TABLE_SET", "")
"""
Comma separated list of specific table names to include when running
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ VALUES ('Monitor', 'Recency'),
('Monitor', 'Daily_Record_Ct'),
('Monitor', 'Monthly_Rec_Ct'),
('Monitor', 'Weekly_Rec_Ct'),
('Monitor', 'Table_Freshness');
('Monitor', 'Table_Freshness'),
('Monitor', 'Schema_Drift');


TRUNCATE TABLE test_templates;
Expand Down
63 changes: 63 additions & 0 deletions testgen/template/dbsetup_test_types/test_types_Schema_Drift.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
test_types:
id: '1512'
test_type: Schema_Drift
test_name_short: Schema Drift
test_name_long: Table Schema Changed
test_description: |-
Checks whether table schema has changed
except_message: |-
Table schema has changed.
measure_uom: Was Schema Change Detected
measure_uom_description: null
selection_criteria: |-
TEMPLATE
dq_score_prevalence_formula: null
dq_score_risk_factor: null
column_name_prompt: null
column_name_help: null
default_parm_columns: null
default_parm_values: null
default_parm_prompts: null
default_parm_help: null
default_severity: Warning
run_type: METADATA
test_scope: tablegroup
dq_dimension: null
health_dimension: null
threshold_description: null
result_visualization: binary_chart
result_visualization_params: '{"legend":{"labels":{"0":"No Changes","1":"Changes"}}}'
usage_notes: |-
This test compares the current table column types with previous data, to check whether the table schema has changed. This test allows you to track any changes to the table structure.
active: N
cat_test_conditions: []
target_data_lookups: []
test_templates:
- id: '2514'
test_type: Schema_Drift
sql_flavor: bigquery
template_name: ex_schema_drift_generic.sql
- id: '2414'
test_type: Schema_Drift
sql_flavor: databricks
template_name: ex_schema_drift_generic.sql
- id: '2214'
test_type: Schema_Drift
sql_flavor: mssql
template_name: ex_schema_drift_generic.sql
- id: '2314'
test_type: Schema_Drift
sql_flavor: postgresql
template_name: ex_schema_drift_generic.sql
- id: '2014'
test_type: Schema_Drift
sql_flavor: redshift
template_name: ex_schema_drift_generic.sql
- id: '2614'
test_type: Schema_Drift
sql_flavor: redshift_spectrum
template_name: ex_schema_drift_generic.sql
- id: '2114'
test_type: Schema_Drift
sql_flavor: snowflake
template_name: ex_schema_drift_generic.sql
Loading