Skip to content

Commit eeebbc5

Browse files
committed
support alibaba cloud rocketmq
1 parent 9f959db commit eeebbc5

File tree

4 files changed

+50
-8
lines changed

4 files changed

+50
-8
lines changed

mse-simple-demo/A/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@
5959
<artifactId>rocketmq-client</artifactId>
6060
<version>${rocketmq.version}</version>
6161
</dependency>
62+
<dependency>
63+
<groupId>org.apache.rocketmq</groupId>
64+
<artifactId>rocketmq-acl</artifactId>
65+
<version>${rocketmq.version}</version>
66+
</dependency>
6267

6368
<!-- Dubbo -->
6469
<dependency>

mse-simple-demo/A/src/main/java/com/alibabacloud/mse/demo/a/mq/RocketMqConfiguration.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,15 @@
22

33
import lombok.RequiredArgsConstructor;
44
import lombok.extern.slf4j.Slf4j;
5+
import org.apache.commons.lang3.StringUtils;
6+
import org.apache.rocketmq.acl.common.AclClientRPCHook;
7+
import org.apache.rocketmq.acl.common.SessionCredentials;
58
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
9+
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
610
import org.apache.rocketmq.client.exception.MQClientException;
711
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
12+
import org.apache.rocketmq.remoting.RPCHook;
813
import org.springframework.beans.factory.annotation.Autowired;
9-
import org.springframework.beans.factory.annotation.Qualifier;
1014
import org.springframework.beans.factory.annotation.Value;
1115
import org.springframework.cloud.commons.util.InetUtils;
1216
import org.springframework.context.annotation.Bean;
@@ -27,6 +31,11 @@ public class RocketMqConfiguration {
2731
@Value("${rocketmq.consumer.topic}")
2832
private String topic;
2933

34+
@Value("${middleware.mq.ak:}")
35+
private String ak;
36+
@Value("${middleware.mq.sk:}")
37+
private String sk;
38+
3039
@Autowired
3140
private RestTemplate restTemplate;
3241

@@ -43,16 +52,21 @@ public class RocketMqConfiguration {
4352
@Bean(initMethod = "start", destroyMethod = "shutdown")
4453
public DefaultMQPushConsumer mqPushConsumer() throws MQClientException {
4554
log.info("正在启动rocketMq的consumer");
46-
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
55+
56+
DefaultMQPushConsumer consumer;
57+
if (StringUtils.isNotEmpty(this.ak) && StringUtils.isNotEmpty(this.sk)) {
58+
log.info("middleware.mq.ak 不为空,MQ ACL信息已填充");
59+
RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(this.ak, this.sk));
60+
consumer = new DefaultMQPushConsumer(this.groupName, rpcHook, new AllocateMessageQueueAveragely());
61+
} else {
62+
log.info("middleware.mq.ak 为空");
63+
consumer = new DefaultMQPushConsumer(groupName);
64+
}
4765
consumer.setNamesrvAddr(nameSrvAddr);
4866
consumer.subscribe(topic, "*");
4967
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
5068

51-
MqConsumer mqConsumer = new MqConsumer(
52-
restTemplate,
53-
inetUtils,
54-
serviceTag
55-
);
69+
MqConsumer mqConsumer = new MqConsumer(restTemplate, inetUtils, serviceTag);
5670
consumer.registerMessageListener(mqConsumer);
5771
log.info("完成启动rocketMq的consumer,subscribe:{}", topic);
5872
return consumer;

mse-simple-demo/C/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@
4949
<artifactId>rocketmq-client</artifactId>
5050
<version>${rocketmq.version}</version>
5151
</dependency>
52+
<dependency>
53+
<groupId>org.apache.rocketmq</groupId>
54+
<artifactId>rocketmq-acl</artifactId>
55+
<version>${rocketmq.version}</version>
56+
</dependency>
5257

5358

5459
<!-- Dubbo -->

mse-simple-demo/C/src/main/java/com/alibabacloud/mse/demo/c/mq/RocketMqConfiguration.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@
33

44
import lombok.RequiredArgsConstructor;
55
import lombok.extern.slf4j.Slf4j;
6+
import org.apache.commons.lang3.StringUtils;
7+
import org.apache.rocketmq.acl.common.AclClientRPCHook;
8+
import org.apache.rocketmq.acl.common.SessionCredentials;
69
import org.apache.rocketmq.client.producer.DefaultMQProducer;
10+
import org.apache.rocketmq.remoting.RPCHook;
711
import org.springframework.beans.factory.annotation.Value;
812
import org.springframework.context.annotation.Bean;
913
import org.springframework.context.annotation.Configuration;
@@ -19,6 +23,11 @@ public class RocketMqConfiguration {
1923
@Value("${rocketmq.consumer.group}")
2024
private String groupName;
2125

26+
@Value("${middleware.mq.ak:}")
27+
private String ak;
28+
@Value("${middleware.mq.sk:}")
29+
private String sk;
30+
2231
static {
2332
System.setProperty("rocketmq.client.log.loadconfig", "false");
2433
}
@@ -27,7 +36,16 @@ public class RocketMqConfiguration {
2736
public DefaultMQProducer getBaseProducer() throws Exception {
2837
log.info("正在启动rocketMq的producer");
2938

30-
DefaultMQProducer baseProducer = new DefaultMQProducer();
39+
DefaultMQProducer baseProducer;
40+
if (StringUtils.isNotEmpty(this.ak) && StringUtils.isNotEmpty(this.sk)) {
41+
log.info("middleware.mq.ak 不为空,MQ ACL信息已填充");
42+
RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(this.ak, this.sk));
43+
baseProducer = new DefaultMQProducer(rpcHook);
44+
} else {
45+
log.info("middleware.mq.ak 为空");
46+
baseProducer = new DefaultMQProducer();
47+
}
48+
3149
baseProducer.setProducerGroup(groupName);
3250
baseProducer.setNamesrvAddr(nameSrvAddr);
3351
baseProducer.start();

0 commit comments

Comments
 (0)