Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.container.diskbalancer;

import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_ALREADY_EXISTS;
import static org.apache.hadoop.ozone.container.common.volume.StorageVolume.TMP_DIR_NAME;
import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.calculateVolumeDataDensity;
import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.getIdealUsage;
import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.getVolumeUsages;
Expand Down Expand Up @@ -66,7 +67,6 @@
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import org.apache.hadoop.ozone.container.common.volume.VolumeChoosingPolicyFactory;
import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.VolumeFixedUsage;
import org.apache.hadoop.ozone.container.diskbalancer.policy.ContainerChoosingPolicy;
Expand Down Expand Up @@ -164,8 +164,6 @@ public DiskBalancerService(OzoneContainer ozoneContainer,
metrics = DiskBalancerServiceMetrics.create();

loadDiskBalancerInfo();

constructTmpDir();
}

/**
Expand All @@ -177,17 +175,45 @@ public synchronized void refresh(DiskBalancerInfo diskBalancerInfo) throws IOExc
applyDiskBalancerInfo(diskBalancerInfo);
}

private void constructTmpDir() throws IOException {
/**
* Cleans up stale diskBalancer temporary directories on startup.
*
* @throws IOException if cleanup fails
*/
private void cleanupTmpDir() throws IOException {
for (HddsVolume volume:
StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())) {
Path tmpDir = getDiskBalancerTmpDir(volume);
Path diskBalancerTmpDir = null;
try {
FileUtils.deleteDirectory(tmpDir.toFile());
FileUtils.forceMkdir(tmpDir.toFile());
File tmpDir = volume.getTmpDir();
if (tmpDir != null) {
// If tmpDir is initialized, use it directly
diskBalancerTmpDir = tmpDir.toPath().resolve(DISK_BALANCER_DIR);
} else {
// If tmpDir is not initialized, construct the path manually
// This handles the case where stale directories exist from previous
// failed moves even though volumes haven't been initialized yet
String clusterId = volume.getClusterID();
if (clusterId == null) {
// Skip volumes without clusterID - they're not properly formatted
continue;
}
String workDirName = volume.getWorkingDirName();
if (workDirName == null) {
workDirName = clusterId;
}
diskBalancerTmpDir = Paths.get(volume.getStorageDir().toString(),
workDirName, TMP_DIR_NAME, DISK_BALANCER_DIR);
}

// Clean up any existing diskBalancer directory from previous runs
if (diskBalancerTmpDir.toFile().exists()) {
FileUtils.deleteDirectory(diskBalancerTmpDir.toFile());
LOG.info("Cleaned up stale diskBalancer tmp directory: {}", diskBalancerTmpDir);
}
} catch (IOException ex) {
LOG.warn("Can not reconstruct tmp directory under volume {}", volume,
ex);
throw ex;
LOG.warn("Failed to clean up diskBalancer tmp directory under volume {}: {}",
volume, diskBalancerTmpDir, ex);
}
}
}
Expand Down Expand Up @@ -340,6 +366,17 @@ public void setVersion(DiskBalancerVersion version) {
this.version = version;
}

@Override
public synchronized void start() {
// Clean up any stale diskBalancer tmp directories from previous runs
try {
cleanupTmpDir();
} catch (IOException e) {
LOG.warn("Failed to clean up diskBalancer tmp directories before starting service", e);
}
super.start();
}

