From f9f71d8dc435ea44e8acd127d32d12c5ccc396e3 Mon Sep 17 00:00:00 2001 From: lushiji Date: Tue, 4 Nov 2025 11:03:44 +0800 Subject: [PATCH] [feat][client] PCIP-5 Support Pull Consumer --- pcip/pcip-5.md | 179 +++++++++++ pom.xml | 1 + pulsar-client-common-contrib/pom.xml | 12 + .../pulsar/client/api/PulsarPullConsumer.java | 99 ++++++ .../api/impl/PulsarPullConsumerImpl.java | 296 ++++++++++++++++++ .../pulsar/client/common/Constants.java | 22 ++ .../pulsar/client/common/ConsumeStats.java | 34 ++ .../pulsar/client/common/PullRequest.java | 83 +++++ .../pulsar/client/common/PullResponse.java | 29 ++ .../pulsar/client/common/package-info.java} | 12 +- .../client/util/OffsetToMessageIdCache.java | 107 +++++++ .../util/OffsetToMessageIdCacheProvider.java | 27 ++ .../pulsar/client/util/PulsarAdminUtils.java | 142 +++++++++ .../pulsar/client/util/ReaderCache.java | 213 +++++++++++++ .../client/util/ReaderCacheProvider.java | 41 +++ .../pulsar/client/util/package-info.java | 14 + .../src/test/java/PulsarPullConsumerTest.java | 256 +++++++++++++++ 17 files changed, 1556 insertions(+), 11 deletions(-) create mode 100644 pcip/pcip-5.md create mode 100644 pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/api/PulsarPullConsumer.java create mode 100644 pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/api/impl/PulsarPullConsumerImpl.java create mode 100644 pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/Constants.java create mode 100644 pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/ConsumeStats.java create mode 100644 pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/PullRequest.java create mode 100644 pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/PullResponse.java rename pulsar-client-common-contrib/src/{test/java/DemoTest.java => main/java/org/apache/pulsar/client/common/package-info.java} (75%) create mode 100644 pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/OffsetToMessageIdCache.java create mode 100644 pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/OffsetToMessageIdCacheProvider.java create mode 100644 pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/PulsarAdminUtils.java create mode 100644 pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/ReaderCache.java create mode 100644 pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/ReaderCacheProvider.java create mode 100644 pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/package-info.java create mode 100644 pulsar-client-common-contrib/src/test/java/PulsarPullConsumerTest.java diff --git a/pcip/pcip-5.md b/pcip/pcip-5.md new file mode 100644 index 0000000..33d4be3 --- /dev/null +++ b/pcip/pcip-5.md @@ -0,0 +1,179 @@ +# PCIP-5: Pull Consumer Implementation for Apache Pulsar + +# Background knowledge + +- **Pulsar Consumers**: Pulsar currently supports push-based consumption models (exclusive/shared/failover/key-shared). + This proposal adds pull-based consumption. +- **Message Positioning**: Pulsar uses composite MessageIDs (ledgerId + entryId + partitionId), while systems like + Kafka/RocketMQ use monotonic offsets. +- **Offset Mapping**: https://github.com/apache/pulsar/pull/24220 can be used to convert between offsets and Pulsar's + MessageIDs. + +# Motivation + +System Migration Requirement: The organization plans to migrate from RocketMQ to Pulsar, requiring a unified MQ client +abstraction layer to conceal implementation details and support seamless engine replacement. + +Interface Compatibility Issues: + +- Pulsar lacks a native offset retrieval interface (pull/fetch model). +- RocketMQ/Kafka use monotonically increasing numeric offsets to locate messages, whereas Pulsar employs a composite + MessageID (ledgerId + entryId + partitionId). + +Objective: Implement a RocketMQ-like Pull Consumer to support precise offset control and reduce migration costs. + +# Goals + +## In Scope + +| Goal | Description | +|----------------------------|---------------------------------------------------------------------------| +| **Precise Offset Control** | Supports specifying partition, offset, pull count, and byte size. | +| **Resource Efficiency** | Reuses Reader connections with LRU cache management. | +| **Easy Integration** | Compatible with Pulsar’s existing API, requiring no Broker modifications. | + +## Out of Scope + +NA + +# High Level Design + +```mermaid +graph TD + A[PullConsumer] -->|pull| B[Offset Mapping] + B -->|convert| C[MessageID] + C -->|seek| D[Reader Pool] + D -->|fetch| E[Broker] + A -->|ack| F[Offset-Based Ack] + F -->|convert| G[MessageID] + G -->|cumulative ack| H[Consumer] +``` + +Key components: + +1. **`PulsarPullConsumer` interface**: Standard pull consumer API +2. **Offset ↔ MessageID Cache**: Partition-scoped mapping layer +3. **Reader Pool**: Managed resource pool with LRU eviction +4. **Partition Locking**: Thread-safe access coordination + +## Detailed Design + +### Design & Implementation Details + +**Core Interface** `PulsarPullConsumer`: + +```java +public interface PulsarPullConsumer extends AutoCloseable { + void start() throws PulsarClientException; + + List> pull(PullRequest request); + + void ack(long offset, int partition) throws PulsarClientException; + + long searchOffset(int partition, long timestamp) throws PulsarAdminException; + + long getConsumeStats(int partition) throws PulsarAdminException; + + class PullRequest { + private long offset; + private int partition; + private int maxMessages; + private int maxBytes; + private Duration timeout; + } +} +``` + +**Reader Management** : + +```java +public class ReaderCache { + private final Map>> readerCacheMap; + + public Reader getReader(String topic, long offset) { + // 1. Acquire partition lock + // 2. Get/create LRU cache (default: max 100 readers/partition) + // 3. Remove reader from cache for exclusive use + } + + public void releaseReader(String topic, long nextOffset, Reader reader) { + // Return reader to cache if still connected + } +} +``` + +**Offset Mapping**: + +```java +public class OffsetToMessageIdCache { + private final Map> partitionCaches; + + public MessageId getMessageIdByOffset(String topic, long offset) { + // 1. Check caffeine cache (default: max 1000 entries/partition) + // 2. On cache miss: pulsarAdmin.topics().getMessageIDByOffset() + // 3. Populate cache + } +} +``` + +### Public-facing Changes + +#### Public API + +**New Interfaces**: + +```java +// Entry point +PulsarPullConsumer pullConsumer1 = new PulsarPullConsumerImpl<>( + nonPartitionedTopic, subscription, + brokerCluster, Schema.BYTES, + pulsarClient, pulsarAdmin); + +// Usage +List> messages = pullConsumer1.pull( + PulsarPullConsumer.PullRequest.builder() + .offset(offset) + .partition(PulsarPullConsumer.PARTITION_NONE) + .maxMessages(10) + .maxBytes(1024 * 1024) + .timeout(java.time.Duration.ofSeconds(10)) + .build()); +``` + +## Get Started + +### Quick Start + +```java +// 1. Create pull consumer +PulsarPullConsumer pullConsumer1 = new PulsarPullConsumerImpl<>( + nonPartitionedTopic, subscription, + brokerCluster, Schema.BYTES, + pulsarClient, pulsarAdmin); + +consumer.start(); // Initialize connections + +// 2. Pull messages from partition 0 starting at offset 200 +List> batch = pullConsumer1.pull( + PulsarPullConsumer.PullRequest.builder() + .offset(offset) + .partition(PulsarPullConsumer.PARTITION_NONE) + .maxMessages(10) + .maxBytes(1024 * 1024) + .timeout(java.time.Duration.ofSeconds(10)) + .build()); +); + +// 3. Process messages batch. + +forEach(msg ->{ + System.out.println("Received: "+new String(msg.getData())); + // Store last offset + }); + +// 4. Acknowledge up to last offset +consumer.ack(250L,0); + +// 5. Close resources +consumer.close(); +``` \ No newline at end of file diff --git a/pom.xml b/pom.xml index e65edca..7573376 100644 --- a/pom.xml +++ b/pom.xml @@ -62,6 +62,7 @@ 1.20.1 4.13.1 5.12.0 + 3.2.0 diff --git a/pulsar-client-common-contrib/pom.xml b/pulsar-client-common-contrib/pom.xml index db36c7d..5c5723d 100644 --- a/pulsar-client-common-contrib/pom.xml +++ b/pulsar-client-common-contrib/pom.xml @@ -26,9 +26,21 @@ pulsar-client-common-contrib + + com.github.ben-manes.caffeine + caffeine + ${caffeine.version} + + + org.apache.pulsar + pulsar-client-admin + ${pulsar.version} + test + org.apache.pulsar pulsar-client-all + ${pulsar.version} 2024 diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/api/PulsarPullConsumer.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/api/PulsarPullConsumer.java new file mode 100644 index 0000000..ea63939 --- /dev/null +++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/api/PulsarPullConsumer.java @@ -0,0 +1,99 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.client.api; + +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.common.ConsumeStats; +import org.apache.pulsar.client.common.PullRequest; +import org.apache.pulsar.client.common.PullResponse; +import org.apache.pulsar.common.api.proto.CommandAck; + +/** + * Pull-based consumer interface with enhanced offset management capabilities. + * + *

