From 2a988fa80c9aad480df8873be880a4d667964bfc Mon Sep 17 00:00:00 2001 From: xiangying Date: Sun, 27 Apr 2025 15:19:30 +0800 Subject: [PATCH] [feat][client] pcip-5 Support Pull Consumer --- pcip/pcip-5.md | 179 ++++++++++++ pom.xml | 1 + pulsar-client-common-contrib/pom.xml | 6 + .../pulsar/client/api/PulsarPullConsumer.java | 88 ++++++ .../api/impl/PulsarPullConsumerImpl.java | 254 ++++++++++++++++++ .../pulsar/client/common/Constants.java | 22 ++ .../pulsar/client/common/ConsumeStats.java | 34 +++ .../pulsar/client/common/PullRequest.java | 85 ++++++ .../pulsar/client/common/PullResponse.java | 29 ++ .../pulsar/client/common/package-info.java} | 12 +- .../client/util/OffsetToMessageIdCache.java | 110 ++++++++ .../util/OffsetToMessageIdCacheProvider.java | 26 ++ .../pulsar/client/util/PulsarAdminUtils.java | 122 +++++++++ .../pulsar/client/util/ReaderCache.java | 189 +++++++++++++ .../client/util/ReaderCacheProvider.java | 33 +++ .../pulsar/client/util/package-info.java | 14 + .../src/test/java/PulsarPullConsumerTest.java | 233 ++++++++++++++++ 17 files changed, 1426 insertions(+), 11 deletions(-) create mode 100644 pcip/pcip-5.md create mode 100644 pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/api/PulsarPullConsumer.java create mode 100644 pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/api/impl/PulsarPullConsumerImpl.java create mode 100644 pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/Constants.java create mode 100644 pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/ConsumeStats.java create mode 100644 pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/PullRequest.java create mode 100644 pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/PullResponse.java rename pulsar-client-common-contrib/src/{test/java/DemoTest.java => main/java/org/apache/pulsar/client/common/package-info.java} (75%) create mode 100644 pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/OffsetToMessageIdCache.java create mode 100644 pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/OffsetToMessageIdCacheProvider.java create mode 100644 pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/PulsarAdminUtils.java create mode 100644 pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/ReaderCache.java create mode 100644 pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/ReaderCacheProvider.java create mode 100644 pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/package-info.java create mode 100644 pulsar-client-common-contrib/src/test/java/PulsarPullConsumerTest.java diff --git a/pcip/pcip-5.md b/pcip/pcip-5.md new file mode 100644 index 0000000..e0bcb6a --- /dev/null +++ b/pcip/pcip-5.md @@ -0,0 +1,179 @@ +# PCIP-4: Pull Consumer Implementation for Apache Pulsar + +# Background knowledge + +- **Pulsar Consumers**: Pulsar currently supports push-based consumption models (exclusive/shared/failover/key-shared). + This proposal adds pull-based consumption. +- **Message Positioning**: Pulsar uses composite MessageIDs (ledgerId + entryId + partitionId), while systems like + Kafka/RocketMQ use monotonic offsets. +- **Offset Mapping**: https://github.com/apache/pulsar/pull/24220 can be used to convert between offsets and Pulsar's + MessageIDs. + +# Motivation + +System Migration Requirement: The organization plans to migrate from RocketMQ to Pulsar, requiring a unified MQ client +abstraction layer to conceal implementation details and support seamless engine replacement. + +Interface Compatibility Issues: + +- Pulsar lacks a native offset retrieval interface (pull/fetch model). +- RocketMQ/Kafka use monotonically increasing numeric offsets to locate messages, whereas Pulsar employs a composite + MessageID (ledgerId + entryId + partitionId). + +Objective: Implement a RocketMQ-like Pull Consumer to support precise offset control and reduce migration costs. + +# Goals + +## In Scope + +| Goal | Description | +|----------------------------|---------------------------------------------------------------------------| +| **Precise Offset Control** | Supports specifying partition, offset, pull count, and byte size. | +| **Resource Efficiency** | Reuses Reader connections with LRU cache management. | +| **Easy Integration** | Compatible with Pulsar’s existing API, requiring no Broker modifications. | + +## Out of Scope + +NA + +# High Level Design + +```mermaid +graph TD + A[PullConsumer] -->|pull| B[Offset Mapping] + B -->|convert| C[MessageID] + C -->|seek| D[Reader Pool] + D -->|fetch| E[Broker] + A -->|ack| F[Offset-Based Ack] + F -->|convert| G[MessageID] + G -->|cumulative ack| H[Consumer] +``` + +Key components: + +1. **`PulsarPullConsumer` interface**: Standard pull consumer API +2. **Offset ↔ MessageID Cache**: Partition-scoped mapping layer +3. **Reader Pool**: Managed resource pool with LRU eviction +4. **Partition Locking**: Thread-safe access coordination + +## Detailed Design + +### Design & Implementation Details + +**Core Interface** `PulsarPullConsumer`: + +```java +public interface PulsarPullConsumer extends AutoCloseable { + void start() throws PulsarClientException; + + List> pull(PullRequest request); + + void ack(long offset, int partition) throws PulsarClientException; + + long searchOffset(int partition, long timestamp) throws PulsarAdminException; + + long getConsumeStats(int partition) throws PulsarAdminException; + + class PullRequest { + private long offset; + private int partition; + private int maxMessages; + private int maxBytes; + private Duration timeout; + } +} +``` + +**Reader Management** : + +```java +public class ReaderCache { + private final Map>> readerCacheMap; + + public Reader getReader(String topic, long offset) { + // 1. Acquire partition lock + // 2. Get/create LRU cache (default: max 100 readers/partition) + // 3. Remove reader from cache for exclusive use + } + + public void releaseReader(String topic, long nextOffset, Reader reader) { + // Return reader to cache if still connected + } +} +``` + +**Offset Mapping**: + +```java +public class OffsetToMessageIdCache { + private final Map> partitionCaches; + + public MessageId getMessageIdByOffset(String topic, long offset) { + // 1. Check caffeine cache (default: max 1000 entries/partition) + // 2. On cache miss: pulsarAdmin.topics().getMessageIDByOffset() + // 3. Populate cache + } +} +``` + +### Public-facing Changes + +#### Public API + +**New Interfaces**: + +```java +// Entry point +PulsarPullConsumer pullConsumer1 = new PulsarPullConsumerImpl<>( + nonPartitionedTopic, subscription, + brokerCluster, Schema.BYTES, + pulsarClient, pulsarAdmin); + +// Usage +List> messages = pullConsumer1.pull( + PulsarPullConsumer.PullRequest.builder() + .offset(offset) + .partition(PulsarPullConsumer.PARTITION_NONE) + .maxMessages(10) + .maxBytes(1024 * 1024) + .timeout(java.time.Duration.ofSeconds(10)) + .build()); +``` + +## Get Started + +### Quick Start + +```java +// 1. Create pull consumer +PulsarPullConsumer pullConsumer1 = new PulsarPullConsumerImpl<>( + nonPartitionedTopic, subscription, + brokerCluster, Schema.BYTES, + pulsarClient, pulsarAdmin); + +consumer.start(); // Initialize connections + +// 2. Pull messages from partition 0 starting at offset 200 +List> batch = pullConsumer1.pull( + PulsarPullConsumer.PullRequest.builder() + .offset(offset) + .partition(PulsarPullConsumer.PARTITION_NONE) + .maxMessages(10) + .maxBytes(1024 * 1024) + .timeout(java.time.Duration.ofSeconds(10)) + .build()); +); + +// 3. Process messages batch. + +forEach(msg ->{ + System.out.println("Received: "+new String(msg.getData())); + // Store last offset + }); + +// 4. Acknowledge up to last offset +consumer.ack(250L,0); + +// 5. Close resources +consumer.close(); +``` \ No newline at end of file diff --git a/pom.xml b/pom.xml index e57a6e2..a5a3499 100644 --- a/pom.xml +++ b/pom.xml @@ -50,6 +50,7 @@ 1.20.1 4.13.1 5.12.0 + 3.2.0 diff --git a/pulsar-client-common-contrib/pom.xml b/pulsar-client-common-contrib/pom.xml index 722fa20..f4911a9 100644 --- a/pulsar-client-common-contrib/pom.xml +++ b/pulsar-client-common-contrib/pom.xml @@ -30,6 +30,12 @@ org.apache.pulsar pulsar-client-all + ${pulsar.version} + + + com.github.ben-manes.caffeine + caffeine + ${caffeine.version} diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/api/PulsarPullConsumer.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/api/PulsarPullConsumer.java new file mode 100644 index 0000000..963dc3a --- /dev/null +++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/api/PulsarPullConsumer.java @@ -0,0 +1,88 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.client.api; + +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.common.ConsumeStats; +import org.apache.pulsar.client.common.PullRequest; +import org.apache.pulsar.client.common.PullResponse; + +/** + * Pull-based consumer interface with enhanced offset management capabilities. + * + *

