Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 179 additions & 0 deletions pcip/pcip-5.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
# PCIP-4: Pull Consumer Implementation for Apache Pulsar

# Background knowledge

- **Pulsar Consumers**: Pulsar currently supports push-based consumption models (exclusive/shared/failover/key-shared).
This proposal adds pull-based consumption.
- **Message Positioning**: Pulsar uses composite MessageIDs (ledgerId + entryId + partitionId), while systems like
Kafka/RocketMQ use monotonic offsets.
- **Offset Mapping**: https://github.com/apache/pulsar/pull/24220 can be used to convert between offsets and Pulsar's
MessageIDs.

# Motivation

System Migration Requirement: The organization plans to migrate from RocketMQ to Pulsar, requiring a unified MQ client
abstraction layer to conceal implementation details and support seamless engine replacement.

Interface Compatibility Issues:

- Pulsar lacks a native offset retrieval interface (pull/fetch model).
- RocketMQ/Kafka use monotonically increasing numeric offsets to locate messages, whereas Pulsar employs a composite
MessageID (ledgerId + entryId + partitionId).

Objective: Implement a RocketMQ-like Pull Consumer to support precise offset control and reduce migration costs.

# Goals

## In Scope

| Goal | Description |
|----------------------------|---------------------------------------------------------------------------|
| **Precise Offset Control** | Supports specifying partition, offset, pull count, and byte size. |
| **Resource Efficiency** | Reuses Reader connections with LRU cache management. |
| **Easy Integration** | Compatible with Pulsar’s existing API, requiring no Broker modifications. |

## Out of Scope

NA

# High Level Design

```mermaid
graph TD
A[PullConsumer] -->|pull| B[Offset Mapping]
B -->|convert| C[MessageID]
C -->|seek| D[Reader Pool]
D -->|fetch| E[Broker]
A -->|ack| F[Offset-Based Ack]
F -->|convert| G[MessageID]
G -->|cumulative ack| H[Consumer]
```

Key components:

1. **`PulsarPullConsumer` interface**: Standard pull consumer API
2. **Offset ↔ MessageID Cache**: Partition-scoped mapping layer
3. **Reader Pool**: Managed resource pool with LRU eviction
4. **Partition Locking**: Thread-safe access coordination

## Detailed Design

### Design & Implementation Details

**Core Interface** `PulsarPullConsumer`:

```java
public interface PulsarPullConsumer<T> extends AutoCloseable {
void start() throws PulsarClientException;

List<Message<T>> pull(PullRequest request);

void ack(long offset, int partition) throws PulsarClientException;

long searchOffset(int partition, long timestamp) throws PulsarAdminException;

long getConsumeStats(int partition) throws PulsarAdminException;

class PullRequest {
private long offset;
private int partition;
private int maxMessages;
private int maxBytes;
private Duration timeout;
}
}
```

**Reader Management** :

```java
public class ReaderCache<T> {
private final Map<String, LoadingCache<Long, Reader<T>>> readerCacheMap;

public Reader<T> getReader(String topic, long offset) {
// 1. Acquire partition lock
// 2. Get/create LRU cache (default: max 100 readers/partition)
// 3. Remove reader from cache for exclusive use
}

public void releaseReader(String topic, long nextOffset, Reader<T> reader) {
// Return reader to cache if still connected
}
}
```

**Offset Mapping**:

```java
public class OffsetToMessageIdCache {
private final Map<String, LoadingCache<Long, MessageId>> partitionCaches;

public MessageId getMessageIdByOffset(String topic, long offset) {
// 1. Check caffeine cache (default: max 1000 entries/partition)
// 2. On cache miss: pulsarAdmin.topics().getMessageIDByOffset()
// 3. Populate cache
}
}
```

### Public-facing Changes

#### Public API

**New Interfaces**:

