Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -173,33 +173,117 @@ public void testWalStateInMemoryCdcCluster() throws Exception {
}

/**
* Test WAL mode change for a cache group contains multiple caches.
* Test for WAL enable/disable commands.
* @throws Exception If failed.
*/
@Test
public void testWalChangeForMultiCacheGroup() throws Exception {
public void testWalManagementOperations() throws Exception {
clusterState = 0; // PDS cluster.

IgniteEx srv = startGrids(2);
srv.cluster().state(ClusterState.ACTIVE);

srv.createCache(new CacheConfiguration<>("cache1")
.setGroupName("testGroup"));
.setGroupName("group1"));
srv.createCache(new CacheConfiguration<>("cache2")
.setGroupName("testGroup"));
.setGroupName("group1"));
srv.createCache(new CacheConfiguration<>("cache3")
.setGroupName("group2"));
srv.createCache("cache4");

assertEquals(EXIT_CODE_OK, execute("--wal", "state", "--groups", "group1,group2,cache4"));
outputContains(".*group1.*true.*true.*true.*true.*false");
outputContains(".*group2.*true.*true.*true.*true.*false");
outputContains(".*cache4.*true.*true.*true.*true.*false");

assertEquals(EXIT_CODE_OK, execute("--wal", "disable", "--groups", "group1"));
outputContains("Successfully disabled WAL for groups:");
outputContains("group1");

assertEquals(EXIT_CODE_OK, execute("--wal", "state", "--groups", "group1"));
outputContains(".*group1.*true.*false.*true.*true.*false");

assertEquals(EXIT_CODE_OK, execute("--wal", "disable", "--groups", "group1,group2"));
outputContains("Successfully disabled WAL for groups:");
outputContains("group1");
outputContains("group2");

assertEquals(EXIT_CODE_OK, execute("--wal", "state", "--groups", "group1,group2"));
outputContains(".*group1.*true.*false.*true.*true.*false");
outputContains(".*group2.*true.*false.*true.*true.*false");

assertEquals(EXIT_CODE_OK, execute("--wal", "disable", "--groups", "cache4,nonExistentGroup"));
outputContains("Successfully disabled WAL for groups:");
outputContains("cache4");
outputContains("Errors occurred:");
outputContains("Cache group not found: nonExistentGroup");

assertEquals(EXIT_CODE_OK, execute("--wal", "state", "--groups", "cache4"));
outputContains(".*cache4.*true.*false.*true.*true.*false");

//Error when using cache name instead of group name
assertEquals(EXIT_CODE_OK, execute("--wal", "enable", "--groups", "cache3"));
outputContains("Errors occurred:");
outputContains("Cache group not found: cache3");

assertEquals(EXIT_CODE_OK, execute("--wal", "enable", "--groups", "group2,cache4"));
outputContains("Successfully enabled WAL for groups:");
outputContains("group2");
outputContains("cache4");

assertEquals(EXIT_CODE_OK, execute("--wal", "state", "--groups", "group2,cache4"));
outputContains(".*group2.*true.*true.*true.*true.*false");
outputContains(".*cache4.*true.*true.*true.*true.*false");

assertEquals(EXIT_CODE_OK, execute("--wal", "disable"));
outputContains("Successfully disabled WAL for groups:");
outputContains("group1");
outputContains("group2");
outputContains("cache4");

assertEquals(EXIT_CODE_OK, execute("--wal", "state", "--groups", "testGroup"));
outputContains(".*testGroup.*true.*true.*true.*true.*false");
assertEquals(EXIT_CODE_OK, execute("--wal", "state"));
outputContains(".*group1.*true.*false.*true.*true.*false");
outputContains(".*group2.*true.*false.*true.*true.*false");
outputContains(".*cache4.*true.*false.*true.*true.*false");

assertEquals(EXIT_CODE_OK, execute("--wal", "enable"));
outputContains("Successfully enabled WAL for groups:");
outputContains("group1");
outputContains("group2");
outputContains("cache4");

assertEquals(EXIT_CODE_OK, execute("--wal", "disable", "--groups", "testGroup"));
assertEquals(EXIT_CODE_OK, execute("--wal", "state"));
outputContains(".*group1.*true.*true.*true.*true.*false");
outputContains(".*group2.*true.*true.*true.*true.*false");
outputContains(".*cache4.*true.*true.*true.*true.*false");
}

