diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index e3d3ce29..9f89e7b2 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -70,6 +70,7 @@ pub const STACKABLE_LOG_DIR: &str = "/stackable/log"; pub const LOG_CONFIG_DIR: &str = "/stackable/app/log_config"; pub const AIRFLOW_HOME: &str = "/stackable/airflow"; pub const AIRFLOW_CONFIG_FILENAME: &str = "webserver_config.py"; +pub const AIRFLOW_DAGS_FOLDER: &str = "/stackable/app/allDAGs"; pub const TEMPLATE_VOLUME_NAME: &str = "airflow-executor-pod-template"; pub const TEMPLATE_LOCATION: &str = "/templates"; @@ -594,11 +595,19 @@ impl AirflowRole { format!( "cp -RL {CONFIG_PATH}/{AIRFLOW_CONFIG_FILENAME} {AIRFLOW_HOME}/{AIRFLOW_CONFIG_FILENAME}" ), + format!("mkdir {AIRFLOW_DAGS_FOLDER}"), // graceful shutdown part COMMON_BASH_TRAP_FUNCTIONS.to_string(), remove_vector_shutdown_file_command(STACKABLE_LOG_DIR), ]; + for (i, _) in airflow.spec.cluster_config.dags_git_sync.iter().enumerate() { + command.push( + format!("ln -s /stackable/app/git-{i}/current /stackable/app/allDAGs/current-{i}") + .to_string(), + ) + } + if resolved_product_image.product_version.starts_with("3.") { // Start-up commands have changed in 3.x. // See https://airflow.apache.org/docs/apache-airflow/3.0.1/installation/upgrading_to_airflow3.html#step-6-changes-to-your-startup-scripts and diff --git a/rust/operator-binary/src/env_vars.rs b/rust/operator-binary/src/env_vars.rs index 92960842..a7686d66 100644 --- a/rust/operator-binary/src/env_vars.rs +++ b/rust/operator-binary/src/env_vars.rs @@ -15,8 +15,8 @@ use stackable_operator::{ use crate::{ crd::{ - AirflowExecutor, AirflowRole, ExecutorConfig, LOG_CONFIG_DIR, STACKABLE_LOG_DIR, - TEMPLATE_LOCATION, TEMPLATE_NAME, + AIRFLOW_DAGS_FOLDER, AirflowExecutor, AirflowRole, ExecutorConfig, LOG_CONFIG_DIR, + STACKABLE_LOG_DIR, TEMPLATE_LOCATION, TEMPLATE_NAME, authentication::{ AirflowAuthenticationClassResolved, AirflowClientAuthenticationDetailsResolved, }, @@ -154,12 +154,11 @@ pub fn build_airflow_statefulset_envs( ); } - let dags_folder = get_dags_folder(git_sync_resources); env.insert( AIRFLOW_CORE_DAGS_FOLDER.into(), EnvVar { name: AIRFLOW_CORE_DAGS_FOLDER.into(), - value: Some(dags_folder), + value: Some(AIRFLOW_DAGS_FOLDER.to_owned()), ..Default::default() }, ); @@ -288,23 +287,17 @@ pub fn build_airflow_statefulset_envs( Ok(transform_map_to_vec(env)) } -pub fn get_dags_folder(git_sync_resources: &git_sync::v1alpha1::GitSyncResources) -> String { - let git_sync_count = git_sync_resources.git_content_folders.len(); - if git_sync_count > 1 { - tracing::warn!( - "There are {git_sync_count} git-sync entries: Only the first one will be considered.", - ); - } - +pub fn get_dags_folder(git_sync_resources: &git_sync::v1alpha1::GitSyncResources) -> Vec { + let mut git_folders = Vec::::new(); // If DAG provisioning via git-sync is not configured, set a default value // so that PYTHONPATH can refer to it. N.B. nested variables need to be // resolved, so that /stackable/airflow is used instead of $AIRFLOW_HOME. // see https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#dags-folder - git_sync_resources - .git_content_folders_as_string() - .first() - .cloned() - .unwrap_or("/stackable/airflow/dags".to_string()) + + for folder in git_sync_resources.git_content_folders_as_string() { + git_folders.push(folder) + } + git_folders } // This set of environment variables is a standard set that is not dependent on any @@ -314,7 +307,17 @@ fn static_envs( ) -> BTreeMap { let mut env: BTreeMap = BTreeMap::new(); - let dags_folder = get_dags_folder(git_sync_resources); + let dags_folders = get_dags_folder(git_sync_resources); + let mut dag_python_path = String::new(); + + // TODO: Might be there is a better solution to this + for (i, dags_folder) in dags_folders.iter().enumerate() { + dag_python_path.push_str(dags_folder); + // Can't append ":" if it's last entry + if i != (dags_folders.len() - 1) { + dag_python_path.push(':'); + } + } env.insert( PYTHONPATH.into(), @@ -323,7 +326,7 @@ fn static_envs( // dependencies can be found: this must be the actual path and not a variable. // Also include the airflow site-packages by default (for airflow and kubernetes classes etc.) name: PYTHONPATH.into(), - value: Some(format!("{LOG_CONFIG_DIR}:{dags_folder}")), + value: Some(format!("{LOG_CONFIG_DIR}:{dag_python_path}")), ..Default::default() }, ); @@ -407,12 +410,11 @@ pub fn build_airflow_template_envs( // the config map also requires the dag-folder location as this will be passed on // to the pods started by airflow. - let dags_folder = get_dags_folder(git_sync_resources); env.insert( AIRFLOW_CORE_DAGS_FOLDER.into(), EnvVar { name: AIRFLOW_CORE_DAGS_FOLDER.into(), - value: Some(dags_folder), + value: Some(AIRFLOW_DAGS_FOLDER.to_owned()), ..Default::default() }, );