Features:

+ *
    + *
  • Precise offset control with partition-aware operations
  • + *
  • Thread-safe design for concurrent access
  • + *
  • Support for both partitioned and non-partitioned topics
  • + *
  • Built-in offset to message ID mapping
  • + *
+ * + * @param message payload type + */ +public interface PulsarPullConsumer extends AutoCloseable { + + /** + * Initializes consumer resources and establishes connections. + * + * @throws PulsarClientException if client initialization fails + */ + void start() throws PulsarClientException; + + /** + * Pulls messages from the specified partition starting from the given offset. + * + * @param request pull request configuration + * @return immutable list of messages starting from the specified offset + * @throws IllegalArgumentException for invalid request parameters + */ + PullResponse pull(PullRequest request); + + /** + * Acknowledges all messages up to the specified offset (inclusive). + * + * @param offset target offset to acknowledge + * @param partition partition index (use -1 for non-partitioned topics) + * @throws PulsarClientException for acknowledgment failures + * @throws IllegalArgumentException for invalid partition index + */ + void ack(long offset, int partition) throws PulsarClientException; + + /** + * Finds the latest message offset before or at the specified timestamp. + * + * @param partition partition index (use -1 for non-partitioned topics) + * @param timestamp target timestamp in milliseconds + * @return corresponding message offset + * @throws PulsarAdminException for admin operation failures + * @throws IllegalArgumentException for invalid partition index + */ + long searchOffset(int partition, long timestamp) throws PulsarAdminException; + + /** + * Retrieves consumption statistics for the specified partition. + * + * @param partition partition index (use -1 for non-partitioned topics) + * @return current consumption offset + * @throws PulsarAdminException for stats retrieval failures + * @throws IllegalArgumentException for invalid partition index + */ + ConsumeStats getConsumeStats(int partition) throws PulsarAdminException; + + /** + * Releases all resources and closes connections gracefully. + */ + @Override + void close(); +} diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/api/impl/PulsarPullConsumerImpl.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/api/impl/PulsarPullConsumerImpl.java new file mode 100644 index 0000000..9ad6e5e --- /dev/null +++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/api/impl/PulsarPullConsumerImpl.java @@ -0,0 +1,254 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.client.api.impl; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarPullConsumer; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.common.ConsumeStats; +import org.apache.pulsar.client.common.PullRequest; +import org.apache.pulsar.client.common.PullResponse; +import org.apache.pulsar.client.util.OffsetToMessageIdCache; +import org.apache.pulsar.client.util.OffsetToMessageIdCacheProvider; +import org.apache.pulsar.client.util.PulsarAdminUtils; +import org.apache.pulsar.client.util.ReaderCache; +import org.apache.pulsar.client.util.ReaderCacheProvider; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PulsarPullConsumerImpl implements PulsarPullConsumer { + private static final Logger log = LoggerFactory.getLogger(PulsarPullConsumerImpl.class); + private static final String PARTITION_SPLICER = "-partition-"; + private static final int DEFAULT_READ_TIMEOUT_MS = 30_000; + + private final String topic; + private final String subscription; + private final String brokerCluster; + private final Schema schema; + private final Map> consumerMap; + private final OffsetToMessageIdCache offsetToMessageIdCache; + private final ReaderCache readerCache; + private final PulsarAdmin pulsarAdmin; + private final Supplier pulsarClientSupplier; + + private volatile int partitionCount; + + public PulsarPullConsumerImpl(String topic, + String subscription, + String brokerCluster, + Schema schema, + Supplier clientSupplier, + PulsarAdmin admin) { + this.topic = Objects.requireNonNull(topic, "Topic must not be null"); + this.subscription = Objects.requireNonNull(subscription, "Subscription must not be null"); + this.brokerCluster = Objects.requireNonNull(brokerCluster, "Broker cluster must not be null"); + this.schema = Objects.requireNonNull(schema, "Schema must not be null"); + this.pulsarClientSupplier = Objects.requireNonNull(clientSupplier, "PulsarClient must not be null"); + this.pulsarAdmin = Objects.requireNonNull(admin, "PulsarAdmin must not be null"); + this.consumerMap = new ConcurrentHashMap<>(); + this.offsetToMessageIdCache = OffsetToMessageIdCacheProvider.getOrCreateCache(admin, brokerCluster); + this.readerCache = + ReaderCacheProvider.getOrCreateReaderCache(brokerCluster, schema, clientSupplier.get(), + offsetToMessageIdCache); + } + + @Override + public void start() throws PulsarClientException { + try { + initializePartitions(); + } catch (PulsarAdminException e) { + throw new PulsarClientException("Failed to initialize partitions", e); + } + } + + private void initializePartitions() throws PulsarAdminException, PulsarClientException { + PartitionedTopicMetadata metadata = pulsarAdmin.topics().getPartitionedTopicMetadata(topic); + this.partitionCount = metadata.partitions; + + if (partitionCount == 0) { + subscribeToTopic(topic); + return; + } + + for (int i = 0; i < partitionCount; i++) { + String partitionTopic = buildPartitionTopic(topic, i); + subscribeToTopic(partitionTopic); + } + } + + private void subscribeToTopic(String topicName) throws PulsarClientException { + Consumer consumer = getPulsarClient().newConsumer(schema) + .topic(topicName) + .subscriptionName(subscription) + .subscribe(); + consumerMap.put(topicName, consumer); + log.debug("Subscribed to topic: {}", topicName); + } + + private PulsarClient getPulsarClient() { + return Objects.requireNonNull(pulsarClientSupplier.get(), + "PulsarClient supplier returned null. Ensure PulsarClient is properly initialized."); + } + + @Override + public PullResponse pull(PullRequest request) { + validatePullParameters(request.getMaxMessages(), request.getMaxBytes()); + + String partitionTopic = buildPartitionTopic(topic, request.getPartition()); + Reader reader = null; + Message lastMessage = null; + try { + reader = readerCache.getReader(partitionTopic, request.getOffset()); + List> messages = readMessages(reader, request.getMaxMessages(), request.getMaxBytes(), + request.getTimeout()); + lastMessage = messages.isEmpty() ? null : messages.get(messages.size() - 1); + return new PullResponse<>(reader.hasMessageAvailable(), messages); + } catch (PulsarClientException e) { + log.error("Failed to pull messages from topic {} at offset {}", partitionTopic, request.getOffset(), e); + return new PullResponse<>(false, Collections.emptyList()); + } finally { + if (reader != null) { + releaseReader(partitionTopic, reader, + lastMessage != null ? lastMessage.getIndex().get() + 1 : request.getOffset()); + } + } + } + + private List> readMessages(Reader reader, + int maxMessages, + int maxBytes, + Duration timeout) { + List> messages = new ArrayList<>(Math.min(maxMessages, 1024)); + int totalBytes = 0; + long deadline = System.nanoTime() + timeout.toNanos(); + + while (messages.size() < maxMessages && totalBytes < maxBytes) { + long remaining = deadline - System.nanoTime(); + if (remaining <= 0) { + break; + } + + try { + Message msg = reader.readNext( + (int) Math.min(TimeUnit.NANOSECONDS.toMillis(remaining), DEFAULT_READ_TIMEOUT_MS), + TimeUnit.MILLISECONDS + ); + if (msg == null) { + break; + } + + messages.add(msg); + totalBytes += msg.getData().length; + } catch (PulsarClientException e) { + log.warn("Error reading message from {}", reader.getTopic(), e); + break; + } + } + return Collections.unmodifiableList(messages); + } + + @Override + public void ack(long offset, int partition) throws PulsarClientException { + String partitionTopic = buildPartitionTopic(topic, partition); + Consumer consumer = consumerMap.get(partitionTopic); + if (consumer == null) { + throw new PulsarClientException("Consumer not found for partition: " + partition); + } + + MessageId messageId = offsetToMessageIdCache.getMessageIdByOffset(partitionTopic, offset); + if (messageId == null) { + throw new PulsarClientException("MessageID not found for offset: " + offset); + } + consumer.acknowledgeCumulative(messageId); + } + + @Override + public long searchOffset(int partition, long timestamp) throws PulsarAdminException { + String partitionTopic = buildPartitionTopic(topic, partition); + return PulsarAdminUtils.searchOffset(partitionTopic, timestamp, brokerCluster, pulsarAdmin); + } + + @Override + public ConsumeStats getConsumeStats(int partition) throws PulsarAdminException { + String partitionTopic = buildPartitionTopic(topic, partition); + return PulsarAdminUtils.getConsumeStats(partitionTopic, partition, subscription, brokerCluster, pulsarAdmin); + } + + @Override + public void close() { + closeResources(); + } + + private void closeResources() { + consumerMap.values().forEach(consumer -> { + try { + consumer.close(); + } catch (PulsarClientException e) { + log.warn("Failed to close consumer for topic {}", consumer.getTopic(), e); + } + }); + + try { + offsetToMessageIdCache.cleanup(); + } catch (Exception e) { + log.warn("Error cleaning offset cache", e); + } + } + + private void validatePullParameters(int maxMessages, int maxBytes) { + if (maxMessages <= 0) { + throw new IllegalArgumentException("maxMessages must be positive"); + } + if (maxBytes <= 0) { + throw new IllegalArgumentException("maxBytes must be positive"); + } + } + + private String buildPartitionTopic(String baseTopic, int partition) { + if (partitionCount > 0 && partition >= partitionCount) { + throw new IllegalArgumentException(String.format( + "Invalid partition %d for topic %s with %d partitions", + partition, baseTopic, partitionCount + )); + } + return partitionCount == 0 ? baseTopic : baseTopic + PARTITION_SPLICER + partition; + } + + private void releaseReader(String topicPartition, Reader reader, long nextOffset) { + try { + if (reader.isConnected()) { + readerCache.releaseReader(topicPartition, nextOffset, reader); + } + } catch (Exception e) { + log.warn("Error releasing reader for {}", topicPartition, e); + } + } +} diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/Constants.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/Constants.java new file mode 100644 index 0000000..564a5a9 --- /dev/null +++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/Constants.java @@ -0,0 +1,22 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.client.common; + +import java.time.Duration; + +public class Constants { + public static final int PARTITION_NONE_INDEX = -1; + public static final Duration DEFAULT_OPERATION_TIMEOUT = Duration.ofSeconds(30); + public static final String PARTITIONED_TOPIC_SUFFIX = "-partitioned-"; +} diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/ConsumeStats.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/ConsumeStats.java new file mode 100644 index 0000000..c095e0a --- /dev/null +++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/ConsumeStats.java @@ -0,0 +1,34 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.client.common; + +import lombok.Data; + +@Data +public class ConsumeStats { + private String topic; + private String group; + private long minOffset; + private long maxOffset; + private long lastConsumedOffset; + + public ConsumeStats(String topic, String group, long minOffset, long maxOffset, long lastConsumedOffset) { + this.topic = topic; + this.group = group; + this.minOffset = minOffset; + this.maxOffset = maxOffset; + this.lastConsumedOffset = lastConsumedOffset; + } + +} diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/PullRequest.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/PullRequest.java new file mode 100644 index 0000000..76da426 --- /dev/null +++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/PullRequest.java @@ -0,0 +1,85 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.client.common; + +import java.time.Duration; +import lombok.Data; + +/** + * Configuration object for pull requests. + */ +@Data +public class PullRequest { + private final long offset; + private final int partition; + private final int maxMessages; + private final int maxBytes; + private final Duration timeout; + + private PullRequest(Builder builder) { + this.offset = builder.offset; + this.partition = builder.partition; + this.maxMessages = builder.maxMessages; + this.maxBytes = builder.maxBytes; + this.timeout = builder.timeout; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private long offset = -1L; + private int partition = Constants.PARTITION_NONE_INDEX; + private int maxMessages = 100; + private int maxBytes = 10_485_760; // 10MB + private Duration timeout = Constants.DEFAULT_OPERATION_TIMEOUT; + + public Builder offset(long offset) { + this.offset = offset; + return this; + } + + public Builder partition(int partition) { + this.partition = partition; + return this; + } + + public Builder maxMessages(int maxMessages) { + this.maxMessages = maxMessages; + return this; + } + + public Builder maxBytes(int maxBytes) { + this.maxBytes = maxBytes; + return this; + } + + public Builder timeout(Duration timeout) { + this.timeout = timeout; + return this; + } + + public PullRequest build() { + validate(); + return new PullRequest(this); + } + + private void validate() { + if (maxMessages <= 0 || maxBytes <= 0) { + throw new IllegalArgumentException("Max messages/bytes must be positive"); + } + } + } +} \ No newline at end of file diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/PullResponse.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/PullResponse.java new file mode 100644 index 0000000..e453b30 --- /dev/null +++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/PullResponse.java @@ -0,0 +1,29 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.client.common; + +import java.util.List; +import lombok.Data; +import org.apache.pulsar.client.api.Message; + +@Data +public class PullResponse { + boolean hasMoreMsg; + List> messages; + + public PullResponse(boolean hasMoreMsg, List> messages) { + this.hasMoreMsg = hasMoreMsg; + this.messages = messages; + } +} diff --git a/pulsar-client-common-contrib/src/test/java/DemoTest.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/package-info.java similarity index 75% rename from pulsar-client-common-contrib/src/test/java/DemoTest.java rename to pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/package-info.java index 7b2a7c4..dd94298 100644 --- a/pulsar-client-common-contrib/src/test/java/DemoTest.java +++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/common/package-info.java @@ -11,14 +11,4 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import lombok.extern.slf4j.Slf4j; -import org.testng.annotations.Test; - -@Slf4j -public class DemoTest { - - @Test - public void testDemo() { - log.info("=== Test started ==="); - } -} +package org.apache.pulsar.client.common; \ No newline at end of file diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/OffsetToMessageIdCache.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/OffsetToMessageIdCache.java new file mode 100644 index 0000000..c81216d --- /dev/null +++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/OffsetToMessageIdCache.java @@ -0,0 +1,110 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.client.util; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import java.time.Duration; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.MessageId; + +/** + * Cache for mapping offsets to MessageIds with the following features. + * - Per-partition caching with configurable size and expiration + * - Thread-safe cache initialization and access + * - Automatic cache cleanup + */ +public class OffsetToMessageIdCache { + private static final int DEFAULT_MAX_CACHE_SIZE = 1000; + private static final Duration DEFAULT_EXPIRE_AFTER_ACCESS = Duration.ofMinutes(5); + + private final PulsarAdmin pulsarAdmin; + private final Map> partitionCaches = new ConcurrentHashMap<>(); + private final int maxCacheSize; + private final Duration expireAfterAccess; + + public OffsetToMessageIdCache(PulsarAdmin pulsarAdmin) { + this(pulsarAdmin, DEFAULT_MAX_CACHE_SIZE, DEFAULT_EXPIRE_AFTER_ACCESS); + } + + public OffsetToMessageIdCache(PulsarAdmin pulsarAdmin, + int maxCacheSize, + Duration expireAfterAccess) { + this.pulsarAdmin = Objects.requireNonNull(pulsarAdmin, "PulsarAdmin must not be null"); + this.maxCacheSize = validatePositive(maxCacheSize, "Cache size must be positive"); + this.expireAfterAccess = Objects.requireNonNull(expireAfterAccess, + "Expire duration must not be null"); + } + + private LoadingCache createCache(String partitionTopic) { + return Caffeine.newBuilder() + .maximumSize(maxCacheSize) + .expireAfterAccess(expireAfterAccess) + .recordStats() + .build(offset -> loadMessageId(partitionTopic, offset)); + } + + private MessageId loadMessageId(String partitionTopic, Long offset) throws PulsarAdminException { + if (offset < 0) { + return MessageId.earliest; + } + return pulsarAdmin.topics().getMessageIdByIndex(partitionTopic, offset); + } + + public MessageId getMessageIdByOffset(String partitionTopic, long offset) { + return partitionCaches + .computeIfAbsent(partitionTopic, this::createCache) + .get(offset); + } + + public void putMessageIdByOffset(String partitionTopic, long offset, MessageId messageId) { + partitionCaches + .computeIfAbsent(partitionTopic, this::createCache) + .put(offset, messageId); + } + + /** + * Cleans up all cached entries and releases resources. + */ + public void cleanup() { + partitionCaches.values().forEach(cache -> { + cache.invalidateAll(); + cache.cleanUp(); + }); + partitionCaches.clear(); + } + + /** + * Removes cache for specific partition topic. + * @param partitionTopic topic partition to remove + */ + public void removePartitionCache(String partitionTopic) { + LoadingCache cache = partitionCaches.remove(partitionTopic); + if (cache != null) { + cache.invalidateAll(); + cache.cleanUp(); + } + } + + private static int validatePositive(int value, String errorMessage) { + if (value <= 0) { + throw new IllegalArgumentException(errorMessage); + } + return value; + } +} diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/OffsetToMessageIdCacheProvider.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/OffsetToMessageIdCacheProvider.java new file mode 100644 index 0000000..ec5a461 --- /dev/null +++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/OffsetToMessageIdCacheProvider.java @@ -0,0 +1,26 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.client.util; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.pulsar.client.admin.PulsarAdmin; + +public class OffsetToMessageIdCacheProvider { + private static final Map CACHE_MAP = new ConcurrentHashMap<>(); + + public static OffsetToMessageIdCache getOrCreateCache(PulsarAdmin pulsarAdmin, String brokerCluster) { + return CACHE_MAP.computeIfAbsent(brokerCluster, key -> new OffsetToMessageIdCache(pulsarAdmin)); + } +} diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/PulsarAdminUtils.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/PulsarAdminUtils.java new file mode 100644 index 0000000..d1f9150 --- /dev/null +++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/PulsarAdminUtils.java @@ -0,0 +1,122 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.client.util; + +import java.util.Optional; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageIdAdv; +import org.apache.pulsar.client.common.ConsumeStats; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; + +public class PulsarAdminUtils { + + public static long searchOffset(String partitionTopic, long timestamp, String brokerCluster, + PulsarAdmin pulsarAdmin) throws PulsarAdminException { + MessageIdAdv messageId = (MessageIdAdv) pulsarAdmin.topics().getMessageIdByTimestamp(partitionTopic, timestamp); + return extractMessageIndex(partitionTopic, messageId, brokerCluster, pulsarAdmin); + } + + public static ConsumeStats getConsumeStats(String partitionTopic, int partition, String subscription, + String brokerCluster, PulsarAdmin pulsarAdmin) + throws PulsarAdminException { + ConsumeStats consumeStats = new ConsumeStats(partitionTopic, subscription, -1L, -1L, -1L); + + PersistentTopicInternalStats internalStats = pulsarAdmin.topics().getInternalStats(partitionTopic); + if (internalStats == null || internalStats.ledgers.isEmpty()) { + return consumeStats; + } + + String consumedPosition = internalStats.cursors.containsKey(subscription) + ? internalStats.cursors.get(subscription).markDeletePosition + : "-1:-1"; + MessageIdAdv consumedMessageId = parseMessageIdFromString(consumedPosition, partition); + String maxPosition = internalStats.lastConfirmedEntry; + MessageIdAdv maxMessageId = parseMessageIdFromString(maxPosition, partition); + String minPosition = internalStats.ledgers.get(0).ledgerId + ":-1"; + MessageIdAdv minMessageId = parseMessageIdFromString(minPosition, partition); + + // Ensure consumedMessageId is not less than minMessageId + if (consumedMessageId.compareTo(minMessageId) < 0) { + consumedMessageId = minMessageId; + } + + consumeStats.setLastConsumedOffset( + extractMessageIndex(partitionTopic, consumedMessageId, brokerCluster, pulsarAdmin)); + consumeStats.setMaxOffset(extractMessageIndex(partitionTopic, maxMessageId, brokerCluster, pulsarAdmin)); + consumeStats.setMinOffset(extractMessageIndex(partitionTopic, minMessageId, brokerCluster, pulsarAdmin)); + return consumeStats; + } + + // Common message processing logic + private static long extractMessageIndex(String topic, MessageIdAdv messageId, String brokerCluster, + PulsarAdmin pulsarAdmin) throws PulsarAdminException { + if (messageId == null || messageId.getLedgerId() < 0) { + return -1; + } + long ledgerId = messageId.getLedgerId(); + long entryId = messageId.getEntryId(); + if (ledgerId > 0 && entryId < 0) { + entryId = 0; + return getMessageIndex(topic, new MessageIdImpl(ledgerId, entryId, messageId.getPartitionIndex()), + brokerCluster, pulsarAdmin) - 1; + } else { + return getMessageIndex(topic, messageId, brokerCluster, pulsarAdmin); + } + } + + private static long getMessageIndex(String topic, MessageIdAdv messageId, String brokerCluster, + PulsarAdmin pulsarAdmin) throws PulsarAdminException { + List> messages = + pulsarAdmin.topics().getMessagesById(topic, messageId.getLedgerId(), messageId.getEntryId()); + + if (messages == null || messages.isEmpty()) { + throw new PulsarAdminException("No messages found for " + messageId+ " in topic " + topic); + } + + return messages.stream().map(Message::getIndex).filter(Optional::isPresent).mapToLong(opt -> { + long index = opt.get(); + OffsetToMessageIdCacheProvider.getOrCreateCache(pulsarAdmin, brokerCluster) + .putMessageIdByOffset(topic, index, messageId); + return index; + }).findFirst().orElseThrow(() -> new PulsarAdminException("Missing message index in " + messageId)); + } + + private static long processMessageId(String topic, MessageIdAdv messageId, String brokerCluster, + PulsarAdmin pulsarAdmin) throws PulsarAdminException { + try { + return extractMessageIndex(topic, messageId, brokerCluster, pulsarAdmin); + } catch (NumberFormatException e) { + throw new PulsarAdminException("Invalid ID components: " + messageId, e); + } + } + + private static MessageIdAdv parseMessageIdFromString(String messageIdStr, int partition) + throws PulsarAdminException { + String[] parts = messageIdStr.split(":"); + if (parts.length < 2) { + throw new PulsarAdminException("Invalid message ID format: " + messageIdStr); + } + try { + long ledgerId = Long.parseLong(parts[0]); + long entryId = Long.parseLong(parts[1]); + return new MessageIdImpl(ledgerId, entryId, partition); + } catch (NumberFormatException e) { + throw new PulsarAdminException("Invalid message ID components: " + messageIdStr, e); + } + } + +} diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/ReaderCache.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/ReaderCache.java new file mode 100644 index 0000000..f0a7095 --- /dev/null +++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/ReaderCache.java @@ -0,0 +1,189 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.client.util; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.RemovalListener; +import java.time.Duration; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ReaderCache provides thread-safe management of Pulsar Reader instances with the following features. + * - Partition-level locking for concurrent access + * - LRU eviction with size-based and access-time-based policies + * - Automatic resource cleanup + */ +public class ReaderCache { + private static final Logger log = LoggerFactory.getLogger(ReaderCache.class); + private static final int DEFAULT_MAX_CACHE_SIZE = 100; + private static final Duration DEFAULT_EXPIRE_AFTER_ACCESS = Duration.ofMinutes(5); + private static final int MAX_RETRIES = 3; + private static final long RETRY_DELAY_MS = 100; + + private final PulsarClient pulsarClient; + private final OffsetToMessageIdCache offsetToMessageIdCache; + private final Schema schema; + private final Map partitionLocks = new ConcurrentHashMap<>(); + private final Map>> readerCacheMap = new ConcurrentHashMap<>(); + + private final int maxCacheSize; + private final Duration expireAfterAccess; + + public ReaderCache(PulsarClient pulsarClient, + OffsetToMessageIdCache offsetToMessageIdCache, + Schema schema) { + this(pulsarClient, offsetToMessageIdCache, schema, DEFAULT_MAX_CACHE_SIZE, DEFAULT_EXPIRE_AFTER_ACCESS); + } + + public ReaderCache(PulsarClient pulsarClient, + OffsetToMessageIdCache offsetToMessageIdCache, + Schema schema, + int maxCacheSize, + Duration expireAfterAccess) { + this.pulsarClient = Objects.requireNonNull(pulsarClient); + this.offsetToMessageIdCache = Objects.requireNonNull(offsetToMessageIdCache); + this.schema = Objects.requireNonNull(schema); + this.maxCacheSize = maxCacheSize; + this.expireAfterAccess = expireAfterAccess; + } + + private LoadingCache> createCache(String partitionTopic) { + return Caffeine.newBuilder() + .maximumSize(maxCacheSize) + .expireAfterAccess(expireAfterAccess) + .removalListener((RemovalListener>) (key, reader, cause) -> { + // Do not close reader on explicit removal + if (reader != null && cause != RemovalCause.EXPLICIT) { + closeReaderSilently(reader); + } + }) + .recordStats() + .build(key -> createReaderWithRetry(partitionTopic, key)); + } + + private Reader createReaderWithRetry(String partitionTopic, Long offset) throws PulsarClientException { + int attempts = 0; + while (attempts < MAX_RETRIES) { + try { + return pulsarClient.newReader(schema) + .startMessageId(offsetToMessageIdCache.getMessageIdByOffset(partitionTopic, offset)) + .topic(partitionTopic) + .create(); + } catch (PulsarClientException e) { + if (++attempts >= MAX_RETRIES) { + throw e; + } + handleRetry(partitionTopic, offset, attempts, e); + } + } + throw new PulsarClientException("Failed to create reader after " + MAX_RETRIES + " attempts"); + } + + private void handleRetry(String partitionTopic, Long offset, int attempt, Exception e) { + log.warn("Reader creation failed [Topic: {}][Offset: {}] Attempt {}/{}: {}", + partitionTopic, offset, attempt, MAX_RETRIES, e.getMessage()); + try { + Thread.sleep(RETRY_DELAY_MS * attempt); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted during retry", ie); + } + } + + public Reader getReader(String partitionTopic, long offset) { + final ReentrantLock lock = partitionLocks.computeIfAbsent(partitionTopic, k -> new ReentrantLock()); + lock.lock(); + try { + LoadingCache> cache = readerCacheMap.computeIfAbsent( + partitionTopic, + pt -> createCache(pt) + ); + Reader reader = cache.get(offset); + // Ensure exclusive use: remove from cache but don't close + cache.invalidate(offset); + return reader; + } finally { + lock.unlock(); + } + } + + // Allows users to proactively return the reader + public void releaseReader(String partitionTopic, long offset, Reader reader) { + readerCacheMap.computeIfPresent(partitionTopic, (pt, cache) -> { + if (reader.isConnected()) { + cache.put(offset, reader); + } + return cache; + }); + } + + public void cleanup() { + readerCacheMap.forEach((partition, cache) -> { + cache.invalidateAll(); + cache.cleanUp(); + }); + readerCacheMap.clear(); + partitionLocks.clear(); + } + + public CacheStats getCacheStats(String partitionTopic) { + LoadingCache cache = readerCacheMap.get(partitionTopic); + return cache != null ? new CacheStats(cache.stats()) : null; + } + + private void closeReaderSilently(Reader reader) { + try { + if (reader != null && reader.isConnected()) { + reader.close(); + } + } catch (Exception e) { + log.error("Error closing reader for topic {}", reader.getTopic(), e); + } + } + + public static class CacheStats { + private final com.github.benmanes.caffeine.cache.stats.CacheStats stats; + + CacheStats(com.github.benmanes.caffeine.cache.stats.CacheStats stats) { + this.stats = stats; + } + + public long hitCount() { + return stats.hitCount(); + } + + public long missCount() { + return stats.missCount(); + } + + public long loadSuccessCount() { + return stats.loadSuccessCount(); + } + + public long loadFailureCount() { + return stats.loadFailureCount(); + } + } +} diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/ReaderCacheProvider.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/ReaderCacheProvider.java new file mode 100644 index 0000000..6ee84cb --- /dev/null +++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/ReaderCacheProvider.java @@ -0,0 +1,33 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.client.util; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; + +public class ReaderCacheProvider { + private static final Map, ReaderCache>> CACHE_MAP = new ConcurrentHashMap<>(); + + public static ReaderCache getOrCreateReaderCache(String brokerCluster, Schema schema, PulsarClient client, + OffsetToMessageIdCache offsetToMessageIdCache) { + Map, ReaderCache> partitionReaderCache = CACHE_MAP.computeIfAbsent(brokerCluster, + key -> new ConcurrentHashMap<>()); + @SuppressWarnings("unchecked") + ReaderCache cache = (ReaderCache) partitionReaderCache.computeIfAbsent(schema, + key -> new ReaderCache<>(client, offsetToMessageIdCache, schema)); + return cache; + } +} diff --git a/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/package-info.java b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/package-info.java new file mode 100644 index 0000000..894b731 --- /dev/null +++ b/pulsar-client-common-contrib/src/main/java/org/apache/pulsar/client/util/package-info.java @@ -0,0 +1,14 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.client.util; \ No newline at end of file diff --git a/pulsar-client-common-contrib/src/test/java/PulsarPullConsumerTest.java b/pulsar-client-common-contrib/src/test/java/PulsarPullConsumerTest.java new file mode 100644 index 0000000..392a1db --- /dev/null +++ b/pulsar-client-common-contrib/src/test/java/PulsarPullConsumerTest.java @@ -0,0 +1,233 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarPullConsumer; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.impl.PulsarPullConsumerImpl; +import org.apache.pulsar.client.common.Constants; +import org.apache.pulsar.client.common.ConsumeStats; +import org.apache.pulsar.client.common.PullRequest; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +public class PulsarPullConsumerTest { + // todo: We can not add a mock test before the https://github.com/apache/pulsar/pull/24220 is released + String brokerUrl = "pulsar://127.0.0.1:6650"; + String serviceUrl = "http://127.0.0.1:8080"; + + String nonPartitionedTopic = "persistent://public/default/my-topic1"; + String partitionedTopic = "persistent://public/default/my-topic2"; + PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(brokerUrl).build(); + PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceUrl).build(); + + public PulsarPullConsumerTest() throws PulsarClientException { + } + + @BeforeClass + public void setup() throws Exception { + try { + pulsarAdmin.topics().createNonPartitionedTopic(nonPartitionedTopic); + pulsarAdmin.topics().createPartitionedTopic(partitionedTopic, 2); + log.info("Created topics: {}, {}", nonPartitionedTopic, partitionedTopic); + } catch (Exception e) { + log.info("Topics already exist, skipping creation: {}, {}", nonPartitionedTopic, partitionedTopic); + } + } + + @AfterClass + public void cleanup() throws Exception { + pulsarAdmin.topics().delete(nonPartitionedTopic, true); + pulsarAdmin.topics().deletePartitionedTopic(partitionedTopic, true); + if (pulsarClient != null) { + pulsarClient.close(); + } + if (pulsarAdmin != null) { + pulsarAdmin.close(); + } + } + + @DataProvider(name = "testData") + public Object[][] testData() { + return new Object[][]{ + {nonPartitionedTopic, Constants.PARTITION_NONE_INDEX}, + {partitionedTopic, 0}, + {partitionedTopic, 1} + }; + } + + /** + * Case test design: + * 1. Single partition topicA + * 1. Send one thousand messages + * 2. Create a PullConsumer, subscribe to topicA, and pull messages. + * 3. Verify message Exactly-once + * 4. Verify message consumption status + * 2. Multi-partition topicB + * 1. Send one thousand messages + * 3. Create multiple PullConsumers and subscribe to each partition of topicB. + * 4. Each PullConsumer pulls messages and verifies that the message is Exactly-once. + * 5. Verify message consumption status + */ + @Test(dataProvider = "testData") + public void testPullConsumer(String topic, int partitionIndex) throws Exception { + log.info("Starting testPullConsumer with topic: {}, partitionIndex: {}", topic, partitionIndex); + topic = partitionIndex == Constants.PARTITION_NONE_INDEX ? topic : topic + "-partition-" + partitionIndex; + String subscription = "my-subscription"; + String brokerCluster = "sit"; + @Cleanup + PulsarPullConsumer pullConsumer = new PulsarPullConsumerImpl<>(topic, subscription, + brokerCluster, Schema.BYTES, () -> pulsarClient, pulsarAdmin); + pullConsumer.start(); + + @Cleanup + Producer producer = pulsarClient + .newProducer(Schema.BYTES) + .topic(topic) + .enableBatching(false) + .create(); + + Set sent = new HashSet<>(); + for (int i = 0; i < 1000; i++) { + String message = "Hello-Pulsar-" + i; + MessageId messageId = producer.send(message.getBytes()); + sent.add(message); + log.info("Sent message: {} with id: {}", message, messageId); + } + + ConsumeStats consumeStats = pullConsumer.getConsumeStats(partitionIndex); + long offset = consumeStats.getLastConsumedOffset(); + Set received = new HashSet<>(); + while (true) { + List> messages = pullConsumer.pull( + PullRequest.builder() + .offset(offset) + .partition(partitionIndex) + .maxMessages(10) + .maxBytes(1024 * 1024) + .timeout(java.time.Duration.ofSeconds(10)) + .build()).getMessages(); + log.info("Pulled {} messages from topic {}", messages.size(), topic); + if (messages.isEmpty()) { + log.info("No more messages to pull, exiting..."); + break; + } + for (Message message : messages) { + if (!received.add(new String(message.getData()))) { + log.error("Duplicate message detected: {}", new String(message.getData())); + } + } + long consumedIndex = messages.get(messages.size() - 1).getIndex().get(); + pullConsumer.ack(consumedIndex, partitionIndex); + offset = consumedIndex + 1; + log.info("Acknowledged messages up to index: {}", consumedIndex); + } + offset = pullConsumer.getConsumeStats(partitionIndex).getLastConsumedOffset(); + log.info("Final consume offset for non-partitioned topic: {}", offset); + log.info("received {} unique messages from non-partitioned topic, it is equals to sent {}", received.size(), + received.equals(sent)); + assert received.equals(sent) : "Received messages do not match sent messages"; + } + + @Test(dataProvider = "testData") + public void testSearchOffset(String topic, int partitionIndex) throws Exception { + topic = partitionIndex == Constants.PARTITION_NONE_INDEX ? topic : topic + "-partition-" + partitionIndex; + String subscription = "my-subscription"; + String brokerCluster = "sit"; + @Cleanup + PulsarPullConsumer pullConsumer = new PulsarPullConsumerImpl<>(topic, subscription, + brokerCluster, Schema.BYTES, () -> pulsarClient, pulsarAdmin); + pullConsumer.start(); + + @Cleanup + Producer producer = pulsarClient + .newProducer(Schema.BYTES) + .topic(topic) + .enableBatching(false) + .create(); + + long timestamp = 0; + MessageIdAdv messageId = null; + for (int i = 0; i < 10; i++) { + String message = "Hello-Pulsar-" + i; + timestamp = System.currentTimeMillis(); + messageId = (MessageIdAdv) producer.send(message.getBytes()); + } + for (int i = 0; i < 10; i++) { + String message = "Hello-Pulsar-" + i; + producer.send(message.getBytes()); + System.currentTimeMillis(); + } + long offset = pullConsumer.searchOffset(partitionIndex, timestamp); + MessageIdAdv searchedMessageId = (MessageIdAdv) pulsarAdmin.topics().getMessageIdByIndex(topic, offset); + assert messageId.getEntryId() == searchedMessageId.getEntryId() + && messageId.getLedgerId() == searchedMessageId.getLedgerId() : + "Searched message ID does not match expected message ID"; + } + + @Test + public void testGetConsumeStats() { + try { + String subscription = "test-subscription"; + String brokerCluster = "sit"; + @Cleanup + PulsarPullConsumer pullConsumer = new PulsarPullConsumerImpl<>(nonPartitionedTopic, subscription, + brokerCluster, Schema.BYTES, () -> pulsarClient, pulsarAdmin); + pullConsumer.start(); + + @Cleanup + Producer producer = pulsarClient + .newProducer(Schema.BYTES) + .topic(nonPartitionedTopic) + .enableBatching(false) + .create(); + + for (int i = 0; i < 10; i++) { + String message = "Hello-Pulsar-" + i; + producer.send(message.getBytes()); + } + + ConsumeStats offset = pullConsumer.getConsumeStats(Constants.PARTITION_NONE_INDEX); + log.info("Initial consume offset for topic {}: {}", nonPartitionedTopic, offset); + Assert.assertEquals(offset.getLastConsumedOffset(), -1L); + Assert.assertEquals(offset.getMaxOffset(), 9L); + Assert.assertEquals(offset.getMinOffset(), -1L); + // Simulate some message processing + pullConsumer.ack(offset.getLastConsumedOffset() + 10, Constants.PARTITION_NONE_INDEX); + Thread.sleep(1000); + ConsumeStats newOffset = pullConsumer.getConsumeStats(Constants.PARTITION_NONE_INDEX); + Assert.assertEquals(newOffset.getLastConsumedOffset(), 9L); + Assert.assertEquals(newOffset.getMaxOffset(), 9L); + Assert.assertEquals(newOffset.getMinOffset(), -1L); + log.info("New consume offset after ack: {}", newOffset); + } catch (Exception e) { + log.error("Error during testExamineConsumeStats", e); + } + } +}