Features: + * + *

    + *
  • Precise offset control with partition-aware operations + *
  • Thread-safe design for concurrent access + *
  • Support for both partitioned and non-partitioned topics + *
  • Built-in offset to message ID mapping + *
+ * + * @param message payload type + */ +public interface PulsarPullConsumer extends AutoCloseable { + + /** + * Initializes consumer resources and establishes connections. + * + * @throws PulsarClientException if client initialization fails + */ + void start() throws PulsarClientException; + + /** + * Pulls messages from the specified partition starting from the given offset. + * + * @param request pull request configuration + * @return immutable list of messages starting from the specified offset + * @throws IllegalArgumentException for invalid request parameters + */ + PullResponse pull(PullRequest request); + + /** + * Acknowledges all messages up to the specified offset (inclusive). + * + * @param offset target offset to acknowledge + * @param partition partition index (use -1 for non-partitioned topics) + * @throws PulsarClientException for acknowledgment failures + * @throws IllegalArgumentException for invalid partition index + */ + void ack(long offset, int partition) throws PulsarClientException; + + /** + * Acknowledges all messages up to the specified offset (inclusive). + * + * @param offset target offset to acknowledge + * @param partition partition index (use -1 for non-partitioned topics) + * @param ackType ackType Individual(0),Cumulative(1); + * @throws PulsarClientException for acknowledgment failures + * @throws IllegalArgumentException for invalid partition index + */ + void ack(long offset, int partition, CommandAck.AckType ackType) throws PulsarClientException; + + /** + * Finds the latest message offset before or at the specified timestamp. + * + * @param partition partition index (use -1 for non-partitioned topics) + * @param timestamp target timestamp in milliseconds + * @return corresponding message offset + * @throws PulsarAdminException for admin operation failures + * @throws IllegalArgumentException for invalid partition index + */ + long searchOffset(int partition, long timestamp) throws PulsarAdminException; + + /** + * Retrieves consumption statistics for the specified partition. + * + * @param partition partition index (use -1 for non-partitioned topics) + * @return current consumption offset + * @throws PulsarAdminException for stats retrieval failures + * @throws IllegalArgumentException for invalid partition index + */ + ConsumeStats getConsumeStats(int partition) throws PulsarAdminException; + + /** Releases all resources and closes connections gracefully. */ + @Override + void close(); +} diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/api/impl/PulsarPullConsumerImpl.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/api/impl/PulsarPullConsumerImpl.java new file mode 100644 index 0000000..74bba7b --- /dev/null +++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/api/impl/PulsarPullConsumerImpl.java @@ -0,0 +1,296 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.client.api.impl; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarPullConsumer; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.common.ConsumeStats; +import org.apache.pulsar.client.common.PullRequest; +import org.apache.pulsar.client.common.PullResponse; +import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; +import org.apache.pulsar.client.impl.TopicMessageIdImpl; +import org.apache.pulsar.client.util.OffsetToMessageIdCache; +import org.apache.pulsar.client.util.OffsetToMessageIdCacheProvider; +import org.apache.pulsar.client.util.PulsarAdminUtils; +import org.apache.pulsar.client.util.ReaderCache; +import org.apache.pulsar.client.util.ReaderCacheProvider; +import org.apache.pulsar.common.api.proto.CommandAck; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PulsarPullConsumerImpl implements PulsarPullConsumer { + private static final Logger log = LoggerFactory.getLogger(PulsarPullConsumerImpl.class); + private static final String PARTITION_SPLICER = "-partition-"; + private static final int DEFAULT_READ_TIMEOUT_MS = 30_000; + + private final String topic; + private final String subscription; + private final String brokerCluster; + private final Schema schema; + private final Map> consumerMap; + private final OffsetToMessageIdCache offsetToMessageIdCache; + private final ReaderCache readerCache; + private final PulsarAdmin pulsarAdmin; + private final Supplier pulsarClientSupplier; + private final ConsumerBuilder consumerBuilder; + + private volatile int partitionCount; + + public PulsarPullConsumerImpl( + String topic, + String subscription, + String brokerCluster, + Schema schema, + Supplier clientSupplier, + PulsarAdmin admin, + ConsumerBuilder consumerBuilder) { + this.topic = Objects.requireNonNull(topic, "Topic must not be null"); + this.subscription = Objects.requireNonNull(subscription, "Subscription must not be null"); + this.brokerCluster = Objects.requireNonNull(brokerCluster, "Broker cluster must not be null"); + this.schema = Objects.requireNonNull(schema, "Schema must not be null"); + this.pulsarClientSupplier = + Objects.requireNonNull(clientSupplier, "PulsarClient must not be null"); + this.pulsarAdmin = Objects.requireNonNull(admin, "PulsarAdmin must not be null"); + this.consumerMap = new ConcurrentHashMap<>(); + this.offsetToMessageIdCache = + OffsetToMessageIdCacheProvider.getOrCreateCache(admin, brokerCluster); + this.readerCache = + ReaderCacheProvider.getOrCreateReaderCache( + this.subscription, brokerCluster, schema, clientSupplier.get(), offsetToMessageIdCache); + this.consumerBuilder = + consumerBuilder == null + ? getPulsarClient().newConsumer(schema).subscriptionType(SubscriptionType.Failover) + : consumerBuilder; + } + + @Override + public void start() throws PulsarClientException { + try { + initializePartitions(); + } catch (PulsarAdminException e) { + throw new PulsarClientException("Failed to initialize partitions", e); + } + } + + private void initializePartitions() throws PulsarAdminException, PulsarClientException { + PartitionedTopicMetadata metadata = pulsarAdmin.topics().getPartitionedTopicMetadata(topic); + this.partitionCount = metadata.partitions; + + if (partitionCount == 0) { + subscribeToTopic(topic); + return; + } + + for (int i = 0; i < partitionCount; i++) { + String partitionTopic = buildPartitionTopic(topic, i); + subscribeToTopic(partitionTopic); + } + } + + private void subscribeToTopic(String topicName) throws PulsarClientException { + Consumer consumer = + consumerBuilder.topic(topicName).subscriptionName(subscription).subscribe(); + consumerMap.put(topicName, consumer); + log.debug("Subscribed to topic: {}", topicName); + } + + private PulsarClient getPulsarClient() { + return Objects.requireNonNull( + pulsarClientSupplier.get(), + "PulsarClient supplier returned null. Ensure PulsarClient is properly initialized."); + } + + @Override + public PullResponse pull(PullRequest request) { + validatePullParameters(request.getMaxMessages(), request.getMaxBytes()); + + String partitionTopic = buildPartitionTopic(topic, request.getPartition()); + Reader reader = null; + Message lastMessage = null; + try { + reader = readerCache.getReader(partitionTopic, request.getOffset()); + List> messages = + readMessages( + reader, request.getMaxMessages(), request.getMaxBytes(), request.getTimeout()); + lastMessage = messages.isEmpty() ? null : messages.get(messages.size() - 1); + return new PullResponse<>(reader.hasMessageAvailable(), messages); + } catch (PulsarClientException e) { + log.error( + "Failed to pull messages from topic {} at offset {}", + partitionTopic, + request.getOffset(), + e); + return new PullResponse<>(false, Collections.emptyList()); + } finally { + if (reader != null) { + releaseReader( + partitionTopic, + reader, + lastMessage != null ? lastMessage.getIndex().get() + 1 : request.getOffset()); + } + } + } + + private List> readMessages( + Reader reader, int maxMessages, int maxBytes, Duration timeout) { + List> messages = new ArrayList<>(Math.min(maxMessages, 1024)); + int totalBytes = 0; + long deadline = System.nanoTime() + timeout.toNanos(); + + while (messages.size() < maxMessages && totalBytes < maxBytes) { + long remaining = deadline - System.nanoTime(); + if (remaining <= 0) { + break; + } + + try { + Message msg = + reader.readNext( + (int) Math.min(TimeUnit.NANOSECONDS.toMillis(remaining), DEFAULT_READ_TIMEOUT_MS), + TimeUnit.MILLISECONDS); + if (msg == null) { + break; + } + + messages.add(msg); + totalBytes += msg.getData().length; + } catch (PulsarClientException e) { + log.warn("Error reading message from {}", reader.getTopic(), e); + break; + } + } + return Collections.unmodifiableList(messages); + } + + @Override + public void ack(long offset, int partition) throws PulsarClientException { + ack(offset, partition, CommandAck.AckType.Cumulative); + } + + @Override + public void ack(long offset, int partition, CommandAck.AckType ackType) + throws PulsarClientException { + String partitionTopic = buildPartitionTopic(topic, partition); + Consumer consumer = consumerMap.get(partitionTopic); + if (consumer == null) { + throw new PulsarClientException("Consumer not found for partition: " + partition); + } + + MessageId messageId = offsetToMessageIdCache.getMessageIdByOffset(partitionTopic, offset); + if (messageId == null) { + throw new PulsarClientException("MessageID not found for offset: " + offset); + } + + if (ackType == CommandAck.AckType.Individual) { + TopicMessageIdImpl topicMessageId = + new TopicMessageIdImpl(partitionTopic, (MessageIdAdv) messageId); + consumer.acknowledge(topicMessageId); + } else { + if (consumer instanceof MultiTopicsConsumerImpl) { + TopicMessageIdImpl topicMessageId = + new TopicMessageIdImpl(partitionTopic, (MessageIdAdv) messageId); + consumer.acknowledgeCumulative(topicMessageId); + } else { + consumer.acknowledgeCumulative(messageId); + } + } + } + + @Override + public long searchOffset(int partition, long timestamp) throws PulsarAdminException { + String partitionTopic = buildPartitionTopic(topic, partition); + return PulsarAdminUtils.searchOffset(partitionTopic, timestamp, brokerCluster, pulsarAdmin); + } + + @Override + public ConsumeStats getConsumeStats(int partition) throws PulsarAdminException { + String partitionTopic = buildPartitionTopic(topic, partition); + return PulsarAdminUtils.getConsumeStats( + partitionTopic, partition, subscription, brokerCluster, pulsarAdmin); + } + + @Override + public void close() { + closeResources(); + } + + private void closeResources() { + consumerMap + .values() + .forEach( + consumer -> { + try { + consumer.close(); + } catch (PulsarClientException e) { + log.warn("Failed to close consumer for topic {}", consumer.getTopic(), e); + } + }); + + try { + offsetToMessageIdCache.cleanup(); + } catch (Exception e) { + log.warn("Error cleaning offset cache", e); + } + } + + private void validatePullParameters(int maxMessages, int maxBytes) { + if (maxMessages <= 0) { + throw new IllegalArgumentException("maxMessages must be positive"); + } + if (maxBytes <= 0) { + throw new IllegalArgumentException("maxBytes must be positive"); + } + } + + private String buildPartitionTopic(String baseTopic, int partition) { + if (partitionCount > 0 && partition >= partitionCount) { + throw new IllegalArgumentException( + String.format( + "Invalid partition %d for topic %s with %d partitions", + partition, baseTopic, partitionCount)); + } + return partitionCount == 0 ? baseTopic : baseTopic + PARTITION_SPLICER + partition; + } + + private void releaseReader(String topicPartition, Reader reader, long nextOffset) { + try { + if (reader.isConnected()) { + readerCache.releaseReader(topicPartition, nextOffset, reader); + } + } catch (Exception e) { + log.warn("Error releasing reader for {}", topicPartition, e); + } + } +} diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/Constants.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/Constants.java new file mode 100644 index 0000000..a40fc05 --- /dev/null +++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/Constants.java @@ -0,0 +1,22 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.client.common; + +import java.time.Duration; + +public class Constants { + public static final int PARTITION_NONE_INDEX = -1; + public static final Duration DEFAULT_OPERATION_TIMEOUT = Duration.ofSeconds(30); + public static final String PARTITIONED_TOPIC_SUFFIX = "-partitioned-"; +} diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/ConsumeStats.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/ConsumeStats.java new file mode 100644 index 0000000..58b19b8 --- /dev/null +++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/ConsumeStats.java @@ -0,0 +1,34 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.client.common; + +import lombok.Data; + +@Data +public class ConsumeStats { + private String topic; + private String group; + private long minOffset; + private long maxOffset; + private long lastConsumedOffset; + + public ConsumeStats( + String topic, String group, long minOffset, long maxOffset, long lastConsumedOffset) { + this.topic = topic; + this.group = group; + this.minOffset = minOffset; + this.maxOffset = maxOffset; + this.lastConsumedOffset = lastConsumedOffset; + } +} diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/PullRequest.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/PullRequest.java new file mode 100644 index 0000000..02179a6 --- /dev/null +++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/PullRequest.java @@ -0,0 +1,83 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.client.common; + +import java.time.Duration; +import lombok.Data; + +/** Configuration object for pull requests. */ +@Data +public class PullRequest { + private final long offset; + private final int partition; + private final int maxMessages; + private final int maxBytes; + private final Duration timeout; + + private PullRequest(Builder builder) { + this.offset = builder.offset; + this.partition = builder.partition; + this.maxMessages = builder.maxMessages; + this.maxBytes = builder.maxBytes; + this.timeout = builder.timeout; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private long offset = -1L; + private int partition = Constants.PARTITION_NONE_INDEX; + private int maxMessages = 100; + private int maxBytes = 10_485_760; // 10MB + private Duration timeout = Constants.DEFAULT_OPERATION_TIMEOUT; + + public Builder offset(long offset) { + this.offset = offset; + return this; + } + + public Builder partition(int partition) { + this.partition = partition; + return this; + } + + public Builder maxMessages(int maxMessages) { + this.maxMessages = maxMessages; + return this; + } + + public Builder maxBytes(int maxBytes) { + this.maxBytes = maxBytes; + return this; + } + + public Builder timeout(Duration timeout) { + this.timeout = timeout; + return this; + } + + public PullRequest build() { + validate(); + return new PullRequest(this); + } + + private void validate() { + if (maxMessages <= 0 || maxBytes <= 0) { + throw new IllegalArgumentException("Max messages/bytes must be positive"); + } + } + } +} diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/PullResponse.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/PullResponse.java new file mode 100644 index 0000000..63cdd91 --- /dev/null +++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/PullResponse.java @@ -0,0 +1,29 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.client.common; + +import java.util.List; +import lombok.Data; +import org.apache.pulsar.client.api.Message; + +@Data +public class PullResponse { + boolean hasMoreMsg; + List> messages; + + public PullResponse(boolean hasMoreMsg, List> messages) { + this.hasMoreMsg = hasMoreMsg; + this.messages = messages; + } +} diff --git a/pulsar-client-common-contrib/src/test/java/DemoTest.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/package-info.java similarity index 75% rename from pulsar-client-common-contrib/src/test/java/DemoTest.java rename to pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/package-info.java index 7b2a7c4..4873d10 100644 --- a/pulsar-client-common-contrib/src/test/java/DemoTest.java +++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/package-info.java @@ -11,14 +11,4 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import lombok.extern.slf4j.Slf4j; -import org.testng.annotations.Test; - -@Slf4j -public class DemoTest { - - @Test - public void testDemo() { - log.info("=== Test started ==="); - } -} +package org.apache.pulsar.client.common; diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/OffsetToMessageIdCache.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/OffsetToMessageIdCache.java new file mode 100644 index 0000000..f1ecc00 --- /dev/null +++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/OffsetToMessageIdCache.java @@ -0,0 +1,107 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.client.util; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import java.time.Duration; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.MessageId; + +/** + * Cache for mapping offsets to MessageIds with the following features. - Per-partition caching with + * configurable size and expiration - Thread-safe cache initialization and access - Automatic cache + * cleanup + */ +public class OffsetToMessageIdCache { + private static final int DEFAULT_MAX_CACHE_SIZE = 1000; + private static final Duration DEFAULT_EXPIRE_AFTER_ACCESS = Duration.ofMinutes(5); + + private final PulsarAdmin pulsarAdmin; + private final Map> partitionCaches = + new ConcurrentHashMap<>(); + private final int maxCacheSize; + private final Duration expireAfterAccess; + + public OffsetToMessageIdCache(PulsarAdmin pulsarAdmin) { + this(pulsarAdmin, DEFAULT_MAX_CACHE_SIZE, DEFAULT_EXPIRE_AFTER_ACCESS); + } + + public OffsetToMessageIdCache( + PulsarAdmin pulsarAdmin, int maxCacheSize, Duration expireAfterAccess) { + this.pulsarAdmin = Objects.requireNonNull(pulsarAdmin, "PulsarAdmin must not be null"); + this.maxCacheSize = validatePositive(maxCacheSize, "Cache size must be positive"); + this.expireAfterAccess = + Objects.requireNonNull(expireAfterAccess, "Expire duration must not be null"); + } + + private LoadingCache createCache(String partitionTopic) { + return Caffeine.newBuilder() + .maximumSize(maxCacheSize) + .expireAfterAccess(expireAfterAccess) + .recordStats() + .build(offset -> loadMessageId(partitionTopic, offset)); + } + + private MessageId loadMessageId(String partitionTopic, Long offset) throws PulsarAdminException { + if (offset < 0) { + return MessageId.earliest; + } + return pulsarAdmin.topics().getMessageIdByIndex(partitionTopic, offset); + } + + public MessageId getMessageIdByOffset(String partitionTopic, long offset) { + return partitionCaches.computeIfAbsent(partitionTopic, this::createCache).get(offset); + } + + public void putMessageIdByOffset(String partitionTopic, long offset, MessageId messageId) { + partitionCaches.computeIfAbsent(partitionTopic, this::createCache).put(offset, messageId); + } + + /** Cleans up all cached entries and releases resources. */ + public void cleanup() { + partitionCaches + .values() + .forEach( + cache -> { + cache.invalidateAll(); + cache.cleanUp(); + }); + partitionCaches.clear(); + } + + /** + * Removes cache for specific partition topic. + * + * @param partitionTopic topic partition to remove + */ + public void removePartitionCache(String partitionTopic) { + LoadingCache cache = partitionCaches.remove(partitionTopic); + if (cache != null) { + cache.invalidateAll(); + cache.cleanUp(); + } + } + + private static int validatePositive(int value, String errorMessage) { + if (value <= 0) { + throw new IllegalArgumentException(errorMessage); + } + return value; + } +} diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/OffsetToMessageIdCacheProvider.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/OffsetToMessageIdCacheProvider.java new file mode 100644 index 0000000..dc2e740 --- /dev/null +++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/OffsetToMessageIdCacheProvider.java @@ -0,0 +1,27 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.client.util; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.pulsar.client.admin.PulsarAdmin; + +public class OffsetToMessageIdCacheProvider { + private static final Map CACHE_MAP = new ConcurrentHashMap<>(); + + public static OffsetToMessageIdCache getOrCreateCache( + PulsarAdmin pulsarAdmin, String brokerCluster) { + return CACHE_MAP.computeIfAbsent(brokerCluster, key -> new OffsetToMessageIdCache(pulsarAdmin)); + } +} diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/PulsarAdminUtils.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/PulsarAdminUtils.java new file mode 100644 index 0000000..7f517d9 --- /dev/null +++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/PulsarAdminUtils.java @@ -0,0 +1,142 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.client.util; + +import java.util.Optional; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageIdAdv; +import org.apache.pulsar.client.common.ConsumeStats; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; + +public class PulsarAdminUtils { + + public static long searchOffset( + String partitionTopic, long timestamp, String brokerCluster, PulsarAdmin pulsarAdmin) + throws PulsarAdminException { + MessageIdAdv messageId = + (MessageIdAdv) pulsarAdmin.topics().getMessageIdByTimestamp(partitionTopic, timestamp); + return extractMessageIndex(partitionTopic, messageId, brokerCluster, pulsarAdmin); + } + + public static ConsumeStats getConsumeStats( + String partitionTopic, + int partition, + String subscription, + String brokerCluster, + PulsarAdmin pulsarAdmin) + throws PulsarAdminException { + ConsumeStats consumeStats = new ConsumeStats(partitionTopic, subscription, -1L, -1L, -1L); + + PersistentTopicInternalStats internalStats = + pulsarAdmin.topics().getInternalStats(partitionTopic); + if (internalStats == null || internalStats.ledgers.isEmpty()) { + return consumeStats; + } + + String consumedPosition = + internalStats.cursors.containsKey(subscription) + ? internalStats.cursors.get(subscription).markDeletePosition + : "-1:-1"; + MessageIdAdv consumedMessageId = parseMessageIdFromString(consumedPosition, partition); + String maxPosition = internalStats.lastConfirmedEntry; + MessageIdAdv maxMessageId = parseMessageIdFromString(maxPosition, partition); + String minPosition = internalStats.ledgers.get(0).ledgerId + ":-1"; + MessageIdAdv minMessageId = parseMessageIdFromString(minPosition, partition); + + // Ensure consumedMessageId is not less than minMessageId + if (consumedMessageId.compareTo(minMessageId) < 0) { + consumedMessageId = minMessageId; + } + + consumeStats.setLastConsumedOffset( + extractMessageIndex(partitionTopic, consumedMessageId, brokerCluster, pulsarAdmin)); + consumeStats.setMaxOffset( + extractMessageIndex(partitionTopic, maxMessageId, brokerCluster, pulsarAdmin)); + consumeStats.setMinOffset( + extractMessageIndex(partitionTopic, minMessageId, brokerCluster, pulsarAdmin)); + return consumeStats; + } + + // Common message processing logic + private static long extractMessageIndex( + String topic, MessageIdAdv messageId, String brokerCluster, PulsarAdmin pulsarAdmin) + throws PulsarAdminException { + if (messageId == null || messageId.getLedgerId() < 0) { + return -1; + } + long ledgerId = messageId.getLedgerId(); + long entryId = messageId.getEntryId(); + if (ledgerId > 0 && entryId < 0) { + entryId = 0; + return getMessageIndex( + topic, + new MessageIdImpl(ledgerId, entryId, messageId.getPartitionIndex()), + brokerCluster, + pulsarAdmin) + - 1; + } else { + return getMessageIndex(topic, messageId, brokerCluster, pulsarAdmin); + } + } + + private static long getMessageIndex( + String topic, MessageIdAdv messageId, String brokerCluster, PulsarAdmin pulsarAdmin) + throws PulsarAdminException { + Message message = + pulsarAdmin.topics().getMessageById(topic, messageId.getLedgerId(), messageId.getEntryId()); + + if (message == null) { + throw new PulsarAdminException("No messages found for " + messageId + " in topic " + topic); + } + + Optional indexOptional = message.getIndex(); + if (indexOptional.isPresent()) { + long index = indexOptional.get(); + OffsetToMessageIdCacheProvider.getOrCreateCache(pulsarAdmin, brokerCluster) + .putMessageIdByOffset(topic, index, messageId); + return index; + } else { + throw new PulsarAdminException( + "Message index not found for " + messageId + " in topic " + topic); + } + } + + private static long processMessageId( + String topic, MessageIdAdv messageId, String brokerCluster, PulsarAdmin pulsarAdmin) + throws PulsarAdminException { + try { + return extractMessageIndex(topic, messageId, brokerCluster, pulsarAdmin); + } catch (NumberFormatException e) { + throw new PulsarAdminException("Invalid ID components: " + messageId, e); + } + } + + private static MessageIdAdv parseMessageIdFromString(String messageIdStr, int partition) + throws PulsarAdminException { + String[] parts = messageIdStr.split(":"); + if (parts.length < 2) { + throw new PulsarAdminException("Invalid message ID format: " + messageIdStr); + } + try { + long ledgerId = Long.parseLong(parts[0]); + long entryId = Long.parseLong(parts[1]); + return new MessageIdImpl(ledgerId, entryId, partition); + } catch (NumberFormatException e) { + throw new PulsarAdminException("Invalid message ID components: " + messageIdStr, e); + } + } +} diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/ReaderCache.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/ReaderCache.java new file mode 100644 index 0000000..6c2ff4b --- /dev/null +++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/ReaderCache.java @@ -0,0 +1,213 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.client.util; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.RemovalListener; +import java.time.Duration; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ReaderCache provides thread-safe management of Pulsar Reader instances with the following + * features. - Partition-level locking for concurrent access - LRU eviction with size-based and + * access-time-based policies - Automatic resource cleanup + */ +public class ReaderCache { + private static final Logger log = LoggerFactory.getLogger(ReaderCache.class); + private static final int DEFAULT_MAX_CACHE_SIZE = 100; + private static final Duration DEFAULT_EXPIRE_AFTER_ACCESS = Duration.ofMinutes(5); + private static final int MAX_RETRIES = 3; + private static final long RETRY_DELAY_MS = 100; + + private final String subscription; + private final PulsarClient pulsarClient; + private final OffsetToMessageIdCache offsetToMessageIdCache; + private final Schema schema; + private final Map partitionLocks = new ConcurrentHashMap<>(); + private final Map>> readerCacheMap = + new ConcurrentHashMap<>(); + + private final int maxCacheSize; + private final Duration expireAfterAccess; + + public ReaderCache( + String subscription, + PulsarClient pulsarClient, + OffsetToMessageIdCache offsetToMessageIdCache, + Schema schema) { + this( + subscription, + pulsarClient, + offsetToMessageIdCache, + schema, + DEFAULT_MAX_CACHE_SIZE, + DEFAULT_EXPIRE_AFTER_ACCESS); + } + + public ReaderCache( + String subscription, + PulsarClient pulsarClient, + OffsetToMessageIdCache offsetToMessageIdCache, + Schema schema, + int maxCacheSize, + Duration expireAfterAccess) { + this.subscription = subscription; + this.pulsarClient = Objects.requireNonNull(pulsarClient); + this.offsetToMessageIdCache = Objects.requireNonNull(offsetToMessageIdCache); + this.schema = Objects.requireNonNull(schema); + this.maxCacheSize = maxCacheSize; + this.expireAfterAccess = expireAfterAccess; + } + + private LoadingCache> createCache(String partitionTopic) { + return Caffeine.newBuilder() + .maximumSize(maxCacheSize) + .expireAfterAccess(expireAfterAccess) + .removalListener( + (RemovalListener>) + (key, reader, cause) -> { + // Do not close reader on explicit removal + if (reader != null && cause != RemovalCause.EXPLICIT) { + closeReaderSilently(reader); + } + }) + .recordStats() + .build(key -> createReaderWithRetry(partitionTopic, key)); + } + + private Reader createReaderWithRetry(String partitionTopic, Long offset) + throws PulsarClientException { + int attempts = 0; + while (attempts < MAX_RETRIES) { + try { + return pulsarClient + .newReader(schema) + .readerName("reader-" + subscription) + .startMessageId(offsetToMessageIdCache.getMessageIdByOffset(partitionTopic, offset)) + .topic(partitionTopic) + .create(); + } catch (PulsarClientException e) { + if (++attempts >= MAX_RETRIES) { + throw e; + } + handleRetry(partitionTopic, offset, attempts, e); + } + } + throw new PulsarClientException("Failed to create reader after " + MAX_RETRIES + " attempts"); + } + + private void handleRetry(String partitionTopic, Long offset, int attempt, Exception e) { + log.warn( + "Reader creation failed [Topic: {}][Offset: {}] Attempt {}/{}: {}", + partitionTopic, + offset, + attempt, + MAX_RETRIES, + e.getMessage()); + try { + Thread.sleep(RETRY_DELAY_MS * attempt); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted during retry", ie); + } + } + + public Reader getReader(String partitionTopic, long offset) { + final ReentrantLock lock = + partitionLocks.computeIfAbsent(partitionTopic, k -> new ReentrantLock()); + lock.lock(); + try { + LoadingCache> cache = + readerCacheMap.computeIfAbsent(partitionTopic, pt -> createCache(pt)); + Reader reader = cache.get(offset); + // Ensure exclusive use: remove from cache but don't close + cache.invalidate(offset); + return reader; + } finally { + lock.unlock(); + } + } + + // Allows users to proactively return the reader + public void releaseReader(String partitionTopic, long offset, Reader reader) { + readerCacheMap.computeIfPresent( + partitionTopic, + (pt, cache) -> { + if (reader.isConnected()) { + cache.put(offset, reader); + } + return cache; + }); + } + + public void cleanup() { + readerCacheMap.forEach( + (partition, cache) -> { + cache.invalidateAll(); + cache.cleanUp(); + }); + readerCacheMap.clear(); + partitionLocks.clear(); + } + + public CacheStats getCacheStats(String partitionTopic) { + LoadingCache cache = readerCacheMap.get(partitionTopic); + return cache != null ? new CacheStats(cache.stats()) : null; + } + + private void closeReaderSilently(Reader reader) { + try { + if (reader != null && reader.isConnected()) { + reader.close(); + } + } catch (Exception e) { + log.error("Error closing reader for topic {}", reader.getTopic(), e); + } + } + + public static class CacheStats { + private final com.github.benmanes.caffeine.cache.stats.CacheStats stats; + + CacheStats(com.github.benmanes.caffeine.cache.stats.CacheStats stats) { + this.stats = stats; + } + + public long hitCount() { + return stats.hitCount(); + } + + public long missCount() { + return stats.missCount(); + } + + public long loadSuccessCount() { + return stats.loadSuccessCount(); + } + + public long loadFailureCount() { + return stats.loadFailureCount(); + } + } +} diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/ReaderCacheProvider.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/ReaderCacheProvider.java new file mode 100644 index 0000000..c8281ec --- /dev/null +++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/ReaderCacheProvider.java @@ -0,0 +1,41 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.client.util; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; + +public class ReaderCacheProvider { + private static final Map, ReaderCache>> CACHE_MAP = + new ConcurrentHashMap<>(); + + public static ReaderCache getOrCreateReaderCache( + String subscription, + String brokerCluster, + Schema schema, + PulsarClient client, + OffsetToMessageIdCache offsetToMessageIdCache) { + Map, ReaderCache> partitionReaderCache = + CACHE_MAP.computeIfAbsent(brokerCluster, key -> new ConcurrentHashMap<>()); + @SuppressWarnings("unchecked") + ReaderCache cache = + (ReaderCache) + partitionReaderCache.computeIfAbsent( + schema, + key -> new ReaderCache<>(subscription, client, offsetToMessageIdCache, schema)); + return cache; + } +} diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/package-info.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/package-info.java new file mode 100644 index 0000000..e53ef19 --- /dev/null +++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/package-info.java @@ -0,0 +1,14 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.client.util; diff --git a/pulsar-client-common-contrib/src/test/java/PulsarPullConsumerTest.java b/pulsar-client-common-contrib/src/test/java/PulsarPullConsumerTest.java new file mode 100644 index 0000000..177ee75 --- /dev/null +++ b/pulsar-client-common-contrib/src/test/java/PulsarPullConsumerTest.java @@ -0,0 +1,256 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarPullConsumer; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.impl.PulsarPullConsumerImpl; +import org.apache.pulsar.client.common.Constants; +import org.apache.pulsar.client.common.ConsumeStats; +import org.apache.pulsar.client.common.PullRequest; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +public class PulsarPullConsumerTest { + // todo: We can not add a mock test before the https://github.com/apache/pulsar/pull/24220 is + // released + String brokerUrl = "pulsar://127.0.0.1:6650"; + String serviceUrl = "http://127.0.0.1:8080"; + + String nonPartitionedTopic = "persistent://public/default/my-topic1"; + String partitionedTopic = "persistent://public/default/my-topic2"; + PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(brokerUrl).build(); + PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceUrl).build(); + + public PulsarPullConsumerTest() throws PulsarClientException {} + + @BeforeClass + public void setup() throws Exception { + try { + pulsarAdmin.topics().createNonPartitionedTopic(nonPartitionedTopic); + pulsarAdmin.topics().createPartitionedTopic(partitionedTopic, 2); + log.info("Created topics: {}, {}", nonPartitionedTopic, partitionedTopic); + } catch (Exception e) { + log.info( + "Topics already exist, skipping creation: {}, {}", nonPartitionedTopic, partitionedTopic); + } + } + + @AfterClass + public void cleanup() throws Exception { + pulsarAdmin.topics().delete(nonPartitionedTopic, true); + pulsarAdmin.topics().deletePartitionedTopic(partitionedTopic, true); + if (pulsarClient != null) { + pulsarClient.close(); + } + if (pulsarAdmin != null) { + pulsarAdmin.close(); + } + } + + @DataProvider(name = "testData") + public Object[][] testData() { + return new Object[][] { + {nonPartitionedTopic, Constants.PARTITION_NONE_INDEX}, + {partitionedTopic, 0}, + {partitionedTopic, 1} + }; + } + + /** + * Case test design: 1. Single partition topicA 1. Send one thousand messages 2. Create a + * PullConsumer, subscribe to topicA, and pull messages. 3. Verify message Exactly-once 4. Verify + * message consumption status 2. Multi-partition topicB 1. Send one thousand messages 3. Create + * multiple PullConsumers and subscribe to each partition of topicB. 4. Each PullConsumer pulls + * messages and verifies that the message is Exactly-once. 5. Verify message consumption status + */ + @Test(dataProvider = "testData") + public void testPullConsumer(String topic, int partitionIndex) throws Exception { + log.info("Starting testPullConsumer with topic: {}, partitionIndex: {}", topic, partitionIndex); + topic = + partitionIndex == Constants.PARTITION_NONE_INDEX + ? topic + : topic + "-partition-" + partitionIndex; + String subscription = "my-subscription"; + String brokerCluster = "sit"; + @Cleanup + PulsarPullConsumer pullConsumer = + new PulsarPullConsumerImpl<>( + topic, + subscription, + brokerCluster, + Schema.BYTES, + () -> pulsarClient, + pulsarAdmin, + null); + pullConsumer.start(); + + @Cleanup + Producer producer = + pulsarClient.newProducer(Schema.BYTES).topic(topic).enableBatching(false).create(); + + Set sent = new HashSet<>(); + for (int i = 0; i < 1000; i++) { + String message = "Hello-Pulsar-" + i; + MessageId messageId = producer.send(message.getBytes()); + sent.add(message); + log.info("Sent message: {} with id: {}", message, messageId); + } + + ConsumeStats consumeStats = pullConsumer.getConsumeStats(partitionIndex); + long offset = consumeStats.getLastConsumedOffset(); + Set received = new HashSet<>(); + while (true) { + List> messages = + pullConsumer + .pull( + PullRequest.builder() + .offset(offset) + .partition(partitionIndex) + .maxMessages(10) + .maxBytes(1024 * 1024) + .timeout(java.time.Duration.ofSeconds(10)) + .build()) + .getMessages(); + log.info("Pulled {} messages from topic {}", messages.size(), topic); + if (messages.isEmpty()) { + log.info("No more messages to pull, exiting..."); + break; + } + for (Message message : messages) { + if (!received.add(new String(message.getData()))) { + log.error("Duplicate message detected: {}", new String(message.getData())); + } + } + long consumedIndex = messages.get(messages.size() - 1).getIndex().get(); + pullConsumer.ack(consumedIndex, partitionIndex); + offset = consumedIndex + 1; + log.info("Acknowledged messages up to index: {}", consumedIndex); + } + offset = pullConsumer.getConsumeStats(partitionIndex).getLastConsumedOffset(); + log.info("Final consume offset for non-partitioned topic: {}", offset); + log.info( + "received {} unique messages from non-partitioned topic, it is equals to sent {}", + received.size(), + received.equals(sent)); + assert received.equals(sent) : "Received messages do not match sent messages"; + } + + @Test(dataProvider = "testData") + public void testSearchOffset(String topic, int partitionIndex) throws Exception { + topic = + partitionIndex == Constants.PARTITION_NONE_INDEX + ? topic + : topic + "-partition-" + partitionIndex; + String subscription = "my-subscription"; + String brokerCluster = "sit"; + @Cleanup + PulsarPullConsumer pullConsumer = + new PulsarPullConsumerImpl<>( + topic, + subscription, + brokerCluster, + Schema.BYTES, + () -> pulsarClient, + pulsarAdmin, + null); + pullConsumer.start(); + + @Cleanup + Producer producer = + pulsarClient.newProducer(Schema.BYTES).topic(topic).enableBatching(false).create(); + + long timestamp = 0; + MessageIdAdv messageId = null; + for (int i = 0; i < 10; i++) { + String message = "Hello-Pulsar-" + i; + timestamp = System.currentTimeMillis(); + messageId = (MessageIdAdv) producer.send(message.getBytes()); + } + for (int i = 0; i < 10; i++) { + String message = "Hello-Pulsar-" + i; + producer.send(message.getBytes()); + System.currentTimeMillis(); + } + long offset = pullConsumer.searchOffset(partitionIndex, timestamp); + MessageIdAdv searchedMessageId = + (MessageIdAdv) pulsarAdmin.topics().getMessageIdByIndex(topic, offset); + assert messageId.getEntryId() == searchedMessageId.getEntryId() + && messageId.getLedgerId() == searchedMessageId.getLedgerId() + : "Searched message ID does not match expected message ID"; + } + + @Test + public void testGetConsumeStats() { + try { + String subscription = "test-subscription"; + String brokerCluster = "sit"; + @Cleanup + PulsarPullConsumer pullConsumer = + new PulsarPullConsumerImpl<>( + nonPartitionedTopic, + subscription, + brokerCluster, + Schema.BYTES, + () -> pulsarClient, + pulsarAdmin, + null); + pullConsumer.start(); + + @Cleanup + Producer producer = + pulsarClient + .newProducer(Schema.BYTES) + .topic(nonPartitionedTopic) + .enableBatching(false) + .create(); + + for (int i = 0; i < 10; i++) { + String message = "Hello-Pulsar-" + i; + producer.send(message.getBytes()); + } + + ConsumeStats offset = pullConsumer.getConsumeStats(Constants.PARTITION_NONE_INDEX); + log.info("Initial consume offset for topic {}: {}", nonPartitionedTopic, offset); + Assert.assertEquals(offset.getLastConsumedOffset(), -1L); + Assert.assertEquals(offset.getMaxOffset(), 9L); + Assert.assertEquals(offset.getMinOffset(), -1L); + // Simulate some message processing + pullConsumer.ack(offset.getLastConsumedOffset() + 10, Constants.PARTITION_NONE_INDEX); + Thread.sleep(1000); + ConsumeStats newOffset = pullConsumer.getConsumeStats(Constants.PARTITION_NONE_INDEX); + Assert.assertEquals(newOffset.getLastConsumedOffset(), 9L); + Assert.assertEquals(newOffset.getMaxOffset(), 9L); + Assert.assertEquals(newOffset.getMinOffset(), -1L); + log.info("New consume offset after ack: {}", newOffset); + } catch (Exception e) { + log.error("Error during testExamineConsumeStats", e); + } + } +}