@Override
public BackgroundTaskQueue getTasks() {
BackgroundTaskQueue queue = new BackgroundTaskQueue();
Expand Down Expand Up @@ -462,8 +499,8 @@ public BackgroundTaskResult call() {
container.readLock();
try {
// Step 1: Copy container to new Volume's tmp Dir
diskBalancerTmpDir = destVolume.getTmpDir().toPath()
.resolve(DISK_BALANCER_DIR).resolve(String.valueOf(containerId));
diskBalancerTmpDir = getDiskBalancerTmpDir(destVolume)
.resolve(String.valueOf(containerId));
ozoneContainer.getController().copyContainer(containerData, diskBalancerTmpDir);

// Step 2: verify checksum and Transition Temp container to Temp C1-RECOVERING
Expand Down Expand Up @@ -682,9 +719,8 @@ public long calculateBytesToMove(List<VolumeFixedUsage> inputVolumeSet) {
return totalBytesToMove;
}

private Path getDiskBalancerTmpDir(HddsVolume hddsVolume) {
return Paths.get(hddsVolume.getVolumeRootDir())
.resolve(StorageVolume.TMP_DIR_NAME).resolve(DISK_BALANCER_DIR);
private Path getDiskBalancerTmpDir(HddsVolume hddsVolume) throws IOException {
return hddsVolume.getTmpDir().toPath().resolve(DISK_BALANCER_DIR);
}

public DiskBalancerServiceMetrics getMetrics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.hadoop.ozone.container.diskbalancer;

import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.createDbInstancesForTestIfNeeded;
import static org.apache.hadoop.ozone.container.common.volume.StorageVolume.TMP_DIR_NAME;
import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.getVolumeUsages;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -52,6 +54,7 @@
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.VolumeFixedUsage;
import org.apache.hadoop.ozone.container.diskbalancer.policy.ContainerChoosingPolicy;
import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy;
Expand Down Expand Up @@ -114,6 +117,45 @@ public void cleanup() throws IOException {
volumeSet.shutdown();
}

/**
* Creates stale diskBalancer directories to simulate leftover directories
* from previous failed container moves.
*
* @param volumeSet the volume set containing volumes to create stale dirs for
* @param clusterId the cluster ID to use when constructing paths for uninitialized volumes
* @throws IOException if directory creation fails
*/
private void createStaleDiskBalancerDirs(VolumeSet volSet, String clusterId)
throws IOException {
List<StorageVolume> volumes = volSet.getVolumesList();
for (StorageVolume volume : volumes) {
if (volume instanceof HddsVolume) {
HddsVolume hddsVolume = (HddsVolume) volume;
File staleDiskBalancerDir;

File volumeTmpDir = hddsVolume.getTmpDir();
if (volumeTmpDir != null) {
// If tmpDir is initialized, use it directly
staleDiskBalancerDir = new File(volumeTmpDir, DiskBalancerService.DISK_BALANCER_DIR);
} else {
// If tmpDir is not initialized, construct the path manually
File clusterIdDir = new File(hddsVolume.getHddsRootDir(), clusterId);
File tmpDirPath = new File(clusterIdDir, TMP_DIR_NAME);
staleDiskBalancerDir = new File(tmpDirPath, DiskBalancerService.DISK_BALANCER_DIR);
}

// Create stale directory with some content
assertTrue(staleDiskBalancerDir.mkdirs(),
"Failed to create stale diskBalancer directory: " + staleDiskBalancerDir.getAbsolutePath());
File staleContainerDir = new File(staleDiskBalancerDir, "12345");
assertTrue(staleContainerDir.mkdirs());
// Verify stale directory exists before cleanup
assertTrue(staleDiskBalancerDir.exists(),
"Stale diskBalancer directory should exist before cleanup");
}
}
}

@ContainerTestVersionInfo.ContainerTest
public void testUpdateService(ContainerTestVersionInfo versionInfo) throws Exception {
setLayoutAndSchemaForTest(versionInfo);
Expand Down Expand Up @@ -361,4 +403,116 @@ public void testDiskBalancerConfigurationThresholdValidation(double threshold,
assertEquals(expectedThreshold, config.getThreshold(), 0.0001);
}
}

@ContainerTestVersionInfo.ContainerTest
public void testDiskBalancerCleansUpStaleTmpDir(ContainerTestVersionInfo versionInfo) throws Exception {
setLayoutAndSchemaForTest(versionInfo);
// Start volumes to initialize tmp directories
volumeSet.startAllVolume();

ContainerSet containerSet = ContainerSet.newReadOnlyContainerSet(1000);
ContainerMetrics metrics = ContainerMetrics.create(conf);
KeyValueHandler keyValueHandler =
new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
metrics, c -> {
}, new ContainerChecksumTreeManager(conf));

// Create stale diskBalancer directories to simulate leftover from previous run
createStaleDiskBalancerDirs(volumeSet, scmId);

// Use actual DiskBalancerService (not TestImpl) to test the real start() method
OzoneContainer ozoneContainer = mockDependencies(containerSet, keyValueHandler, null);
DiskBalancerService svc = new DiskBalancerService(ozoneContainer, 1000, 1000,
TimeUnit.MILLISECONDS, 1, conf);

// Start the service, which should clean up stale tmp directories via cleanupTmpDir()
svc.start();

// Verify stale diskBalancer tmp directories are cleaned up
for (StorageVolume volume : volumeSet.getVolumesList()) {
if (volume instanceof HddsVolume) {
HddsVolume hddsVolume = (HddsVolume) volume;
File volumeTmpDir = hddsVolume.getTmpDir();
File diskBalancerTmpDir = new File(volumeTmpDir, DiskBalancerService.DISK_BALANCER_DIR);

// Verify stale directory is cleaned up (should not exist)
assertFalse(diskBalancerTmpDir.exists(),
"Stale diskBalancer tmp directory should be cleaned up on startup");
}
}

svc.shutdown();
}

@ContainerTestVersionInfo.ContainerTest
public void testDiskBalancerCleanupWhenTmpDirNotInitialized(ContainerTestVersionInfo versionInfo) throws Exception {
setLayoutAndSchemaForTest(versionInfo);
// Create a fresh volume set WITHOUT calling createDbInstancesForTestIfNeeded
// This simulates volumes that are formatted but tmpDir is not initialized
MutableVolumeSet testVolumeSet = new MutableVolumeSet(datanodeUuid, scmId, conf, null,
StorageVolume.VolumeType.DATA_VOLUME, null);

// Format volumes and ensure clusterID directory exists, but DON'T create tmp dirs
// This simulates the scenario where tmpDir is null
for (StorageVolume volume : testVolumeSet.getVolumesList()) {
if (volume instanceof HddsVolume) {
HddsVolume hddsVolume = (HddsVolume) volume;
// Format volume to set clusterID
hddsVolume.format(scmId);
// Manually create the clusterID directory (needed for tmpDir creation)
// but don't call createWorkingDir() or createTmpDirs()
File clusterIdDir = new File(hddsVolume.getHddsRootDir(), scmId);
if (!clusterIdDir.exists()) {
assertTrue(clusterIdDir.mkdirs(),
"Failed to create clusterID directory: " + clusterIdDir.getAbsolutePath());
}
// Verify tmpDir is null (not initialized)
assertNull(hddsVolume.getTmpDir());
}
}

// Create stale diskBalancer directories manually to simulate leftover from failed move
// This tests the scenario where stale dirs exist even though tmpDir is not initialized
createStaleDiskBalancerDirs(testVolumeSet, scmId);

ContainerSet containerSet = ContainerSet.newReadOnlyContainerSet(1000);
ContainerMetrics metrics = ContainerMetrics.create(conf);
KeyValueHandler keyValueHandler =
new KeyValueHandler(conf, datanodeUuid, containerSet, testVolumeSet,
metrics, c -> {
}, new ContainerChecksumTreeManager(conf));

// Use actual DiskBalancerService (not TestImpl) to test the real start() method
OzoneContainer ozoneContainer = mockDependencies(containerSet, keyValueHandler, null);
// Override getVolumeSet to return our test volume set
when(ozoneContainer.getVolumeSet()).thenReturn(testVolumeSet);

DiskBalancerService svc = new DiskBalancerService(ozoneContainer, 1000, 1000,
TimeUnit.MILLISECONDS, 1, conf);

// Start the service - cleanup should handle volumes with uninitialized tmpDir
// and clean up stale directories even when tmpDir is null
svc.start();

// Verify stale directories are cleaned up even though tmpDir is not initialized
List<StorageVolume> volumes = testVolumeSet.getVolumesList();
for (StorageVolume volume : volumes) {
if (volume instanceof HddsVolume) {
HddsVolume hddsVolume = (HddsVolume) volume;
// tmpDir should still be null - cleanup doesn't initialize it
assertNull(hddsVolume.getTmpDir(),
"tmpDir should not be initialized by cleanup, it will be created lazily");

// Verify stale diskBalancer directory is cleaned up
File hddsRootDir = hddsVolume.getHddsRootDir();
File expectedDiskBalancerTmpDir = new File(new File(hddsRootDir, scmId),
TMP_DIR_NAME + File.separator + DiskBalancerService.DISK_BALANCER_DIR);
assertFalse(expectedDiskBalancerTmpDir.exists(),
"Stale diskBalancer directory should be cleaned up even when tmpDir is not initialized");
}
}

svc.shutdown();
testVolumeSet.shutdown();
}
}
Loading