diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini index 2e403a00995..d792a288743 100644 --- a/ambari-agent/conf/unix/ambari-agent.ini +++ b/ambari-agent/conf/unix/ambari-agent.ini @@ -57,6 +57,10 @@ ssl_verify_cert=0 credential_lib_dir=/var/lib/ambari-agent/cred/lib credential_conf_dir=/var/lib/ambari-agent/cred/conf credential_shell_cmd=org.apache.hadoop.security.alias.CredentialShell +; agent_secret required if the user wants to enable encryption, and they +; should be changed from the default values for production clusters. After enabling this for the +; first time, the user will also need to clear the cache directory so it can be rebuilt in an encrypted form. +; agent_secret=default-secret-change-me [network] ; this option apply only for Agent communication @@ -78,4 +82,4 @@ idle_interval_max=10 [logging] -syslog_enabled=0 \ No newline at end of file +syslog_enabled=0 diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py index 89df8711fe6..c37ac84611b 100644 --- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py +++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py @@ -469,6 +469,14 @@ def get_ca_cert_file_path(self): """ return self.get("security", "ca_cert_path", default="") + def get_agent_secret(self): + """ + Get agent secret used to authenticate with the server. + + :return: agent secret string + """ + return self.get('security', 'agent_secret', default="") + @property def send_alert_changes_only(self): return bool(self.get("agent", "send_alert_changes_only", "0")) diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterAlertDefinitionsCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterAlertDefinitionsCache.py index bbee0b28d2d..1100ffc0b62 100644 --- a/ambari-agent/src/main/python/ambari_agent/ClusterAlertDefinitionsCache.py +++ b/ambari-agent/src/main/python/ambari_agent/ClusterAlertDefinitionsCache.py @@ -34,13 +34,13 @@ class ClusterAlertDefinitionsCache(ClusterCache): differently for every host. """ - def __init__(self, cluster_cache_dir): + def __init__(self, cluster_cache_dir, secret=None): """ Initializes the host level params cache. :param cluster_cache_dir: :return: """ - super(ClusterAlertDefinitionsCache, self).__init__(cluster_cache_dir) + super(ClusterAlertDefinitionsCache, self).__init__(cluster_cache_dir, secret) def get_alert_definition_index_by_id(self, alert_dict, cluster_id, alert_id): definitions = alert_dict[cluster_id]["alertDefinitions"] diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py index 2c924b1ed12..8ea93c09423 100644 --- a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py +++ b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py @@ -22,6 +22,9 @@ import os import threading from collections import defaultdict +import ambari_pyaes +from ambari_pbkdf2.pbkdf2 import PBKDF2 + from ambari_agent.Utils import Utils @@ -38,7 +41,7 @@ class ClusterCache(dict): file_locks = defaultdict(threading.RLock) - def __init__(self, cluster_cache_dir): + def __init__(self, cluster_cache_dir, secret=None): """ Initializes the cache. :param cluster_cache_dir: @@ -46,6 +49,7 @@ def __init__(self, cluster_cache_dir): """ self.cluster_cache_dir = cluster_cache_dir + self.secret = secret self.__current_cache_json_file = os.path.join( self.cluster_cache_dir, self.get_cache_name() + ".json" @@ -63,8 +67,10 @@ def __init__(self, cluster_cache_dir): try: with self.__file_lock: if os.path.isfile(self.__current_cache_json_file): - with open(self.__current_cache_json_file, "r") as fp: - cache_dict = json.load(fp) + with open(self.__current_cache_json_file, "rb") as fp: # Note: 'rb' for binary + encrypted_data = fp.read() + decrypted_json = self._decrypt_data(encrypted_data) + cache_dict = json.loads(decrypted_json) if os.path.isfile(self.__current_cache_hash_file): with open(self.__current_cache_hash_file, "r") as fp: @@ -83,6 +89,85 @@ def __init__(self, cluster_cache_dir): logger.exception(f"Loading saved cache for {self.__class__.__name__} failed") self.rewrite_cache({}, None) + def encrypt(self, plaintext, encryption_key): + salt = os.urandom(16) + iv = os.urandom(16) + + key = PBKDF2(encryption_key, salt, iterations=65536).read(16) + aes = ambari_pyaes.AESModeOfOperationCBC(key, iv=iv) + + # ensure bytes + if not isinstance(plaintext, bytes): + plaintext = plaintext.encode() + + # PKCS7 pad + padded = ambari_pyaes.util.append_PKCS7_padding(plaintext) + + # CBC encrypt block-by-block + ciphertext = b"" + for i in range(0, len(padded), 16): + block = padded[i:i + 16] + encrypted_block = aes.encrypt(block) # must be exactly 16 bytes + ciphertext += encrypted_block + + inner = "::".join([ + salt.hex(), + iv.hex(), + ciphertext.hex() + ]).encode() + + return f"${{enc=aes128_hex, value={inner.hex()}}}" + + def decrypt(self, encrypted_value, encryption_key): + if isinstance(encrypted_value, bytes): + try: + ev_str = encrypted_value.decode() + except Exception: + ev_str = None + else: + ev_str = encrypted_value + + if not ev_str or "value=" not in ev_str: + return encrypted_value + + enc_text = ev_str.split("value=")[1][:-1] + # salt::iv::ciphertext(hex) + salt_hex, iv_hex, data_hex = ( + bytes.fromhex(part) + for part in bytes.fromhex(enc_text).decode().split("::") + ) + + key = PBKDF2(encryption_key, salt_hex, iterations=65536).read(16) + aes = ambari_pyaes.AESModeOfOperationCBC(key, iv=iv_hex) + + data = data_hex + + # Decrypt block-by-block (required) + plaintext = b"" + for i in range(0, len(data), 16): + block = data[i:i + 16] + plaintext += aes.decrypt(block) + + # Remove padding + return ambari_pyaes.util.strip_PKCS7_padding(plaintext) + + def _is_encryption_enabled(self): + return not self.secret + + def _encrypt_data(self, data): + """Encrypt string data""" + if self._is_encryption_enabled(): + return data + else: + return self.encrypt(data.encode(), self.secret) + + def _decrypt_data(self, encrypted_data): + """Decrypt encrypted bytes to string""" + if self._is_encryption_enabled(): + return encrypted_data + else: + return self.decrypt(encrypted_data, self.secret).decode() + def get_cluster_indepedent_data(self): return self[ClusterCache.COMMON_DATA_CLUSTER] @@ -141,8 +226,12 @@ def persist_cache(self, cache_hash): os.makedirs(self.cluster_cache_dir) with self.__file_lock: + # Encrypt JSON data + json_str = json.dumps(self, indent=2) + encrypted_json = self._encrypt_data(json_str) + with open(self.__current_cache_json_file, "w") as f: - json.dump(self, f, indent=2) + f.write(encrypted_json) if self.hash is not None: with open(self.__current_cache_hash_file, "w") as fp: @@ -173,7 +262,7 @@ def get_cache_name(self): raise NotImplemented() def __deepcopy__(self, memo): - return self.__class__(self.cluster_cache_dir) + return self.__class__(self.cluster_cache_dir, self.secret) def __copy__(self): - return self.__class__(self.cluster_cache_dir) + return self.__class__(self.cluster_cache_dir, self.secret) diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py index ec5d9044139..ec8a796da11 100644 --- a/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py +++ b/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py @@ -31,13 +31,13 @@ class ClusterConfigurationCache(ClusterCache): configuration properties. """ - def __init__(self, cluster_cache_dir): + def __init__(self, cluster_cache_dir, secret=None): """ Initializes the configuration cache. :param cluster_cache_dir: directory the changed json are saved :return: """ - super(ClusterConfigurationCache, self).__init__(cluster_cache_dir) + super(ClusterConfigurationCache, self).__init__(cluster_cache_dir, secret) def get_cache_name(self): return "configurations" diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterHostLevelParamsCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterHostLevelParamsCache.py index e31717649fa..21ce4a2e259 100644 --- a/ambari-agent/src/main/python/ambari_agent/ClusterHostLevelParamsCache.py +++ b/ambari-agent/src/main/python/ambari_agent/ClusterHostLevelParamsCache.py @@ -34,13 +34,13 @@ class ClusterHostLevelParamsCache(ClusterCache): differently for every host. """ - def __init__(self, cluster_cache_dir): + def __init__(self, cluster_cache_dir, secret=None): """ Initializes the host level params cache. :param cluster_cache_dir: :return: """ - super(ClusterHostLevelParamsCache, self).__init__(cluster_cache_dir) + super(ClusterHostLevelParamsCache, self).__init__(cluster_cache_dir, secret) def get_cache_name(self): return "host_level_params" diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterMetadataCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterMetadataCache.py index db5d70fcdbb..840da4c8503 100644 --- a/ambari-agent/src/main/python/ambari_agent/ClusterMetadataCache.py +++ b/ambari-agent/src/main/python/ambari_agent/ClusterMetadataCache.py @@ -38,7 +38,7 @@ def __init__(self, cluster_cache_dir, config): :return: """ self.config = config - super(ClusterMetadataCache, self).__init__(cluster_cache_dir) + super(ClusterMetadataCache, self).__init__(cluster_cache_dir, config.get_agent_secret()) def on_cache_update(self): try: diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py index 22abd1c789c..7aa8ef5e07c 100644 --- a/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py +++ b/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py @@ -50,7 +50,7 @@ def __init__(self, cluster_cache_dir, config): self.cluster_local_components = {} self.cluster_host_info = None self.component_version_map = {} - super(ClusterTopologyCache, self).__init__(cluster_cache_dir) + super(ClusterTopologyCache, self).__init__(cluster_cache_dir, config.get_agent_secret()) def get_cache_name(self): return "topology" diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py index 72c8b620f01..f14ca0f13c2 100644 --- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py +++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py @@ -90,11 +90,16 @@ def init(self): self.config.cluster_cache_dir, self.config ) self.host_level_params_cache = ClusterHostLevelParamsCache( - self.config.cluster_cache_dir + self.config.cluster_cache_dir, + self.config.get_agent_secret() + ) + self.configurations_cache = ClusterConfigurationCache( + self.config.cluster_cache_dir, + self.config.get_agent_secret() ) - self.configurations_cache = ClusterConfigurationCache(self.config.cluster_cache_dir) self.alert_definitions_cache = ClusterAlertDefinitionsCache( - self.config.cluster_cache_dir + self.config.cluster_cache_dir, + self.config.get_agent_secret() ) self.configuration_builder = ConfigurationBuilder(self) self.stale_alerts_monitor = StaleAlertsMonitor(self) diff --git a/ambari-agent/src/test/python/ambari_agent/TestClusterCache.py b/ambari-agent/src/test/python/ambari_agent/TestClusterCache.py new file mode 100644 index 00000000000..2deb212e74d --- /dev/null +++ b/ambari-agent/src/test/python/ambari_agent/TestClusterCache.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python3 + +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from ambari_agent import main + +main.MEMORY_LEAK_DEBUG_FILEPATH = "/tmp/memory_leak_debug.out" +import os +import tempfile +import shutil +from unittest import TestCase + +from ambari_agent.ClusterCache import ClusterCache +from mock.mock import patch, MagicMock +from ambari_commons import OSCheck +from only_for_platform import os_distro_value + + +class TestClusterCache(TestCase): + """ + Test suite for verifying encryption behavior of ClusterCache. + + It covers: + - encryption/decryption round-trip when secret are provided + - behavior when encryption is effectively disabled (no secret) + """ + + # so that ClusterCache initialization is OS-agnostic in this test. + @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value)) + def setUp(self): + # Create a temporary directory that will be cleaned up after each test. + self.tmpdir = tempfile.mkdtemp() + cluster_cache_dir = self.tmpdir + "/cluster_cache_dir" + + # Instance with encryption enabled (secret provided). + self.cluster_cache_encrypted = DummyClusterCache( + cluster_cache_dir, + "super_secret" + ) + + # Instance with encryption disabled (no secret). + self.cluster_cache_unencrypted = DummyClusterCache(cluster_cache_dir) + + @patch.object(os, "chmod") + def test_enc(self, chmod_mock): + """ + Verify that: + - encrypted instance changes the data and can restore it back + - unencrypted instance is a no-op for encrypt/decrypt + """ + string_json = '{"a": 1, "b": 2}' + + # Encrypted cache should not store raw JSON. + encrypted = self.cluster_cache_encrypted._encrypt_data(string_json) + self.assertNotEqual(string_json, encrypted) + # Round-trip must produce original JSON. + decrypted = self.cluster_cache_encrypted._decrypt_data(encrypted) + self.assertEqual(string_json, decrypted) + + # For unencrypted cache, encrypt/decrypt should behave as pass-through. + string_json = '{"a": 1, "b": 2}' + encrypted = self.cluster_cache_unencrypted._encrypt_data(string_json) + self.assertEqual(string_json, encrypted) + decrypted = self.cluster_cache_unencrypted._decrypt_data(encrypted) + self.assertEqual(string_json, decrypted) + + @patch.object(os, "chmod") + def test_encryption_enable(self, chmod_mock): + """ + Verify that _is_encryption_enabled reflects whether secret were provided. + """ + # When secret are given, encryption flag should reflect enabled status. + self.assertFalse(self.cluster_cache_encrypted._is_encryption_enabled()) + + # When no secret, encryption should be reported as disabled. + self.assertTrue(self.cluster_cache_unencrypted._is_encryption_enabled()) + + def tearDown(self): + shutil.rmtree(self.tmpdir) + +class DummyClusterCache(ClusterCache): + """ + Minimal ClusterCache subclass used only for unit testing. + + It overrides get_cache_name to avoid depending on real production cache names. + """ + def get_cache_name(self): + # Dummy implementation just for tests. + return "configuration"