/**
* Test WAL mode change attempts for non-persistent cache groups.
* @throws Exception If failed.
*/
@Test
public void testWalChangeForNonPersistentCaches() throws Exception {
IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(0));
cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setPersistenceEnabled(false)));

IgniteEx srv = startGrid(cfg);
srv.cluster().state(ClusterState.ACTIVE);

srv.createCache("cache1");

assertEquals(EXIT_CODE_OK, execute("--wal", "state", "--groups", "testGroup"));
outputContains(".*testGroup.*true.*false.*true.*true.*false");
assertEquals(EXIT_CODE_OK, execute("--wal", "enable", "--groups", "cache1"));
outputContains("Errors occurred:");
outputContains("cache1.*Cannot change WAL mode because persistence is not enabled for cache\\(s\\)");

assertEquals(EXIT_CODE_OK, execute("--wal", "enable", "--groups", "testGroup"));
assertEquals(EXIT_CODE_OK, execute("--wal", "state", "--groups", "cache1"));
outputContains(".*cache1.*false.*false.*true.*true.*false");

assertEquals(EXIT_CODE_OK, execute("--wal", "state", "--groups", "testGroup"));
outputContains(".*testGroup.*true.*true.*true.*true.*false");
assertEquals(EXIT_CODE_OK, execute("--wal", "disable", "--groups", "cache1"));
outputContains("Errors occurred:");
outputContains("cache1.*Cannot change WAL mode because persistence is not enabled for cache\\(s\\)");
}

/** */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.ignite.internal.management.wal;

import java.util.List;
import java.util.function.Consumer;
import org.apache.ignite.internal.management.api.ComputeCommand;