```java
// Entry point
PulsarPullConsumer<byte[]> pullConsumer1 = new PulsarPullConsumerImpl<>(
nonPartitionedTopic, subscription,
brokerCluster, Schema.BYTES,
pulsarClient, pulsarAdmin);

// Usage
List<Message<byte[]>> messages = pullConsumer1.pull(
PulsarPullConsumer.PullRequest.builder()
.offset(offset)
.partition(PulsarPullConsumer.PARTITION_NONE)
.maxMessages(10)
.maxBytes(1024 * 1024)
.timeout(java.time.Duration.ofSeconds(10))
.build());
```

## Get Started

### Quick Start

```java
// 1. Create pull consumer
PulsarPullConsumer<byte[]> pullConsumer1 = new PulsarPullConsumerImpl<>(
nonPartitionedTopic, subscription,
brokerCluster, Schema.BYTES,
pulsarClient, pulsarAdmin);

consumer.start(); // Initialize connections

// 2. Pull messages from partition 0 starting at offset 200
List<Message<byte[]>> batch = pullConsumer1.pull(
PulsarPullConsumer.PullRequest.builder()
.offset(offset)
.partition(PulsarPullConsumer.PARTITION_NONE)
.maxMessages(10)
.maxBytes(1024 * 1024)
.timeout(java.time.Duration.ofSeconds(10))
.build());
);

// 3. Process messages batch.

forEach(msg ->{
System.out.println("Received: "+new String(msg.getData()));
// Store last offset
});

// 4. Acknowledge up to last offset
consumer.ack(250L,0);

// 5. Close resources
consumer.close();
```
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
<testcontainers.version>1.20.1</testcontainers.version>
<junit.version>4.13.1</junit.version>
<mockito.version>5.12.0</mockito.version>
<caffeine.version>3.2.0</caffeine.version>
</properties>

<modules>
Expand Down
6 changes: 6 additions & 0 deletions pulsar-client-common-contrib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-all</artifactId>
<version>${pulsar.version}</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>${caffeine.version}</version>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pulsar.client.api;

import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.common.ConsumeStats;
import org.apache.pulsar.client.common.PullRequest;
import org.apache.pulsar.client.common.PullResponse;

/**
* Pull-based consumer interface with enhanced offset management capabilities.
*
* <p>Features:</p>
* <ul>
* <li>Precise offset control with partition-aware operations</li>
* <li>Thread-safe design for concurrent access</li>
* <li>Support for both partitioned and non-partitioned topics</li>
* <li>Built-in offset to message ID mapping</li>
* </ul>
*
* @param <T> message payload type
*/
public interface PulsarPullConsumer<T> extends AutoCloseable {

/**
* Initializes consumer resources and establishes connections.
*
* @throws PulsarClientException if client initialization fails
*/
void start() throws PulsarClientException;

/**
* Pulls messages from the specified partition starting from the given offset.
*
* @param request pull request configuration
* @return immutable list of messages starting from the specified offset
* @throws IllegalArgumentException for invalid request parameters
*/
PullResponse<T> pull(PullRequest request);

/**
* Acknowledges all messages up to the specified offset (inclusive).
*
* @param offset target offset to acknowledge
* @param partition partition index (use -1 for non-partitioned topics)
* @throws PulsarClientException for acknowledgment failures
* @throws IllegalArgumentException for invalid partition index
*/
void ack(long offset, int partition) throws PulsarClientException;

/**
* Finds the latest message offset before or at the specified timestamp.
*
* @param partition partition index (use -1 for non-partitioned topics)
* @param timestamp target timestamp in milliseconds
* @return corresponding message offset
* @throws PulsarAdminException for admin operation failures
* @throws IllegalArgumentException for invalid partition index
*/
long searchOffset(int partition, long timestamp) throws PulsarAdminException;

/**
* Retrieves consumption statistics for the specified partition.
*
* @param partition partition index (use -1 for non-partitioned topics)
* @return current consumption offset
* @throws PulsarAdminException for stats retrieval failures
* @throws IllegalArgumentException for invalid partition index
*/
ConsumeStats getConsumeStats(int partition) throws PulsarAdminException;

/**
* Releases all resources and closes connections gracefully.
*/
@Override
void close();
}
Loading
Loading