diff --git a/src/test/java/io/stargate/sgv2/jsonapi/api/v1/AbstractKeyspaceIntegrationTestBase.java b/src/test/java/io/stargate/sgv2/jsonapi/api/v1/AbstractKeyspaceIntegrationTestBase.java index 82a05f00bf..e5a9eaf98d 100644 --- a/src/test/java/io/stargate/sgv2/jsonapi/api/v1/AbstractKeyspaceIntegrationTestBase.java +++ b/src/test/java/io/stargate/sgv2/jsonapi/api/v1/AbstractKeyspaceIntegrationTestBase.java @@ -54,7 +54,7 @@ public abstract class AbstractKeyspaceIntegrationTestBase { /** * Access is protected via {@link #createDriverSession()} method and closed in {@link #cleanUp()}. */ - private CqlSession cqlSession; + protected CqlSession cqlSession; @BeforeAll public static void enableLog() { @@ -350,7 +350,7 @@ protected boolean executeCqlStatement(SimpleStatement... statements) { * Synchronized to avoid creating multiple sessions, performance is not a concern. Session is * closed in {@link #cleanUp()} method. */ - private synchronized CqlSession createDriverSession() { + protected synchronized CqlSession createDriverSession() { if (cqlSession == null) { int port = Integer.getInteger(IntegrationTestUtils.CASSANDRA_CQL_PORT_PROP); String dc; diff --git a/src/test/java/io/stargate/sgv2/jsonapi/api/v1/SessionEvictionIntegrationTest.java b/src/test/java/io/stargate/sgv2/jsonapi/api/v1/SessionEvictionIntegrationTest.java new file mode 100644 index 0000000000..2e13040e83 --- /dev/null +++ b/src/test/java/io/stargate/sgv2/jsonapi/api/v1/SessionEvictionIntegrationTest.java @@ -0,0 +1,202 @@ +package io.stargate.sgv2.jsonapi.api.v1; + +import static io.stargate.sgv2.jsonapi.api.v1.ResponseAssertions.*; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.CqlSessionBuilder; +import com.github.dockerjava.api.DockerClient; +import io.quarkus.logging.Log; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusIntegrationTest; +import io.stargate.sgv2.jsonapi.testresource.IsolatedDseTestResource; +import io.stargate.sgv2.jsonapi.testresource.StargateTestResource; +import java.io.IOException; +import java.net.InetSocketAddress; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; + +@QuarkusIntegrationTest +@QuarkusTestResource(IsolatedDseTestResource.class) +public class SessionEvictionIntegrationTest extends AbstractCollectionIntegrationTestBase { + + @Override + protected synchronized CqlSession createDriverSession() { + if (cqlSession == null) { + GenericContainer container = IsolatedDseTestResource.getIsolatedContainer(); + if (container == null) { + throw new IllegalStateException("Isolated container not started!"); + } + int port = container.getMappedPort(9042); + String dc; + if (StargateTestResource.isDse() || StargateTestResource.isHcd()) { + dc = "dc1"; + } else { + dc = "datacenter1"; + } + var builder = + new CqlSessionBuilder() + .withLocalDatacenter(dc) + .addContactPoint(new InetSocketAddress("localhost", port)) + .withAuthCredentials("cassandra", "cassandra"); // default admin password :) + cqlSession = builder.build(); + } + return cqlSession; + } + + @Test + public void testSessionEvictionOnAllNodesFailed() throws Exception { + + // 1. Insert initial data to ensure the database is healthy before the test + insertDoc( + """ + { + "insertOne": { + "document": { + "name": "before_crash" + } + } + } + """); + + // 2. Pause/stop the container to simulate DB failure + GenericContainer dbContainer = IsolatedDseTestResource.getIsolatedContainer(); + DockerClient dockerClient = dbContainer.getDockerClient(); + String containerId = dbContainer.getContainerId(); + Log.info("Pausing Database Container to simulate failure (Freeze)..."); + dockerClient.pauseContainerCmd(containerId).exec(); + + try { + // 3. Verify failure: The application should receive a 500 error/AllNodeFailedException + givenHeadersPostJsonThen( + """ + { + "insertOne": { + "document": { + "name": "after_crash" + } + } + } + """) + .statusCode(500) + .body("$", responseIsErrorWithStatus()); + // .body("errors[0].message", containsString("AllNodesFailedException")); + // .body("errors[0].message", containsString("No node was available")); + + } finally { + // 4. Always unpause the container in finally block to ensure cleanup + Log.info("Unpausing Database Container to simulate recovery..."); + dockerClient.unpauseContainerCmd(containerId).exec(); + } + + // 5. Wait for the database to become responsive again + Log.info("start to sleep"); + Thread.sleep(30000); + Log.info("end sleep"); + + // 6. Verify Session Recovery: The application should have evicted the bad session + // and created a new one automatically. + insertDoc( + """ + { + "insertOne": { + "document": { + "name": "after_recovery" + } + } + } + """); + + Log.info("Test Passed: Session recovered after DB restart."); + } + + /** Pauses the container using either 'podman' or 'docker' command, depending on availability. */ + private void pauseContainer(String containerId) throws IOException, InterruptedException { + if (isCommandAvailable("podman")) { + runCommand("podman", "pause", containerId); + } else if (isCommandAvailable("docker")) { + runCommand("docker", "pause", containerId); + } else { + throw new RuntimeException("Neither 'podman' nor 'docker' command found to pause container."); + } + } + + /** Unpauses the container using either 'podman' or 'docker' command. */ + private void unpauseContainer(String containerId) throws IOException, InterruptedException { + if (isCommandAvailable("podman")) { + runCommand("podman", "unpause", containerId); + } else if (isCommandAvailable("docker")) { + runCommand("docker", "unpause", containerId); + } else { + // Best effort warning if unpause fails because no command is found + System.err.println("WARNING: Could not unpause container, no container runtime found."); + } + } + + /** Checks if a shell command is available in the current environment. */ + private boolean isCommandAvailable(String command) { + try { + // Checking version is a safe, side-effect-free way to test existence + new ProcessBuilder(command, "--version").start().waitFor(); + return true; + } catch (Exception e) { + return false; + } + } + + /** Executes a shell command and waits for it to finish. */ + private void runCommand(String... command) throws IOException, InterruptedException { + ProcessBuilder pb = new ProcessBuilder(command); + pb.inheritIO(); // Prints stdout/stderr to the console for debugging + Process process = pb.start(); + int exitCode = process.waitFor(); + if (exitCode != 0) { + throw new RuntimeException( + "Command failed with exit code " + exitCode + ": " + String.join(" ", command)); + } + } + + /** Polls the database until it becomes responsive again. */ + private void waitForDbRecovery() { + System.out.println("Waiting for DB to recover..."); + long start = System.currentTimeMillis(); + long timeout = 30000; // 30 seconds timeout + + while (System.currentTimeMillis() - start < timeout) { + try { + // Perform a lightweight check (e.g., an empty find) to see if DB responds + String json = + """ + { + "find": { + "filter": {"name": "check_recovery"} + } + } + """; + + int statusCode = + io.restassured.RestAssured.given() + .port(getTestPort()) + .headers(getHeaders()) + .contentType(io.restassured.http.ContentType.JSON) + .body(json) + .when() + .post(GeneralResource.BASE_PATH + "/" + keyspaceName + "/" + collectionName) + .getStatusCode(); + + // 200 OK means the DB handled the request (even if empty result) + if (statusCode == 200) { + System.out.println("DB recovered!"); + return; + } + } catch (Exception e) { + // Ignore connection errors and continue retrying + } + + try { + Thread.sleep(500); // Poll every 500ms + } catch (InterruptedException ignored) { + } + } + throw new RuntimeException("DB failed to recover within " + timeout + "ms"); + } +} diff --git a/src/test/java/io/stargate/sgv2/jsonapi/testresource/DseTestResource.java b/src/test/java/io/stargate/sgv2/jsonapi/testresource/DseTestResource.java index 61e7fb685b..41d2be62e3 100644 --- a/src/test/java/io/stargate/sgv2/jsonapi/testresource/DseTestResource.java +++ b/src/test/java/io/stargate/sgv2/jsonapi/testresource/DseTestResource.java @@ -151,12 +151,17 @@ public Map start() { propsBuilder.put( "stargate.jsonapi.embedding.providers.vertexai.models[0].parameters[0].required", "true"); if (this.containerNetworkId.isPresent()) { - String host = System.getProperty("stargate.int-test.cassandra.host"); + String host = + env.getOrDefault( + "stargate.int-test.cassandra.host", + System.getProperty("stargate.int-test.cassandra.host")); propsBuilder.put("stargate.jsonapi.operations.database-config.cassandra-end-points", host); } else { - int port = Integer.getInteger(IntegrationTestUtils.CASSANDRA_CQL_PORT_PROP); - propsBuilder.put( - "stargate.jsonapi.operations.database-config.cassandra-port", String.valueOf(port)); + String port = + env.getOrDefault( + IntegrationTestUtils.CASSANDRA_CQL_PORT_PROP, + System.getProperty(IntegrationTestUtils.CASSANDRA_CQL_PORT_PROP)); + propsBuilder.put("stargate.jsonapi.operations.database-config.cassandra-port", port); } if (isDse() || isHcd()) { propsBuilder.put("stargate.jsonapi.operations.database-config.local-datacenter", "dc1"); diff --git a/src/test/java/io/stargate/sgv2/jsonapi/testresource/IsolatedDseTestResource.java b/src/test/java/io/stargate/sgv2/jsonapi/testresource/IsolatedDseTestResource.java new file mode 100644 index 0000000000..a21398a4f3 --- /dev/null +++ b/src/test/java/io/stargate/sgv2/jsonapi/testresource/IsolatedDseTestResource.java @@ -0,0 +1,33 @@ +package io.stargate.sgv2.jsonapi.testresource; + +import java.util.Map; +import org.testcontainers.containers.GenericContainer; + +public class IsolatedDseTestResource extends DseTestResource { + + private static GenericContainer isolatedContainer; + + @Override + public Map start() { + Map props = super.start(); + isolatedContainer = super.getCassandraContainer(); + java.util.HashMap mutableProps = new java.util.HashMap<>(props); + mutableProps.remove("stargate.int-test.cassandra.cql-port"); + return mutableProps; + } + + @Override + protected void exposeSystemProperties(Map props) { + // Do not expose system properties to avoid interfering with other tests running in parallel + } + + @Override + public void stop() { + super.stop(); + isolatedContainer = null; + } + + public static GenericContainer getIsolatedContainer() { + return isolatedContainer; + } +} diff --git a/src/test/java/io/stargate/sgv2/jsonapi/testresource/StargateTestResource.java b/src/test/java/io/stargate/sgv2/jsonapi/testresource/StargateTestResource.java index 60627c6097..a48af9bafc 100644 --- a/src/test/java/io/stargate/sgv2/jsonapi/testresource/StargateTestResource.java +++ b/src/test/java/io/stargate/sgv2/jsonapi/testresource/StargateTestResource.java @@ -83,11 +83,15 @@ public Map start() { propsBuilder.put("stargate.jsonapi.operations.vectorize-enabled", "true"); ImmutableMap props = propsBuilder.build(); - props.forEach(System::setProperty); + exposeSystemProperties(props); LOG.info("Using props map for the integration tests: %s".formatted(props)); return props; } + protected void exposeSystemProperties(Map props) { + props.forEach(System::setProperty); + } + @Override public void stop() { if (null != cassandraContainer && !cassandraContainer.isShouldBeReused()) { @@ -110,6 +114,10 @@ public static String getPersistenceModule() { "testing.containers.cluster-persistence", "persistence-cassandra-4.0"); } + public GenericContainer getCassandraContainer() { + return cassandraContainer; + } + public static boolean isDse() { String dse = System.getProperty("testing.containers.cluster-dse", null); return "true".equals(dse);