From a53c5051ac7bd7d6055a1ea3037a9bc2a173bb6d Mon Sep 17 00:00:00 2001 From: Andrei Nadyktov Date: Fri, 30 Aug 2024 14:27:43 +0300 Subject: [PATCH 1/6] IGNITE-22530 CDC: Add regex filters for cache names --- .../test/java/org/apache/ignite/util/CdcCommandTest.java | 3 ++- .../java/org/apache/ignite/util/CdcResendCommandTest.java | 3 ++- .../src/main/java/org/apache/ignite/cdc/CdcConsumer.java | 8 +++++--- .../main/java/org/apache/ignite/internal/cdc/CdcMain.java | 2 +- .../apache/ignite/internal/cdc/WalRecordsConsumer.java | 7 ++++--- .../test/java/org/apache/ignite/cdc/AbstractCdcTest.java | 4 ++-- .../org/apache/ignite/cdc/CdcNonDefaultWorkDirTest.java | 5 +++-- .../org/apache/ignite/cdc/CdcPushMetricsExporterTest.java | 5 +++-- .../src/test/java/org/apache/ignite/cdc/CdcSelfTest.java | 5 +++-- .../internal/ducktest/tests/cdc/CountingCdcConsumer.java | 3 ++- .../apache/ignite/internal/cdc/CdcIndexRebuildTest.java | 3 ++- .../java/org/apache/ignite/cdc/CdcConfigurationTest.java | 3 ++- 12 files changed, 31 insertions(+), 20 deletions(-) diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java index 241b3adc51eeb..6c9d2eae10c10 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java @@ -19,6 +19,7 @@ import java.io.File; import java.io.Serializable; +import java.nio.file.Path; import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -194,7 +195,7 @@ public void testDeleteLostSegmentLinksApplicationNotClosed() throws Exception { CdcConfiguration cfg = new CdcConfiguration(); cfg.setConsumer(new UserCdcConsumer() { - @Override public void start(MetricRegistry mreg) { + @Override public void start(MetricRegistry mreg, Path cdcDir) { appStarted.countDown(); } }); diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java index c333fd3ed0dac..cb17d9ba7bb94 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.util; +import java.nio.file.Path; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -217,7 +218,7 @@ synchronized List events() { } /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg) { + @Override public void start(MetricRegistry mreg, Path cdcDir) { // No-op } diff --git a/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java b/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java index 592bc71ef545b..a5a508b687b62 100644 --- a/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java +++ b/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java @@ -17,6 +17,7 @@ package org.apache.ignite.cdc; +import java.nio.file.Path; import java.util.Iterator; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteBinary; @@ -33,7 +34,7 @@ * This consumer will receive data change events during ignite-cdc process invocation. * The lifecycle of the consumer is the following: * @@ -66,8 +67,9 @@ public interface CdcConsumer { /** * Starts the consumer. * @param mreg Metric registry for consumer specific metrics. + * @param cdcDir Path to Change Data Capture Directory. */ - public void start(MetricRegistry mreg); + public void start(MetricRegistry mreg, Path cdcDir); /** * Handles entry changes events. @@ -131,7 +133,7 @@ public interface CdcConsumer { /** * Stops the consumer. - * This method can be invoked only after {@link #start(MetricRegistry)}. + * This method can be invoked only after {@link #start(MetricRegistry, Path)}. */ public void stop(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java index d7f22658fdd7a..7110b6ca8ffaa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java @@ -339,7 +339,7 @@ public void runX() throws Exception { committedSegmentOffset.value(walState.get1().fileOffset()); } - consumer.start(mreg, kctx.metric().registry(metricName("cdc", "consumer"))); + consumer.start(mreg, kctx.metric().registry(metricName("cdc", "consumer")), cdcDir); started = true; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java index 3b111d50a197e..94ce645f1dbd1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.cdc; +import java.nio.file.Path; import java.util.EnumSet; import java.util.Iterator; import java.util.NoSuchElementException; @@ -188,8 +189,8 @@ public void onCacheDestroyEvents(Iterator caches) { * @param cdcConsumerReg CDC consumer metric registry. * @throws IgniteCheckedException If failed. */ - public void start(MetricRegistryImpl cdcReg, MetricRegistryImpl cdcConsumerReg) throws IgniteCheckedException { - consumer.start(cdcConsumerReg); + public void start(MetricRegistryImpl cdcReg, MetricRegistryImpl cdcConsumerReg, Path cdcDir) throws IgniteCheckedException { + consumer.start(cdcConsumerReg, cdcDir); evtsCnt = cdcReg.longMetric(EVTS_CNT, "Count of events processed by the consumer"); lastEvtTs = cdcReg.longMetric(LAST_EVT_TIME, "Time of the last event process"); @@ -200,7 +201,7 @@ public void start(MetricRegistryImpl cdcReg, MetricRegistryImpl cdcConsumerReg) /** * Stops the consumer. - * This methods can be invoked only after {@link #start(MetricRegistryImpl, MetricRegistryImpl)}. + * This methods can be invoked only after {@link #start(MetricRegistryImpl, MetricRegistryImpl, Path)}. */ public void stop() { consumer.stop(); diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java index 6f1180389558d..c4ed4b61f9621 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java @@ -308,7 +308,7 @@ public abstract static class TestCdcConsumer implements CdcConsumer { private volatile boolean stopped; /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg) { + @Override public void start(MetricRegistry mreg, Path cdcDir) { stopped = false; } @@ -462,7 +462,7 @@ public static class TrackCacheEventsConsumer implements CdcConsumer { } /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg) { + @Override public void start(MetricRegistry mreg, Path cdcDir) { // No-op. } diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcNonDefaultWorkDirTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcNonDefaultWorkDirTest.java index cc6f520187d24..9988ccf905603 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcNonDefaultWorkDirTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcNonDefaultWorkDirTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.cdc; import java.io.File; +import java.nio.file.Path; import java.util.concurrent.CountDownLatch; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.DataStorageConfiguration; @@ -92,8 +93,8 @@ public void testCdcStartWithNonDefaultWorkDir() throws Exception { CdcConfiguration cdcCfg = new CdcConfiguration(); cdcCfg.setConsumer(new AbstractCdcTest.UserCdcConsumer() { - @Override public void start(MetricRegistry mreg) { - super.start(mreg); + @Override public void start(MetricRegistry mreg, Path cdcDir) { + super.start(mreg, cdcDir); started.countDown(); } diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java index 4b4d49790d951..feeecacf976d5 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.cdc; +import java.nio.file.Path; import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -201,10 +202,10 @@ public TestIgniteToIgniteConsumer(IgniteConfiguration destClusterCliCfg) { } /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg) { + @Override public void start(MetricRegistry mreg, Path cdcDir) { Ignite ignite = Ignition.start(destClusterCliCfg); - super.start(mreg); + super.start(mreg, cdcDir); ignite.log().info("TestIgniteToIgniteConsumer started."); } diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java index 1c2132c497bb9..f55a6e51adec2 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java @@ -19,6 +19,7 @@ import java.io.File; import java.io.Serializable; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -418,7 +419,7 @@ public void testReadOneByOneForBackup() throws Exception { // No-op. } - @Override public void start(MetricRegistry mreg) { + @Override public void start(MetricRegistry mreg, Path cdcDir) { // No-op. } }; @@ -512,7 +513,7 @@ public void testReadFromNextEntry() throws Exception { // No-op. } - @Override public void start(MetricRegistry mreg) { + @Override public void start(MetricRegistry mreg, Path cdcDir) { // No-op. } }, cfg)); diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CountingCdcConsumer.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CountingCdcConsumer.java index 8af23d0c36185..d43da27eb6fa0 100644 --- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CountingCdcConsumer.java +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CountingCdcConsumer.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.ducktest.tests.cdc; +import java.nio.file.Path; import java.util.Iterator; import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteLogger; @@ -42,7 +43,7 @@ public class CountingCdcConsumer implements CdcConsumer { private final AtomicLong objectsConsumed = new AtomicLong(); /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg) { + @Override public void start(MetricRegistry mreg, Path cdcDir) { log.info("CountingCdcConsumer started"); } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/CdcIndexRebuildTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/CdcIndexRebuildTest.java index e45cdaae45bc9..4613fd0e1b887 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/CdcIndexRebuildTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/CdcIndexRebuildTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.cdc; +import java.nio.file.Path; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -140,7 +141,7 @@ public void reset() { } /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg) { + @Override public void start(MetricRegistry mreg, Path cdcDir) { // No-op. } diff --git a/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java b/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java index 7ace260bcfc00..4675fb310fc87 100644 --- a/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java +++ b/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.cdc; +import java.nio.file.Path; import java.util.Iterator; import java.util.concurrent.CountDownLatch; import org.apache.ignite.IgniteCheckedException; @@ -123,7 +124,7 @@ public static class TestCdcConsumer implements CdcConsumer { public CountDownLatch startLatch = new CountDownLatch(1); /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg) { + @Override public void start(MetricRegistry mreg, Path cdcDir) { springString2 = ctx.getBean("springString2", String.class); startLatch.countDown(); From bc3a673837d95573e9030ca775e181457216ac22 Mon Sep 17 00:00:00 2001 From: Andrei Nadyktov Date: Sun, 18 May 2025 21:58:51 +0300 Subject: [PATCH 2/6] IGNITE-22530 Add CdcConsumerEx interface --- .../apache/ignite/util/CdcCommandTest.java | 3 +- .../ignite/util/CdcResendCommandTest.java | 3 +- .../org/apache/ignite/cdc/CdcConsumer.java | 8 ++--- .../org/apache/ignite/cdc/CdcConsumerEx.java | 34 +++++++++++++++++++ .../apache/ignite/internal/cdc/CdcMain.java | 2 +- .../internal/cdc/WalRecordsConsumer.java | 7 +++- .../apache/ignite/cdc/AbstractCdcTest.java | 4 +-- .../ignite/cdc/CdcNonDefaultWorkDirTest.java | 5 ++- .../cdc/CdcPushMetricsExporterTest.java | 5 ++- .../org/apache/ignite/cdc/CdcSelfTest.java | 5 ++- .../tests/cdc/CountingCdcConsumer.java | 3 +- .../internal/cdc/CdcIndexRebuildTest.java | 3 +- .../ignite/cdc/CdcConfigurationTest.java | 3 +- 13 files changed, 57 insertions(+), 28 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumerEx.java diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java index 6c9d2eae10c10..241b3adc51eeb 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java @@ -19,7 +19,6 @@ import java.io.File; import java.io.Serializable; -import java.nio.file.Path; import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -195,7 +194,7 @@ public void testDeleteLostSegmentLinksApplicationNotClosed() throws Exception { CdcConfiguration cfg = new CdcConfiguration(); cfg.setConsumer(new UserCdcConsumer() { - @Override public void start(MetricRegistry mreg, Path cdcDir) { + @Override public void start(MetricRegistry mreg) { appStarted.countDown(); } }); diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java index cb17d9ba7bb94..c333fd3ed0dac 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.util; -import java.nio.file.Path; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -218,7 +217,7 @@ synchronized List events() { } /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg, Path cdcDir) { + @Override public void start(MetricRegistry mreg) { // No-op } diff --git a/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java b/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java index a5a508b687b62..592bc71ef545b 100644 --- a/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java +++ b/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java @@ -17,7 +17,6 @@ package org.apache.ignite.cdc; -import java.nio.file.Path; import java.util.Iterator; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteBinary; @@ -34,7 +33,7 @@ * This consumer will receive data change events during ignite-cdc process invocation. * The lifecycle of the consumer is the following: *
    - *
  • Start of the consumer {@link #start(MetricRegistry, Path)}.
  • + *
  • Start of the consumer {@link #start(MetricRegistry)}.
  • *
  • Notification of the consumer by the {@link #onEvents(Iterator)} call.
  • *
  • Stop of the consumer {@link #stop()}.
  • *
@@ -67,9 +66,8 @@ public interface CdcConsumer { /** * Starts the consumer. * @param mreg Metric registry for consumer specific metrics. - * @param cdcDir Path to Change Data Capture Directory. */ - public void start(MetricRegistry mreg, Path cdcDir); + public void start(MetricRegistry mreg); /** * Handles entry changes events. @@ -133,7 +131,7 @@ public interface CdcConsumer { /** * Stops the consumer. - * This method can be invoked only after {@link #start(MetricRegistry, Path)}. + * This method can be invoked only after {@link #start(MetricRegistry)}. */ public void stop(); diff --git a/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumerEx.java b/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumerEx.java new file mode 100644 index 0000000000000..035fb566ed30e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumerEx.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cdc; + +import java.nio.file.Path; +import org.apache.ignite.metric.MetricRegistry; + +/** + * Extended CdcConsumer interface which provides overloaded {@link CdcConsumerEx#start(MetricRegistry, Path)} method + * required for CDC regex filters. + */ +public interface CdcConsumerEx extends CdcConsumer { + /** + * Starts the consumer. + * @param mreg Metric registry for consumer specific metrics. + * @param cdcDir Path to Change Data Capture Directory. + */ + void start(MetricRegistry mreg, Path cdcDir); +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java index 7110b6ca8ffaa..b7cbbc8c4ece9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java @@ -339,7 +339,7 @@ public void runX() throws Exception { committedSegmentOffset.value(walState.get1().fileOffset()); } - consumer.start(mreg, kctx.metric().registry(metricName("cdc", "consumer")), cdcDir); + consumer.start(mreg, kctx.metric().registry(metricName("cdc", "consumer")), ft.walCdc().toPath()); started = true; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java index 94ce645f1dbd1..3295ae5589d44 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java @@ -27,6 +27,7 @@ import org.apache.ignite.binary.BinaryType; import org.apache.ignite.cdc.CdcCacheEvent; import org.apache.ignite.cdc.CdcConsumer; +import org.apache.ignite.cdc.CdcConsumerEx; import org.apache.ignite.cdc.CdcEvent; import org.apache.ignite.cdc.TypeMapping; import org.apache.ignite.internal.pagemem.wal.WALIterator; @@ -187,10 +188,14 @@ public void onCacheDestroyEvents(Iterator caches) { * * @param cdcReg CDC metric registry. * @param cdcConsumerReg CDC consumer metric registry. + * @param cdcDir Path to Change Data Capture Directory. * @throws IgniteCheckedException If failed. */ public void start(MetricRegistryImpl cdcReg, MetricRegistryImpl cdcConsumerReg, Path cdcDir) throws IgniteCheckedException { - consumer.start(cdcConsumerReg, cdcDir); + if (consumer instanceof CdcConsumerEx) + ((CdcConsumerEx) consumer).start(cdcConsumerReg, cdcDir); + else + consumer.start(cdcConsumerReg); evtsCnt = cdcReg.longMetric(EVTS_CNT, "Count of events processed by the consumer"); lastEvtTs = cdcReg.longMetric(LAST_EVT_TIME, "Time of the last event process"); diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java index c4ed4b61f9621..6f1180389558d 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java @@ -308,7 +308,7 @@ public abstract static class TestCdcConsumer implements CdcConsumer { private volatile boolean stopped; /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg, Path cdcDir) { + @Override public void start(MetricRegistry mreg) { stopped = false; } @@ -462,7 +462,7 @@ public static class TrackCacheEventsConsumer implements CdcConsumer { } /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg, Path cdcDir) { + @Override public void start(MetricRegistry mreg) { // No-op. } diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcNonDefaultWorkDirTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcNonDefaultWorkDirTest.java index 9988ccf905603..cc6f520187d24 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcNonDefaultWorkDirTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcNonDefaultWorkDirTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.cdc; import java.io.File; -import java.nio.file.Path; import java.util.concurrent.CountDownLatch; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.DataStorageConfiguration; @@ -93,8 +92,8 @@ public void testCdcStartWithNonDefaultWorkDir() throws Exception { CdcConfiguration cdcCfg = new CdcConfiguration(); cdcCfg.setConsumer(new AbstractCdcTest.UserCdcConsumer() { - @Override public void start(MetricRegistry mreg, Path cdcDir) { - super.start(mreg, cdcDir); + @Override public void start(MetricRegistry mreg) { + super.start(mreg); started.countDown(); } diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java index feeecacf976d5..4b4d49790d951 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.cdc; -import java.nio.file.Path; import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -202,10 +201,10 @@ public TestIgniteToIgniteConsumer(IgniteConfiguration destClusterCliCfg) { } /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg, Path cdcDir) { + @Override public void start(MetricRegistry mreg) { Ignite ignite = Ignition.start(destClusterCliCfg); - super.start(mreg, cdcDir); + super.start(mreg); ignite.log().info("TestIgniteToIgniteConsumer started."); } diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java index f55a6e51adec2..1c2132c497bb9 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java @@ -19,7 +19,6 @@ import java.io.File; import java.io.Serializable; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -419,7 +418,7 @@ public void testReadOneByOneForBackup() throws Exception { // No-op. } - @Override public void start(MetricRegistry mreg, Path cdcDir) { + @Override public void start(MetricRegistry mreg) { // No-op. } }; @@ -513,7 +512,7 @@ public void testReadFromNextEntry() throws Exception { // No-op. } - @Override public void start(MetricRegistry mreg, Path cdcDir) { + @Override public void start(MetricRegistry mreg) { // No-op. } }, cfg)); diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CountingCdcConsumer.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CountingCdcConsumer.java index d43da27eb6fa0..8af23d0c36185 100644 --- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CountingCdcConsumer.java +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CountingCdcConsumer.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.ducktest.tests.cdc; -import java.nio.file.Path; import java.util.Iterator; import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteLogger; @@ -43,7 +42,7 @@ public class CountingCdcConsumer implements CdcConsumer { private final AtomicLong objectsConsumed = new AtomicLong(); /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg, Path cdcDir) { + @Override public void start(MetricRegistry mreg) { log.info("CountingCdcConsumer started"); } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/CdcIndexRebuildTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/CdcIndexRebuildTest.java index 4613fd0e1b887..e45cdaae45bc9 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/CdcIndexRebuildTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/CdcIndexRebuildTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.cdc; -import java.nio.file.Path; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -141,7 +140,7 @@ public void reset() { } /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg, Path cdcDir) { + @Override public void start(MetricRegistry mreg) { // No-op. } diff --git a/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java b/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java index 4675fb310fc87..7ace260bcfc00 100644 --- a/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java +++ b/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.cdc; -import java.nio.file.Path; import java.util.Iterator; import java.util.concurrent.CountDownLatch; import org.apache.ignite.IgniteCheckedException; @@ -124,7 +123,7 @@ public static class TestCdcConsumer implements CdcConsumer { public CountDownLatch startLatch = new CountDownLatch(1); /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg, Path cdcDir) { + @Override public void start(MetricRegistry mreg) { springString2 = ctx.getBean("springString2", String.class); startLatch.countDown(); From 246848f665f535c80df1727e7919c7b06d018d47 Mon Sep 17 00:00:00 2001 From: Andrei Nadyktov Date: Sun, 8 Jun 2025 21:57:32 +0300 Subject: [PATCH 3/6] IGNITE-22530 Add CdcRegexMatcher interface --- .../apache/ignite/cdc/CdcRegexMatcher.java | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 modules/core/src/main/java/org/apache/ignite/cdc/CdcRegexMatcher.java diff --git a/modules/core/src/main/java/org/apache/ignite/cdc/CdcRegexMatcher.java b/modules/core/src/main/java/org/apache/ignite/cdc/CdcRegexMatcher.java new file mode 100644 index 0000000000000..08aade859db1e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cdc/CdcRegexMatcher.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cdc; + +/** + * Regexp matcher that processes user's regexp patterns for CDC caches names. + */ +public interface CdcRegexMatcher { + /** + * Finds and processes match between cache name and user's regexp patterns. + * + * @param cacheName Cache name. + * @return True if cache name matches user's regexp patterns. + */ + boolean match(String cacheName); +} From ebbde3846d3d5389ac9264aa3f651ce18bc62ea7 Mon Sep 17 00:00:00 2001 From: Andrei Nadyktov Date: Wed, 9 Jul 2025 16:49:46 +0300 Subject: [PATCH 4/6] IGNITE-22530 Move CdcConsumerEx to org.apache.ignite.internal --- .../org/apache/ignite/{ => internal}/cdc/CdcConsumerEx.java | 4 +++- .../org/apache/ignite/internal/cdc/WalRecordsConsumer.java | 1 - 2 files changed, 3 insertions(+), 2 deletions(-) rename modules/core/src/main/java/org/apache/ignite/{ => internal}/cdc/CdcConsumerEx.java (94%) diff --git a/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumerEx.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerEx.java similarity index 94% rename from modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumerEx.java rename to modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerEx.java index 035fb566ed30e..57153b7018c56 100644 --- a/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumerEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerEx.java @@ -15,9 +15,11 @@ * limitations under the License. */ -package org.apache.ignite.cdc; +package org.apache.ignite.internal.cdc; import java.nio.file.Path; + +import org.apache.ignite.cdc.CdcConsumer; import org.apache.ignite.metric.MetricRegistry; /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java index 3295ae5589d44..18a21ef84ce96 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java @@ -27,7 +27,6 @@ import org.apache.ignite.binary.BinaryType; import org.apache.ignite.cdc.CdcCacheEvent; import org.apache.ignite.cdc.CdcConsumer; -import org.apache.ignite.cdc.CdcConsumerEx; import org.apache.ignite.cdc.CdcEvent; import org.apache.ignite.cdc.TypeMapping; import org.apache.ignite.internal.pagemem.wal.WALIterator; From 9241f9c16800eb6aa544ae7f01d64a830aa3ff33 Mon Sep 17 00:00:00 2001 From: Andrei Nadyktov Date: Mon, 3 Nov 2025 20:39:21 +0300 Subject: [PATCH 5/6] IGNITE-22530 Remove CdcRegexMatcher interface --- .../apache/ignite/cdc/CdcRegexMatcher.java | 31 ------------------- 1 file changed, 31 deletions(-) delete mode 100644 modules/core/src/main/java/org/apache/ignite/cdc/CdcRegexMatcher.java diff --git a/modules/core/src/main/java/org/apache/ignite/cdc/CdcRegexMatcher.java b/modules/core/src/main/java/org/apache/ignite/cdc/CdcRegexMatcher.java deleted file mode 100644 index 08aade859db1e..0000000000000 --- a/modules/core/src/main/java/org/apache/ignite/cdc/CdcRegexMatcher.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cdc; - -/** - * Regexp matcher that processes user's regexp patterns for CDC caches names. - */ -public interface CdcRegexMatcher { - /** - * Finds and processes match between cache name and user's regexp patterns. - * - * @param cacheName Cache name. - * @return True if cache name matches user's regexp patterns. - */ - boolean match(String cacheName); -} From 231a3d37c2fb5ef1ca178870132de9b7d9eddc2b Mon Sep 17 00:00:00 2001 From: Andrei Nadyktov Date: Tue, 6 Jan 2026 22:50:25 +0300 Subject: [PATCH 6/6] IGNITE-22530 Make CdcConsumerEx accept caches from IgniteConfiguration#getCacheConfiguration --- .../org/apache/ignite/internal/cdc/CdcConsumerEx.java | 6 ++++-- .../java/org/apache/ignite/internal/cdc/CdcMain.java | 10 +++++++++- .../apache/ignite/internal/cdc/WalRecordsConsumer.java | 8 +++++--- 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerEx.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerEx.java index 57153b7018c56..0510253910ad5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerEx.java @@ -18,12 +18,13 @@ package org.apache.ignite.internal.cdc; import java.nio.file.Path; +import java.util.List; import org.apache.ignite.cdc.CdcConsumer; import org.apache.ignite.metric.MetricRegistry; /** - * Extended CdcConsumer interface which provides overloaded {@link CdcConsumerEx#start(MetricRegistry, Path)} method + * Extended CdcConsumer interface which provides overloaded {@link CdcConsumerEx#start(MetricRegistry, Path, List)} method * required for CDC regex filters. */ public interface CdcConsumerEx extends CdcConsumer { @@ -31,6 +32,7 @@ public interface CdcConsumerEx extends CdcConsumer { * Starts the consumer. * @param mreg Metric registry for consumer specific metrics. * @param cdcDir Path to Change Data Capture Directory. + * @param cacheNames List of cache names. */ - void start(MetricRegistry mreg, Path cdcDir); + void start(MetricRegistry mreg, Path cdcDir, List cacheNames); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java index b7cbbc8c4ece9..a54d2933bf7e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java @@ -26,11 +26,13 @@ import java.util.Comparator; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Predicate; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -41,6 +43,7 @@ import org.apache.ignite.cdc.CdcConsumer; import org.apache.ignite.cdc.CdcEvent; import org.apache.ignite.cdc.TypeMapping; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -339,7 +342,12 @@ public void runX() throws Exception { committedSegmentOffset.value(walState.get1().fileOffset()); } - consumer.start(mreg, kctx.metric().registry(metricName("cdc", "consumer")), ft.walCdc().toPath()); + List cacheNames = Arrays.stream(igniteCfg.getCacheConfiguration()) + .map(CacheConfiguration::getName) + .collect(Collectors.toList()); + + consumer.start(mreg, kctx.metric().registry(metricName("cdc", "consumer")), ft.walCdc().toPath(), + cacheNames); started = true; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java index 18a21ef84ce96..0feab16818520 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java @@ -20,6 +20,7 @@ import java.nio.file.Path; import java.util.EnumSet; import java.util.Iterator; +import java.util.List; import java.util.NoSuchElementException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -188,11 +189,12 @@ public void onCacheDestroyEvents(Iterator caches) { * @param cdcReg CDC metric registry. * @param cdcConsumerReg CDC consumer metric registry. * @param cdcDir Path to Change Data Capture Directory. + * @param cacheNames List of cache names. * @throws IgniteCheckedException If failed. */ - public void start(MetricRegistryImpl cdcReg, MetricRegistryImpl cdcConsumerReg, Path cdcDir) throws IgniteCheckedException { + public void start(MetricRegistryImpl cdcReg, MetricRegistryImpl cdcConsumerReg, Path cdcDir, List cacheNames) throws IgniteCheckedException { if (consumer instanceof CdcConsumerEx) - ((CdcConsumerEx) consumer).start(cdcConsumerReg, cdcDir); + ((CdcConsumerEx) consumer).start(cdcConsumerReg, cdcDir, cacheNames); else consumer.start(cdcConsumerReg); @@ -205,7 +207,7 @@ public void start(MetricRegistryImpl cdcReg, MetricRegistryImpl cdcConsumerReg, /** * Stops the consumer. - * This methods can be invoked only after {@link #start(MetricRegistryImpl, MetricRegistryImpl, Path)}. + * This methods can be invoked only after {@link #start(MetricRegistryImpl, MetricRegistryImpl, Path, List)}. */ public void stop() { consumer.stop();