Skip to content

Conversation

@coderzc
Copy link
Member

@coderzc coderzc commented Dec 24, 2025

Fixes #xyz

Main Issue: #xyz

PIP: #xyz

Motivation

Modifications

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added PIP doc Your PR contains doc changes, no matter whether the changes are in markdown or code files. labels Dec 24, 2025
@coderzc coderzc changed the title PIP-451: Support label-based topic subscription [feat][broker] PIP-451: Support label-based topic subscription Dec 24, 2025
@coderzc coderzc changed the title [feat][broker] PIP-451: Support label-based topic subscription [feat][broker] PIP-451: Support Label-based Topic Subscription Dec 24, 2025
Copy link
Member

@shibd shibd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I support pushing this feature forward. It effectively simplifies the pattern subscription issue and provides a multi-dimensional approach to topic management.

However, there are some implementation details that require further discussion.

pip/pip-451.md Outdated

```java
// Map<Namespace, Map<LabelKey, Map<LabelValue, Set<TopicName>>>>
Map<String, Map<String, Map<String, Set<String>>>> labelTopicInvertedIndex;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SystemTopicBasedTopicPoliciesService constructs the cache for a specific namespace based on the assigned namespace bundles.

In production scenarios, the bundles of a single namespace are highly likely to be evenly distributed across all brokers. This implies that every broker will effectively maintain a full copy of the this cache. I am concerned about the potential memory footprint in this scenario

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, each broker needs to cache the full amount of labelTopicInvertedIndex.

pip/pip-451.md Outdated
public void onLabelsUpdate(TopicName topicName, @Nullable Map<String, String> allLabels) {
// allLabels can be null if the topic is deleted
boolean wasMatching = matchedTopics.contains(topicName);
boolean matchesNow = allLabels != null && labelMatcher.matches(allLabels);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the data structure of labelMatcher look like? Could you provide an example?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like this:

public class LabelMatcher {

    private Map<String, String> labels;

    boolean matches(Map<String, String> targetLabels) {
        for (Map.Entry<String, String> entry : labels.entrySet()) {
            String key = entry.getKey();
            String value = entry.getValue();
            if (!targetLabels.containsKey(key) || !targetLabels.get(key).equals(value)) {
                return false;
            }
        }
        return true;
    }
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be simplified into a tool function.

pip/pip-451.md Outdated
* Multi-Watcher Orchestration:

When `subscribe()` is called, the Client iterates over the provided namespaces.
For each namespace, it initiates a CommandWatchTopicList request.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If can provide a exmaples for user side, that be better.

pip/pip-451.md Outdated
* @param namespaces The set of namespaces to look for topics.
* If empty, defaults to the current consumer's namespace.
*/
ConsumerBuilder<T> topicsByLabel(Map<String, String> labels, Set<String> namespaces);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This input parameter cannot represent specific labels for a specific namespace. Perhaps we need to wrap the parameter as:

List {
   String namespace,
   Map<String, String> labels
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it necessary to use different labels for different namespaces?

pip/pip-451.md Outdated

### Topic Labels Observation and Notification Mechanism

### TopicPoliciesService Interface Changes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
### TopicPoliciesService Interface Changes
#### TopicPoliciesService Interface Changes

pip/pip-451.md Outdated

#### Query topic associated with specific labels:

* CLI: pulsar-admin topics list <namespace> --custome-labels "k1:v1,k2:v2"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering how we should implement this feature, since not every broker maintains a full labelTopicInvertedIndex cache. In the REST API implementation, we would first need to determine which broker is assigned to any bundle of that namespace, and then send the request there.

Is this lookup operation easy to achieve?

Copy link
Member Author

@coderzc coderzc Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the topic policy under a namespace can be loaded onto any broker, since we call prepareInitPoliciesCacheAsync(NamespaceName namespace).

@coderzc coderzc marked this pull request as draft January 5, 2026 09:00
@coderzc coderzc force-pushed the pip-451 branch 3 times, most recently from 1934ba3 to c8a0e78 Compare January 8, 2026 02:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc Your PR contains doc changes, no matter whether the changes are in markdown or code files. PIP

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants