diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/RocketMQEngine.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/RocketMQEngine.java index 1cb324a07..b5be076c2 100644 --- a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/RocketMQEngine.java +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/RocketMQEngine.java @@ -19,6 +19,7 @@ package org.dromara.hutool.extra.mq.engine.rocketmq; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.MixAll; import org.dromara.hutool.core.lang.Assert; import org.dromara.hutool.extra.mq.Consumer; import org.dromara.hutool.extra.mq.MQConfig; @@ -35,6 +36,8 @@ import org.dromara.hutool.extra.mq.engine.MQEngine; public class RocketMQEngine implements MQEngine { private MQConfig config; + private String producerGroup; + private String consumerGroup; /** * 默认构造 @@ -42,6 +45,30 @@ public class RocketMQEngine implements MQEngine { public RocketMQEngine() { // SPI方式加载时检查库是否引入 Assert.notNull( org.apache.rocketmq.common.message.Message.class); + this.producerGroup = MixAll.DEFAULT_PRODUCER_GROUP; + this.consumerGroup = MixAll.DEFAULT_CONSUMER_GROUP; + } + + /** + * 设置生产者组 + * + * @param producerGroup 生产者组 + * @return this + */ + public RocketMQEngine setProducerGroup(final String producerGroup) { + this.producerGroup = producerGroup; + return this; + } + + /** + * 设置消费者组 + * + * @param consumerGroup 消费者组 + * @return this + */ + public RocketMQEngine setConsumerGroup(final String consumerGroup) { + this.consumerGroup = consumerGroup; + return this; } @Override @@ -52,7 +79,7 @@ public class RocketMQEngine implements MQEngine { @Override public Producer getProducer() { - final DefaultMQProducer defaultMQProducer = new DefaultMQProducer(); + final DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup); defaultMQProducer.setNamesrvAddr(config.getBrokerUrl()); try { defaultMQProducer.start(); @@ -64,7 +91,7 @@ public class RocketMQEngine implements MQEngine { @Override public Consumer getConsumer() { - final DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(); + final DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(consumerGroup); defaultMQPushConsumer.setNamesrvAddr(config.getBrokerUrl()); return new RocketMQConsumer(defaultMQPushConsumer); }