Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions rust/operator-binary/src/crd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ pub const STACKABLE_LOG_DIR: &str = "/stackable/log";
pub const LOG_CONFIG_DIR: &str = "/stackable/app/log_config";
pub const AIRFLOW_HOME: &str = "/stackable/airflow";
pub const AIRFLOW_CONFIG_FILENAME: &str = "webserver_config.py";
pub const AIRFLOW_DAGS_FOLDER: &str = "/stackable/app/allDAGs";

pub const TEMPLATE_VOLUME_NAME: &str = "airflow-executor-pod-template";
pub const TEMPLATE_LOCATION: &str = "/templates";
Expand Down Expand Up @@ -594,11 +595,19 @@ impl AirflowRole {
format!(
"cp -RL {CONFIG_PATH}/{AIRFLOW_CONFIG_FILENAME} {AIRFLOW_HOME}/{AIRFLOW_CONFIG_FILENAME}"
),
format!("mkdir {AIRFLOW_DAGS_FOLDER}"),
// graceful shutdown part
COMMON_BASH_TRAP_FUNCTIONS.to_string(),
remove_vector_shutdown_file_command(STACKABLE_LOG_DIR),
];

for (i, _) in airflow.spec.cluster_config.dags_git_sync.iter().enumerate() {
command.push(
format!("ln -s /stackable/app/git-{i}/current /stackable/app/allDAGs/current-{i}")
.to_string(),
)
}

if resolved_product_image.product_version.starts_with("3.") {
// Start-up commands have changed in 3.x.
// See https://airflow.apache.org/docs/apache-airflow/3.0.1/installation/upgrading_to_airflow3.html#step-6-changes-to-your-startup-scripts and
Expand Down
44 changes: 23 additions & 21 deletions rust/operator-binary/src/env_vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use stackable_operator::{

use crate::{
crd::{
AirflowExecutor, AirflowRole, ExecutorConfig, LOG_CONFIG_DIR, STACKABLE_LOG_DIR,
TEMPLATE_LOCATION, TEMPLATE_NAME,
AIRFLOW_DAGS_FOLDER, AirflowExecutor, AirflowRole, ExecutorConfig, LOG_CONFIG_DIR,
STACKABLE_LOG_DIR, TEMPLATE_LOCATION, TEMPLATE_NAME,
authentication::{
AirflowAuthenticationClassResolved, AirflowClientAuthenticationDetailsResolved,
},
Expand Down Expand Up @@ -154,12 +154,11 @@ pub fn build_airflow_statefulset_envs(
);
}

let dags_folder = get_dags_folder(git_sync_resources);
env.insert(
AIRFLOW_CORE_DAGS_FOLDER.into(),
EnvVar {
name: AIRFLOW_CORE_DAGS_FOLDER.into(),
value: Some(dags_folder),
value: Some(AIRFLOW_DAGS_FOLDER.to_owned()),
..Default::default()
},
);
Expand Down Expand Up @@ -288,23 +287,17 @@ pub fn build_airflow_statefulset_envs(
Ok(transform_map_to_vec(env))
}

pub fn get_dags_folder(git_sync_resources: &git_sync::v1alpha1::GitSyncResources) -> String {
let git_sync_count = git_sync_resources.git_content_folders.len();
if git_sync_count > 1 {
tracing::warn!(
"There are {git_sync_count} git-sync entries: Only the first one will be considered.",
);
}

pub fn get_dags_folder(git_sync_resources: &git_sync::v1alpha1::GitSyncResources) -> Vec<String> {
let mut git_folders = Vec::<String>::new();
// If DAG provisioning via git-sync is not configured, set a default value
// so that PYTHONPATH can refer to it. N.B. nested variables need to be
// resolved, so that /stackable/airflow is used instead of $AIRFLOW_HOME.
// see https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#dags-folder
git_sync_resources
.git_content_folders_as_string()
.first()
.cloned()
.unwrap_or("/stackable/airflow/dags".to_string())

for folder in git_sync_resources.git_content_folders_as_string() {
git_folders.push(folder)
}
git_folders
}

// This set of environment variables is a standard set that is not dependent on any
Expand All @@ -314,7 +307,17 @@ fn static_envs(
) -> BTreeMap<String, EnvVar> {
let mut env: BTreeMap<String, EnvVar> = BTreeMap::new();

let dags_folder = get_dags_folder(git_sync_resources);
let dags_folders = get_dags_folder(git_sync_resources);
let mut dag_python_path = String::new();

// TODO: Might be there is a better solution to this
for (i, dags_folder) in dags_folders.iter().enumerate() {
dag_python_path.push_str(dags_folder);
// Can't append ":" if it's last entry
if i != (dags_folders.len() - 1) {
dag_python_path.push(':');
}
}

env.insert(
PYTHONPATH.into(),
Expand All @@ -323,7 +326,7 @@ fn static_envs(
// dependencies can be found: this must be the actual path and not a variable.
// Also include the airflow site-packages by default (for airflow and kubernetes classes etc.)
name: PYTHONPATH.into(),
value: Some(format!("{LOG_CONFIG_DIR}:{dags_folder}")),
value: Some(format!("{LOG_CONFIG_DIR}:{dag_python_path}")),
..Default::default()
},
);
Expand Down Expand Up @@ -407,12 +410,11 @@ pub fn build_airflow_template_envs(

// the config map also requires the dag-folder location as this will be passed on
// to the pods started by airflow.
let dags_folder = get_dags_folder(git_sync_resources);
env.insert(
AIRFLOW_CORE_DAGS_FOLDER.into(),
EnvVar {
name: AIRFLOW_CORE_DAGS_FOLDER.into(),
value: Some(dags_folder),
value: Some(AIRFLOW_DAGS_FOLDER.to_owned()),
..Default::default()
},
);
Expand Down