diff --git a/pom.xml b/pom.xml
index 3321a33..2367ab1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,14 +21,26 @@
apache
29
- 1.0.0-SNAPSHOT
-
- 2024
pulsar-java-contrib
+ 1.0.0-SNAPSHOT
pom
Pulsar Java Contrib
+
+ pulsar-client-common-contrib
+ pulsar-loadbalance-contrib
+ pulsar-interceptor-contrib
+ pulsar-connector-contrib
+ pulsar-function-contrib
+ pulsar-bookkeeper-contrib
+ pulsar-transaction-contrib
+ pulsar-metrics-contrib
+ pulsar-auth-contrib
+ pulsar-rpc-contrib
+ pulsar-admin-mcp-contrib
+
+
1.18.32
2.0.13
@@ -52,20 +64,6 @@
5.12.0
-
- pulsar-client-common-contrib
- pulsar-loadbalance-contrib
- pulsar-interceptor-contrib
- pulsar-connector-contrib
- pulsar-function-contrib
- pulsar-bookkeeper-contrib
- pulsar-transaction-contrib
- pulsar-metrics-contrib
- pulsar-auth-contrib
- pulsar-rpc-contrib
- pulsar-admin-mcp-contrib
-
-
@@ -102,6 +100,12 @@
+
+ org.awaitility
+ awaitility
+ ${awaitility.version}
+ test
+
org.projectlombok
lombok
@@ -122,34 +126,9 @@
${org.testing.version}
test
-
- org.awaitility
- awaitility
- ${awaitility.version}
- test
-
-
- org.apache.maven.plugins
- maven-wrapper-plugin
- ${maven-wrapper-plugin.version}
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
- ${maven-compiler-plugin.version}
-
-
-
- org.projectlombok
- lombok
- ${lombok.version}
-
-
-
-
com.diffplug.spotless
@@ -195,36 +174,6 @@
-
-
- org.apache.maven.plugins
- maven-checkstyle-plugin
- ${maven-checkstyle-plugin.version}
-
-
- com.puppycrawl.tools
- checkstyle
- ${puppycrawl.checkstyle.version}
-
-
-
- ./src/main/resources/checkstyle.xml
- ./src/main/resources/suppressions.xml
- true
- UTF-8
- **/*.proto
-
-
-
-
- validate-checkstyle
- validate
-
- check
-
-
-
-
@@ -267,6 +216,57 @@
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+ ${maven-checkstyle-plugin.version}
+
+ ./src/main/resources/checkstyle.xml
+ ./src/main/resources/suppressions.xml
+ true
+ UTF-8
+ **/*.proto
+
+
+
+
+ com.puppycrawl.tools
+ checkstyle
+ ${puppycrawl.checkstyle.version}
+
+
+
+
+ validate-checkstyle
+
+ check
+
+ validate
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ ${maven-compiler-plugin.version}
+
+
+
+ org.projectlombok
+ lombok
+ ${lombok.version}
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-wrapper-plugin
+ ${maven-wrapper-plugin.version}
+
+
+ 2024
diff --git a/pulsar-admin-mcp-contrib/pom.xml b/pulsar-admin-mcp-contrib/pom.xml
index d0ff264..35cb110 100644
--- a/pulsar-admin-mcp-contrib/pom.xml
+++ b/pulsar-admin-mcp-contrib/pom.xml
@@ -14,9 +14,7 @@
limitations under the License.
-->
-
+
4.0.0
@@ -58,19 +56,47 @@
+
+ com.fasterxml.jackson.core
+ jackson-core
+ ${jackson.version}
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ ${jackson.version}
+
+
+ io.modelcontextprotocol.sdk
+ mcp
+ 0.12.0
+
+
+ org.apache.commons
+ commons-lang3
+ 3.18.0
+
+
+ org.apache.pulsar
+ pulsar-client
+
org.apache.pulsar
pulsar-client-admin
test
+
+ org.apache.pulsar
+ pulsar-client-admin
+
org.eclipse.jetty
- jetty-server
+ jetty-http
${jetty.version}
org.eclipse.jetty
- jetty-servlet
+ jetty-io
${jetty.version}
@@ -80,12 +106,12 @@
org.eclipse.jetty
- jetty-webapp
+ jetty-server
${jetty.version}
org.eclipse.jetty
- jetty-io
+ jetty-servlet
${jetty.version}
@@ -95,37 +121,9 @@
org.eclipse.jetty
- jetty-http
+ jetty-webapp
${jetty.version}
-
- com.fasterxml.jackson.core
- jackson-core
- ${jackson.version}
-
-
- com.fasterxml.jackson.core
- jackson-databind
- ${jackson.version}
-
-
- io.modelcontextprotocol.sdk
- mcp
- 0.12.0
-
-
- org.apache.pulsar
- pulsar-client
-
-
- org.apache.pulsar
- pulsar-client-admin
-
-
- org.apache.commons
- commons-lang3
- 3.18.0
-
org.mockito
mockito-core
@@ -139,8 +137,7 @@
org.springframework.boot
- spring-boot-starter-test
- test
+ spring-boot-starter
org.springframework.boot
@@ -150,7 +147,8 @@
org.springframework.boot
- spring-boot-starter-validation
+ spring-boot-starter-test
+ test
org.springframework.boot
@@ -158,14 +156,9 @@
-
- org.testcontainers
- pulsar
- test
-
org.springframework.boot
- spring-boot-starter
+ spring-boot-starter-validation
org.springframework.boot
@@ -173,10 +166,23 @@
+
+ org.testcontainers
+ pulsar
+ test
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+ ${maven.compiler.source}
+ ${maven.compiler.target}
+
+
org.springframework.boot
spring-boot-maven-plugin
@@ -186,22 +192,15 @@
- repackage
+
+ repackage
+
-
- org.apache.maven.plugins
- maven-compiler-plugin
-
- ${maven.compiler.source}
- ${maven.compiler.target}
-
-
2024
-
diff --git a/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/Main.java b/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/Main.java
index f9775d3..c59ccfc 100644
--- a/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/Main.java
+++ b/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/Main.java
@@ -19,17 +19,17 @@
import org.slf4j.LoggerFactory;
public class Main {
- private static final Logger logger = LoggerFactory.getLogger(Main.class);
+ private static final Logger logger = LoggerFactory.getLogger(Main.class);
- public static void main(String[] args) {
- try {
- PulsarMCPCliOptions options = PulsarMCPCliOptions.parseArgs(args);
- logger.info("Starting Pulsar Admin MCP Server with options: {}", options);
- TransportLauncher.start(options);
- } catch (Exception e) {
- logger.error("Fatal error starting Pulsar Admin MCP Server: {}", e.getMessage(), e);
- System.err.println("Fatal error: " + e.getMessage());
- System.exit(-1);
- }
+ public static void main(String[] args) {
+ try {
+ PulsarMCPCliOptions options = PulsarMCPCliOptions.parseArgs(args);
+ logger.info("Starting Pulsar Admin MCP Server with options: {}", options);
+ TransportLauncher.start(options);
+ } catch (Exception e) {
+ logger.error("Fatal error starting Pulsar Admin MCP Server: {}", e.getMessage(), e);
+ System.err.println("Fatal error: " + e.getMessage());
+ System.exit(-1);
}
+ }
}
diff --git a/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/client/PulsarClientManager.java b/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/client/PulsarClientManager.java
index 84e464a..f7d3e03 100644
--- a/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/client/PulsarClientManager.java
+++ b/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/client/PulsarClientManager.java
@@ -23,113 +23,115 @@
@Component
public class PulsarClientManager implements AutoCloseable {
- private PulsarAdmin pulsarAdmin;
- private PulsarClient pulsarClient;
+ private PulsarAdmin pulsarAdmin;
+ private PulsarClient pulsarClient;
- private final AtomicBoolean adminInitialized = new AtomicBoolean();
- private final AtomicBoolean clientInitialized = new AtomicBoolean();
+ private final AtomicBoolean adminInitialized = new AtomicBoolean();
+ private final AtomicBoolean clientInitialized = new AtomicBoolean();
- public void initialize() {
- getAdmin();
- getClient();
+ public void initialize() {
+ getAdmin();
+ getClient();
+ }
+
+ public synchronized PulsarAdmin getAdmin() {
+ if (!adminInitialized.get()) {
+ initializePulsarAdmin();
}
+ return pulsarAdmin;
+ }
- public synchronized PulsarAdmin getAdmin() {
- if (!adminInitialized.get()) {
- initializePulsarAdmin();
- }
- return pulsarAdmin;
+ public synchronized PulsarClient getClient() {
+ if (!clientInitialized.get()) {
+ initializePulsarClient();
}
+ return pulsarClient;
+ }
- public synchronized PulsarClient getClient() {
- if (!clientInitialized.get()) {
- initializePulsarClient();
- }
- return pulsarClient;
+ private void initializePulsarAdmin() {
+
+ if (!adminInitialized.compareAndSet(false, true)) {
+ return;
}
- private void initializePulsarAdmin() {
+ boolean success = false;
+ try {
+ String adminUrl = System.getenv().getOrDefault("PULSAR_ADMIN_URL", "http://localhost:8080");
- if (!adminInitialized.compareAndSet(false, true)) {
- return;
- }
+ PulsarAdminBuilder adminBuilder =
+ PulsarAdmin.builder()
+ .serviceHttpUrl(adminUrl)
+ .connectionTimeout(30, TimeUnit.SECONDS)
+ .readTimeout(60, TimeUnit.SECONDS);
- boolean success = false;
- try {
- String adminUrl = System.getenv().getOrDefault("PULSAR_ADMIN_URL", "http://localhost:8080");
-
- PulsarAdminBuilder adminBuilder = PulsarAdmin.builder()
- .serviceHttpUrl(adminUrl)
- .connectionTimeout(30, TimeUnit.SECONDS)
- .readTimeout(60, TimeUnit.SECONDS);
-
- pulsarAdmin = adminBuilder.build();
-
- pulsarAdmin.clusters().getClusters();
- success = true;
-
- } catch (Exception e) {
- if (pulsarAdmin != null) {
- try {
- pulsarAdmin.close();
- } catch (Exception ignore) {
-
- }
- pulsarAdmin = null;
- }
- adminInitialized.set(false);
- throw new RuntimeException("Failed to initialize PulsarAdmin", e);
- } finally {
- if (!success) {
- adminInitialized.set(false);
- }
- }
- }
+ pulsarAdmin = adminBuilder.build();
- private void initializePulsarClient() {
- if (!clientInitialized.compareAndSet(false, true)) {
- return;
- }
- boolean success = false;
+ pulsarAdmin.clusters().getClusters();
+ success = true;
+
+ } catch (Exception e) {
+ if (pulsarAdmin != null) {
try {
- String serviceUrl = System.getenv().getOrDefault("PULSAR_SERVICE_URL", "pulsar://localhost:6650");
-
- var clientBuilder = PulsarClient.builder()
- .serviceUrl(serviceUrl)
- .operationTimeout(30, TimeUnit.SECONDS)
- .connectionTimeout(30, TimeUnit.SECONDS)
- .keepAliveInterval(30, TimeUnit.SECONDS);
-
- this.pulsarClient = clientBuilder.build();
- success = true;
-
- } catch (Exception e) {
- if (pulsarClient != null) {
- try {
- pulsarClient.close();
- } catch (Exception ignore) {
- }
- pulsarClient = null;
- }
- clientInitialized.set(false);
- throw new RuntimeException("Failed to initialize PulsarClient", e);
- } finally {
- if (!success) {
- clientInitialized.set(false);
- }
+ pulsarAdmin.close();
+ } catch (Exception ignore) {
+
}
+ pulsarAdmin = null;
+ }
+ adminInitialized.set(false);
+ throw new RuntimeException("Failed to initialize PulsarAdmin", e);
+ } finally {
+ if (!success) {
+ adminInitialized.set(false);
+ }
}
+ }
- @Override
- public void close() throws Exception {
- if (pulsarClient != null) {
- pulsarClient.close();
- }
- if (pulsarAdmin != null) {
- pulsarAdmin.close();
+ private void initializePulsarClient() {
+ if (!clientInitialized.compareAndSet(false, true)) {
+ return;
+ }
+ boolean success = false;
+ try {
+ String serviceUrl =
+ System.getenv().getOrDefault("PULSAR_SERVICE_URL", "pulsar://localhost:6650");
+
+ var clientBuilder =
+ PulsarClient.builder()
+ .serviceUrl(serviceUrl)
+ .operationTimeout(30, TimeUnit.SECONDS)
+ .connectionTimeout(30, TimeUnit.SECONDS)
+ .keepAliveInterval(30, TimeUnit.SECONDS);
+
+ this.pulsarClient = clientBuilder.build();
+ success = true;
+
+ } catch (Exception e) {
+ if (pulsarClient != null) {
+ try {
+ pulsarClient.close();
+ } catch (Exception ignore) {
}
- adminInitialized.set(false);
+ pulsarClient = null;
+ }
+ clientInitialized.set(false);
+ throw new RuntimeException("Failed to initialize PulsarClient", e);
+ } finally {
+ if (!success) {
clientInitialized.set(false);
+ }
}
+ }
+ @Override
+ public void close() throws Exception {
+ if (pulsarClient != null) {
+ pulsarClient.close();
+ }
+ if (pulsarAdmin != null) {
+ pulsarAdmin.close();
+ }
+ adminInitialized.set(false);
+ clientInitialized.set(false);
+ }
}
diff --git a/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/config/PulsarMCPCliOptions.java b/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/config/PulsarMCPCliOptions.java
index 163f4e6..e742349 100644
--- a/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/config/PulsarMCPCliOptions.java
+++ b/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/config/PulsarMCPCliOptions.java
@@ -18,67 +18,64 @@
@Getter
public class PulsarMCPCliOptions {
- @Getter
- public enum TransportType {
+ @Getter
+ public enum TransportType {
+ STDIO("stdio", "Standard input/output (Claude Desktop)"),
+ HTTP("http", "HTTP Streaming Events (Web application)");
+ private final String value;
+ private final String description;
- STDIO("stdio", "Standard input/output (Claude Desktop)"),
- HTTP("http", "HTTP Streaming Events (Web application)");
- private final String value;
- private final String description;
-
- TransportType(String value, String description) {
- this.value = value;
- this.description = description;
- }
+ TransportType(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
- public static TransportType fromString(String value) {
- for (TransportType t : values()) {
- if (t.value.equalsIgnoreCase(value)) {
- return t;
- }
- }
- throw new IllegalArgumentException(
- value + " is not a valid TransportType. Valid Options: stdio,http");
+ public static TransportType fromString(String value) {
+ for (TransportType t : values()) {
+ if (t.value.equalsIgnoreCase(value)) {
+ return t;
}
+ }
+ throw new IllegalArgumentException(
+ value + " is not a valid TransportType. Valid Options: stdio,http");
}
+ }
- private TransportType transport = TransportType.STDIO;
- private int httpPort = 8889;
+ private TransportType transport = TransportType.STDIO;
+ private int httpPort = 8889;
- public static PulsarMCPCliOptions parseArgs(String[] args) {
- PulsarMCPCliOptions opts = new PulsarMCPCliOptions();
+ public static PulsarMCPCliOptions parseArgs(String[] args) {
+ PulsarMCPCliOptions opts = new PulsarMCPCliOptions();
- for (int i = 0; i < args.length; i++) {
- String arg = args[i];
- switch (arg) {
- case "-t", "--transport" -> {
- if (i + 1 >= args.length) {
- throw new IllegalArgumentException("Missing value for --transport");
- }
- opts.transport = TransportType.fromString(args[++i]);
- }
- case "--port" -> {
- if (i + 1 >= args.length) {
- throw new IllegalArgumentException("Missing value for --port");
- }
- try {
- opts.httpPort = Integer.parseInt(args[++i]);
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException("Invalid port number for --port");
- }
- }
- default -> {
- throw new IllegalArgumentException("Unknown argument: " + arg);
- }
- }
+ for (int i = 0; i < args.length; i++) {
+ String arg = args[i];
+ switch (arg) {
+ case "-t", "--transport" -> {
+ if (i + 1 >= args.length) {
+ throw new IllegalArgumentException("Missing value for --transport");
+ }
+ opts.transport = TransportType.fromString(args[++i]);
+ }
+ case "--port" -> {
+ if (i + 1 >= args.length) {
+ throw new IllegalArgumentException("Missing value for --port");
+ }
+ try {
+ opts.httpPort = Integer.parseInt(args[++i]);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid port number for --port");
+ }
}
- return opts;
+ default -> {
+ throw new IllegalArgumentException("Unknown argument: " + arg);
+ }
+ }
}
+ return opts;
+ }
- @Override
- public String toString() {
- return "PulsarMCPCliOptions{transport=" + transport
- + ",httpPort=" + httpPort + '}';
- }
+ @Override
+ public String toString() {
+ return "PulsarMCPCliOptions{transport=" + transport + ",httpPort=" + httpPort + '}';
+ }
}
-
diff --git a/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/tools/BasePulsarTools.java b/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/tools/BasePulsarTools.java
index d09911b..19a575b 100644
--- a/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/tools/BasePulsarTools.java
+++ b/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/tools/BasePulsarTools.java
@@ -23,181 +23,168 @@
public abstract class BasePulsarTools {
- protected static final Logger LOGGER = LoggerFactory.getLogger(BasePulsarTools.class);
- protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
- protected final PulsarAdmin pulsarAdmin;
-
- public BasePulsarTools(PulsarAdmin pulsarAdmin) {
- if (pulsarAdmin == null) {
- throw new IllegalArgumentException("pulsarAdmin cannot be null");
- }
- this.pulsarAdmin = pulsarAdmin;
- }
-
- protected McpSchema.CallToolResult createSuccessResult(String message, Object data){
- StringBuilder result = new StringBuilder();
- result.append(message).append("\n");
-
- if (data != null){
- try {
- String jsonData = OBJECT_MAPPER.writerWithDefaultPrettyPrinter()
- .writeValueAsString(data);
- result.append(jsonData)
- .append("\n");
- } catch (Exception e) {
- result.append("Result").append(data.toString()).append("\n");
- }
- }
-
- return new McpSchema.CallToolResult(
- List.of(new McpSchema.TextContent(result.toString())),
- false
- );
- }
-
- protected McpSchema.CallToolResult createErrorResult(String message){
- String errorText = "Error: " + message;
- return new McpSchema.CallToolResult(
- List.of(new McpSchema.TextContent(errorText)),
- true
- );
- }
-
- protected McpSchema.CallToolResult createErrorResult(String message, List suggestions){
- StringBuilder result = new StringBuilder();
- result.append(message).append("\n");
-
- if (suggestions != null && !suggestions.isEmpty()) {
- suggestions.forEach(s -> result.append(s).append("\n"));
- }
- return new McpSchema.CallToolResult(
- List.of(new McpSchema.TextContent(result.toString())),
- true
- );
- }
-
- protected String getStringParam(Map map, String key){
- Object value = map.get(key);
- return value == null ? "" : value.toString();
- }
-
- protected String getRequiredStringParam(Map map, String key) throws IllegalArgumentException{
- String value = getStringParam(map, key);
- if (value == null || value.trim().isEmpty()) {
- throw new IllegalArgumentException("Required parameter '" + key + "' is missing");
- }
- return value.trim();
- }
-
- protected Integer getIntParam(Map map, String key, Integer defaultValue) {
- Object value = map.get(key);
- if (value == null) {
- return defaultValue;
- }
-
- try {
- if (value instanceof Number) {
- return ((Number) value).intValue();
- } else {
- return Integer.parseInt(value.toString());
- }
- } catch (NumberFormatException e) {
- return defaultValue;
- }
- }
-
- protected Boolean getBooleanParam(Map map, String key, Boolean defaultValue) {
- Object value = map.get(key);
- if (value == null) {
- return defaultValue;
- }
- if (value instanceof Boolean) {
- return (Boolean) value;
- } else {
- return Boolean.parseBoolean(value.toString());
- }
- }
-
- protected Long getLongParam(Map arguments, String timestamp, Long defaultValue) {
- Object value = arguments.get(timestamp);
- if (value == null) {
- return defaultValue;
- }
-
- try {
- if (value instanceof Number) {
- return ((Number) value).longValue();
- } else {
- return Long.parseLong(value.toString());
- }
- } catch (NumberFormatException e) {
- return defaultValue;
- }
- }
-
- protected static McpSchema.Tool createTool (
- String name,
- String description,
- String inputSchema) {
- return McpSchema.Tool.builder()
- .name(name)
- .description(description)
- .inputSchema(inputSchema)
- .build();
- }
-
-
- protected String buildFullTopicName(Map arguments) {
- String topicName = getStringParam(arguments, "topic");
- if (topicName != null && !topicName.isBlank()) {
- if (topicName.startsWith("persistent://") || topicName.startsWith("non-persistent://")) {
- return topicName.trim();
- }
- }
-
- String tenant = (String) arguments.getOrDefault("tenant", "public");
- String namespace = (String) arguments.getOrDefault("namespace", "default");
- Boolean persistent = (Boolean) arguments.getOrDefault("persistent", true);
-
- String prefix = persistent ? "persistent://" : "non-persistent://";
- return prefix + tenant + "/" + namespace + "/" + topicName;
+ protected static final Logger LOGGER = LoggerFactory.getLogger(BasePulsarTools.class);
+ protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ protected final PulsarAdmin pulsarAdmin;
+
+ public BasePulsarTools(PulsarAdmin pulsarAdmin) {
+ if (pulsarAdmin == null) {
+ throw new IllegalArgumentException("pulsarAdmin cannot be null");
+ }
+ this.pulsarAdmin = pulsarAdmin;
+ }
+
+ protected McpSchema.CallToolResult createSuccessResult(String message, Object data) {
+ StringBuilder result = new StringBuilder();
+ result.append(message).append("\n");
+
+ if (data != null) {
+ try {
+ String jsonData = OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(data);
+ result.append(jsonData).append("\n");
+ } catch (Exception e) {
+ result.append("Result").append(data.toString()).append("\n");
+ }
+ }
+
+ return new McpSchema.CallToolResult(
+ List.of(new McpSchema.TextContent(result.toString())), false);
+ }
+
+ protected McpSchema.CallToolResult createErrorResult(String message) {
+ String errorText = "Error: " + message;
+ return new McpSchema.CallToolResult(List.of(new McpSchema.TextContent(errorText)), true);
+ }
+
+ protected McpSchema.CallToolResult createErrorResult(String message, List suggestions) {
+ StringBuilder result = new StringBuilder();
+ result.append(message).append("\n");
+
+ if (suggestions != null && !suggestions.isEmpty()) {
+ suggestions.forEach(s -> result.append(s).append("\n"));
+ }
+ return new McpSchema.CallToolResult(
+ List.of(new McpSchema.TextContent(result.toString())), true);
+ }
+
+ protected String getStringParam(Map map, String key) {
+ Object value = map.get(key);
+ return value == null ? "" : value.toString();
+ }
+
+ protected String getRequiredStringParam(Map map, String key)
+ throws IllegalArgumentException {
+ String value = getStringParam(map, key);
+ if (value == null || value.trim().isEmpty()) {
+ throw new IllegalArgumentException("Required parameter '" + key + "' is missing");
+ }
+ return value.trim();
+ }
+
+ protected Integer getIntParam(Map map, String key, Integer defaultValue) {
+ Object value = map.get(key);
+ if (value == null) {
+ return defaultValue;
+ }
+
+ try {
+ if (value instanceof Number) {
+ return ((Number) value).intValue();
+ } else {
+ return Integer.parseInt(value.toString());
+ }
+ } catch (NumberFormatException e) {
+ return defaultValue;
+ }
+ }
+
+ protected Boolean getBooleanParam(Map map, String key, Boolean defaultValue) {
+ Object value = map.get(key);
+ if (value == null) {
+ return defaultValue;
+ }
+ if (value instanceof Boolean) {
+ return (Boolean) value;
+ } else {
+ return Boolean.parseBoolean(value.toString());
+ }
+ }
+
+ protected Long getLongParam(Map arguments, String timestamp, Long defaultValue) {
+ Object value = arguments.get(timestamp);
+ if (value == null) {
+ return defaultValue;
+ }
+
+ try {
+ if (value instanceof Number) {
+ return ((Number) value).longValue();
+ } else {
+ return Long.parseLong(value.toString());
+ }
+ } catch (NumberFormatException e) {
+ return defaultValue;
+ }
+ }
+
+ protected static McpSchema.Tool createTool(String name, String description, String inputSchema) {
+ return McpSchema.Tool.builder()
+ .name(name)
+ .description(description)
+ .inputSchema(inputSchema)
+ .build();
+ }
+
+ protected String buildFullTopicName(Map arguments) {
+ String topicName = getStringParam(arguments, "topic");
+ if (topicName != null && !topicName.isBlank()) {
+ if (topicName.startsWith("persistent://") || topicName.startsWith("non-persistent://")) {
+ return topicName.trim();
+ }
+ }
+
+ String tenant = (String) arguments.getOrDefault("tenant", "public");
+ String namespace = (String) arguments.getOrDefault("namespace", "default");
+ Boolean persistent = (Boolean) arguments.getOrDefault("persistent", true);
+
+ String prefix = persistent ? "persistent://" : "non-persistent://";
+ return prefix + tenant + "/" + namespace + "/" + topicName;
+ }
+
+ protected String resolveNamespace(Map arguments) {
+ String tenant = getStringParam(arguments, "tenant");
+ String namespace = getStringParam(arguments, "namespace");
+
+ if (namespace != null && namespace.contains("/")) {
+ return namespace;
+ }
+
+ if (tenant == null) {
+ tenant = "public";
+ }
+
+ if (namespace == null) {
+ namespace = "default";
+ }
+
+ return tenant + "/" + namespace;
+ }
+
+ protected void addTopicBreakdown(Map result, String fullTopicName) {
+ if (fullTopicName.startsWith("persistent://")) {
+ fullTopicName = fullTopicName.substring("persistent://".length());
+ } else if (fullTopicName.startsWith("non-persistent://")) {
+ fullTopicName = fullTopicName.substring("non-persistent://".length());
}
- protected String resolveNamespace(Map arguments) {
- String tenant = getStringParam(arguments, "tenant");
- String namespace = getStringParam(arguments, "namespace");
-
- if (namespace != null && namespace.contains("/")) {
- return namespace;
- }
-
- if (tenant == null) {
- tenant = "public";
- }
-
- if (namespace == null) {
- namespace = "default";
- }
-
- return tenant + "/" + namespace;
- }
-
- protected void addTopicBreakdown(Map result, String fullTopicName) {
- if (fullTopicName.startsWith("persistent://")) {
- fullTopicName = fullTopicName.substring("persistent://".length());
- } else if (fullTopicName.startsWith("non-persistent://")) {
- fullTopicName = fullTopicName.substring("non-persistent://".length());
- }
-
- String[] parts = fullTopicName.split("/", 3);
- if (parts.length != 3) {
- return;
- }
-
- result.put("tenant", parts[0]);
- result.put("namespace", parts[1]);
- result.put("topicName", parts[2]);
+ String[] parts = fullTopicName.split("/", 3);
+ if (parts.length != 3) {
+ return;
}
+ result.put("tenant", parts[0]);
+ result.put("namespace", parts[1]);
+ result.put("topicName", parts[2]);
+ }
}
diff --git a/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/tools/ClusterTools.java b/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/tools/ClusterTools.java
index 62c6a15..1a20e52 100644
--- a/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/tools/ClusterTools.java
+++ b/pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/tools/ClusterTools.java
@@ -31,82 +31,86 @@
public class ClusterTools extends BasePulsarTools {
- public ClusterTools(PulsarAdmin pulsarAdmin) {
- super(pulsarAdmin);
- }
-
- public void registerTools(McpSyncServer mcpServer){
- registerListClusters(mcpServer);
- registerGetClusterInfo(mcpServer);
- registerCreateCluster(mcpServer);
- registerDeleteCluster(mcpServer);
- registerUpdateClusterConfig(mcpServer);
- registerGetClusterStats(mcpServer);
- registerListBrokers(mcpServer);
- registerGetBrokerStats(mcpServer);
- registerGetClusterFailureDomain(mcpServer);
- registerSetClusterFailureDomain(mcpServer);
- }
-
- private void registerListClusters(McpSyncServer mcpServer){
- McpSchema.Tool tool = createTool(
- "list-clusters",
- "List all Pulsar clusters and their status",
- """
+ public ClusterTools(PulsarAdmin pulsarAdmin) {
+ super(pulsarAdmin);
+ }
+
+ public void registerTools(McpSyncServer mcpServer) {
+ registerListClusters(mcpServer);
+ registerGetClusterInfo(mcpServer);
+ registerCreateCluster(mcpServer);
+ registerDeleteCluster(mcpServer);
+ registerUpdateClusterConfig(mcpServer);
+ registerGetClusterStats(mcpServer);
+ registerListBrokers(mcpServer);
+ registerGetBrokerStats(mcpServer);
+ registerGetClusterFailureDomain(mcpServer);
+ registerSetClusterFailureDomain(mcpServer);
+ }
+
+ private void registerListClusters(McpSyncServer mcpServer) {
+ McpSchema.Tool tool =
+ createTool(
+ "list-clusters",
+ "List all Pulsar clusters and their status",
+ """
{
"type": "object",
"properties": {},
"required": []
}
- """
- );
-
- mcpServer.addTool(McpServerFeatures.SyncToolSpecification.builder()
- .tool(tool)
- .callHandler((exchange, request) -> {
- try {
- var clusters = pulsarAdmin.clusters().getClusters();
-
- Map clusterDetails = new HashMap<>();
- for (String clusterName :clusters) {
- try {
- ClusterData clusterData = pulsarAdmin.clusters().getCluster(clusterName);
- Map details = new HashMap<>();
- details.put("serviceUrl", clusterData.getServiceUrl());
- details.put("serviceUrlTls", clusterData.getServiceUrlTls());
- details.put("brokerServiceUrl", clusterData.getBrokerServiceUrl());
- details.put("brokerServiceUrlTls", clusterData.getBrokerServiceUrlTls());
- details.put("status", "active");
- clusterDetails.put(clusterName, details);
- } catch (Exception e) {
- Map details = new HashMap<>();
- details.put("status", "error");
- details.put("error", e.getMessage());
- clusterDetails.put(clusterName, details);
- }
- }
+ """);
+
+ mcpServer.addTool(
+ McpServerFeatures.SyncToolSpecification.builder()
+ .tool(tool)
+ .callHandler(
+ (exchange, request) -> {
+ try {
+ var clusters = pulsarAdmin.clusters().getClusters();
+
+ Map clusterDetails = new HashMap<>();
+ for (String clusterName : clusters) {
+ try {
+ ClusterData clusterData = pulsarAdmin.clusters().getCluster(clusterName);
+ Map details = new HashMap<>();
+ details.put("serviceUrl", clusterData.getServiceUrl());
+ details.put("serviceUrlTls", clusterData.getServiceUrlTls());
+ details.put("brokerServiceUrl", clusterData.getBrokerServiceUrl());
+ details.put("brokerServiceUrlTls", clusterData.getBrokerServiceUrlTls());
+ details.put("status", "active");
+ clusterDetails.put(clusterName, details);
+ } catch (Exception e) {
+ Map details = new HashMap<>();
+ details.put("status", "error");
+ details.put("error", e.getMessage());
+ clusterDetails.put(clusterName, details);
+ }
+ }
- Map result = new HashMap<>();
- result.put("clusters", clusters);
- result.put("count", clusters.size());
- result.put("clusterDetails", clusterDetails);
+ Map result = new HashMap<>();
+ result.put("clusters", clusters);
+ result.put("count", clusters.size());
+ result.put("clusterDetails", clusterDetails);
- return createSuccessResult("Cluster details", result);
+ return createSuccessResult("Cluster details", result);
- } catch (IllegalArgumentException e) {
- return createErrorResult(e.getMessage());
- } catch (Exception e) {
- LOGGER.error("Failed to list clusters", e);
- return createErrorResult("Failed to list clusters" + e.getMessage());
- }
- }).build());
- }
-
- private void registerGetClusterInfo(McpSyncServer mcpServer) {
- McpSchema.Tool tool = createTool(
- "get-cluster-info",
- "Get details information about a specific cluster",
- """
+ } catch (IllegalArgumentException e) {
+ return createErrorResult(e.getMessage());
+ } catch (Exception e) {
+ LOGGER.error("Failed to list clusters", e);
+ return createErrorResult("Failed to list clusters" + e.getMessage());
+ }
+ })
+ .build());
+ }
+
+ private void registerGetClusterInfo(McpSyncServer mcpServer) {
+ McpSchema.Tool tool =
+ createTool(
+ "get-cluster-info",
+ "Get details information about a specific cluster",
+ """
{
"type": "object",
"properties": {
@@ -117,46 +121,49 @@ private void registerGetClusterInfo(McpSyncServer mcpServer) {
},
"required": ["clusterName"]
}
- """
- );
-
- mcpServer.addTool(McpServerFeatures.SyncToolSpecification.builder()
- .tool(tool)
- .callHandler((exchange, request) -> {
- try {
- String clusterName = getRequiredStringParam(request.arguments(), "clusterName");
-
- ClusterData clusterData = pulsarAdmin.clusters().getCluster(clusterName);
-
- Map result = new HashMap<>();
- result.put("clusterName", clusterName);
- result.put("serviceUrl", clusterData.getServiceUrl());
- result.put("serviceUrlTls", clusterData.getServiceUrlTls());
- result.put("brokerServiceUrl", clusterData.getBrokerServiceUrl());
- result.put("brokerServiceUrlTls", clusterData.getBrokerServiceUrlTls());
- result.put("peerClusterNames", clusterData.getPeerClusterNames());
- result.put("proxyProtocol", clusterData.getProxyProtocol());
- result.put("authenticationPlugin", clusterData.getAuthenticationPlugin());
- result.put("authenticationParameters", clusterData.getAuthenticationParameters());
- result.put("proxyServiceUrl", clusterData.getProxyServiceUrl());
-
- return createSuccessResult("Cluster info retrieved", result);
-
- } catch (IllegalArgumentException e) {
- return createErrorResult(e.getMessage());
- } catch (Exception e) {
- LOGGER.error("Failed to get cluster info", e);
- return createErrorResult("Failed to get cluster info: " + e.getMessage());
- }
+ """);
+
+ mcpServer.addTool(
+ McpServerFeatures.SyncToolSpecification.builder()
+ .tool(tool)
+ .callHandler(
+ (exchange, request) -> {
+ try {
+ String clusterName = getRequiredStringParam(request.arguments(), "clusterName");
+
+ ClusterData clusterData = pulsarAdmin.clusters().getCluster(clusterName);
+
+ Map result = new HashMap<>();
+ result.put("clusterName", clusterName);
+ result.put("serviceUrl", clusterData.getServiceUrl());
+ result.put("serviceUrlTls", clusterData.getServiceUrlTls());
+ result.put("brokerServiceUrl", clusterData.getBrokerServiceUrl());
+ result.put("brokerServiceUrlTls", clusterData.getBrokerServiceUrlTls());
+ result.put("peerClusterNames", clusterData.getPeerClusterNames());
+ result.put("proxyProtocol", clusterData.getProxyProtocol());
+ result.put("authenticationPlugin", clusterData.getAuthenticationPlugin());
+ result.put(
+ "authenticationParameters", clusterData.getAuthenticationParameters());
+ result.put("proxyServiceUrl", clusterData.getProxyServiceUrl());
+
+ return createSuccessResult("Cluster info retrieved", result);
+
+ } catch (IllegalArgumentException e) {
+ return createErrorResult(e.getMessage());
+ } catch (Exception e) {
+ LOGGER.error("Failed to get cluster info", e);
+ return createErrorResult("Failed to get cluster info: " + e.getMessage());
+ }
})
- .build());
- }
-
- private void registerCreateCluster(McpSyncServer mcpServer) {
- McpSchema.Tool tool = createTool(
- "create-cluster",
- "Create a new Pulsar cluster",
- """
+ .build());
+ }
+
+ private void registerCreateCluster(McpSyncServer mcpServer) {
+ McpSchema.Tool tool =
+ createTool(
+ "create-cluster",
+ "Create a new Pulsar cluster",
+ """
{
"type": "object",
"properties": {
@@ -195,81 +202,85 @@ private void registerCreateCluster(McpSyncServer mcpServer) {
},
"required": ["clusterName", "serviceUrl"]
}
- """
- );
+ """);
+
+ mcpServer.addTool(
+ McpServerFeatures.SyncToolSpecification.builder()
+ .tool(tool)
+ .callHandler(
+ (exchange, request) -> {
+ try {
+ String clusterName = getRequiredStringParam(request.arguments(), "clusterName");
+ String serviceUrl = getRequiredStringParam(request.arguments(), "serviceUrl");
+ String serviceUrlTls = getStringParam(request.arguments(), "serviceUrlTls");
+ String brokerServiceUrl =
+ getStringParam(request.arguments(), "brokerServiceUrl");
+ String brokerServiceUrlTls =
+ getStringParam(request.arguments(), "brokerServiceUrlTls");
+ String proxyServiceUrl = getStringParam(request.arguments(), "proxyServiceUrl");
+ String authenticationPlugin =
+ getStringParam(request.arguments(), "authenticationPlugin");
+ String authenticationParameters =
+ getStringParam(request.arguments(), "authenticationParameters");
- mcpServer.addTool(McpServerFeatures.SyncToolSpecification.builder()
- .tool(tool)
- .callHandler((exchange, request) -> {
try {
- String clusterName = getRequiredStringParam(request.arguments(), "clusterName");
- String serviceUrl = getRequiredStringParam(request.arguments(), "serviceUrl");
- String serviceUrlTls = getStringParam(request.arguments(), "serviceUrlTls");
- String brokerServiceUrl = getStringParam(request.arguments(), "brokerServiceUrl");
- String brokerServiceUrlTls = getStringParam(request.arguments(), "brokerServiceUrlTls");
- String proxyServiceUrl = getStringParam(request.arguments(), "proxyServiceUrl");
- String authenticationPlugin = getStringParam(request.arguments(), "authenticationPlugin");
- String authenticationParameters = getStringParam(
- request.arguments(),
- "authenticationParameters"
- );
-
- try {
- pulsarAdmin.clusters().getCluster(clusterName);
- return createErrorResult("Cluster already exists: " + clusterName,
- List.of("Choose a different cluster name"));
- } catch (PulsarAdminException.NotFoundException ignore) {
-
- } catch (PulsarAdminException e) {
- return createErrorResult("Failed to verify cluster existence: " + e.getMessage());
- }
+ pulsarAdmin.clusters().getCluster(clusterName);
+ return createErrorResult(
+ "Cluster already exists: " + clusterName,
+ List.of("Choose a different cluster name"));
+ } catch (PulsarAdminException.NotFoundException ignore) {
- var clusterDataBuilder = ClusterData.builder()
- .serviceUrl(serviceUrl);
+ } catch (PulsarAdminException e) {
+ return createErrorResult(
+ "Failed to verify cluster existence: " + e.getMessage());
+ }
- if (serviceUrlTls != null) {
- clusterDataBuilder.serviceUrlTls(serviceUrlTls);
- }
- if (brokerServiceUrl != null) {
- clusterDataBuilder.brokerServiceUrl(brokerServiceUrl);
- }
- if (brokerServiceUrlTls != null) {
- clusterDataBuilder.brokerServiceUrlTls(brokerServiceUrlTls);
- }
- if (proxyServiceUrl != null) {
- clusterDataBuilder.proxyServiceUrl(proxyServiceUrl);
- }
- if (authenticationPlugin != null) {
- clusterDataBuilder.authenticationPlugin(authenticationPlugin);
- }
- if (authenticationParameters != null) {
- clusterDataBuilder.authenticationParameters(authenticationParameters);
- }
+ var clusterDataBuilder = ClusterData.builder().serviceUrl(serviceUrl);
+
+ if (serviceUrlTls != null) {
+ clusterDataBuilder.serviceUrlTls(serviceUrlTls);
+ }
+ if (brokerServiceUrl != null) {
+ clusterDataBuilder.brokerServiceUrl(brokerServiceUrl);
+ }
+ if (brokerServiceUrlTls != null) {
+ clusterDataBuilder.brokerServiceUrlTls(brokerServiceUrlTls);
+ }
+ if (proxyServiceUrl != null) {
+ clusterDataBuilder.proxyServiceUrl(proxyServiceUrl);
+ }
+ if (authenticationPlugin != null) {
+ clusterDataBuilder.authenticationPlugin(authenticationPlugin);
+ }
+ if (authenticationParameters != null) {
+ clusterDataBuilder.authenticationParameters(authenticationParameters);
+ }
- pulsarAdmin.clusters().createCluster(clusterName, clusterDataBuilder.build());
+ pulsarAdmin.clusters().createCluster(clusterName, clusterDataBuilder.build());
- Map result = new HashMap<>();
- result.put("clusterName", clusterName);
- result.put("serviceUrl", serviceUrl);
- result.put("created", true);
+ Map result = new HashMap<>();
+ result.put("clusterName", clusterName);
+ result.put("serviceUrl", serviceUrl);
+ result.put("created", true);
- return createSuccessResult("Cluster created successfully", result);
+ return createSuccessResult("Cluster created successfully", result);
- } catch (IllegalArgumentException e) {
- return createErrorResult("Invalid parameter: " + e.getMessage());
- } catch (Exception e) {
- LOGGER.error("Failed to create cluster", e);
- return createErrorResult("Failed to create cluster: " + e.getMessage());
- }
+ } catch (IllegalArgumentException e) {
+ return createErrorResult("Invalid parameter: " + e.getMessage());
+ } catch (Exception e) {
+ LOGGER.error("Failed to create cluster", e);
+ return createErrorResult("Failed to create cluster: " + e.getMessage());
+ }
})
- .build());
- }
-
- private void registerUpdateClusterConfig(McpSyncServer mcpServer) {
- McpSchema.Tool tool = createTool(
- "update-cluster-config",
- "Update configuration of an existing Pulsar cluster",
- """
+ .build());
+ }
+
+ private void registerUpdateClusterConfig(McpSyncServer mcpServer) {
+ McpSchema.Tool tool =
+ createTool(
+ "update-cluster-config",
+ "Update configuration of an existing Pulsar cluster",
+ """
{
"type": "object",
"properties": {
@@ -308,88 +319,95 @@ private void registerUpdateClusterConfig(McpSyncServer mcpServer) {
},
"required": ["clusterName", "serviceUrl"]
}
- """
- );
-
- mcpServer.addTool(McpServerFeatures.SyncToolSpecification.builder()
- .tool(tool)
- .callHandler((exchange, request) -> {
+ """);
+
+ mcpServer.addTool(
+ McpServerFeatures.SyncToolSpecification.builder()
+ .tool(tool)
+ .callHandler(
+ (exchange, request) -> {
+ try {
+ String clusterName = getRequiredStringParam(request.arguments(), "clusterName");
+ String serviceUrl = getStringParam(request.arguments(), "serviceUrl");
+ String serviceUrlTls = getStringParam(request.arguments(), "serviceUrlTls");
+ String brokerServiceUrl =
+ getStringParam(request.arguments(), "brokerServiceUrl");
+ String brokerServiceUrlTls =
+ getStringParam(request.arguments(), "brokerServiceUrlTls");
+ String proxyServiceUrl = getStringParam(request.arguments(), "proxyServiceUrl");
+ String authenticationPlugin =
+ getStringParam(request.arguments(), "authenticationPlugin");
+ String authenticationParameters =
+ getStringParam(request.arguments(), "authenticationParameters");
+
+ ClusterData current;
try {
- String clusterName = getRequiredStringParam(request.arguments(), "clusterName");
- String serviceUrl = getStringParam(request.arguments(), "serviceUrl");
- String serviceUrlTls = getStringParam(request.arguments(), "serviceUrlTls");
- String brokerServiceUrl = getStringParam(request.arguments(), "brokerServiceUrl");
- String brokerServiceUrlTls = getStringParam(request.arguments(), "brokerServiceUrlTls");
- String proxyServiceUrl = getStringParam(request.arguments(), "proxyServiceUrl");
- String authenticationPlugin = getStringParam(request.arguments(), "authenticationPlugin");
- String authenticationParameters = getStringParam(
- request.arguments(),
- "authenticationParameters");
-
- ClusterData current;
- try {
- current = pulsarAdmin.clusters().getCluster(clusterName);
- } catch (PulsarAdminException.NotFoundException e) {
- return createErrorResult("Cluster not found: " + clusterName);
- }
-
- String finalServiceUrl = (serviceUrl != null && !serviceUrl.isBlank())
- ? serviceUrl.trim()
- : current.getServiceUrl();
-
- var b = ClusterData.builder()
- .serviceUrl(finalServiceUrl)
- .serviceUrlTls((serviceUrlTls != null
- && !serviceUrlTls.isBlank())
- ? serviceUrlTls
- : current.getServiceUrlTls())
- .brokerServiceUrl((brokerServiceUrl != null
- && !brokerServiceUrl.isBlank())
- ? brokerServiceUrl
- : current.getBrokerServiceUrl())
- .brokerServiceUrlTls((brokerServiceUrlTls != null
- && !brokerServiceUrlTls.isBlank())
- ? brokerServiceUrlTls
- : current.getBrokerServiceUrlTls())
- .proxyServiceUrl((proxyServiceUrl != null
- && !proxyServiceUrl.isBlank())
- ? proxyServiceUrl
- : current.getProxyServiceUrl());
-
- if (authenticationPlugin != null && !authenticationPlugin.isBlank()) {
- b.authenticationPlugin(authenticationPlugin);
- } else if (current.getAuthenticationPlugin() != null) {
- b.authenticationPlugin(current.getAuthenticationPlugin());
- }
- if (authenticationParameters != null && !authenticationParameters.isBlank()) {
- b.authenticationParameters(authenticationParameters);
- } else if (current.getAuthenticationParameters() != null) {
- b.authenticationParameters(current.getAuthenticationParameters());
- }
-
- pulsarAdmin.clusters().updateCluster(clusterName, b.build());
- Map result = new HashMap<>();
- result.put("clusterName", clusterName);
- result.put("serviceUrl", serviceUrl);
- result.put("updated", true);
-
- return createSuccessResult("Cluster configuration updated successfully", result);
+ current = pulsarAdmin.clusters().getCluster(clusterName);
+ } catch (PulsarAdminException.NotFoundException e) {
+ return createErrorResult("Cluster not found: " + clusterName);
+ }
- } catch (IllegalArgumentException e) {
- return createErrorResult("Invalid input: " + e.getMessage());
- } catch (Exception e) {
- LOGGER.error("Failed to update cluster config", e);
- return createErrorResult("Failed to update cluster config: " + e.getMessage());
+ String finalServiceUrl =
+ (serviceUrl != null && !serviceUrl.isBlank())
+ ? serviceUrl.trim()
+ : current.getServiceUrl();
+
+ var b =
+ ClusterData.builder()
+ .serviceUrl(finalServiceUrl)
+ .serviceUrlTls(
+ (serviceUrlTls != null && !serviceUrlTls.isBlank())
+ ? serviceUrlTls
+ : current.getServiceUrlTls())
+ .brokerServiceUrl(
+ (brokerServiceUrl != null && !brokerServiceUrl.isBlank())
+ ? brokerServiceUrl
+ : current.getBrokerServiceUrl())
+ .brokerServiceUrlTls(
+ (brokerServiceUrlTls != null && !brokerServiceUrlTls.isBlank())
+ ? brokerServiceUrlTls
+ : current.getBrokerServiceUrlTls())
+ .proxyServiceUrl(
+ (proxyServiceUrl != null && !proxyServiceUrl.isBlank())
+ ? proxyServiceUrl
+ : current.getProxyServiceUrl());
+
+ if (authenticationPlugin != null && !authenticationPlugin.isBlank()) {
+ b.authenticationPlugin(authenticationPlugin);
+ } else if (current.getAuthenticationPlugin() != null) {
+ b.authenticationPlugin(current.getAuthenticationPlugin());
+ }
+ if (authenticationParameters != null && !authenticationParameters.isBlank()) {
+ b.authenticationParameters(authenticationParameters);
+ } else if (current.getAuthenticationParameters() != null) {
+ b.authenticationParameters(current.getAuthenticationParameters());
}
+
+ pulsarAdmin.clusters().updateCluster(clusterName, b.build());
+ Map result = new HashMap<>();
+ result.put("clusterName", clusterName);
+ result.put("serviceUrl", serviceUrl);
+ result.put("updated", true);
+
+ return createSuccessResult(
+ "Cluster configuration updated successfully", result);
+
+ } catch (IllegalArgumentException e) {
+ return createErrorResult("Invalid input: " + e.getMessage());
+ } catch (Exception e) {
+ LOGGER.error("Failed to update cluster config", e);
+ return createErrorResult("Failed to update cluster config: " + e.getMessage());
+ }
})
- .build());
- }
-
- private void registerDeleteCluster(McpSyncServer mcpServer) {
- McpSchema.Tool tool = createTool(
- "delete-cluster",
- "Delete a Pulsar cluster by name",
- """
+ .build());
+ }
+
+ private void registerDeleteCluster(McpSyncServer mcpServer) {
+ McpSchema.Tool tool =
+ createTool(
+ "delete-cluster",
+ "Delete a Pulsar cluster by name",
+ """
{
"type": "object",
"properties": {
@@ -405,76 +423,80 @@ private void registerDeleteCluster(McpSyncServer mcpServer) {
},
"required": ["clusterName"]
}
- """
- );
-
- mcpServer.addTool(McpServerFeatures.SyncToolSpecification.builder()
- .tool(tool)
- .callHandler((exchange, request) -> {
- try {
- String clusterName = getRequiredStringParam(request.arguments(), "clusterName").trim();
- boolean force = getBooleanParam(request.arguments(), "force", false);
-
- if (!force) {
- List tenants = pulsarAdmin.tenants().getTenants();
- List referencingTenants = new ArrayList<>();
- for (String tenant : tenants) {
- var info = pulsarAdmin.tenants().getTenantInfo(tenant);
- var allowed = info != null ? info.getAllowedClusters() : null;
- if (allowed != null && allowed.contains(clusterName)) {
- referencingTenants.add(tenant);
- }
- }
-
- List referencingNamespaces = new ArrayList<>();
- for (String tenant : tenants) {
- var nss = pulsarAdmin.namespaces().getNamespaces(tenant);
- for (String ns : nss) {
- var repl = pulsarAdmin.namespaces().getNamespaceReplicationClusters(ns);
- if (repl != null && repl.contains(clusterName)) {
- referencingNamespaces.add(ns);
- }
- }
- }
-
- if (!referencingTenants.isEmpty() || !referencingNamespaces.isEmpty()) {
- StringBuilder sb = new StringBuilder();
- sb.append("Cluster '").append(clusterName)
- .append("' is still referenced. Use 'force: true' to delete anyway.");
- if (!referencingTenants.isEmpty()) {
- sb.append(" Referenced by tenants: ").append(referencingTenants);
- }
- if (!referencingNamespaces.isEmpty()) {
- sb.append(" Referenced by namespaces: ").append(referencingNamespaces);
- }
- return createErrorResult(sb.toString());
- }
- }
+ """);
+
+ mcpServer.addTool(
+ McpServerFeatures.SyncToolSpecification.builder()
+ .tool(tool)
+ .callHandler(
+ (exchange, request) -> {
+ try {
+ String clusterName =
+ getRequiredStringParam(request.arguments(), "clusterName").trim();
+ boolean force = getBooleanParam(request.arguments(), "force", false);
+
+ if (!force) {
+ List tenants = pulsarAdmin.tenants().getTenants();
+ List referencingTenants = new ArrayList<>();
+ for (String tenant : tenants) {
+ var info = pulsarAdmin.tenants().getTenantInfo(tenant);
+ var allowed = info != null ? info.getAllowedClusters() : null;
+ if (allowed != null && allowed.contains(clusterName)) {
+ referencingTenants.add(tenant);
+ }
+ }
+
+ List referencingNamespaces = new ArrayList<>();
+ for (String tenant : tenants) {
+ var nss = pulsarAdmin.namespaces().getNamespaces(tenant);
+ for (String ns : nss) {
+ var repl = pulsarAdmin.namespaces().getNamespaceReplicationClusters(ns);
+ if (repl != null && repl.contains(clusterName)) {
+ referencingNamespaces.add(ns);
+ }
+ }
+ }
+
+ if (!referencingTenants.isEmpty() || !referencingNamespaces.isEmpty()) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Cluster '")
+ .append(clusterName)
+ .append("' is still referenced. Use 'force: true' to delete anyway.");
+ if (!referencingTenants.isEmpty()) {
+ sb.append(" Referenced by tenants: ").append(referencingTenants);
+ }
+ if (!referencingNamespaces.isEmpty()) {
+ sb.append(" Referenced by namespaces: ").append(referencingNamespaces);
+ }
+ return createErrorResult(sb.toString());
+ }
+ }
- pulsarAdmin.clusters().deleteCluster(clusterName);
+ pulsarAdmin.clusters().deleteCluster(clusterName);
- Map result = new HashMap<>();
- result.put("clusterName", clusterName);
- result.put("deleted", true);
- result.put("forced", force);
+ Map result = new HashMap<>();
+ result.put("clusterName", clusterName);
+ result.put("deleted", true);
+ result.put("forced", force);
- return createSuccessResult("Cluster deleted successfully", result);
+ return createSuccessResult("Cluster deleted successfully", result);
- } catch (IllegalArgumentException e) {
- return createErrorResult(e.getMessage());
- } catch (Exception e) {
- LOGGER.error("Failed to delete cluster", e);
- return createErrorResult("Failed to delete cluster: " + e.getMessage());
- }
+ } catch (IllegalArgumentException e) {
+ return createErrorResult(e.getMessage());
+ } catch (Exception e) {
+ LOGGER.error("Failed to delete cluster", e);
+ return createErrorResult("Failed to delete cluster: " + e.getMessage());
+ }
})
- .build());
- }
-
- private void registerGetClusterStats(McpSyncServer mcpServer) {
- McpSchema.Tool tool = createTool(
- "get-cluster-stats",
- "Get statistics for a given Pulsar cluster",
- """
+ .build());
+ }
+
+ private void registerGetClusterStats(McpSyncServer mcpServer) {
+ McpSchema.Tool tool =
+ createTool(
+ "get-cluster-stats",
+ "Get statistics for a given Pulsar cluster",
+ """
{
"type": "object",
"properties": {
@@ -485,42 +507,44 @@ private void registerGetClusterStats(McpSyncServer mcpServer) {
},
"required": ["clusterName"]
}
- """
- );
-
- mcpServer.addTool(McpServerFeatures.SyncToolSpecification.builder()
- .tool(tool)
- .callHandler((exchange, request) -> {
- try {
- String clusterName = getRequiredStringParam(request.arguments(), "clusterName");
- if (clusterName == null || clusterName.isBlank()) {
- return createErrorResult("Missing required parameter: clusterName");
- }
+ """);
+
+ mcpServer.addTool(
+ McpServerFeatures.SyncToolSpecification.builder()
+ .tool(tool)
+ .callHandler(
+ (exchange, request) -> {
+ try {
+ String clusterName = getRequiredStringParam(request.arguments(), "clusterName");
+ if (clusterName == null || clusterName.isBlank()) {
+ return createErrorResult("Missing required parameter: clusterName");
+ }
- var brokers = pulsarAdmin.brokers().getActiveBrokers(clusterName);
+ var brokers = pulsarAdmin.brokers().getActiveBrokers(clusterName);
- Map stats = new HashMap<>();
- stats.put("clusterName", clusterName);
- stats.put("activeBrokers", brokers);
- stats.put("brokerCount", brokers.size());
+ Map stats = new HashMap<>();
+ stats.put("clusterName", clusterName);
+ stats.put("activeBrokers", brokers);
+ stats.put("brokerCount", brokers.size());
- return createSuccessResult("Cluster stats retrieved successfully", stats);
+ return createSuccessResult("Cluster stats retrieved successfully", stats);
- } catch (IllegalArgumentException e){
- return createErrorResult(e.getMessage());
- } catch (Exception e) {
- LOGGER.error("Failed to get cluster stats", e);
- return createErrorResult("Failed to get cluster stats: " + e.getMessage());
- }
+ } catch (IllegalArgumentException e) {
+ return createErrorResult(e.getMessage());
+ } catch (Exception e) {
+ LOGGER.error("Failed to get cluster stats", e);
+ return createErrorResult("Failed to get cluster stats: " + e.getMessage());
+ }
})
- .build());
- }
-
- private void registerListBrokers(McpSyncServer mcpServer) {
- McpSchema.Tool tool = createTool(
- "list-brokers",
- "List all active brokers in a given Pulsar cluster",
- """
+ .build());
+ }
+
+ private void registerListBrokers(McpSyncServer mcpServer) {
+ McpSchema.Tool tool =
+ createTool(
+ "list-brokers",
+ "List all active brokers in a given Pulsar cluster",
+ """
{
"type": "object",
"properties": {
@@ -531,65 +555,72 @@ private void registerListBrokers(McpSyncServer mcpServer) {
},
"required": ["clusterName"]
}
- """
- );
+ """);
+
+ mcpServer.addTool(
+ McpServerFeatures.SyncToolSpecification.builder()
+ .tool(tool)
+ .callHandler(
+ (exchange, request) -> {
+ try {
+ String clusterName =
+ getRequiredStringParam(request.arguments(), "clusterName").trim();
+ if (clusterName.isEmpty()) {
+ return createErrorResult("clusterName cannot be blank");
+ }
- mcpServer.addTool(McpServerFeatures.SyncToolSpecification.builder()
- .tool(tool)
- .callHandler((exchange, request) -> {
try {
- String clusterName = getRequiredStringParam(request.arguments(), "clusterName").trim();
- if (clusterName.isEmpty()) {
- return createErrorResult("clusterName cannot be blank");
- }
+ pulsarAdmin.clusters().getCluster(clusterName);
+ } catch (PulsarAdminException.NotFoundException e) {
+ return createErrorResult("Cluster '" + clusterName + "' not found");
+ }
- try {
- pulsarAdmin.clusters().getCluster(clusterName);
- } catch (PulsarAdminException.NotFoundException e) {
- return createErrorResult("Cluster '" + clusterName + "' not found");
- }
+ List active =
+ new ArrayList<>(pulsarAdmin.brokers().getActiveBrokers(clusterName));
+ active.sort(String::compareTo);
- List active = new ArrayList<>(pulsarAdmin.brokers().getActiveBrokers(clusterName));
- active.sort(String::compareTo);
-
- String leader = null;
- try {
- leader = String.valueOf(pulsarAdmin.brokers().getLeaderBroker());
- } catch (Exception ignore) {}
-
- var dynamicConfigNames = pulsarAdmin.brokers().getDynamicConfigurationNames();
- List dynamicNamesSorted = dynamicConfigNames == null
- ? List.of()
- : dynamicConfigNames.stream().sorted().toList();
-
- Map result = new HashMap<>();
- result.put("clusterName", clusterName);
- result.put("activeBrokers", active);
- result.put("brokerCount", active.size());
- result.put("leaderBroker", leader);
- result.put("dynamicConfigNames", dynamicNamesSorted);
- result.put("available", !active.isEmpty());
- result.put("timestamp", System.currentTimeMillis());
-
- String msg = "List of active brokers retrieved successfully"
- + (leader != null ? "" : " (leader not available)");
- return createSuccessResult(msg, result);
-
- } catch (IllegalArgumentException e) {
- return createErrorResult(e.getMessage());
- } catch (Exception e) {
- LOGGER.error("Failed to list brokers", e);
- return createErrorResult("Failed to list brokers: " + e.getMessage());
+ String leader = null;
+ try {
+ leader = String.valueOf(pulsarAdmin.brokers().getLeaderBroker());
+ } catch (Exception ignore) {
}
+
+ var dynamicConfigNames = pulsarAdmin.brokers().getDynamicConfigurationNames();
+ List dynamicNamesSorted =
+ dynamicConfigNames == null
+ ? List.of()
+ : dynamicConfigNames.stream().sorted().toList();
+
+ Map result = new HashMap<>();
+ result.put("clusterName", clusterName);
+ result.put("activeBrokers", active);
+ result.put("brokerCount", active.size());
+ result.put("leaderBroker", leader);
+ result.put("dynamicConfigNames", dynamicNamesSorted);
+ result.put("available", !active.isEmpty());
+ result.put("timestamp", System.currentTimeMillis());
+
+ String msg =
+ "List of active brokers retrieved successfully"
+ + (leader != null ? "" : " (leader not available)");
+ return createSuccessResult(msg, result);
+
+ } catch (IllegalArgumentException e) {
+ return createErrorResult(e.getMessage());
+ } catch (Exception e) {
+ LOGGER.error("Failed to list brokers", e);
+ return createErrorResult("Failed to list brokers: " + e.getMessage());
+ }
})
- .build());
- }
-
- private void registerGetBrokerStats(McpSyncServer mcpServer) {
- McpSchema.Tool tool = createTool(
- "get-broker-stats",
- "Get statistics for a specific Pulsar broker",
- """
+ .build());
+ }
+
+ private void registerGetBrokerStats(McpSyncServer mcpServer) {
+ McpSchema.Tool tool =
+ createTool(
+ "get-broker-stats",
+ "Get statistics for a specific Pulsar broker",
+ """
{
"type": "object",
"properties": {
@@ -600,43 +631,44 @@ private void registerGetBrokerStats(McpSyncServer mcpServer) {
},
"required": ["brokerUrl"]
}
- """
- );
-
- mcpServer.addTool(McpServerFeatures.SyncToolSpecification.builder()
- .tool(tool)
- .callHandler((exchange, request) -> {
- try {
- String brokerUrl = getRequiredStringParam(request.arguments(), "brokerUrl");
-
- if (brokerUrl == null || brokerUrl.isBlank()) {
- return createErrorResult("Missing required parameter: brokerUrl");
- }
+ """);
+
+ mcpServer.addTool(
+ McpServerFeatures.SyncToolSpecification.builder()
+ .tool(tool)
+ .callHandler(
+ (exchange, request) -> {
+ try {
+ String brokerUrl = getRequiredStringParam(request.arguments(), "brokerUrl");
+
+ if (brokerUrl == null || brokerUrl.isBlank()) {
+ return createErrorResult("Missing required parameter: brokerUrl");
+ }
- var brokerStats = pulsarAdmin.brokerStats().getTopics();
+ var brokerStats = pulsarAdmin.brokerStats().getTopics();
- Map result = new HashMap<>();
- result.put("brokerUrl", brokerUrl);
- result.put("stats", brokerStats);
+ Map result = new HashMap<>();
+ result.put("brokerUrl", brokerUrl);
+ result.put("stats", brokerStats);
- return createSuccessResult("Broker stats retrieved successfully", result);
+ return createSuccessResult("Broker stats retrieved successfully", result);
- } catch (IllegalArgumentException e) {
- return createErrorResult(e.getMessage());
- } catch (Exception e) {
- LOGGER.error("Failed to get broker stats", e);
- return createErrorResult("Failed to get broker stats: " + e.getMessage());
- }
+ } catch (IllegalArgumentException e) {
+ return createErrorResult(e.getMessage());
+ } catch (Exception e) {
+ LOGGER.error("Failed to get broker stats", e);
+ return createErrorResult("Failed to get broker stats: " + e.getMessage());
+ }
})
- .build()
- );
- }
-
- private void registerGetClusterFailureDomain(McpSyncServer mcpServer) {
- McpSchema.Tool tool = createTool(
- "get-cluster-failure-domain",
- "Get failure domain(s) for a specific Pulsar cluster",
- """
+ .build());
+ }
+
+ private void registerGetClusterFailureDomain(McpSyncServer mcpServer) {
+ McpSchema.Tool tool =
+ createTool(
+ "get-cluster-failure-domain",
+ "Get failure domain(s) for a specific Pulsar cluster",
+ """
{
"type": "object",
"properties": {
@@ -656,130 +688,138 @@ private void registerGetClusterFailureDomain(McpSyncServer mcpServer) {
},
"required": ["clusterName"]
}
- """
- );
+ """);
+
+ mcpServer.addTool(
+ McpServerFeatures.SyncToolSpecification.builder()
+ .tool(tool)
+ .callHandler(
+ (exchange, request) -> {
+ try {
+ String clusterName =
+ getRequiredStringParam(request.arguments(), "clusterName").trim();
+ String domainName = getStringParam(request.arguments(), "domainName");
+ boolean includeEmpty =
+ getBooleanParam(request.arguments(), "includeEmpty", true);
+
+ if (clusterName.isEmpty()) {
+ return createErrorResult("clusterName cannot be blank");
+ }
+ if (domainName != null) {
+ domainName = domainName.trim();
+ }
- mcpServer.addTool(McpServerFeatures.SyncToolSpecification.builder()
- .tool(tool)
- .callHandler((exchange, request) -> {
try {
- String clusterName = getRequiredStringParam(request.arguments(), "clusterName").trim();
- String domainName = getStringParam(request.arguments(), "domainName");
- boolean includeEmpty = getBooleanParam(request.arguments(), "includeEmpty", true);
-
- if (clusterName.isEmpty()) {
- return createErrorResult("clusterName cannot be blank");
- }
- if (domainName != null) {
- domainName = domainName.trim();
- }
-
- try {
- pulsarAdmin.clusters().getCluster(clusterName);
- } catch (PulsarAdminException.NotFoundException e) {
- return createErrorResult("Cluster '" + clusterName + "' not found");
- }
-
- Map result = new HashMap<>();
- result.put("clusterName", clusterName);
-
- if (domainName != null && !domainName.isEmpty()) {
- try {
- FailureDomain fd =
- pulsarAdmin.clusters().getFailureDomain(clusterName, domainName);
-
- Set brokers = (fd != null && fd.getBrokers() != null)
- ? new HashSet<>(fd.getBrokers())
- : new HashSet<>();
-
- if (!includeEmpty && brokers.isEmpty()) {
- result.put("domains", List.of());
- result.put("domainCount", 0);
- result.put("available", false);
- return createSuccessResult(
- "Domain exists but filtered by includeEmpty=false", result);
- }
-
- Map item = new HashMap<>();
- item.put("domainName", domainName);
- item.put("brokers", brokers.stream().sorted().toList());
- item.put("brokerCount", brokers.size());
-
- result.put("domains", List.of(item));
- result.put("domainCount", 1);
- result.put("available", true);
-
- return createSuccessResult(
- "Cluster failure domain retrieved successfully", result);
- } catch (PulsarAdminException.NotFoundException e) {
- return createErrorResult(
- "Domain '"
- + domainName
- + "' not found in cluster '"
- + clusterName + "'");
- }
- } else {
- Map raw =
- pulsarAdmin.clusters().getFailureDomains(clusterName);
- if (raw == null) {
- raw = Map.of();
- }
-
- List