/** */
public class WalDisableCommand implements ComputeCommand<WalDisableCommand.WalDisableCommandArg, Void> {
public class WalDisableCommand implements ComputeCommand<WalDisableCommand.WalDisableCommandArg, WalSetStateTaskResult> {
/** {@inheritDoc} */
@Override public Class<WalSetStateTask> taskClass() {
return WalSetStateTask.class;
Expand All @@ -41,6 +43,25 @@ public class WalDisableCommand implements ComputeCommand<WalDisableCommand.WalDi
return "Are you sure? Any node failure without WAL can lead to the loss of all PDS data. CDC events will be lost without WAL.";
}

/** {@inheritDoc} */
@Override public void printResult(WalDisableCommandArg arg, WalSetStateTaskResult res, Consumer<String> printer) {
String operation = arg instanceof WalEnableCommand.WalEnableCommandArg ? "enable" : "disable";
List<String> successGrps = res.successGroups();
List<String> errors = res.errorMessages();

if (!successGrps.isEmpty()) {
printer.accept("Successfully " + operation + "d WAL for groups:");
for (String grp : successGrps)
printer.accept(" " + grp);
}

if (errors != null && !errors.isEmpty()) {
printer.accept("Errors occurred:");
for (String error : errors)
printer.accept(" " + error);
}
}

/** */
public static class WalDisableCommandArg extends WalStateCommandArg {
/** */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

package org.apache.ignite.internal.management.wal;

import java.util.function.Consumer;
import org.apache.ignite.internal.management.api.ComputeCommand;
import org.apache.ignite.internal.management.wal.WalDisableCommand.WalDisableCommandArg;

/** */
public class WalEnableCommand implements ComputeCommand<WalDisableCommandArg, Void> {
public class WalEnableCommand implements ComputeCommand<WalDisableCommandArg, WalSetStateTaskResult> {
/** {@inheritDoc} */
@Override public Class<WalSetStateTask> taskClass() {
return WalSetStateTask.class;
Expand All @@ -37,6 +38,12 @@ public class WalEnableCommand implements ComputeCommand<WalDisableCommandArg, Vo
return WalEnableCommandArg.class;
}

/** {@inheritDoc} */
@Override public void printResult(WalDisableCommandArg arg, WalSetStateTaskResult res, Consumer<String> printer) {
WalDisableCommand cmd = new WalDisableCommand();
cmd.printResult(arg, res, printer);
}

/** */
public static class WalEnableCommandArg extends WalDisableCommandArg {
/** */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.management.wal;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand All @@ -32,22 +33,42 @@
import org.jetbrains.annotations.Nullable;

/** */
public class WalSetStateTask extends VisorMultiNodeTask<WalDisableCommandArg, Void, Void> {
public class WalSetStateTask extends VisorMultiNodeTask<WalDisableCommandArg, WalSetStateTaskResult, WalSetStateTaskResult> {
/** */
private static final long serialVersionUID = 0;

/** {@inheritDoc} */
@Override protected VisorJob<WalDisableCommandArg, Void> job(WalDisableCommandArg arg) {
return new WalDisableJob(arg, false);
@Override protected VisorJob<WalDisableCommandArg, WalSetStateTaskResult> job(WalDisableCommandArg arg) {
return new WalDisableJob(arg, debug);
}

/** {@inheritDoc} */
@Override protected @Nullable Void reduce0(List<ComputeJobResult> res) throws IgniteException {
return null;
@Override protected @Nullable WalSetStateTaskResult reduce0(List<ComputeJobResult> res) throws IgniteException {
Set<String> successGrps = new HashSet<>();
List<String> errors = new ArrayList<>();

for (ComputeJobResult jobRes : res) {
if (jobRes.getException() != null) {
Throwable e = jobRes.getException();
errors.add("Node " + jobRes.getNode().consistentId() + ": Task execution failed - " + e.getMessage());
}
else {
WalSetStateTaskResult result = jobRes.getData();
if (result.successGroups() != null)
successGrps.addAll(result.successGroups());
if (!Boolean.TRUE.equals(result.success()) && result.errorMessages() != null)
errors.addAll(result.errorMessages());
}
}

if (errors.isEmpty())
return new WalSetStateTaskResult(new ArrayList<>(successGrps));
else
return new WalSetStateTaskResult(new ArrayList<>(successGrps), errors);
}

/** */
private static class WalDisableJob extends VisorJob<WalDisableCommandArg, Void> {
private static class WalDisableJob extends VisorJob<WalDisableCommandArg, WalSetStateTaskResult> {
/** */
private static final long serialVersionUID = 0;

Expand All @@ -57,22 +78,50 @@ protected WalDisableJob(@Nullable WalDisableCommandArg arg, boolean debug) {
}

/** {@inheritDoc} */
@Override protected Void run(@Nullable WalDisableCommandArg arg) throws IgniteException {
Set<String> grps = F.isEmpty(arg.groups()) ? null : new HashSet<>(Arrays.asList(arg.groups()));
@Override protected WalSetStateTaskResult run(@Nullable WalDisableCommandArg arg) throws IgniteException {
Set<String> requestedGrps = F.isEmpty(arg.groups()) ? null : new HashSet<>(Arrays.asList(arg.groups()));
boolean isEnable = arg instanceof WalEnableCommandArg;
List<String> successGrps = new ArrayList<>();
List<String> errors = new ArrayList<>();

try {
Set<String> availableGrps = new HashSet<>();

for (CacheGroupContext gctx : ignite.context().cache().cacheGroups()) {
String grpName = gctx.cacheOrGroupName();
for (CacheGroupContext gctx : ignite.context().cache().cacheGroups()) {
String grpName = gctx.cacheOrGroupName();
availableGrps.add(grpName);

if (grps != null && !grps.contains(grpName))
continue;
if (requestedGrps != null && !requestedGrps.contains(grpName))
continue;

if (arg instanceof WalEnableCommandArg)
ignite.cluster().enableWal(grpName);
try {
if (isEnable)
ignite.cluster().enableWal(grpName);
else
ignite.cluster().disableWal(grpName);

successGrps.add(grpName);
}
catch (Exception e) {
errors.add("Failed to " + (isEnable ? "enable" : "disable") +
" WAL for cache group: " + grpName + " - " + e.getMessage());
}
}

for (String requestedGrp : requestedGrps) {
if (!availableGrps.contains(requestedGrp))
errors.add("Cache group not found: " + requestedGrp);
}

if (errors.isEmpty())
return new WalSetStateTaskResult(successGrps);
else
ignite.cluster().disableWal(grpName);
return new WalSetStateTaskResult(successGrps, errors);
}
catch (Exception e) {
errors.add("Failed to execute operation - " + e.getMessage());
return new WalSetStateTaskResult(successGrps, errors);
}

return null;
}
}
}
Loading