From 4b517a17a133e46cc3d7515abe1155d5ddf97f00 Mon Sep 17 00:00:00 2001 From: coderzc Date: Wed, 24 Dec 2025 21:11:07 +0800 Subject: [PATCH 1/2] pip-451: Support Label-based Topic Subscription --- pip/pip-451.md | 255 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 255 insertions(+) create mode 100644 pip/pip-451.md diff --git a/pip/pip-451.md b/pip/pip-451.md new file mode 100644 index 0000000000000..0f9de3ad38e51 --- /dev/null +++ b/pip/pip-451.md @@ -0,0 +1,255 @@ +# PIP-451: Support Label-based Topic Subscription + +# Motivation + +Currently, Apache Pulsar supports Pattern Subscription, which allows consumers to subscribe to multiple topics using Regular Expressions (Regex). While powerful, Regex-based subscription has several structural limitations in complex microservice architectures: + +* Coupling: It couples the consumption logic with the topic naming convention. Changing business requirements often forces topic renaming, which is operationally expensive and risky.S + +* Flexibility: It is difficult to group semantically related but differently named topics (e.g., persistent://public/default/payment-core and persistent://public/legacy/billing-v1) into a single subscription without complex regex wizardry. + +* Complexity: complex Regex can be hard to maintain and error-prone. + +Label-based Subscription solves these issues by decoupling "Identity" (Topic Name) from "Attributes" (Labels). Users can attach Key-Value metadata (e.g., env=prod, dept=finance) to topics and subscribe by specifying a label selector. + +# Goals + +## In Scope +* Management: Allow attaching, updating, and removing Key-Value labels to/from Topics via the Admin API. +* Subscription: Allow Consumers to subscribe to topics matching specific Labels within specified Namespaces. Support cross-namespace subscription via an explicit namespace list in the Client API, avoiding the complexity of background metadata polling. + + + +# High-Level Design +The design introduces a metadata-driven approach where labels are stored in TopicPolicies. +The Broker maintains an in-memory index to map labels to topics. The client utilizes a "Watch" mechanism to receive real-time updates when topics matching the labels are created or updated. +## Key points +* Storage: Labels are stored as Map inside TopicPolicies. +* Indexing: The Broker maintains an In-Memory Inverted Index (LabelKey -> LabelValue -> Set) per Namespace. This hierarchical structure ensures efficient lookups for Key-Value pairs without iterating through all topics. +* Discovery Protocol: We extend the CommandWatchTopicList protocol (PIP-179) to accept a label_selector. +* Client Implementation: The Client accepts a list of target namespaces and manages multiple watchers (one per namespace) to aggregate matching topics. + +# Detailed Design + +## Design & Implementation Details + +### Storage + +#### Topic Labels in TopicPolicies + +Add a labels field to the TopicPolicies class. Since Topic Policies are propagated via the __change_events system topic, this ensures durability and consistency across brokers. +```java +public class TopicPolicies { + // New field: Key-Value labels + private Map customLabels; +} +``` + +#### In-Memory Inverted Index for Labels-Topic Mapping + +The SystemTopicBasedTopicPoliciesService will maintain a nested map structure per Namespace to support efficient Key-Value lookups. + +**Data Structure**: + +```java +// Map>>> +Map>>> labelTopicInvertedIndex; +``` + +### Topic Labels Observation and Notification Mechanism + +### TopicPoliciesService Interface Changes + +```java +public interface TopicPoliciesService { + // ... Existing methods ... + + /** + * Register a label change listener with the service. + */ + void registerTopicLabelsListener(NamespaceName namespace, TopicLabelsListener listener); + + /** + * Unregister a label change listener with the service. + */ + void unregisterTopicLabelsListener(NamespaceName namespace, TopicLabelsListener listener); + + /** + * Query the list of all matching Topics under the specified Namespace based on the label selector. + */ + CompletableFuture> getTopicsByLabels(NamespaceName namespace, Map labels); + + /** + * Query all existing labels within a Namespace. + */ + CompletableFuture>> getAllLabels(NamespaceName namespace); +} +``` +To support real-time label change notifications, we will introduce a listener mechanism in TopicPoliciesService. + +#### Add TopicLabelsListener Interface + +```java +public interface TopicLabelsListener { + /** + * Triggered when topic labels are substantially changed. + * @param topicName The affected topic. + * @param allLabels The latest snapshot of all labels. + */ + void onLabelsUpdate(TopicName topicName, Map allLabels); +} +``` + +#### Listener Management in TopicPoliciesService + +The TopicPoliciesService will maintain a list of TopicLabelsListener and notify them when a policy update includes label changes. + +#### Notification Logic in Broker +* Change Detection: When SystemTopicBasedTopicPoliciesService consumes a policy event, it compares oldLabels from the cache with newLabels from the event, then update in-memory inverted index (labelTopicInvertedIndex). + +* Global Dispatch: If `!Objects.equals(oldLabels, newLabels)`, the service invokes `onLabelsUpdate(topicName, labels)` for all registered listeners. + +* Watcher Evaluation: The TopicListService (registered as a listener) iterates through its TopicListWatcher instances. + +#### TopicListWatcher State Machine + +Each TopicListWatcher maintains a set of currently matched topics to perform state-based updates: + +```java +public void onLabelsUpdate(TopicName topicName, @Nullable Map allLabels) { + // allLabels can be null if the topic is deleted + boolean wasMatching = matchedTopics.contains(topicName); + boolean matchesNow = allLabels != null && labelMatcher.matches(allLabels); + + List newTopics = Collections.emptyList(); + List deletedTopics = Collections.emptyList(); + + if (!wasMatching && matchesNow) { + // It did not match before, but now it matches (matches after adding or modifying labels). + newTopics = Collections.singletonList(topicName.toString()); + matchedTopics.add(topicName.toString()); + } else if (wasMatching && !matchesNow) { + // It matched before, but now it no longer matches (fails to match after the Topic is deleted or its labels are modified). + deletedTopics = Collections.singletonList(topicName.toString()); + matchedTopics.remove(topicName.toString()); + } else { + // If the state remains unchanged + return; + } + + String hash = TopicList.calculateHash(matchedTopics); + topicListService.sendTopicListUpdate(id, hash, deletedTopics, newTopics); +} +``` + +### Client Implementation Details +The Client implementation acts as an orchestrator to support the "Cross-Namespace" requirement defined in topicsByLabel. + +* Multi-Watcher Orchestration: + +When `subscribe()` is called, the Client iterates over the provided namespaces. +For each namespace, it initiates a CommandWatchTopicList request. + +* Aggregation: + +The Client maintains a unified view of matching topics. +New Topic Event: If any of the namespace watchers receives a NEW_TOPIC update (meaning a topic in that namespace matched the labels), the Client adds it to the list and creates a child consumer. +Deleted Topic Event: If a watcher receives a DELETED_TOPIC update (topic deleted or label removed), the Client closes the corresponding child consumer. + +* Deterministic Scope: + +Unlike Regex subscription which might require scanning metadata for matching namespaces, this design relies on the user providing the `Set` namespaces. +### Public API & CLI Changes + +#### Set Custom Labels: + +* CLI: pulsar-admin topics set-custom-labels --labels "key1=value1,key2=value2" +* REST API: POST /admin/v2/topics/{tenant}/{namespace}/{topic}/custom-labels with a JSON payload {"labels": {"key1":"value1", "key2":"value2"}} +* Action: Sets or updates custom labels for the specified topic. The broker (or admin client before sending) will validate that all provided keys (e.g., key1, key2) are present in the allowedCustomLabelKeys list defined in broker.conf. Invalid keys will result in an error. This operation will update the topic's policy and publish a change event to the system topic (__change_events) for that namespace. + +#### Get Custom Labels: + +* CLI: pulsar-admin topics get-custom-labels +* REST API: GET /admin/v2/topics/{tenant}/{namespace}/{topic}/custom-labels +* Action: Retrieves the currently set custom labels for the topic. + +#### Remove Custom Labels: + +* CLI: + - pulsar-admin topics remove-custom-labels --labels "key1,key2" (to remove specific labels) + - pulsar-admin topics remove-custom-labels --all (to remove all custom labels from the topic) +* pulsar-admin topics remove-custom-labels --labels "key1,key2" (to remove specific labels) +* pulsar-admin topics remove-custom-labels --all (to remove all custom labels from the topic) +* REST API: DELETE /admin/v2/topics/{tenant}/{namespace}/{topic}/custom-labels with a query params keys=k1&keys=k2 or all=true. +* Action: Removes the specified custom labels or all custom labels from the topic. This also updates the topic policy. + +#### Query topic associated with specific labels: + +* CLI: pulsar-admin topics list --custome-labels "k1:v1,k2:v2" +* REST API: GET /admin/v2/topics/{tenant}/{namespace} with a query params custome-labels=k1:v1,k2:v2 + +#### Get all existing labels within a namespace: + +* CLI: pulsar-admin namespaces list-topic-custome-labels +* REST API: GET + /admin/v2/topics/{tenant}/{namespace}/topicCustomeLabels + +### Consumer API (Client) +Introduce `topicsByLabel` in the ConsumerBuilder. +```java +public interface ConsumerBuilder { + + // ... existing methods ... + + /** + * Subscribe to topics matching the given labels within specific namespaces. + * * @param labels The Key-Value pairs that a topic must match (AND semantics). + * @param namespaces The set of namespaces to look for topics. + * If empty, defaults to the current consumer's namespace. + */ + ConsumerBuilder topicsByLabel(Map labels, Set namespaces); +} +``` +### Binary protocol + +Extend CommandWatchTopicList to include label_selector: +```protobuf +message CommandWatchTopicList { + required uint64 request_id = 1; + required uint64 watcher_id = 2; + required string namespace = 3; + required string topics_pattern = 4; + // Only present when the client reconnects: + optional string topics_hash = 5; + + + // If not empty, the broker filters topics by these labels. + // 'topics_pattern' should be ignored if this field is set. + repeated KeyValue labels = 6; +} +``` + +### Configuration Changes + +maxCustomLabelValueLength= +Description: The maximum character length for a custom label value. +Default: 128 + +# Security Considerations +Authorization: Setting labels modifies Topic Policies, which requires update-policies permission on the Namespace or Topic. + +# Backward Compatibility +Protocol: Old brokers will not understand the label_selector field in CommandWatchTopicList. Old brokers might ignore the `label_selector` field and return all topics (acting as a simple Watch). + +Mitigation: The Client library must check the Broker's protocol version or capabilities. If the Broker does not support Label Subscription, the Client should throw a NotSupportedException rather than falling back to consuming all topics (which would be dangerous). + +API: This is a purely additive change to the Public API. + +# Alternatives +Using topic properties to store labels, since we need to reverse query the topic list through labels, simply storing labels in the topic properties is insufficient. This is because it is difficult to collect all properties under a single namespace. Although this issue can be resolved by adding a secondary index of {labels}/{topic-list} in the metadata, it may introduce challenges to data consistency. Therefore, this is not considered a priority implementation solution. + +# Links + + +* Mailing List discussion thread: +* Mailing List voting thread: \ No newline at end of file From 28fc9709514e16e6085839b858466bc04be7f78e Mon Sep 17 00:00:00 2001 From: coderzc Date: Thu, 8 Jan 2026 00:54:10 +0800 Subject: [PATCH 2/2] Refactoring pip --- pip/pip-451.md | 324 ++++++++++++++++++++++++------------------------- 1 file changed, 159 insertions(+), 165 deletions(-) diff --git a/pip/pip-451.md b/pip/pip-451.md index 0f9de3ad38e51..f6d06f50fee18 100644 --- a/pip/pip-451.md +++ b/pip/pip-451.md @@ -16,237 +16,231 @@ Label-based Subscription solves these issues by decoupling "Identity" (Topic Nam ## In Scope * Management: Allow attaching, updating, and removing Key-Value labels to/from Topics via the Admin API. -* Subscription: Allow Consumers to subscribe to topics matching specific Labels within specified Namespaces. Support cross-namespace subscription via an explicit namespace list in the Client API, avoiding the complexity of background metadata polling. - - +* Subscription: Allow Consumers to subscribe to topics matching specific Tags within specified Namespaces. Support cross-namespace subscription via an explicit namespace list in the Client API, avoiding the complexity of background metadata polling. # High-Level Design -The design introduces a metadata-driven approach where labels are stored in TopicPolicies. -The Broker maintains an in-memory index to map labels to topics. The client utilizes a "Watch" mechanism to receive real-time updates when topics matching the labels are created or updated. -## Key points -* Storage: Labels are stored as Map inside TopicPolicies. -* Indexing: The Broker maintains an In-Memory Inverted Index (LabelKey -> LabelValue -> Set) per Namespace. This hierarchical structure ensures efficient lookups for Key-Value pairs without iterating through all topics. -* Discovery Protocol: We extend the CommandWatchTopicList protocol (PIP-179) to accept a label_selector. -* Client Implementation: The Client accepts a list of target namespaces and manages multiple watchers (one per namespace) to aggregate matching topics. -# Detailed Design +The feature consists of three main components: -## Design & Implementation Details +* Storage: -### Storage +We reuse existing Topic Properties. -#### Topic Labels in TopicPolicies +To support Partitioned Topics, we adopt a Propagate-on-Write strategy. When properties are set on a Parent Topic, the Broker orchestrating the update will iterate and write these properties to the ManagedLedger metadata of all its sub-partitions. -Add a labels field to the TopicPolicies class. Since Topic Policies are propagated via the __change_events system topic, this ensures durability and consistency across brokers. -```java -public class TopicPolicies { - // New field: Key-Value labels - private Map customLabels; -} -``` - -#### In-Memory Inverted Index for Labels-Topic Mapping - -The SystemTopicBasedTopicPoliciesService will maintain a nested map structure per Namespace to support efficient Key-Value lookups. +* Discovery (Broker Scatter-Gather): -**Data Structure**: +A new Binary Protocol command allows clients to query "Active Topics matching properties". -```java -// Map>>> -Map>>> labelTopicInvertedIndex; -``` +The Gateway Broker (Coordinator) distributes the query to all Bundle Owners (Workers). -### Topic Labels Observation and Notification Mechanism +Workers perform a fast in-memory scan of loaded topics. -### TopicPoliciesService Interface Changes +* Client (Polling Model): -```java -public interface TopicPoliciesService { - // ... Existing methods ... +The Consumer client periodically polls the Broker using the new command. - /** - * Register a label change listener with the service. - */ - void registerTopicLabelsListener(NamespaceName namespace, TopicLabelsListener listener); +It compares the returned list with its current subscription and dynamically subscribes/unsubscribes. - /** - * Unregister a label change listener with the service. - */ - void unregisterTopicLabelsListener(NamespaceName namespace, TopicLabelsListener listener); +# Detailed Design - /** - * Query the list of all matching Topics under the specified Namespace based on the label selector. - */ - CompletableFuture> getTopicsByLabels(NamespaceName namespace, Map labels); - - /** - * Query all existing labels within a Namespace. - */ - CompletableFuture>> getAllLabels(NamespaceName namespace); -} -``` -To support real-time label change notifications, we will introduce a listener mechanism in TopicPoliciesService. +## Design & Implementation Details -#### Add TopicLabelsListener Interface +### Public API & CLI Changes -```java -public interface TopicLabelsListener { - /** - * Triggered when topic labels are substantially changed. - * @param topicName The affected topic. - * @param allLabels The latest snapshot of all labels. - */ - void onLabelsUpdate(TopicName topicName, Map allLabels); -} -``` +#### Set topic properties: (Existing API can be reused) -#### Listener Management in TopicPoliciesService +* CLI: pulsar-admin topics update-properties -p a=b -p c=d -The TopicPoliciesService will maintain a list of TopicLabelsListener and notify them when a policy update includes label changes. +#### Get topic properties: (Existing API can be reused) -#### Notification Logic in Broker -* Change Detection: When SystemTopicBasedTopicPoliciesService consumes a policy event, it compares oldLabels from the cache with newLabels from the event, then update in-memory inverted index (labelTopicInvertedIndex). +* CLI: pulsar-admin topics get-properties -* Global Dispatch: If `!Objects.equals(oldLabels, newLabels)`, the service invokes `onLabelsUpdate(topicName, labels)` for all registered listeners. +#### Remove topic properties: (Existing API can be reused) -* Watcher Evaluation: The TopicListService (registered as a listener) iterates through its TopicListWatcher instances. +* CLI: pulsar-admin topics remove-properties -k a -#### TopicListWatcher State Machine +#### Query topic associated with specific properties: -Each TopicListWatcher maintains a set of currently matched topics to perform state-based updates: +* CLI: pulsar-admin topics list --properties "k1:v1,k2:v2" +* REST API: GET /admin/v2/topics/{tenant}/{namespace} with a query params properties=k1:v1,k2:v2 +### Consumer API (Client) +Introduce `topicsByLabels` in the ConsumerBuilder. ```java -public void onLabelsUpdate(TopicName topicName, @Nullable Map allLabels) { - // allLabels can be null if the topic is deleted - boolean wasMatching = matchedTopics.contains(topicName); - boolean matchesNow = allLabels != null && labelMatcher.matches(allLabels); - - List newTopics = Collections.emptyList(); - List deletedTopics = Collections.emptyList(); - - if (!wasMatching && matchesNow) { - // It did not match before, but now it matches (matches after adding or modifying labels). - newTopics = Collections.singletonList(topicName.toString()); - matchedTopics.add(topicName.toString()); - } else if (wasMatching && !matchesNow) { - // It matched before, but now it no longer matches (fails to match after the Topic is deleted or its labels are modified). - deletedTopics = Collections.singletonList(topicName.toString()); - matchedTopics.remove(topicName.toString()); - } else { - // If the state remains unchanged - return; - } +public interface ConsumerBuilder { + + // ... existing methods ... - String hash = TopicList.calculateHash(matchedTopics); - topicListService.sendTopicListUpdate(id, hash, deletedTopics, newTopics); + /** + * Subscribe to topics matching the given labels within specific namespaces. + * * @param labels The Key-Value pairs that a topic must match (AND semantics). + * @param namespaces The set of namespaces to look for topics. + * If empty, defaults to the current consumer's namespace. + */ + ConsumerBuilder topicsByLabels(Map labels, Set namespaces); } ``` -### Client Implementation Details -The Client implementation acts as an orchestrator to support the "Cross-Namespace" requirement defined in topicsByLabel. - -* Multi-Watcher Orchestration: - -When `subscribe()` is called, the Client iterates over the provided namespaces. -For each namespace, it initiates a CommandWatchTopicList request. +### Binary protocol -* Aggregation: +Add new Command to query topics by properties: -The Client maintains a unified view of matching topics. -New Topic Event: If any of the namespace watchers receives a NEW_TOPIC update (meaning a topic in that namespace matched the labels), the Client adds it to the list and creates a child consumer. -Deleted Topic Event: If a watcher receives a DELETED_TOPIC update (topic deleted or label removed), the Client closes the corresponding child consumer. +```protobuf +// New Command Types +enum Type { + // ... existing types + GET_TOPICS_OF_NAMESPACE_WITH_PROPERTIES = 101; + GET_TOPICS_OF_NAMESPACE_WITH_PROPERTIES_RESPONSE = 102; +} -* Deterministic Scope: +message KeyValue { + required string key = 1; + required string value = 2; +} -Unlike Regex subscription which might require scanning metadata for matching namespaces, this design relies on the user providing the `Set` namespaces. -### Public API & CLI Changes +message CommandGetTopicsOfNamespaceWithProperties { + required uint64 request_id = 1; + required string namespace = 2; + + // Filter criteria: A topic matches if it contains ALL specified Key-Values. + repeated KeyValue properties = 3; -#### Set Custom Labels: + // Filter Scope Flag + // false (default): Cluster-wide search. Broker acts as Coordinator (Scatter-Gather). + // true: Local-only search. Broker scans only its own memory. + optional bool only_local = 4 [default = false]; +} -* CLI: pulsar-admin topics set-custom-labels --labels "key1=value1,key2=value2" -* REST API: POST /admin/v2/topics/{tenant}/{namespace}/{topic}/custom-labels with a JSON payload {"labels": {"key1":"value1", "key2":"value2"}} -* Action: Sets or updates custom labels for the specified topic. The broker (or admin client before sending) will validate that all provided keys (e.g., key1, key2) are present in the allowedCustomLabelKeys list defined in broker.conf. Invalid keys will result in an error. This operation will update the topic's policy and publish a change event to the system topic (__change_events) for that namespace. +message CommandGetTopicsOfNamespaceWithPropertiesResponse { + required uint64 request_id = 1; + repeated string topics = 2; + + // Indicates if the result might be incomplete (e.g., a peer broker timed out) + optional bool is_partial_result = 3 [default = false]; +} +``` -#### Get Custom Labels: +### Update topic properties: -* CLI: pulsar-admin topics get-custom-labels -* REST API: GET /admin/v2/topics/{tenant}/{namespace}/{topic}/custom-labels -* Action: Retrieves the currently set custom labels for the topic. +If a topic is partitioned, the properties updated are propagated to all partitions. -#### Remove Custom Labels: +### Query topics by properties: Broker Scatter-Gather -* CLI: - - pulsar-admin topics remove-custom-labels --labels "key1,key2" (to remove specific labels) - - pulsar-admin topics remove-custom-labels --all (to remove all custom labels from the topic) -* pulsar-admin topics remove-custom-labels --labels "key1,key2" (to remove specific labels) -* pulsar-admin topics remove-custom-labels --all (to remove all custom labels from the topic) -* REST API: DELETE /admin/v2/topics/{tenant}/{namespace}/{topic}/custom-labels with a query params keys=k1&keys=k2 or all=true. -* Action: Removes the specified custom labels or all custom labels from the topic. This also updates the topic policy. +Add new method `getTopicsUnderNamespaceByProperties` in LookupService to get topics by properties: -#### Query topic associated with specific labels: +```java +public class GetTopicsResult { + private final List topics; + private final boolean isPartialResult; -* CLI: pulsar-admin topics list --custome-labels "k1:v1,k2:v2" -* REST API: GET /admin/v2/topics/{tenant}/{namespace} with a query params custome-labels=k1:v1,k2:v2 + public GetTopicsResult(List topics, boolean isPartialResult) { + this.topics = topics; + this.isPartialResult = isPartialResult; + } -#### Get all existing labels within a namespace: + public List getTopics() { + return topics; + } -* CLI: pulsar-admin namespaces list-topic-custome-labels -* REST API: GET - /admin/v2/topics/{tenant}/{namespace}/topicCustomeLabels + public boolean isPartialResult() { + return isPartialResult; + } +} -### Consumer API (Client) -Introduce `topicsByLabel` in the ConsumerBuilder. -```java -public interface ConsumerBuilder { - - // ... existing methods ... +public interface LookupService { /** - * Subscribe to topics matching the given labels within specific namespaces. - * * @param labels The Key-Value pairs that a topic must match (AND semantics). - * @param namespaces The set of namespaces to look for topics. - * If empty, defaults to the current consumer's namespace. + * Get topics under a namespace that match the specified properties. + * + * @param namespace The namespace + * @param properties The filter labels + * @param onlyLocal If true, only scan the connected broker's memory (Internal use). + * If false, trigger cluster-wide scatter-gather (Client use). + * @return A result containing the list of topics and a partial status flag. */ - ConsumerBuilder topicsByLabel(Map labels, Set namespaces); + CompletableFuture getTopicsUnderNamespaceByProperties( + NamespaceName namespace, + Map properties, + boolean onlyLocal + ); } ``` -### Binary protocol -Extend CommandWatchTopicList to include label_selector: -```protobuf -message CommandWatchTopicList { - required uint64 request_id = 1; - required uint64 watcher_id = 2; - required string namespace = 3; - required string topics_pattern = 4; - // Only present when the client reconnects: - optional string topics_hash = 5; +When a Broker receives a `CommandGetTopicsOfNamespaceWithProperties` with only_local=false, it acts as the Coordinator and performs the following steps: +```mermaid +sequenceDiagram + autonumber + participant Client + participant ServerCnx as ServerCnx (Proxxy) + participant NS as NamespaceService + participant LocalBS as BrokerService (Local) + participant RemoteBroker as Remote Broker + + Note over Client, ServerCnx: (Mode = Proxy) + Client->>ServerCnx: Command(Namespace, Props, only_local=false) + activate ServerCnx + ServerCnx->>NS: GetOwnedBundles(Namespace) + NS-->>ServerCnx: {Bundle A: Local, Bundle B: Remote} - // If not empty, the broker filters topics by these labels. - // 'topics_pattern' should be ignored if this field is set. - repeated KeyValue labels = 6; -} + rect rgb(240, 248, 255) + note right of ServerCnx: Scatter Phase + par Parallel Execution + ServerCnx->>LocalBS: getTopicsWithPropertiesLocal(Props) + + ServerCnx->>RemoteBroker: Command(Ns, Props, only_local=true) + end + end + + Note right of LocalBS: Scan Active Topics
Check ManagedLedger + LocalBS-->>ServerCnx: List [Topic-A] + + Note right of RemoteBroker: Scan Active Topics
Check ManagedLedger + RemoteBroker-->>ServerCnx: List [Topic-B] + + rect rgb(245, 245, 220) + note right of ServerCnx: Gather Phase + ServerCnx->>ServerCnx: Merge Lists: [Topic-A, Topic-B] + end + + ServerCnx->>Client: Response(Topics=[Topic-A, Topic-B]) + deactivate ServerCnx + ``` +When admin server receives topics list by properties request, it forwards to one of the brokers in the cluster which performs the scatter-gather operation as described above. + +### Client Implementation Details + +Since the architecture relies on Active Topics and does not use event-based notifications (like System Topics), the client must use a Polling (Pull) model to detect changes. + +Implement `PropertyMultiTopicsConsumerImpl`, which inherits or refers to the design pattern of MultiTopicsConsumerImpl and extends the capability of cross-namespace dynamic discovery based on Metadata Properties. + +Internally starts a single-threaded ScheduledExecutorService to periodically execute the recheckTopics() method, with the process as follows: + +1. Discovery (Server-side Filtering) Invokes LookupService.getTopicsUnderNamespaceByProperties to directly retrieve the pre-filtered list of topics that match the specified metadata properties, effectively offloading the filtering logic to the server side. + +2. Rebalance (State Synchronization) Performs a differential comparison between this retrieved target list and the active subscription map, executing incremental subscribe or close operations to maintain state consistency. + + ### Configuration Changes -maxCustomLabelValueLength= -Description: The maximum character length for a custom label value. -Default: 128 # Security Considerations -Authorization: Setting labels modifies Topic Policies, which requires update-policies permission on the Namespace or Topic. + # Backward Compatibility -Protocol: Old brokers will not understand the label_selector field in CommandWatchTopicList. Old brokers might ignore the `label_selector` field and return all topics (acting as a simple Watch). +Protocol: Old brokers will not understand the CommandGetTopicsOfNamespaceWithProperties. Mitigation: The Client library must check the Broker's protocol version or capabilities. If the Broker does not support Label Subscription, the Client should throw a NotSupportedException rather than falling back to consuming all topics (which would be dangerous). -API: This is a purely additive change to the Public API. # Alternatives -Using topic properties to store labels, since we need to reverse query the topic list through labels, simply storing labels in the topic properties is insufficient. This is because it is difficult to collect all properties under a single namespace. Although this issue can be resolved by adding a secondary index of {labels}/{topic-list} in the metadata, it may introduce challenges to data consistency. Therefore, this is not considered a priority implementation solution. + +* Store labels in an external metadata service (e.g., ZooKeeper, etcd) instead of Topic Properties. + - Pros: Decouples label storage from topics, allowing more complex querying and indexing. + - Cons: Increases system complexity, introduces additional latency, and requires maintaining consistency between the external store and topic states. # Links